edgex_sink_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // +build edgex
  15. package sink
  16. import (
  17. "fmt"
  18. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  19. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  20. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/testx"
  23. "github.com/lf-edge/ekuiper/internal/topo/context"
  24. "reflect"
  25. "testing"
  26. )
  27. var (
  28. contextLogger = conf.Log.WithField("rule", "testEdgexSink")
  29. ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  30. )
  31. func compareEvent(expected, actual *dtos.Event) bool {
  32. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
  33. for i, r := range expected.Readings {
  34. if !compareReading(r, actual.Readings[i]) {
  35. break
  36. }
  37. }
  38. return true
  39. }
  40. return false
  41. }
  42. func compareReading(expected, actual dtos.BaseReading) bool {
  43. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
  44. return true
  45. }
  46. return false
  47. }
  48. func TestConfigure(t *testing.T) {
  49. var tests = []struct {
  50. conf map[string]interface{}
  51. expected *EdgexConf
  52. error string
  53. }{
  54. { // 0
  55. conf: map[string]interface{}{
  56. "metadata": "meta",
  57. },
  58. expected: &EdgexConf{
  59. Protocol: "redis",
  60. Host: "localhost",
  61. Port: 6379,
  62. Type: messaging.Redis,
  63. MessageType: MessageTypeEvent,
  64. ContentType: "application/json",
  65. DeviceName: "ekuiper",
  66. ProfileName: "ekuiperProfile",
  67. Metadata: "meta",
  68. },
  69. },
  70. { // 1
  71. conf: map[string]interface{}{
  72. "type": "redis",
  73. "protocol": "redis",
  74. "host": "edgex-redis",
  75. "port": 6379,
  76. "topic": "ekuiperResult",
  77. "deviceName": "ekuiper",
  78. "profileName": "ekuiper",
  79. "sourceName": "ekuiper",
  80. "contentType": "application/json",
  81. },
  82. expected: &EdgexConf{
  83. Protocol: "redis",
  84. Host: "edgex-redis",
  85. Port: 6379,
  86. Type: messaging.Redis,
  87. MessageType: MessageTypeEvent,
  88. ContentType: "application/json",
  89. DeviceName: "ekuiper",
  90. ProfileName: "ekuiper",
  91. SourceName: "ekuiper",
  92. Topic: "ekuiperResult",
  93. },
  94. },
  95. { // 2
  96. conf: map[string]interface{}{
  97. "protocol": "tcp",
  98. "host": "127.0.0.1",
  99. "port": 1883,
  100. "topic": "result",
  101. "type": "mqtt",
  102. "metadata": "edgex_meta",
  103. "contentType": "application/json",
  104. "optional": map[string]interface{}{
  105. "ClientId": "edgex_message_bus_001",
  106. },
  107. },
  108. expected: &EdgexConf{
  109. Protocol: "tcp",
  110. Host: "127.0.0.1",
  111. Port: 1883,
  112. Type: messaging.MQTT,
  113. MessageType: MessageTypeEvent,
  114. ContentType: "application/json",
  115. DeviceName: "ekuiper",
  116. ProfileName: "ekuiperProfile",
  117. SourceName: "",
  118. Metadata: "edgex_meta",
  119. Topic: "result",
  120. Optional: map[string]string{
  121. "ClientId": "edgex_message_bus_001",
  122. },
  123. },
  124. }, { // 3
  125. conf: map[string]interface{}{
  126. "type": "redis",
  127. "protocol": "redis",
  128. "host": "edgex-redis",
  129. "port": 6379,
  130. "topicPrefix": "edgex/events/device",
  131. "messageType": "request",
  132. "contentType": "application/json",
  133. },
  134. expected: &EdgexConf{
  135. Protocol: "redis",
  136. Host: "edgex-redis",
  137. Port: 6379,
  138. Type: messaging.Redis,
  139. MessageType: MessageTypeRequest,
  140. ContentType: "application/json",
  141. DeviceName: "ekuiper",
  142. ProfileName: "ekuiperProfile",
  143. SourceName: "",
  144. TopicPrefix: "edgex/events/device",
  145. },
  146. }, { // 4
  147. conf: map[string]interface{}{
  148. "type": "redis",
  149. "protocol": "redis",
  150. "host": "edgex-redis",
  151. "port": 6379,
  152. "topicPrefix": "edgex/events/device",
  153. "messageType": "requests",
  154. "contentType": "application/json",
  155. },
  156. error: "specified wrong messageType value requests",
  157. }, { // 5
  158. conf: map[string]interface{}{
  159. "type": 20,
  160. "protocol": "redis",
  161. "host": "edgex-redis",
  162. "port": 6379,
  163. "topicPrefix": "edgex/events/device",
  164. "messageType": "requests",
  165. "contentType": "application/json",
  166. },
  167. error: "read properties map[contentType:application/json host:edgex-redis messageType:requests port:6379 protocol:redis topicPrefix:edgex/events/device type:20] fail with error: 1 error(s) decoding:\n\n* 'type' expected type 'string', got unconvertible type 'int', value: '20'",
  168. }, { // 6
  169. conf: map[string]interface{}{
  170. "type": "redis",
  171. "protocol": "redis",
  172. "host": "edgex-redis",
  173. "port": -1,
  174. "topicPrefix": "edgex/events/device",
  175. "messageType": "requests",
  176. "contentType": "application/json",
  177. },
  178. error: "specified wrong port value, expect positive integer but got -1",
  179. }, { // 7
  180. conf: map[string]interface{}{
  181. "type": "zmq",
  182. "protocol": "redis",
  183. "host": "edgex-redis",
  184. "port": 6379,
  185. "topicPrefix": "edgex/events/device",
  186. "messageType": "requests",
  187. "contentType": "application/json",
  188. },
  189. error: "specified wrong type value zmq",
  190. }, { // 8
  191. conf: map[string]interface{}{
  192. "protocol": "redis",
  193. "host": "edgex-redis",
  194. "port": 6379,
  195. "topicPrefix": "edgex/events/device",
  196. "topic": "requests",
  197. "contentType": "application/json",
  198. },
  199. error: "not allow to specify both topic and topicPrefix, please set one only",
  200. },
  201. }
  202. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  203. for i, test := range tests {
  204. ems := EdgexMsgBusSink{}
  205. err := ems.Configure(test.conf)
  206. if !reflect.DeepEqual(test.error, testx.Errstring(err)) {
  207. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, test.error, err)
  208. } else if test.error == "" && !reflect.DeepEqual(test.expected, ems.c) {
  209. t.Errorf("%d\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, test.expected, ems.c)
  210. }
  211. }
  212. }
  213. func TestProduceEvents(t1 *testing.T) {
  214. var tests = []struct {
  215. input string
  216. conf map[string]interface{}
  217. expected *dtos.Event
  218. error string
  219. }{
  220. { // 0
  221. input: `[
  222. {"meta":{
  223. "correlationid":"","deviceName":"demo","id":"","origin":3,
  224. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  225. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  226. }
  227. },
  228. {"humidity":100},
  229. {"temperature":50}
  230. ]`,
  231. conf: map[string]interface{}{
  232. "metadata": "meta",
  233. },
  234. expected: &dtos.Event{
  235. Id: "",
  236. DeviceName: "demo",
  237. ProfileName: "ekuiperProfile",
  238. SourceName: "ruleTest",
  239. Origin: 3,
  240. Readings: []dtos.BaseReading{
  241. {
  242. ResourceName: "humidity",
  243. DeviceName: "test device name1",
  244. ProfileName: "ekuiperProfile",
  245. Id: "12",
  246. Origin: 14,
  247. ValueType: v2.ValueTypeInt64,
  248. SimpleReading: dtos.SimpleReading{Value: "100"},
  249. },
  250. {
  251. ResourceName: "temperature",
  252. DeviceName: "test device name2",
  253. ProfileName: "kuiperProfile",
  254. Id: "22",
  255. Origin: 24,
  256. ValueType: v2.ValueTypeFloat64,
  257. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  258. },
  259. },
  260. },
  261. error: "",
  262. },
  263. { // 1
  264. input: `[
  265. {"meta":{
  266. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  267. "humidity":{"deviceName":"test device name1","id":"12","origin":14},
  268. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  269. }
  270. },
  271. {"h1":100},
  272. {"h2":null}
  273. ]`,
  274. conf: map[string]interface{}{
  275. "metadata": "meta",
  276. },
  277. expected: &dtos.Event{
  278. Id: "abc",
  279. DeviceName: "demo",
  280. ProfileName: "demoProfile",
  281. SourceName: "demoSource",
  282. Origin: 3,
  283. Tags: map[string]string{"auth": "admin"},
  284. Readings: []dtos.BaseReading{
  285. {
  286. ResourceName: "h1",
  287. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  288. DeviceName: "demo",
  289. ProfileName: "demoProfile",
  290. ValueType: v2.ValueTypeFloat64,
  291. },
  292. },
  293. },
  294. error: "",
  295. },
  296. { // 2
  297. input: `[
  298. {"meta": 50},
  299. {"h1":100}
  300. ]`,
  301. conf: map[string]interface{}{
  302. "sourceName": "demo",
  303. },
  304. expected: &dtos.Event{
  305. DeviceName: "ekuiper",
  306. ProfileName: "ekuiperProfile",
  307. SourceName: "demo",
  308. Readings: []dtos.BaseReading{
  309. {
  310. ResourceName: "meta",
  311. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  312. DeviceName: "ekuiper",
  313. ProfileName: "ekuiperProfile",
  314. ValueType: v2.ValueTypeFloat64,
  315. },
  316. {
  317. ResourceName: "h1",
  318. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  319. DeviceName: "ekuiper",
  320. ProfileName: "ekuiperProfile",
  321. ValueType: v2.ValueTypeFloat64,
  322. },
  323. },
  324. },
  325. error: "",
  326. },
  327. { // 3
  328. input: `[
  329. {"meta1": "newmeta"},
  330. {"h1":true},
  331. {"sa":["1","2","3","4"]},
  332. {"fa":[1.1,2.2,3.3,4.4]}
  333. ]`,
  334. expected: &dtos.Event{
  335. DeviceName: "ekuiper",
  336. ProfileName: "ekuiperProfile",
  337. SourceName: "ruleTest",
  338. Readings: []dtos.BaseReading{
  339. {
  340. ResourceName: "meta1",
  341. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  342. DeviceName: "ekuiper",
  343. ProfileName: "ekuiperProfile",
  344. ValueType: v2.ValueTypeString,
  345. },
  346. {
  347. ResourceName: "h1",
  348. SimpleReading: dtos.SimpleReading{Value: "true"},
  349. DeviceName: "ekuiper",
  350. ProfileName: "ekuiperProfile",
  351. ValueType: v2.ValueTypeBool,
  352. },
  353. {
  354. ResourceName: "sa",
  355. SimpleReading: dtos.SimpleReading{Value: "[\"1\",\"2\",\"3\",\"4\"]"},
  356. DeviceName: "ekuiper",
  357. ProfileName: "ekuiperProfile",
  358. ValueType: v2.ValueTypeStringArray,
  359. },
  360. {
  361. ResourceName: "fa",
  362. SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
  363. DeviceName: "ekuiper",
  364. ProfileName: "ekuiperProfile",
  365. ValueType: v2.ValueTypeFloat64Array,
  366. },
  367. },
  368. },
  369. error: "",
  370. },
  371. { // 4
  372. input: `[]`,
  373. conf: map[string]interface{}{
  374. "deviceName": "kuiper",
  375. "profileName": "kp",
  376. "topic": "demo",
  377. },
  378. expected: &dtos.Event{
  379. ProfileName: "kp",
  380. DeviceName: "kuiper",
  381. SourceName: "ruleTest",
  382. Origin: 0,
  383. Readings: nil,
  384. },
  385. error: "",
  386. },
  387. { // 5
  388. input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
  389. expected: &dtos.Event{
  390. DeviceName: "ekuiper",
  391. ProfileName: "ekuiperProfile",
  392. SourceName: "ruleTest",
  393. Origin: 0,
  394. Readings: nil,
  395. },
  396. },
  397. { // 6
  398. input: `[
  399. {"meta1": "newmeta"},
  400. {"sa":"SGVsbG8gV29ybGQ="},
  401. {"meta":{
  402. "correlationid":"","profileName":"demoProfile","deviceName":"demo","id":"abc","origin":3,"tags":{"auth":"admin"},
  403. "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
  404. }}
  405. ]`,
  406. conf: map[string]interface{}{
  407. "metadata": "meta",
  408. "profileName": "myprofile",
  409. "sourceName": "ds",
  410. },
  411. expected: &dtos.Event{
  412. DeviceName: "demo",
  413. ProfileName: "demoProfile",
  414. SourceName: "ds",
  415. Origin: 3,
  416. Tags: map[string]string{"auth": "admin"},
  417. Readings: []dtos.BaseReading{
  418. {
  419. DeviceName: "demo",
  420. ProfileName: "demoProfile",
  421. ResourceName: "meta1",
  422. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  423. ValueType: v2.ValueTypeString,
  424. },
  425. {
  426. ResourceName: "sa",
  427. BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
  428. ProfileName: "demoProfile",
  429. DeviceName: "test device name1",
  430. Id: "12",
  431. Origin: 14,
  432. ValueType: v2.ValueTypeBinary,
  433. },
  434. },
  435. },
  436. error: "",
  437. },
  438. }
  439. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  440. for i, t := range tests {
  441. ems := EdgexMsgBusSink{}
  442. err := ems.Configure(t.conf)
  443. if err != nil {
  444. t1.Errorf("%d: configure error %v", i, err)
  445. continue
  446. }
  447. if ems.c.SourceName == "" {
  448. ems.c.SourceName = "ruleTest"
  449. }
  450. result, err := ems.produceEvents(ctx, []byte(t.input))
  451. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  452. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  453. } else if t.error == "" && !compareEvent(t.expected, result) {
  454. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  455. }
  456. }
  457. }