123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614 |
- // Copyright 2021-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- //go:build edgex
- // +build edgex
- package edgex
- import (
- "encoding/json"
- "fmt"
- "reflect"
- "testing"
- v3 "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
- "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/testx"
- "github.com/lf-edge/ekuiper/internal/topo/context"
- "github.com/lf-edge/ekuiper/internal/topo/transform"
- "github.com/lf-edge/ekuiper/pkg/cast"
- )
- var (
- contextLogger = conf.Log.WithField("rule", "testEdgexSink")
- ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- )
- func compareEvent(expected, actual *dtos.Event) bool {
- if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
- for _, r := range expected.Readings {
- compared := false
- for _, a := range actual.Readings {
- if r.ResourceName == a.ResourceName {
- compared = true
- if !compareReading(r, a) {
- return false
- }
- }
- }
- if !compared {
- return false
- }
- }
- return true
- }
- return false
- }
- func compareReading(expected, actual dtos.BaseReading) bool {
- if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
- if expected.ValueType == v3.ValueTypeObject {
- if !reflect.DeepEqual(expected.ObjectValue, actual.ObjectValue) {
- return false
- }
- }
- return true
- }
- return false
- }
- func TestConfigure(t *testing.T) {
- tests := []struct {
- conf map[string]interface{}
- expected *SinkConf
- error string
- }{
- { // 0
- conf: map[string]interface{}{
- "metadata": "meta",
- },
- expected: &SinkConf{
- MessageType: MessageTypeEvent,
- ContentType: "application/json",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- Metadata: "meta",
- },
- },
- { // 1
- conf: map[string]interface{}{
- "type": "redis",
- "protocol": "redis",
- "host": "edgex-redis",
- "port": 6379,
- "topic": "ekuiperResult",
- "deviceName": "ekuiper",
- "profileName": "ekuiper",
- "sourceName": "ekuiper",
- "contentType": "application/json",
- },
- expected: &SinkConf{
- MessageType: MessageTypeEvent,
- ContentType: "application/json",
- DeviceName: "ekuiper",
- ProfileName: "ekuiper",
- SourceName: "ekuiper",
- Topic: "ekuiperResult",
- },
- },
- { // 2
- conf: map[string]interface{}{
- "protocol": "tcp",
- "host": "127.0.0.1",
- "port": 1883,
- "topic": "result",
- "type": "mqtt",
- "metadata": "edgex_meta",
- "contentType": "application/json",
- "optional": map[string]interface{}{
- "ClientId": "edgex_message_bus_001",
- },
- },
- expected: &SinkConf{
- MessageType: MessageTypeEvent,
- ContentType: "application/json",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- SourceName: "",
- Metadata: "edgex_meta",
- Topic: "result",
- },
- },
- { // 3
- conf: map[string]interface{}{
- "type": "redis",
- "protocol": "redis",
- "host": "edgex-redis",
- "port": 6379,
- "topicPrefix": "edgex/events/device",
- "messageType": "request",
- "contentType": "application/json",
- },
- expected: &SinkConf{
- MessageType: MessageTypeRequest,
- ContentType: "application/json",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- SourceName: "",
- TopicPrefix: "edgex/events/device",
- },
- },
- { // 4
- conf: map[string]interface{}{
- "type": "redis",
- "protocol": "redis",
- "host": "edgex-redis",
- "port": 6379,
- "topicPrefix": "edgex/events/device",
- "messageType": "requests",
- "contentType": "application/json",
- },
- error: "specified wrong messageType value requests",
- },
- { // 5
- conf: map[string]interface{}{
- "protocol": "redis",
- "host": "edgex-redis",
- "port": 6379,
- "topicPrefix": "edgex/events/device",
- "topic": "requests",
- "contentType": "application/json",
- },
- error: "not allow to specify both topic and topicPrefix, please set one only",
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- for i, test := range tests {
- ems := EdgexMsgBusSink{}
- err := ems.Configure(test.conf)
- if !reflect.DeepEqual(test.error, testx.Errstring(err)) {
- t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, test.error, err)
- } else if test.error == "" && !reflect.DeepEqual(test.expected, ems.c) {
- t.Errorf("%d\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, test.expected, ems.c)
- }
- }
- }
- func TestProduceEvents(t1 *testing.T) {
- tests := []struct {
- input string
- conf map[string]interface{}
- expected *dtos.Event
- error string
- }{
- { // 0
- input: `[
- {"meta":{
- "correlationid":"","deviceName":"demo","id":"","origin":3,
- "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
- "temperature":{"deviceName":"test device name2","id":"22","origin":24}
- }
- },
- {"humidity":100},
- {"temperature":50}
- ]`,
- conf: map[string]interface{}{
- "metadata": "meta",
- },
- expected: &dtos.Event{
- Id: "",
- DeviceName: "demo",
- ProfileName: "ekuiperProfile",
- SourceName: "ruleTest",
- Origin: 3,
- Readings: []dtos.BaseReading{
- {
- ResourceName: "humidity",
- DeviceName: "test device name1",
- ProfileName: "ekuiperProfile",
- Id: "12",
- Origin: 14,
- ValueType: v3.ValueTypeInt64,
- SimpleReading: dtos.SimpleReading{Value: "100"},
- },
- {
- ResourceName: "temperature",
- DeviceName: "test device name2",
- ProfileName: "ekuiperProfile",
- Id: "22",
- Origin: 24,
- ValueType: v3.ValueTypeFloat64,
- SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
- },
- },
- },
- error: "",
- },
- { // 1
- input: `[
- {"meta":{
- "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
- "humidity":{"deviceName":"test device name1","id":"12","origin":14},
- "temperature":{"deviceName":"test device name2","id":"22","origin":24}
- }
- },
- {"h1":100},
- {"h2":null}
- ]`,
- conf: map[string]interface{}{
- "metadata": "meta",
- },
- expected: &dtos.Event{
- Id: "abc",
- DeviceName: "demo",
- ProfileName: "demoProfile",
- SourceName: "demoSource",
- Origin: 3,
- Tags: map[string]interface{}{"auth": "admin"},
- Readings: []dtos.BaseReading{
- {
- ResourceName: "h1",
- SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
- DeviceName: "demo",
- ProfileName: "demoProfile",
- ValueType: v3.ValueTypeFloat64,
- },
- },
- },
- error: "",
- },
- { // 2
- input: `[
- {"meta": 50,"h1":100}
- ]`,
- conf: map[string]interface{}{
- "sourceName": "demo",
- },
- expected: &dtos.Event{
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- SourceName: "demo",
- Readings: []dtos.BaseReading{
- {
- ResourceName: "meta",
- SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- ValueType: v3.ValueTypeFloat64,
- },
- {
- ResourceName: "h1",
- SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- ValueType: v3.ValueTypeFloat64,
- },
- },
- },
- error: "",
- },
- { // 3
- input: `[
- {"meta1": "newmeta"},
- {"h1":true},
- {"sa":["1","2","3","4"]},
- {"fa":[1.1,2.2,3.3,4.4]}
- ]`,
- expected: &dtos.Event{
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- SourceName: "ruleTest",
- Readings: []dtos.BaseReading{
- {
- ResourceName: "meta1",
- SimpleReading: dtos.SimpleReading{Value: "newmeta"},
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- ValueType: v3.ValueTypeString,
- },
- {
- ResourceName: "h1",
- SimpleReading: dtos.SimpleReading{Value: "true"},
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- ValueType: v3.ValueTypeBool,
- },
- {
- ResourceName: "sa",
- SimpleReading: dtos.SimpleReading{Value: "[1, 2, 3, 4]"},
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- ValueType: v3.ValueTypeStringArray,
- },
- {
- ResourceName: "fa",
- SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- ValueType: v3.ValueTypeFloat64Array,
- },
- },
- },
- error: "",
- },
- { // 4
- input: `[]`,
- conf: map[string]interface{}{
- "deviceName": "kuiper",
- "profileName": "kp",
- "topic": "demo",
- },
- expected: &dtos.Event{
- ProfileName: "kp",
- DeviceName: "kuiper",
- SourceName: "ruleTest",
- Origin: 0,
- Readings: nil,
- },
- error: "",
- },
- { // 5
- input: `[{"sa":["1","2",3,"4"]}]`, // invalid array, return nil
- expected: &dtos.Event{
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- SourceName: "ruleTest",
- Origin: 0,
- Readings: nil,
- },
- },
- { // 6
- input: `[
- {"meta1": "newmeta"},
- {"sa":"SGVsbG8gV29ybGQ="},
- {"meta":{
- "correlationid":"","profileName":"demoProfile","deviceName":"demo","id":"abc","origin":3,"tags":{"auth":"admin"},
- "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
- }}
- ]`,
- conf: map[string]interface{}{
- "metadata": "meta",
- "profileName": "myprofile",
- "sourceName": "ds",
- },
- expected: &dtos.Event{
- DeviceName: "demo",
- ProfileName: "demoProfile",
- SourceName: "ds",
- Origin: 3,
- Tags: map[string]interface{}{"auth": "admin"},
- Readings: []dtos.BaseReading{
- {
- DeviceName: "demo",
- ProfileName: "demoProfile",
- ResourceName: "meta1",
- SimpleReading: dtos.SimpleReading{Value: "newmeta"},
- ValueType: v3.ValueTypeString,
- },
- {
- ResourceName: "sa",
- BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
- ProfileName: "demoProfile",
- DeviceName: "test device name1",
- Id: "12",
- Origin: 14,
- ValueType: v3.ValueTypeBinary,
- },
- },
- },
- error: "",
- },
- { // 7
- input: `[
- {"meta":{
- "correlationid":"","deviceName":"demo","id":"","origin":3,
- "obj":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Object"}
- }
- },
- {"obj":{"a":1,"b":"sttt"}}
- ]`,
- conf: map[string]interface{}{
- "metadata": "meta",
- },
- expected: &dtos.Event{
- Id: "",
- DeviceName: "demo",
- ProfileName: "ekuiperProfile",
- SourceName: "ruleTest",
- Origin: 3,
- Readings: []dtos.BaseReading{
- {
- ResourceName: "obj",
- DeviceName: "test device name1",
- ProfileName: "ekuiperProfile",
- Id: "12",
- Origin: 14,
- ValueType: v3.ValueTypeObject,
- ObjectReading: dtos.ObjectReading{ObjectValue: map[string]interface{}{
- "a": float64(1),
- "b": "sttt",
- }},
- },
- },
- },
- error: "",
- },
- { // 8
- input: `[
- {"obj":{"a":1,"b":"sttt"}}
- ]`,
- conf: map[string]interface{}{},
- expected: &dtos.Event{
- Id: "",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- SourceName: "ruleTest",
- Origin: 0,
- Readings: []dtos.BaseReading{
- {
- ResourceName: "obj",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- Id: "",
- Origin: 0,
- ValueType: v3.ValueTypeObject,
- ObjectReading: dtos.ObjectReading{ObjectValue: map[string]interface{}{
- "a": float64(1),
- "b": "sttt",
- }},
- },
- },
- },
- error: "",
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- for i, t := range tests {
- ems := EdgexMsgBusSink{}
- err := ems.Configure(t.conf)
- if err != nil {
- t1.Errorf("%d: configure error %v", i, err)
- continue
- }
- if ems.c.SourceName == "" {
- ems.c.SourceName = "ruleTest"
- }
- var payload []map[string]interface{}
- json.Unmarshal([]byte(t.input), &payload)
- result, err := ems.produceEvents(ctx, payload)
- if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
- t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
- } else if t.error == "" && !compareEvent(t.expected, result) {
- t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
- }
- }
- }
- func TestEdgeXTemplate_Apply(t1 *testing.T) {
- tests := []struct {
- input string
- conf map[string]interface{}
- expected *dtos.Event
- error string
- }{
- { // 0
- input: `[{"meta":{
- "correlationid":"","deviceName":"demo","id":"","origin":3,
- "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
- "temperature":{"deviceName":"test device name2","id":"22","origin":24}
- },
- "humidity":100,
- "temperature":50}
- ]`,
- conf: map[string]interface{}{
- "metadata": "meta",
- "dataTemplate": `{"wrapper":"w1","ab":"{{.humidity}}"}`,
- },
- expected: &dtos.Event{
- Id: "",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- SourceName: "ruleTest",
- Origin: 0,
- Readings: []dtos.BaseReading{
- {
- ResourceName: "wrapper",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- Id: "",
- Origin: 0,
- ValueType: v3.ValueTypeString,
- SimpleReading: dtos.SimpleReading{Value: "w1"},
- },
- {
- ResourceName: "ab",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- Id: "",
- Origin: 0,
- ValueType: v3.ValueTypeString,
- SimpleReading: dtos.SimpleReading{Value: "100"},
- },
- },
- },
- error: "",
- }, {
- input: `[{"json":"{\"a\":24,\"b\":\"c\"}"}]`,
- conf: map[string]interface{}{
- "dataTemplate": `{{.json}}`,
- },
- expected: &dtos.Event{
- Id: "",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- SourceName: "ruleTest",
- Origin: 0,
- Readings: []dtos.BaseReading{
- {
- ResourceName: "a",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- Id: "",
- Origin: 0,
- ValueType: v3.ValueTypeFloat64,
- SimpleReading: dtos.SimpleReading{Value: "2.400000e+01"},
- },
- {
- ResourceName: "b",
- DeviceName: "ekuiper",
- ProfileName: "ekuiperProfile",
- Id: "",
- Origin: 0,
- ValueType: v3.ValueTypeString,
- SimpleReading: dtos.SimpleReading{Value: "c"},
- },
- },
- },
- error: "",
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- for i, t := range tests {
- ems := EdgexMsgBusSink{}
- err := ems.Configure(t.conf)
- if err != nil {
- t1.Errorf("%d: configure error %v", i, err)
- continue
- }
- if ems.c.SourceName == "" {
- ems.c.SourceName = "ruleTest"
- }
- var payload []map[string]interface{}
- json.Unmarshal([]byte(t.input), &payload)
- dt := t.conf["dataTemplate"]
- tf, _ := transform.GenTransform(cast.ToStringAlways(dt), "json", "", "", "", []string{})
- vCtx := context.WithValue(ctx, context.TransKey, tf)
- result, err := ems.produceEvents(vCtx, payload[0])
- if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
- t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
- } else if t.error == "" && !compareEvent(t.expected, result) {
- t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
- }
- }
- }
|