edgex_sink_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build edgex
  15. // +build edgex
  16. package sink
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  21. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/testx"
  24. "github.com/lf-edge/ekuiper/internal/topo/context"
  25. "github.com/lf-edge/ekuiper/internal/topo/transform"
  26. "github.com/lf-edge/ekuiper/pkg/cast"
  27. "reflect"
  28. "testing"
  29. )
  30. var (
  31. contextLogger = conf.Log.WithField("rule", "testEdgexSink")
  32. ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  33. )
  34. func compareEvent(expected, actual *dtos.Event) bool {
  35. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
  36. for _, r := range expected.Readings {
  37. compared := false
  38. for _, a := range actual.Readings {
  39. if r.ResourceName == a.ResourceName {
  40. compared = true
  41. if !compareReading(r, a) {
  42. return false
  43. }
  44. }
  45. }
  46. if !compared {
  47. return false
  48. }
  49. }
  50. return true
  51. }
  52. return false
  53. }
  54. func compareReading(expected, actual dtos.BaseReading) bool {
  55. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
  56. if expected.ValueType == v2.ValueTypeObject {
  57. if !reflect.DeepEqual(expected.ObjectValue, actual.ObjectValue) {
  58. return false
  59. }
  60. }
  61. return true
  62. }
  63. return false
  64. }
  65. func TestConfigure(t *testing.T) {
  66. var tests = []struct {
  67. conf map[string]interface{}
  68. expected *EdgexConf
  69. error string
  70. }{
  71. { // 0
  72. conf: map[string]interface{}{
  73. "metadata": "meta",
  74. },
  75. expected: &EdgexConf{
  76. MessageType: MessageTypeEvent,
  77. ContentType: "application/json",
  78. DeviceName: "ekuiper",
  79. ProfileName: "ekuiperProfile",
  80. Metadata: "meta",
  81. },
  82. },
  83. { // 1
  84. conf: map[string]interface{}{
  85. "type": "redis",
  86. "protocol": "redis",
  87. "host": "edgex-redis",
  88. "port": 6379,
  89. "topic": "ekuiperResult",
  90. "deviceName": "ekuiper",
  91. "profileName": "ekuiper",
  92. "sourceName": "ekuiper",
  93. "contentType": "application/json",
  94. },
  95. expected: &EdgexConf{
  96. MessageType: MessageTypeEvent,
  97. ContentType: "application/json",
  98. DeviceName: "ekuiper",
  99. ProfileName: "ekuiper",
  100. SourceName: "ekuiper",
  101. Topic: "ekuiperResult",
  102. },
  103. },
  104. { // 2
  105. conf: map[string]interface{}{
  106. "protocol": "tcp",
  107. "host": "127.0.0.1",
  108. "port": 1883,
  109. "topic": "result",
  110. "type": "mqtt",
  111. "metadata": "edgex_meta",
  112. "contentType": "application/json",
  113. "optional": map[string]interface{}{
  114. "ClientId": "edgex_message_bus_001",
  115. },
  116. },
  117. expected: &EdgexConf{
  118. MessageType: MessageTypeEvent,
  119. ContentType: "application/json",
  120. DeviceName: "ekuiper",
  121. ProfileName: "ekuiperProfile",
  122. SourceName: "",
  123. Metadata: "edgex_meta",
  124. Topic: "result",
  125. },
  126. },
  127. { // 3
  128. conf: map[string]interface{}{
  129. "type": "redis",
  130. "protocol": "redis",
  131. "host": "edgex-redis",
  132. "port": 6379,
  133. "topicPrefix": "edgex/events/device",
  134. "messageType": "request",
  135. "contentType": "application/json",
  136. },
  137. expected: &EdgexConf{
  138. MessageType: MessageTypeRequest,
  139. ContentType: "application/json",
  140. DeviceName: "ekuiper",
  141. ProfileName: "ekuiperProfile",
  142. SourceName: "",
  143. TopicPrefix: "edgex/events/device",
  144. },
  145. },
  146. { // 4
  147. conf: map[string]interface{}{
  148. "type": "redis",
  149. "protocol": "redis",
  150. "host": "edgex-redis",
  151. "port": 6379,
  152. "topicPrefix": "edgex/events/device",
  153. "messageType": "requests",
  154. "contentType": "application/json",
  155. },
  156. error: "specified wrong messageType value requests",
  157. },
  158. { // 5
  159. conf: map[string]interface{}{
  160. "protocol": "redis",
  161. "host": "edgex-redis",
  162. "port": 6379,
  163. "topicPrefix": "edgex/events/device",
  164. "topic": "requests",
  165. "contentType": "application/json",
  166. },
  167. error: "not allow to specify both topic and topicPrefix, please set one only",
  168. },
  169. }
  170. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  171. for i, test := range tests {
  172. ems := EdgexMsgBusSink{}
  173. err := ems.Configure(test.conf)
  174. if !reflect.DeepEqual(test.error, testx.Errstring(err)) {
  175. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, test.error, err)
  176. } else if test.error == "" && !reflect.DeepEqual(test.expected, ems.c) {
  177. t.Errorf("%d\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, test.expected, ems.c)
  178. }
  179. }
  180. }
  181. func TestProduceEvents(t1 *testing.T) {
  182. var tests = []struct {
  183. input string
  184. conf map[string]interface{}
  185. expected *dtos.Event
  186. error string
  187. }{
  188. { // 0
  189. input: `[
  190. {"meta":{
  191. "correlationid":"","deviceName":"demo","id":"","origin":3,
  192. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  193. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  194. }
  195. },
  196. {"humidity":100},
  197. {"temperature":50}
  198. ]`,
  199. conf: map[string]interface{}{
  200. "metadata": "meta",
  201. },
  202. expected: &dtos.Event{
  203. Id: "",
  204. DeviceName: "demo",
  205. ProfileName: "ekuiperProfile",
  206. SourceName: "ruleTest",
  207. Origin: 3,
  208. Readings: []dtos.BaseReading{
  209. {
  210. ResourceName: "humidity",
  211. DeviceName: "test device name1",
  212. ProfileName: "ekuiperProfile",
  213. Id: "12",
  214. Origin: 14,
  215. ValueType: v2.ValueTypeInt64,
  216. SimpleReading: dtos.SimpleReading{Value: "100"},
  217. },
  218. {
  219. ResourceName: "temperature",
  220. DeviceName: "test device name2",
  221. ProfileName: "ekuiperProfile",
  222. Id: "22",
  223. Origin: 24,
  224. ValueType: v2.ValueTypeFloat64,
  225. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  226. },
  227. },
  228. },
  229. error: "",
  230. },
  231. { // 1
  232. input: `[
  233. {"meta":{
  234. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  235. "humidity":{"deviceName":"test device name1","id":"12","origin":14},
  236. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  237. }
  238. },
  239. {"h1":100},
  240. {"h2":null}
  241. ]`,
  242. conf: map[string]interface{}{
  243. "metadata": "meta",
  244. },
  245. expected: &dtos.Event{
  246. Id: "abc",
  247. DeviceName: "demo",
  248. ProfileName: "demoProfile",
  249. SourceName: "demoSource",
  250. Origin: 3,
  251. Tags: map[string]interface{}{"auth": "admin"},
  252. Readings: []dtos.BaseReading{
  253. {
  254. ResourceName: "h1",
  255. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  256. DeviceName: "demo",
  257. ProfileName: "demoProfile",
  258. ValueType: v2.ValueTypeFloat64,
  259. },
  260. },
  261. },
  262. error: "",
  263. },
  264. { // 2
  265. input: `[
  266. {"meta": 50,"h1":100}
  267. ]`,
  268. conf: map[string]interface{}{
  269. "sourceName": "demo",
  270. },
  271. expected: &dtos.Event{
  272. DeviceName: "ekuiper",
  273. ProfileName: "ekuiperProfile",
  274. SourceName: "demo",
  275. Readings: []dtos.BaseReading{
  276. {
  277. ResourceName: "meta",
  278. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  279. DeviceName: "ekuiper",
  280. ProfileName: "ekuiperProfile",
  281. ValueType: v2.ValueTypeFloat64,
  282. },
  283. {
  284. ResourceName: "h1",
  285. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  286. DeviceName: "ekuiper",
  287. ProfileName: "ekuiperProfile",
  288. ValueType: v2.ValueTypeFloat64,
  289. },
  290. },
  291. },
  292. error: "",
  293. },
  294. { // 3
  295. input: `[
  296. {"meta1": "newmeta"},
  297. {"h1":true},
  298. {"sa":["1","2","3","4"]},
  299. {"fa":[1.1,2.2,3.3,4.4]}
  300. ]`,
  301. expected: &dtos.Event{
  302. DeviceName: "ekuiper",
  303. ProfileName: "ekuiperProfile",
  304. SourceName: "ruleTest",
  305. Readings: []dtos.BaseReading{
  306. {
  307. ResourceName: "meta1",
  308. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  309. DeviceName: "ekuiper",
  310. ProfileName: "ekuiperProfile",
  311. ValueType: v2.ValueTypeString,
  312. },
  313. {
  314. ResourceName: "h1",
  315. SimpleReading: dtos.SimpleReading{Value: "true"},
  316. DeviceName: "ekuiper",
  317. ProfileName: "ekuiperProfile",
  318. ValueType: v2.ValueTypeBool,
  319. },
  320. {
  321. ResourceName: "sa",
  322. SimpleReading: dtos.SimpleReading{Value: "[1, 2, 3, 4]"},
  323. DeviceName: "ekuiper",
  324. ProfileName: "ekuiperProfile",
  325. ValueType: v2.ValueTypeStringArray,
  326. },
  327. {
  328. ResourceName: "fa",
  329. SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
  330. DeviceName: "ekuiper",
  331. ProfileName: "ekuiperProfile",
  332. ValueType: v2.ValueTypeFloat64Array,
  333. },
  334. },
  335. },
  336. error: "",
  337. },
  338. { // 4
  339. input: `[]`,
  340. conf: map[string]interface{}{
  341. "deviceName": "kuiper",
  342. "profileName": "kp",
  343. "topic": "demo",
  344. },
  345. expected: &dtos.Event{
  346. ProfileName: "kp",
  347. DeviceName: "kuiper",
  348. SourceName: "ruleTest",
  349. Origin: 0,
  350. Readings: nil,
  351. },
  352. error: "",
  353. },
  354. { // 5
  355. input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
  356. expected: &dtos.Event{
  357. DeviceName: "ekuiper",
  358. ProfileName: "ekuiperProfile",
  359. SourceName: "ruleTest",
  360. Origin: 0,
  361. Readings: nil,
  362. },
  363. },
  364. { // 6
  365. input: `[
  366. {"meta1": "newmeta"},
  367. {"sa":"SGVsbG8gV29ybGQ="},
  368. {"meta":{
  369. "correlationid":"","profileName":"demoProfile","deviceName":"demo","id":"abc","origin":3,"tags":{"auth":"admin"},
  370. "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
  371. }}
  372. ]`,
  373. conf: map[string]interface{}{
  374. "metadata": "meta",
  375. "profileName": "myprofile",
  376. "sourceName": "ds",
  377. },
  378. expected: &dtos.Event{
  379. DeviceName: "demo",
  380. ProfileName: "demoProfile",
  381. SourceName: "ds",
  382. Origin: 3,
  383. Tags: map[string]interface{}{"auth": "admin"},
  384. Readings: []dtos.BaseReading{
  385. {
  386. DeviceName: "demo",
  387. ProfileName: "demoProfile",
  388. ResourceName: "meta1",
  389. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  390. ValueType: v2.ValueTypeString,
  391. },
  392. {
  393. ResourceName: "sa",
  394. BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
  395. ProfileName: "demoProfile",
  396. DeviceName: "test device name1",
  397. Id: "12",
  398. Origin: 14,
  399. ValueType: v2.ValueTypeBinary,
  400. },
  401. },
  402. },
  403. error: "",
  404. }, { // 7
  405. input: `[
  406. {"meta":{
  407. "correlationid":"","deviceName":"demo","id":"","origin":3,
  408. "obj":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Object"}
  409. }
  410. },
  411. {"obj":{"a":1,"b":"sttt"}}
  412. ]`,
  413. conf: map[string]interface{}{
  414. "metadata": "meta",
  415. },
  416. expected: &dtos.Event{
  417. Id: "",
  418. DeviceName: "demo",
  419. ProfileName: "ekuiperProfile",
  420. SourceName: "ruleTest",
  421. Origin: 3,
  422. Readings: []dtos.BaseReading{
  423. {
  424. ResourceName: "obj",
  425. DeviceName: "test device name1",
  426. ProfileName: "ekuiperProfile",
  427. Id: "12",
  428. Origin: 14,
  429. ValueType: v2.ValueTypeObject,
  430. ObjectReading: dtos.ObjectReading{ObjectValue: map[string]interface{}{
  431. "a": float64(1),
  432. "b": "sttt",
  433. }},
  434. },
  435. },
  436. },
  437. error: "",
  438. }, { // 8
  439. input: `[
  440. {"obj":{"a":1,"b":"sttt"}}
  441. ]`,
  442. conf: map[string]interface{}{},
  443. expected: &dtos.Event{
  444. Id: "",
  445. DeviceName: "ekuiper",
  446. ProfileName: "ekuiperProfile",
  447. SourceName: "ruleTest",
  448. Origin: 0,
  449. Readings: []dtos.BaseReading{
  450. {
  451. ResourceName: "obj",
  452. DeviceName: "ekuiper",
  453. ProfileName: "ekuiperProfile",
  454. Id: "",
  455. Origin: 0,
  456. ValueType: v2.ValueTypeObject,
  457. ObjectReading: dtos.ObjectReading{ObjectValue: map[string]interface{}{
  458. "a": float64(1),
  459. "b": "sttt",
  460. }},
  461. },
  462. },
  463. },
  464. error: "",
  465. },
  466. }
  467. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  468. for i, t := range tests {
  469. ems := EdgexMsgBusSink{}
  470. err := ems.Configure(t.conf)
  471. if err != nil {
  472. t1.Errorf("%d: configure error %v", i, err)
  473. continue
  474. }
  475. if ems.c.SourceName == "" {
  476. ems.c.SourceName = "ruleTest"
  477. }
  478. var payload []map[string]interface{}
  479. json.Unmarshal([]byte(t.input), &payload)
  480. result, err := ems.produceEvents(ctx, payload)
  481. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  482. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  483. } else if t.error == "" && !compareEvent(t.expected, result) {
  484. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  485. }
  486. }
  487. }
  488. func TestEdgeXTemplate_Apply(t1 *testing.T) {
  489. var tests = []struct {
  490. input string
  491. conf map[string]interface{}
  492. expected *dtos.Event
  493. error string
  494. }{
  495. { // 0
  496. input: `[{"meta":{
  497. "correlationid":"","deviceName":"demo","id":"","origin":3,
  498. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  499. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  500. },
  501. "humidity":100,
  502. "temperature":50}
  503. ]`,
  504. conf: map[string]interface{}{
  505. "metadata": "meta",
  506. "dataTemplate": `{"wrapper":"w1","ab":"{{.humidity}}"}`,
  507. },
  508. expected: &dtos.Event{
  509. Id: "",
  510. DeviceName: "ekuiper",
  511. ProfileName: "ekuiperProfile",
  512. SourceName: "ruleTest",
  513. Origin: 0,
  514. Readings: []dtos.BaseReading{
  515. {
  516. ResourceName: "wrapper",
  517. DeviceName: "ekuiper",
  518. ProfileName: "ekuiperProfile",
  519. Id: "",
  520. Origin: 0,
  521. ValueType: v2.ValueTypeString,
  522. SimpleReading: dtos.SimpleReading{Value: "w1"},
  523. },
  524. {
  525. ResourceName: "ab",
  526. DeviceName: "ekuiper",
  527. ProfileName: "ekuiperProfile",
  528. Id: "",
  529. Origin: 0,
  530. ValueType: v2.ValueTypeString,
  531. SimpleReading: dtos.SimpleReading{Value: "100"},
  532. },
  533. },
  534. },
  535. error: "",
  536. }, {
  537. input: `[{"json":"{\"a\":24,\"b\":\"c\"}"}]`,
  538. conf: map[string]interface{}{
  539. "dataTemplate": `{{.json}}`,
  540. },
  541. expected: &dtos.Event{
  542. Id: "",
  543. DeviceName: "ekuiper",
  544. ProfileName: "ekuiperProfile",
  545. SourceName: "ruleTest",
  546. Origin: 0,
  547. Readings: []dtos.BaseReading{
  548. {
  549. ResourceName: "a",
  550. DeviceName: "ekuiper",
  551. ProfileName: "ekuiperProfile",
  552. Id: "",
  553. Origin: 0,
  554. ValueType: v2.ValueTypeFloat64,
  555. SimpleReading: dtos.SimpleReading{Value: "2.400000e+01"},
  556. },
  557. {
  558. ResourceName: "b",
  559. DeviceName: "ekuiper",
  560. ProfileName: "ekuiperProfile",
  561. Id: "",
  562. Origin: 0,
  563. ValueType: v2.ValueTypeString,
  564. SimpleReading: dtos.SimpleReading{Value: "c"},
  565. },
  566. },
  567. },
  568. error: "",
  569. },
  570. }
  571. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  572. for i, t := range tests {
  573. ems := EdgexMsgBusSink{}
  574. err := ems.Configure(t.conf)
  575. if err != nil {
  576. t1.Errorf("%d: configure error %v", i, err)
  577. continue
  578. }
  579. if ems.c.SourceName == "" {
  580. ems.c.SourceName = "ruleTest"
  581. }
  582. var payload []map[string]interface{}
  583. json.Unmarshal([]byte(t.input), &payload)
  584. dt := t.conf["dataTemplate"]
  585. tf, _ := transform.GenTransform(cast.ToStringAlways(dt))
  586. vCtx := context.WithValue(ctx, context.TransKey, tf)
  587. result, err := ems.produceEvents(vCtx, payload[0])
  588. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  589. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  590. } else if t.error == "" && !compareEvent(t.expected, result) {
  591. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  592. }
  593. }
  594. }