edgex_sink_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build edgex
  15. // +build edgex
  16. package sink
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  21. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  22. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/testx"
  25. "github.com/lf-edge/ekuiper/internal/topo/context"
  26. "reflect"
  27. "testing"
  28. )
  29. var (
  30. contextLogger = conf.Log.WithField("rule", "testEdgexSink")
  31. ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  32. )
  33. func compareEvent(expected, actual *dtos.Event) bool {
  34. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
  35. for i, r := range expected.Readings {
  36. if !compareReading(r, actual.Readings[i]) {
  37. break
  38. }
  39. }
  40. return true
  41. }
  42. return false
  43. }
  44. func compareReading(expected, actual dtos.BaseReading) bool {
  45. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
  46. return true
  47. }
  48. return false
  49. }
  50. func TestConfigure(t *testing.T) {
  51. var tests = []struct {
  52. conf map[string]interface{}
  53. expected *EdgexConf
  54. error string
  55. }{
  56. { // 0
  57. conf: map[string]interface{}{
  58. "metadata": "meta",
  59. },
  60. expected: &EdgexConf{
  61. Protocol: "redis",
  62. Host: "localhost",
  63. Port: 6379,
  64. Type: messaging.Redis,
  65. MessageType: MessageTypeEvent,
  66. ContentType: "application/json",
  67. DeviceName: "ekuiper",
  68. ProfileName: "ekuiperProfile",
  69. Metadata: "meta",
  70. },
  71. },
  72. { // 1
  73. conf: map[string]interface{}{
  74. "type": "redis",
  75. "protocol": "redis",
  76. "host": "edgex-redis",
  77. "port": 6379,
  78. "topic": "ekuiperResult",
  79. "deviceName": "ekuiper",
  80. "profileName": "ekuiper",
  81. "sourceName": "ekuiper",
  82. "contentType": "application/json",
  83. },
  84. expected: &EdgexConf{
  85. Protocol: "redis",
  86. Host: "edgex-redis",
  87. Port: 6379,
  88. Type: messaging.Redis,
  89. MessageType: MessageTypeEvent,
  90. ContentType: "application/json",
  91. DeviceName: "ekuiper",
  92. ProfileName: "ekuiper",
  93. SourceName: "ekuiper",
  94. Topic: "ekuiperResult",
  95. },
  96. },
  97. { // 2
  98. conf: map[string]interface{}{
  99. "protocol": "tcp",
  100. "host": "127.0.0.1",
  101. "port": 1883,
  102. "topic": "result",
  103. "type": "mqtt",
  104. "metadata": "edgex_meta",
  105. "contentType": "application/json",
  106. "optional": map[string]interface{}{
  107. "ClientId": "edgex_message_bus_001",
  108. },
  109. },
  110. expected: &EdgexConf{
  111. Protocol: "tcp",
  112. Host: "127.0.0.1",
  113. Port: 1883,
  114. Type: messaging.MQTT,
  115. MessageType: MessageTypeEvent,
  116. ContentType: "application/json",
  117. DeviceName: "ekuiper",
  118. ProfileName: "ekuiperProfile",
  119. SourceName: "",
  120. Metadata: "edgex_meta",
  121. Topic: "result",
  122. Optional: map[string]string{
  123. "ClientId": "edgex_message_bus_001",
  124. },
  125. },
  126. }, { // 3
  127. conf: map[string]interface{}{
  128. "type": "redis",
  129. "protocol": "redis",
  130. "host": "edgex-redis",
  131. "port": 6379,
  132. "topicPrefix": "edgex/events/device",
  133. "messageType": "request",
  134. "contentType": "application/json",
  135. },
  136. expected: &EdgexConf{
  137. Protocol: "redis",
  138. Host: "edgex-redis",
  139. Port: 6379,
  140. Type: messaging.Redis,
  141. MessageType: MessageTypeRequest,
  142. ContentType: "application/json",
  143. DeviceName: "ekuiper",
  144. ProfileName: "ekuiperProfile",
  145. SourceName: "",
  146. TopicPrefix: "edgex/events/device",
  147. },
  148. }, { // 4
  149. conf: map[string]interface{}{
  150. "type": "redis",
  151. "protocol": "redis",
  152. "host": "edgex-redis",
  153. "port": 6379,
  154. "topicPrefix": "edgex/events/device",
  155. "messageType": "requests",
  156. "contentType": "application/json",
  157. },
  158. error: "specified wrong messageType value requests",
  159. }, { // 5
  160. conf: map[string]interface{}{
  161. "type": 20,
  162. "protocol": "redis",
  163. "host": "edgex-redis",
  164. "port": 6379,
  165. "topicPrefix": "edgex/events/device",
  166. "messageType": "requests",
  167. "contentType": "application/json",
  168. },
  169. error: "read properties map[contentType:application/json host:edgex-redis messageType:requests port:6379 protocol:redis topicPrefix:edgex/events/device type:20] fail with error: 1 error(s) decoding:\n\n* 'type' expected type 'string', got unconvertible type 'int', value: '20'",
  170. }, { // 6
  171. conf: map[string]interface{}{
  172. "type": "redis",
  173. "protocol": "redis",
  174. "host": "edgex-redis",
  175. "port": -1,
  176. "topicPrefix": "edgex/events/device",
  177. "messageType": "requests",
  178. "contentType": "application/json",
  179. },
  180. error: "specified wrong port value, expect positive integer but got -1",
  181. }, { // 7
  182. conf: map[string]interface{}{
  183. "type": "zmq",
  184. "protocol": "redis",
  185. "host": "edgex-redis",
  186. "port": 6379,
  187. "topicPrefix": "edgex/events/device",
  188. "messageType": "requests",
  189. "contentType": "application/json",
  190. },
  191. error: "specified wrong type value zmq",
  192. }, { // 8
  193. conf: map[string]interface{}{
  194. "protocol": "redis",
  195. "host": "edgex-redis",
  196. "port": 6379,
  197. "topicPrefix": "edgex/events/device",
  198. "topic": "requests",
  199. "contentType": "application/json",
  200. },
  201. error: "not allow to specify both topic and topicPrefix, please set one only",
  202. },
  203. }
  204. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  205. for i, test := range tests {
  206. ems := EdgexMsgBusSink{}
  207. err := ems.Configure(test.conf)
  208. if !reflect.DeepEqual(test.error, testx.Errstring(err)) {
  209. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, test.error, err)
  210. } else if test.error == "" && !reflect.DeepEqual(test.expected, ems.c) {
  211. t.Errorf("%d\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, test.expected, ems.c)
  212. }
  213. }
  214. }
  215. func TestProduceEvents(t1 *testing.T) {
  216. var tests = []struct {
  217. input string
  218. conf map[string]interface{}
  219. expected *dtos.Event
  220. error string
  221. }{
  222. { // 0
  223. input: `[
  224. {"meta":{
  225. "correlationid":"","deviceName":"demo","id":"","origin":3,
  226. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  227. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  228. }
  229. },
  230. {"humidity":100},
  231. {"temperature":50}
  232. ]`,
  233. conf: map[string]interface{}{
  234. "metadata": "meta",
  235. },
  236. expected: &dtos.Event{
  237. Id: "",
  238. DeviceName: "demo",
  239. ProfileName: "ekuiperProfile",
  240. SourceName: "ruleTest",
  241. Origin: 3,
  242. Readings: []dtos.BaseReading{
  243. {
  244. ResourceName: "humidity",
  245. DeviceName: "test device name1",
  246. ProfileName: "ekuiperProfile",
  247. Id: "12",
  248. Origin: 14,
  249. ValueType: v2.ValueTypeInt64,
  250. SimpleReading: dtos.SimpleReading{Value: "100"},
  251. },
  252. {
  253. ResourceName: "temperature",
  254. DeviceName: "test device name2",
  255. ProfileName: "kuiperProfile",
  256. Id: "22",
  257. Origin: 24,
  258. ValueType: v2.ValueTypeFloat64,
  259. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  260. },
  261. },
  262. },
  263. error: "",
  264. },
  265. { // 1
  266. input: `[
  267. {"meta":{
  268. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  269. "humidity":{"deviceName":"test device name1","id":"12","origin":14},
  270. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  271. }
  272. },
  273. {"h1":100},
  274. {"h2":null}
  275. ]`,
  276. conf: map[string]interface{}{
  277. "metadata": "meta",
  278. },
  279. expected: &dtos.Event{
  280. Id: "abc",
  281. DeviceName: "demo",
  282. ProfileName: "demoProfile",
  283. SourceName: "demoSource",
  284. Origin: 3,
  285. Tags: map[string]interface{}{"auth": "admin"},
  286. Readings: []dtos.BaseReading{
  287. {
  288. ResourceName: "h1",
  289. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  290. DeviceName: "demo",
  291. ProfileName: "demoProfile",
  292. ValueType: v2.ValueTypeFloat64,
  293. },
  294. },
  295. },
  296. error: "",
  297. },
  298. { // 2
  299. input: `[
  300. {"meta": 50},
  301. {"h1":100}
  302. ]`,
  303. conf: map[string]interface{}{
  304. "sourceName": "demo",
  305. },
  306. expected: &dtos.Event{
  307. DeviceName: "ekuiper",
  308. ProfileName: "ekuiperProfile",
  309. SourceName: "demo",
  310. Readings: []dtos.BaseReading{
  311. {
  312. ResourceName: "meta",
  313. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  314. DeviceName: "ekuiper",
  315. ProfileName: "ekuiperProfile",
  316. ValueType: v2.ValueTypeFloat64,
  317. },
  318. {
  319. ResourceName: "h1",
  320. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  321. DeviceName: "ekuiper",
  322. ProfileName: "ekuiperProfile",
  323. ValueType: v2.ValueTypeFloat64,
  324. },
  325. },
  326. },
  327. error: "",
  328. },
  329. { // 3
  330. input: `[
  331. {"meta1": "newmeta"},
  332. {"h1":true},
  333. {"sa":["1","2","3","4"]},
  334. {"fa":[1.1,2.2,3.3,4.4]}
  335. ]`,
  336. expected: &dtos.Event{
  337. DeviceName: "ekuiper",
  338. ProfileName: "ekuiperProfile",
  339. SourceName: "ruleTest",
  340. Readings: []dtos.BaseReading{
  341. {
  342. ResourceName: "meta1",
  343. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  344. DeviceName: "ekuiper",
  345. ProfileName: "ekuiperProfile",
  346. ValueType: v2.ValueTypeString,
  347. },
  348. {
  349. ResourceName: "h1",
  350. SimpleReading: dtos.SimpleReading{Value: "true"},
  351. DeviceName: "ekuiper",
  352. ProfileName: "ekuiperProfile",
  353. ValueType: v2.ValueTypeBool,
  354. },
  355. {
  356. ResourceName: "sa",
  357. SimpleReading: dtos.SimpleReading{Value: "[\"1\",\"2\",\"3\",\"4\"]"},
  358. DeviceName: "ekuiper",
  359. ProfileName: "ekuiperProfile",
  360. ValueType: v2.ValueTypeStringArray,
  361. },
  362. {
  363. ResourceName: "fa",
  364. SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
  365. DeviceName: "ekuiper",
  366. ProfileName: "ekuiperProfile",
  367. ValueType: v2.ValueTypeFloat64Array,
  368. },
  369. },
  370. },
  371. error: "",
  372. },
  373. { // 4
  374. input: `[]`,
  375. conf: map[string]interface{}{
  376. "deviceName": "kuiper",
  377. "profileName": "kp",
  378. "topic": "demo",
  379. },
  380. expected: &dtos.Event{
  381. ProfileName: "kp",
  382. DeviceName: "kuiper",
  383. SourceName: "ruleTest",
  384. Origin: 0,
  385. Readings: nil,
  386. },
  387. error: "",
  388. },
  389. { // 5
  390. input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
  391. expected: &dtos.Event{
  392. DeviceName: "ekuiper",
  393. ProfileName: "ekuiperProfile",
  394. SourceName: "ruleTest",
  395. Origin: 0,
  396. Readings: nil,
  397. },
  398. },
  399. { // 6
  400. input: `[
  401. {"meta1": "newmeta"},
  402. {"sa":"SGVsbG8gV29ybGQ="},
  403. {"meta":{
  404. "correlationid":"","profileName":"demoProfile","deviceName":"demo","id":"abc","origin":3,"tags":{"auth":"admin"},
  405. "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
  406. }}
  407. ]`,
  408. conf: map[string]interface{}{
  409. "metadata": "meta",
  410. "profileName": "myprofile",
  411. "sourceName": "ds",
  412. },
  413. expected: &dtos.Event{
  414. DeviceName: "demo",
  415. ProfileName: "demoProfile",
  416. SourceName: "ds",
  417. Origin: 3,
  418. Tags: map[string]interface{}{"auth": "admin"},
  419. Readings: []dtos.BaseReading{
  420. {
  421. DeviceName: "demo",
  422. ProfileName: "demoProfile",
  423. ResourceName: "meta1",
  424. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  425. ValueType: v2.ValueTypeString,
  426. },
  427. {
  428. ResourceName: "sa",
  429. BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
  430. ProfileName: "demoProfile",
  431. DeviceName: "test device name1",
  432. Id: "12",
  433. Origin: 14,
  434. ValueType: v2.ValueTypeBinary,
  435. },
  436. },
  437. },
  438. error: "",
  439. },
  440. }
  441. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  442. for i, t := range tests {
  443. ems := EdgexMsgBusSink{}
  444. err := ems.Configure(t.conf)
  445. if err != nil {
  446. t1.Errorf("%d: configure error %v", i, err)
  447. continue
  448. }
  449. if ems.c.SourceName == "" {
  450. ems.c.SourceName = "ruleTest"
  451. }
  452. var payload []map[string]interface{}
  453. json.Unmarshal([]byte(t.input), &payload)
  454. result, err := ems.produceEvents(ctx, payload)
  455. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  456. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  457. } else if t.error == "" && !compareEvent(t.expected, result) {
  458. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  459. }
  460. }
  461. }