edgex_sink_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build edgex
  15. // +build edgex
  16. package edgex
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "reflect"
  21. "testing"
  22. v3 "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
  23. "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/testx"
  26. "github.com/lf-edge/ekuiper/internal/topo/context"
  27. "github.com/lf-edge/ekuiper/internal/topo/transform"
  28. "github.com/lf-edge/ekuiper/pkg/cast"
  29. )
  30. var (
  31. contextLogger = conf.Log.WithField("rule", "testEdgexSink")
  32. ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  33. )
  34. func compareEvent(expected, actual *dtos.Event) bool {
  35. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
  36. for _, r := range expected.Readings {
  37. compared := false
  38. for _, a := range actual.Readings {
  39. if r.ResourceName == a.ResourceName {
  40. compared = true
  41. if !compareReading(r, a) {
  42. return false
  43. }
  44. }
  45. }
  46. if !compared {
  47. return false
  48. }
  49. }
  50. return true
  51. }
  52. return false
  53. }
  54. func compareReading(expected, actual dtos.BaseReading) bool {
  55. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
  56. if expected.ValueType == v3.ValueTypeObject {
  57. if !reflect.DeepEqual(expected.ObjectValue, actual.ObjectValue) {
  58. return false
  59. }
  60. }
  61. return true
  62. }
  63. return false
  64. }
  65. func TestConfigure(t *testing.T) {
  66. tests := []struct {
  67. conf map[string]interface{}
  68. expected *SinkConf
  69. error string
  70. }{
  71. { // 0
  72. conf: map[string]interface{}{
  73. "metadata": "meta",
  74. },
  75. expected: &SinkConf{
  76. MessageType: MessageTypeEvent,
  77. ContentType: "application/json",
  78. DeviceName: "ekuiper",
  79. ProfileName: "ekuiperProfile",
  80. Metadata: "meta",
  81. },
  82. },
  83. { // 1
  84. conf: map[string]interface{}{
  85. "type": "redis",
  86. "protocol": "redis",
  87. "host": "edgex-redis",
  88. "port": 6379,
  89. "topic": "ekuiperResult",
  90. "deviceName": "ekuiper",
  91. "profileName": "ekuiper",
  92. "sourceName": "ekuiper",
  93. "contentType": "application/json",
  94. },
  95. expected: &SinkConf{
  96. MessageType: MessageTypeEvent,
  97. ContentType: "application/json",
  98. DeviceName: "ekuiper",
  99. ProfileName: "ekuiper",
  100. SourceName: "ekuiper",
  101. Topic: "ekuiperResult",
  102. },
  103. },
  104. { // 2
  105. conf: map[string]interface{}{
  106. "protocol": "tcp",
  107. "host": "127.0.0.1",
  108. "port": 1883,
  109. "topic": "result",
  110. "type": "mqtt",
  111. "metadata": "edgex_meta",
  112. "contentType": "application/json",
  113. "optional": map[string]interface{}{
  114. "ClientId": "edgex_message_bus_001",
  115. },
  116. },
  117. expected: &SinkConf{
  118. MessageType: MessageTypeEvent,
  119. ContentType: "application/json",
  120. DeviceName: "ekuiper",
  121. ProfileName: "ekuiperProfile",
  122. SourceName: "",
  123. Metadata: "edgex_meta",
  124. Topic: "result",
  125. },
  126. },
  127. { // 3
  128. conf: map[string]interface{}{
  129. "type": "redis",
  130. "protocol": "redis",
  131. "host": "edgex-redis",
  132. "port": 6379,
  133. "topicPrefix": "edgex/events/device",
  134. "messageType": "request",
  135. "contentType": "application/json",
  136. },
  137. expected: &SinkConf{
  138. MessageType: MessageTypeRequest,
  139. ContentType: "application/json",
  140. DeviceName: "ekuiper",
  141. ProfileName: "ekuiperProfile",
  142. SourceName: "",
  143. TopicPrefix: "edgex/events/device",
  144. },
  145. },
  146. { // 4
  147. conf: map[string]interface{}{
  148. "type": "redis",
  149. "protocol": "redis",
  150. "host": "edgex-redis",
  151. "port": 6379,
  152. "topicPrefix": "edgex/events/device",
  153. "messageType": "requests",
  154. "contentType": "application/json",
  155. },
  156. error: "specified wrong messageType value requests",
  157. },
  158. { // 5
  159. conf: map[string]interface{}{
  160. "protocol": "redis",
  161. "host": "edgex-redis",
  162. "port": 6379,
  163. "topicPrefix": "edgex/events/device",
  164. "topic": "requests",
  165. "contentType": "application/json",
  166. },
  167. error: "not allow to specify both topic and topicPrefix, please set one only",
  168. },
  169. }
  170. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  171. for i, test := range tests {
  172. ems := EdgexMsgBusSink{}
  173. err := ems.Configure(test.conf)
  174. if !reflect.DeepEqual(test.error, testx.Errstring(err)) {
  175. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, test.error, err)
  176. } else if test.error == "" && !reflect.DeepEqual(test.expected, ems.c) {
  177. t.Errorf("%d\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, test.expected, ems.c)
  178. }
  179. }
  180. }
  181. func TestProduceEvents(t1 *testing.T) {
  182. tests := []struct {
  183. input string
  184. conf map[string]interface{}
  185. expected *dtos.Event
  186. error string
  187. }{
  188. { // 0
  189. input: `[
  190. {"meta":{
  191. "correlationid":"","deviceName":"demo","id":"","origin":3,
  192. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  193. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  194. }
  195. },
  196. {"humidity":100},
  197. {"temperature":50}
  198. ]`,
  199. conf: map[string]interface{}{
  200. "metadata": "meta",
  201. },
  202. expected: &dtos.Event{
  203. Id: "",
  204. DeviceName: "demo",
  205. ProfileName: "ekuiperProfile",
  206. SourceName: "ruleTest",
  207. Origin: 3,
  208. Readings: []dtos.BaseReading{
  209. {
  210. ResourceName: "humidity",
  211. DeviceName: "test device name1",
  212. ProfileName: "ekuiperProfile",
  213. Id: "12",
  214. Origin: 14,
  215. ValueType: v3.ValueTypeInt64,
  216. SimpleReading: dtos.SimpleReading{Value: "100"},
  217. },
  218. {
  219. ResourceName: "temperature",
  220. DeviceName: "test device name2",
  221. ProfileName: "ekuiperProfile",
  222. Id: "22",
  223. Origin: 24,
  224. ValueType: v3.ValueTypeFloat64,
  225. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  226. },
  227. },
  228. },
  229. error: "",
  230. },
  231. { // 1
  232. input: `[
  233. {"meta":{
  234. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  235. "humidity":{"deviceName":"test device name1","id":"12","origin":14},
  236. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  237. }
  238. },
  239. {"h1":100},
  240. {"h2":null}
  241. ]`,
  242. conf: map[string]interface{}{
  243. "metadata": "meta",
  244. },
  245. expected: &dtos.Event{
  246. Id: "abc",
  247. DeviceName: "demo",
  248. ProfileName: "demoProfile",
  249. SourceName: "demoSource",
  250. Origin: 3,
  251. Tags: map[string]interface{}{"auth": "admin"},
  252. Readings: []dtos.BaseReading{
  253. {
  254. ResourceName: "h1",
  255. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  256. DeviceName: "demo",
  257. ProfileName: "demoProfile",
  258. ValueType: v3.ValueTypeFloat64,
  259. },
  260. },
  261. },
  262. error: "",
  263. },
  264. { // 2
  265. input: `[
  266. {"meta": 50,"h1":100}
  267. ]`,
  268. conf: map[string]interface{}{
  269. "sourceName": "demo",
  270. },
  271. expected: &dtos.Event{
  272. DeviceName: "ekuiper",
  273. ProfileName: "ekuiperProfile",
  274. SourceName: "demo",
  275. Readings: []dtos.BaseReading{
  276. {
  277. ResourceName: "meta",
  278. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  279. DeviceName: "ekuiper",
  280. ProfileName: "ekuiperProfile",
  281. ValueType: v3.ValueTypeFloat64,
  282. },
  283. {
  284. ResourceName: "h1",
  285. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  286. DeviceName: "ekuiper",
  287. ProfileName: "ekuiperProfile",
  288. ValueType: v3.ValueTypeFloat64,
  289. },
  290. },
  291. },
  292. error: "",
  293. },
  294. { // 3
  295. input: `[
  296. {"meta1": "newmeta"},
  297. {"h1":true},
  298. {"sa":["1","2","3","4"]},
  299. {"fa":[1.1,2.2,3.3,4.4]}
  300. ]`,
  301. expected: &dtos.Event{
  302. DeviceName: "ekuiper",
  303. ProfileName: "ekuiperProfile",
  304. SourceName: "ruleTest",
  305. Readings: []dtos.BaseReading{
  306. {
  307. ResourceName: "meta1",
  308. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  309. DeviceName: "ekuiper",
  310. ProfileName: "ekuiperProfile",
  311. ValueType: v3.ValueTypeString,
  312. },
  313. {
  314. ResourceName: "h1",
  315. SimpleReading: dtos.SimpleReading{Value: "true"},
  316. DeviceName: "ekuiper",
  317. ProfileName: "ekuiperProfile",
  318. ValueType: v3.ValueTypeBool,
  319. },
  320. {
  321. ResourceName: "sa",
  322. SimpleReading: dtos.SimpleReading{Value: "[1, 2, 3, 4]"},
  323. DeviceName: "ekuiper",
  324. ProfileName: "ekuiperProfile",
  325. ValueType: v3.ValueTypeStringArray,
  326. },
  327. {
  328. ResourceName: "fa",
  329. SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
  330. DeviceName: "ekuiper",
  331. ProfileName: "ekuiperProfile",
  332. ValueType: v3.ValueTypeFloat64Array,
  333. },
  334. },
  335. },
  336. error: "",
  337. },
  338. { // 4
  339. input: `[]`,
  340. conf: map[string]interface{}{
  341. "deviceName": "kuiper",
  342. "profileName": "kp",
  343. "topic": "demo",
  344. },
  345. expected: &dtos.Event{
  346. ProfileName: "kp",
  347. DeviceName: "kuiper",
  348. SourceName: "ruleTest",
  349. Origin: 0,
  350. Readings: nil,
  351. },
  352. error: "",
  353. },
  354. { // 5
  355. input: `[{"sa":["1","2",3,"4"]}]`, // invalid array, return nil
  356. expected: &dtos.Event{
  357. DeviceName: "ekuiper",
  358. ProfileName: "ekuiperProfile",
  359. SourceName: "ruleTest",
  360. Origin: 0,
  361. Readings: nil,
  362. },
  363. },
  364. { // 6
  365. input: `[
  366. {"meta1": "newmeta"},
  367. {"sa":"SGVsbG8gV29ybGQ="},
  368. {"meta":{
  369. "correlationid":"","profileName":"demoProfile","deviceName":"demo","id":"abc","origin":3,"tags":{"auth":"admin"},
  370. "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
  371. }}
  372. ]`,
  373. conf: map[string]interface{}{
  374. "metadata": "meta",
  375. "profileName": "myprofile",
  376. "sourceName": "ds",
  377. },
  378. expected: &dtos.Event{
  379. DeviceName: "demo",
  380. ProfileName: "demoProfile",
  381. SourceName: "ds",
  382. Origin: 3,
  383. Tags: map[string]interface{}{"auth": "admin"},
  384. Readings: []dtos.BaseReading{
  385. {
  386. DeviceName: "demo",
  387. ProfileName: "demoProfile",
  388. ResourceName: "meta1",
  389. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  390. ValueType: v3.ValueTypeString,
  391. },
  392. {
  393. ResourceName: "sa",
  394. BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
  395. ProfileName: "demoProfile",
  396. DeviceName: "test device name1",
  397. Id: "12",
  398. Origin: 14,
  399. ValueType: v3.ValueTypeBinary,
  400. },
  401. },
  402. },
  403. error: "",
  404. },
  405. { // 7
  406. input: `[
  407. {"meta":{
  408. "correlationid":"","deviceName":"demo","id":"","origin":3,
  409. "obj":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Object"}
  410. }
  411. },
  412. {"obj":{"a":1,"b":"sttt"}}
  413. ]`,
  414. conf: map[string]interface{}{
  415. "metadata": "meta",
  416. },
  417. expected: &dtos.Event{
  418. Id: "",
  419. DeviceName: "demo",
  420. ProfileName: "ekuiperProfile",
  421. SourceName: "ruleTest",
  422. Origin: 3,
  423. Readings: []dtos.BaseReading{
  424. {
  425. ResourceName: "obj",
  426. DeviceName: "test device name1",
  427. ProfileName: "ekuiperProfile",
  428. Id: "12",
  429. Origin: 14,
  430. ValueType: v3.ValueTypeObject,
  431. ObjectReading: dtos.ObjectReading{ObjectValue: map[string]interface{}{
  432. "a": float64(1),
  433. "b": "sttt",
  434. }},
  435. },
  436. },
  437. },
  438. error: "",
  439. },
  440. { // 8
  441. input: `[
  442. {"obj":{"a":1,"b":"sttt"}}
  443. ]`,
  444. conf: map[string]interface{}{},
  445. expected: &dtos.Event{
  446. Id: "",
  447. DeviceName: "ekuiper",
  448. ProfileName: "ekuiperProfile",
  449. SourceName: "ruleTest",
  450. Origin: 0,
  451. Readings: []dtos.BaseReading{
  452. {
  453. ResourceName: "obj",
  454. DeviceName: "ekuiper",
  455. ProfileName: "ekuiperProfile",
  456. Id: "",
  457. Origin: 0,
  458. ValueType: v3.ValueTypeObject,
  459. ObjectReading: dtos.ObjectReading{ObjectValue: map[string]interface{}{
  460. "a": float64(1),
  461. "b": "sttt",
  462. }},
  463. },
  464. },
  465. },
  466. error: "",
  467. },
  468. }
  469. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  470. for i, t := range tests {
  471. ems := EdgexMsgBusSink{}
  472. err := ems.Configure(t.conf)
  473. if err != nil {
  474. t1.Errorf("%d: configure error %v", i, err)
  475. continue
  476. }
  477. if ems.c.SourceName == "" {
  478. ems.c.SourceName = "ruleTest"
  479. }
  480. var payload []map[string]interface{}
  481. json.Unmarshal([]byte(t.input), &payload)
  482. result, err := ems.produceEvents(ctx, payload)
  483. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  484. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  485. } else if t.error == "" && !compareEvent(t.expected, result) {
  486. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  487. }
  488. }
  489. }
  490. func TestEdgeXTemplate_Apply(t1 *testing.T) {
  491. tests := []struct {
  492. input string
  493. conf map[string]interface{}
  494. expected *dtos.Event
  495. error string
  496. }{
  497. { // 0
  498. input: `[{"meta":{
  499. "correlationid":"","deviceName":"demo","id":"","origin":3,
  500. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  501. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  502. },
  503. "humidity":100,
  504. "temperature":50}
  505. ]`,
  506. conf: map[string]interface{}{
  507. "metadata": "meta",
  508. "dataTemplate": `{"wrapper":"w1","ab":"{{.humidity}}"}`,
  509. },
  510. expected: &dtos.Event{
  511. Id: "",
  512. DeviceName: "ekuiper",
  513. ProfileName: "ekuiperProfile",
  514. SourceName: "ruleTest",
  515. Origin: 0,
  516. Readings: []dtos.BaseReading{
  517. {
  518. ResourceName: "wrapper",
  519. DeviceName: "ekuiper",
  520. ProfileName: "ekuiperProfile",
  521. Id: "",
  522. Origin: 0,
  523. ValueType: v3.ValueTypeString,
  524. SimpleReading: dtos.SimpleReading{Value: "w1"},
  525. },
  526. {
  527. ResourceName: "ab",
  528. DeviceName: "ekuiper",
  529. ProfileName: "ekuiperProfile",
  530. Id: "",
  531. Origin: 0,
  532. ValueType: v3.ValueTypeString,
  533. SimpleReading: dtos.SimpleReading{Value: "100"},
  534. },
  535. },
  536. },
  537. error: "",
  538. }, {
  539. input: `[{"json":"{\"a\":24,\"b\":\"c\"}"}]`,
  540. conf: map[string]interface{}{
  541. "dataTemplate": `{{.json}}`,
  542. },
  543. expected: &dtos.Event{
  544. Id: "",
  545. DeviceName: "ekuiper",
  546. ProfileName: "ekuiperProfile",
  547. SourceName: "ruleTest",
  548. Origin: 0,
  549. Readings: []dtos.BaseReading{
  550. {
  551. ResourceName: "a",
  552. DeviceName: "ekuiper",
  553. ProfileName: "ekuiperProfile",
  554. Id: "",
  555. Origin: 0,
  556. ValueType: v3.ValueTypeFloat64,
  557. SimpleReading: dtos.SimpleReading{Value: "2.400000e+01"},
  558. },
  559. {
  560. ResourceName: "b",
  561. DeviceName: "ekuiper",
  562. ProfileName: "ekuiperProfile",
  563. Id: "",
  564. Origin: 0,
  565. ValueType: v3.ValueTypeString,
  566. SimpleReading: dtos.SimpleReading{Value: "c"},
  567. },
  568. },
  569. },
  570. error: "",
  571. },
  572. }
  573. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  574. for i, t := range tests {
  575. ems := EdgexMsgBusSink{}
  576. err := ems.Configure(t.conf)
  577. if err != nil {
  578. t1.Errorf("%d: configure error %v", i, err)
  579. continue
  580. }
  581. if ems.c.SourceName == "" {
  582. ems.c.SourceName = "ruleTest"
  583. }
  584. var payload []map[string]interface{}
  585. json.Unmarshal([]byte(t.input), &payload)
  586. dt := t.conf["dataTemplate"]
  587. tf, _ := transform.GenTransform(cast.ToStringAlways(dt), "json", "", "", "", []string{})
  588. vCtx := context.WithValue(ctx, context.TransKey, tf)
  589. result, err := ems.produceEvents(vCtx, payload[0])
  590. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  591. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  592. } else if t.error == "" && !compareEvent(t.expected, result) {
  593. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  594. }
  595. }
  596. }