edgex_sink_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // +build edgex
  15. package sink
  16. import (
  17. "encoding/json"
  18. "fmt"
  19. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  20. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  21. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/testx"
  24. "github.com/lf-edge/ekuiper/internal/topo/context"
  25. "reflect"
  26. "testing"
  27. )
  28. var (
  29. contextLogger = conf.Log.WithField("rule", "testEdgexSink")
  30. ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  31. )
  32. func compareEvent(expected, actual *dtos.Event) bool {
  33. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
  34. for i, r := range expected.Readings {
  35. if !compareReading(r, actual.Readings[i]) {
  36. break
  37. }
  38. }
  39. return true
  40. }
  41. return false
  42. }
  43. func compareReading(expected, actual dtos.BaseReading) bool {
  44. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
  45. return true
  46. }
  47. return false
  48. }
  49. func TestConfigure(t *testing.T) {
  50. var tests = []struct {
  51. conf map[string]interface{}
  52. expected *EdgexConf
  53. error string
  54. }{
  55. { // 0
  56. conf: map[string]interface{}{
  57. "metadata": "meta",
  58. },
  59. expected: &EdgexConf{
  60. Protocol: "redis",
  61. Host: "localhost",
  62. Port: 6379,
  63. Type: messaging.Redis,
  64. MessageType: MessageTypeEvent,
  65. ContentType: "application/json",
  66. DeviceName: "ekuiper",
  67. ProfileName: "ekuiperProfile",
  68. Metadata: "meta",
  69. },
  70. },
  71. { // 1
  72. conf: map[string]interface{}{
  73. "type": "redis",
  74. "protocol": "redis",
  75. "host": "edgex-redis",
  76. "port": 6379,
  77. "topic": "ekuiperResult",
  78. "deviceName": "ekuiper",
  79. "profileName": "ekuiper",
  80. "sourceName": "ekuiper",
  81. "contentType": "application/json",
  82. },
  83. expected: &EdgexConf{
  84. Protocol: "redis",
  85. Host: "edgex-redis",
  86. Port: 6379,
  87. Type: messaging.Redis,
  88. MessageType: MessageTypeEvent,
  89. ContentType: "application/json",
  90. DeviceName: "ekuiper",
  91. ProfileName: "ekuiper",
  92. SourceName: "ekuiper",
  93. Topic: "ekuiperResult",
  94. },
  95. },
  96. { // 2
  97. conf: map[string]interface{}{
  98. "protocol": "tcp",
  99. "host": "127.0.0.1",
  100. "port": 1883,
  101. "topic": "result",
  102. "type": "mqtt",
  103. "metadata": "edgex_meta",
  104. "contentType": "application/json",
  105. "optional": map[string]interface{}{
  106. "ClientId": "edgex_message_bus_001",
  107. },
  108. },
  109. expected: &EdgexConf{
  110. Protocol: "tcp",
  111. Host: "127.0.0.1",
  112. Port: 1883,
  113. Type: messaging.MQTT,
  114. MessageType: MessageTypeEvent,
  115. ContentType: "application/json",
  116. DeviceName: "ekuiper",
  117. ProfileName: "ekuiperProfile",
  118. SourceName: "",
  119. Metadata: "edgex_meta",
  120. Topic: "result",
  121. Optional: map[string]string{
  122. "ClientId": "edgex_message_bus_001",
  123. },
  124. },
  125. }, { // 3
  126. conf: map[string]interface{}{
  127. "type": "redis",
  128. "protocol": "redis",
  129. "host": "edgex-redis",
  130. "port": 6379,
  131. "topicPrefix": "edgex/events/device",
  132. "messageType": "request",
  133. "contentType": "application/json",
  134. },
  135. expected: &EdgexConf{
  136. Protocol: "redis",
  137. Host: "edgex-redis",
  138. Port: 6379,
  139. Type: messaging.Redis,
  140. MessageType: MessageTypeRequest,
  141. ContentType: "application/json",
  142. DeviceName: "ekuiper",
  143. ProfileName: "ekuiperProfile",
  144. SourceName: "",
  145. TopicPrefix: "edgex/events/device",
  146. },
  147. }, { // 4
  148. conf: map[string]interface{}{
  149. "type": "redis",
  150. "protocol": "redis",
  151. "host": "edgex-redis",
  152. "port": 6379,
  153. "topicPrefix": "edgex/events/device",
  154. "messageType": "requests",
  155. "contentType": "application/json",
  156. },
  157. error: "specified wrong messageType value requests",
  158. }, { // 5
  159. conf: map[string]interface{}{
  160. "type": 20,
  161. "protocol": "redis",
  162. "host": "edgex-redis",
  163. "port": 6379,
  164. "topicPrefix": "edgex/events/device",
  165. "messageType": "requests",
  166. "contentType": "application/json",
  167. },
  168. error: "read properties map[contentType:application/json host:edgex-redis messageType:requests port:6379 protocol:redis topicPrefix:edgex/events/device type:20] fail with error: 1 error(s) decoding:\n\n* 'type' expected type 'string', got unconvertible type 'int', value: '20'",
  169. }, { // 6
  170. conf: map[string]interface{}{
  171. "type": "redis",
  172. "protocol": "redis",
  173. "host": "edgex-redis",
  174. "port": -1,
  175. "topicPrefix": "edgex/events/device",
  176. "messageType": "requests",
  177. "contentType": "application/json",
  178. },
  179. error: "specified wrong port value, expect positive integer but got -1",
  180. }, { // 7
  181. conf: map[string]interface{}{
  182. "type": "zmq",
  183. "protocol": "redis",
  184. "host": "edgex-redis",
  185. "port": 6379,
  186. "topicPrefix": "edgex/events/device",
  187. "messageType": "requests",
  188. "contentType": "application/json",
  189. },
  190. error: "specified wrong type value zmq",
  191. }, { // 8
  192. conf: map[string]interface{}{
  193. "protocol": "redis",
  194. "host": "edgex-redis",
  195. "port": 6379,
  196. "topicPrefix": "edgex/events/device",
  197. "topic": "requests",
  198. "contentType": "application/json",
  199. },
  200. error: "not allow to specify both topic and topicPrefix, please set one only",
  201. },
  202. }
  203. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  204. for i, test := range tests {
  205. ems := EdgexMsgBusSink{}
  206. err := ems.Configure(test.conf)
  207. if !reflect.DeepEqual(test.error, testx.Errstring(err)) {
  208. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, test.error, err)
  209. } else if test.error == "" && !reflect.DeepEqual(test.expected, ems.c) {
  210. t.Errorf("%d\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, test.expected, ems.c)
  211. }
  212. }
  213. }
  214. func TestProduceEvents(t1 *testing.T) {
  215. var tests = []struct {
  216. input string
  217. conf map[string]interface{}
  218. expected *dtos.Event
  219. error string
  220. }{
  221. { // 0
  222. input: `[
  223. {"meta":{
  224. "correlationid":"","deviceName":"demo","id":"","origin":3,
  225. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  226. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  227. }
  228. },
  229. {"humidity":100},
  230. {"temperature":50}
  231. ]`,
  232. conf: map[string]interface{}{
  233. "metadata": "meta",
  234. },
  235. expected: &dtos.Event{
  236. Id: "",
  237. DeviceName: "demo",
  238. ProfileName: "ekuiperProfile",
  239. SourceName: "ruleTest",
  240. Origin: 3,
  241. Readings: []dtos.BaseReading{
  242. {
  243. ResourceName: "humidity",
  244. DeviceName: "test device name1",
  245. ProfileName: "ekuiperProfile",
  246. Id: "12",
  247. Origin: 14,
  248. ValueType: v2.ValueTypeInt64,
  249. SimpleReading: dtos.SimpleReading{Value: "100"},
  250. },
  251. {
  252. ResourceName: "temperature",
  253. DeviceName: "test device name2",
  254. ProfileName: "kuiperProfile",
  255. Id: "22",
  256. Origin: 24,
  257. ValueType: v2.ValueTypeFloat64,
  258. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  259. },
  260. },
  261. },
  262. error: "",
  263. },
  264. { // 1
  265. input: `[
  266. {"meta":{
  267. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  268. "humidity":{"deviceName":"test device name1","id":"12","origin":14},
  269. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  270. }
  271. },
  272. {"h1":100},
  273. {"h2":null}
  274. ]`,
  275. conf: map[string]interface{}{
  276. "metadata": "meta",
  277. },
  278. expected: &dtos.Event{
  279. Id: "abc",
  280. DeviceName: "demo",
  281. ProfileName: "demoProfile",
  282. SourceName: "demoSource",
  283. Origin: 3,
  284. Tags: map[string]string{"auth": "admin"},
  285. Readings: []dtos.BaseReading{
  286. {
  287. ResourceName: "h1",
  288. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  289. DeviceName: "demo",
  290. ProfileName: "demoProfile",
  291. ValueType: v2.ValueTypeFloat64,
  292. },
  293. },
  294. },
  295. error: "",
  296. },
  297. { // 2
  298. input: `[
  299. {"meta": 50},
  300. {"h1":100}
  301. ]`,
  302. conf: map[string]interface{}{
  303. "sourceName": "demo",
  304. },
  305. expected: &dtos.Event{
  306. DeviceName: "ekuiper",
  307. ProfileName: "ekuiperProfile",
  308. SourceName: "demo",
  309. Readings: []dtos.BaseReading{
  310. {
  311. ResourceName: "meta",
  312. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  313. DeviceName: "ekuiper",
  314. ProfileName: "ekuiperProfile",
  315. ValueType: v2.ValueTypeFloat64,
  316. },
  317. {
  318. ResourceName: "h1",
  319. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  320. DeviceName: "ekuiper",
  321. ProfileName: "ekuiperProfile",
  322. ValueType: v2.ValueTypeFloat64,
  323. },
  324. },
  325. },
  326. error: "",
  327. },
  328. { // 3
  329. input: `[
  330. {"meta1": "newmeta"},
  331. {"h1":true},
  332. {"sa":["1","2","3","4"]},
  333. {"fa":[1.1,2.2,3.3,4.4]}
  334. ]`,
  335. expected: &dtos.Event{
  336. DeviceName: "ekuiper",
  337. ProfileName: "ekuiperProfile",
  338. SourceName: "ruleTest",
  339. Readings: []dtos.BaseReading{
  340. {
  341. ResourceName: "meta1",
  342. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  343. DeviceName: "ekuiper",
  344. ProfileName: "ekuiperProfile",
  345. ValueType: v2.ValueTypeString,
  346. },
  347. {
  348. ResourceName: "h1",
  349. SimpleReading: dtos.SimpleReading{Value: "true"},
  350. DeviceName: "ekuiper",
  351. ProfileName: "ekuiperProfile",
  352. ValueType: v2.ValueTypeBool,
  353. },
  354. {
  355. ResourceName: "sa",
  356. SimpleReading: dtos.SimpleReading{Value: "[\"1\",\"2\",\"3\",\"4\"]"},
  357. DeviceName: "ekuiper",
  358. ProfileName: "ekuiperProfile",
  359. ValueType: v2.ValueTypeStringArray,
  360. },
  361. {
  362. ResourceName: "fa",
  363. SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
  364. DeviceName: "ekuiper",
  365. ProfileName: "ekuiperProfile",
  366. ValueType: v2.ValueTypeFloat64Array,
  367. },
  368. },
  369. },
  370. error: "",
  371. },
  372. { // 4
  373. input: `[]`,
  374. conf: map[string]interface{}{
  375. "deviceName": "kuiper",
  376. "profileName": "kp",
  377. "topic": "demo",
  378. },
  379. expected: &dtos.Event{
  380. ProfileName: "kp",
  381. DeviceName: "kuiper",
  382. SourceName: "ruleTest",
  383. Origin: 0,
  384. Readings: nil,
  385. },
  386. error: "",
  387. },
  388. { // 5
  389. input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
  390. expected: &dtos.Event{
  391. DeviceName: "ekuiper",
  392. ProfileName: "ekuiperProfile",
  393. SourceName: "ruleTest",
  394. Origin: 0,
  395. Readings: nil,
  396. },
  397. },
  398. { // 6
  399. input: `[
  400. {"meta1": "newmeta"},
  401. {"sa":"SGVsbG8gV29ybGQ="},
  402. {"meta":{
  403. "correlationid":"","profileName":"demoProfile","deviceName":"demo","id":"abc","origin":3,"tags":{"auth":"admin"},
  404. "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
  405. }}
  406. ]`,
  407. conf: map[string]interface{}{
  408. "metadata": "meta",
  409. "profileName": "myprofile",
  410. "sourceName": "ds",
  411. },
  412. expected: &dtos.Event{
  413. DeviceName: "demo",
  414. ProfileName: "demoProfile",
  415. SourceName: "ds",
  416. Origin: 3,
  417. Tags: map[string]string{"auth": "admin"},
  418. Readings: []dtos.BaseReading{
  419. {
  420. DeviceName: "demo",
  421. ProfileName: "demoProfile",
  422. ResourceName: "meta1",
  423. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  424. ValueType: v2.ValueTypeString,
  425. },
  426. {
  427. ResourceName: "sa",
  428. BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
  429. ProfileName: "demoProfile",
  430. DeviceName: "test device name1",
  431. Id: "12",
  432. Origin: 14,
  433. ValueType: v2.ValueTypeBinary,
  434. },
  435. },
  436. },
  437. error: "",
  438. },
  439. }
  440. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  441. for i, t := range tests {
  442. ems := EdgexMsgBusSink{}
  443. err := ems.Configure(t.conf)
  444. if err != nil {
  445. t1.Errorf("%d: configure error %v", i, err)
  446. continue
  447. }
  448. if ems.c.SourceName == "" {
  449. ems.c.SourceName = "ruleTest"
  450. }
  451. var payload []map[string]interface{}
  452. json.Unmarshal([]byte(t.input), &payload)
  453. result, err := ems.produceEvents(ctx, payload)
  454. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  455. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  456. } else if t.error == "" && !compareEvent(t.expected, result) {
  457. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  458. }
  459. }
  460. }