edgex_sink_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build edgex
  15. // +build edgex
  16. package sink
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  21. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/testx"
  24. "github.com/lf-edge/ekuiper/internal/topo/context"
  25. "reflect"
  26. "testing"
  27. )
  28. var (
  29. contextLogger = conf.Log.WithField("rule", "testEdgexSink")
  30. ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  31. )
  32. func compareEvent(expected, actual *dtos.Event) bool {
  33. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
  34. for i, r := range expected.Readings {
  35. if !compareReading(r, actual.Readings[i]) {
  36. return false
  37. }
  38. }
  39. return true
  40. }
  41. return false
  42. }
  43. func compareReading(expected, actual dtos.BaseReading) bool {
  44. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
  45. if expected.ValueType == v2.ValueTypeObject {
  46. if !reflect.DeepEqual(expected.ObjectValue, actual.ObjectValue) {
  47. return false
  48. }
  49. }
  50. return true
  51. }
  52. return false
  53. }
  54. func TestConfigure(t *testing.T) {
  55. var tests = []struct {
  56. conf map[string]interface{}
  57. expected *EdgexConf
  58. error string
  59. }{
  60. { // 0
  61. conf: map[string]interface{}{
  62. "metadata": "meta",
  63. },
  64. expected: &EdgexConf{
  65. MessageType: MessageTypeEvent,
  66. ContentType: "application/json",
  67. DeviceName: "ekuiper",
  68. ProfileName: "ekuiperProfile",
  69. Metadata: "meta",
  70. },
  71. },
  72. { // 1
  73. conf: map[string]interface{}{
  74. "type": "redis",
  75. "protocol": "redis",
  76. "host": "edgex-redis",
  77. "port": 6379,
  78. "topic": "ekuiperResult",
  79. "deviceName": "ekuiper",
  80. "profileName": "ekuiper",
  81. "sourceName": "ekuiper",
  82. "contentType": "application/json",
  83. },
  84. expected: &EdgexConf{
  85. MessageType: MessageTypeEvent,
  86. ContentType: "application/json",
  87. DeviceName: "ekuiper",
  88. ProfileName: "ekuiper",
  89. SourceName: "ekuiper",
  90. Topic: "ekuiperResult",
  91. },
  92. },
  93. { // 2
  94. conf: map[string]interface{}{
  95. "protocol": "tcp",
  96. "host": "127.0.0.1",
  97. "port": 1883,
  98. "topic": "result",
  99. "type": "mqtt",
  100. "metadata": "edgex_meta",
  101. "contentType": "application/json",
  102. "optional": map[string]interface{}{
  103. "ClientId": "edgex_message_bus_001",
  104. },
  105. },
  106. expected: &EdgexConf{
  107. MessageType: MessageTypeEvent,
  108. ContentType: "application/json",
  109. DeviceName: "ekuiper",
  110. ProfileName: "ekuiperProfile",
  111. SourceName: "",
  112. Metadata: "edgex_meta",
  113. Topic: "result",
  114. },
  115. },
  116. { // 3
  117. conf: map[string]interface{}{
  118. "type": "redis",
  119. "protocol": "redis",
  120. "host": "edgex-redis",
  121. "port": 6379,
  122. "topicPrefix": "edgex/events/device",
  123. "messageType": "request",
  124. "contentType": "application/json",
  125. },
  126. expected: &EdgexConf{
  127. MessageType: MessageTypeRequest,
  128. ContentType: "application/json",
  129. DeviceName: "ekuiper",
  130. ProfileName: "ekuiperProfile",
  131. SourceName: "",
  132. TopicPrefix: "edgex/events/device",
  133. },
  134. },
  135. { // 4
  136. conf: map[string]interface{}{
  137. "type": "redis",
  138. "protocol": "redis",
  139. "host": "edgex-redis",
  140. "port": 6379,
  141. "topicPrefix": "edgex/events/device",
  142. "messageType": "requests",
  143. "contentType": "application/json",
  144. },
  145. error: "specified wrong messageType value requests",
  146. },
  147. { // 5
  148. conf: map[string]interface{}{
  149. "protocol": "redis",
  150. "host": "edgex-redis",
  151. "port": 6379,
  152. "topicPrefix": "edgex/events/device",
  153. "topic": "requests",
  154. "contentType": "application/json",
  155. },
  156. error: "not allow to specify both topic and topicPrefix, please set one only",
  157. },
  158. }
  159. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  160. for i, test := range tests {
  161. ems := EdgexMsgBusSink{}
  162. err := ems.Configure(test.conf)
  163. if !reflect.DeepEqual(test.error, testx.Errstring(err)) {
  164. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, test.error, err)
  165. } else if test.error == "" && !reflect.DeepEqual(test.expected, ems.c) {
  166. t.Errorf("%d\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, test.expected, ems.c)
  167. }
  168. }
  169. }
  170. func TestProduceEvents(t1 *testing.T) {
  171. var tests = []struct {
  172. input string
  173. conf map[string]interface{}
  174. expected *dtos.Event
  175. error string
  176. }{
  177. { // 0
  178. input: `[
  179. {"meta":{
  180. "correlationid":"","deviceName":"demo","id":"","origin":3,
  181. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  182. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  183. }
  184. },
  185. {"humidity":100},
  186. {"temperature":50}
  187. ]`,
  188. conf: map[string]interface{}{
  189. "metadata": "meta",
  190. },
  191. expected: &dtos.Event{
  192. Id: "",
  193. DeviceName: "demo",
  194. ProfileName: "ekuiperProfile",
  195. SourceName: "ruleTest",
  196. Origin: 3,
  197. Readings: []dtos.BaseReading{
  198. {
  199. ResourceName: "humidity",
  200. DeviceName: "test device name1",
  201. ProfileName: "ekuiperProfile",
  202. Id: "12",
  203. Origin: 14,
  204. ValueType: v2.ValueTypeInt64,
  205. SimpleReading: dtos.SimpleReading{Value: "100"},
  206. },
  207. {
  208. ResourceName: "temperature",
  209. DeviceName: "test device name2",
  210. ProfileName: "ekuiperProfile",
  211. Id: "22",
  212. Origin: 24,
  213. ValueType: v2.ValueTypeFloat64,
  214. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  215. },
  216. },
  217. },
  218. error: "",
  219. },
  220. { // 1
  221. input: `[
  222. {"meta":{
  223. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  224. "humidity":{"deviceName":"test device name1","id":"12","origin":14},
  225. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  226. }
  227. },
  228. {"h1":100},
  229. {"h2":null}
  230. ]`,
  231. conf: map[string]interface{}{
  232. "metadata": "meta",
  233. },
  234. expected: &dtos.Event{
  235. Id: "abc",
  236. DeviceName: "demo",
  237. ProfileName: "demoProfile",
  238. SourceName: "demoSource",
  239. Origin: 3,
  240. Tags: map[string]interface{}{"auth": "admin"},
  241. Readings: []dtos.BaseReading{
  242. {
  243. ResourceName: "h1",
  244. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  245. DeviceName: "demo",
  246. ProfileName: "demoProfile",
  247. ValueType: v2.ValueTypeFloat64,
  248. },
  249. },
  250. },
  251. error: "",
  252. },
  253. { // 2
  254. input: `[
  255. {"meta": 50},
  256. {"h1":100}
  257. ]`,
  258. conf: map[string]interface{}{
  259. "sourceName": "demo",
  260. },
  261. expected: &dtos.Event{
  262. DeviceName: "ekuiper",
  263. ProfileName: "ekuiperProfile",
  264. SourceName: "demo",
  265. Readings: []dtos.BaseReading{
  266. {
  267. ResourceName: "meta",
  268. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  269. DeviceName: "ekuiper",
  270. ProfileName: "ekuiperProfile",
  271. ValueType: v2.ValueTypeFloat64,
  272. },
  273. {
  274. ResourceName: "h1",
  275. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  276. DeviceName: "ekuiper",
  277. ProfileName: "ekuiperProfile",
  278. ValueType: v2.ValueTypeFloat64,
  279. },
  280. },
  281. },
  282. error: "",
  283. },
  284. { // 3
  285. input: `[
  286. {"meta1": "newmeta"},
  287. {"h1":true},
  288. {"sa":["1","2","3","4"]},
  289. {"fa":[1.1,2.2,3.3,4.4]}
  290. ]`,
  291. expected: &dtos.Event{
  292. DeviceName: "ekuiper",
  293. ProfileName: "ekuiperProfile",
  294. SourceName: "ruleTest",
  295. Readings: []dtos.BaseReading{
  296. {
  297. ResourceName: "meta1",
  298. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  299. DeviceName: "ekuiper",
  300. ProfileName: "ekuiperProfile",
  301. ValueType: v2.ValueTypeString,
  302. },
  303. {
  304. ResourceName: "h1",
  305. SimpleReading: dtos.SimpleReading{Value: "true"},
  306. DeviceName: "ekuiper",
  307. ProfileName: "ekuiperProfile",
  308. ValueType: v2.ValueTypeBool,
  309. },
  310. {
  311. ResourceName: "sa",
  312. SimpleReading: dtos.SimpleReading{Value: "[1, 2, 3, 4]"},
  313. DeviceName: "ekuiper",
  314. ProfileName: "ekuiperProfile",
  315. ValueType: v2.ValueTypeStringArray,
  316. },
  317. {
  318. ResourceName: "fa",
  319. SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
  320. DeviceName: "ekuiper",
  321. ProfileName: "ekuiperProfile",
  322. ValueType: v2.ValueTypeFloat64Array,
  323. },
  324. },
  325. },
  326. error: "",
  327. },
  328. { // 4
  329. input: `[]`,
  330. conf: map[string]interface{}{
  331. "deviceName": "kuiper",
  332. "profileName": "kp",
  333. "topic": "demo",
  334. },
  335. expected: &dtos.Event{
  336. ProfileName: "kp",
  337. DeviceName: "kuiper",
  338. SourceName: "ruleTest",
  339. Origin: 0,
  340. Readings: nil,
  341. },
  342. error: "",
  343. },
  344. { // 5
  345. input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
  346. expected: &dtos.Event{
  347. DeviceName: "ekuiper",
  348. ProfileName: "ekuiperProfile",
  349. SourceName: "ruleTest",
  350. Origin: 0,
  351. Readings: nil,
  352. },
  353. },
  354. { // 6
  355. input: `[
  356. {"meta1": "newmeta"},
  357. {"sa":"SGVsbG8gV29ybGQ="},
  358. {"meta":{
  359. "correlationid":"","profileName":"demoProfile","deviceName":"demo","id":"abc","origin":3,"tags":{"auth":"admin"},
  360. "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
  361. }}
  362. ]`,
  363. conf: map[string]interface{}{
  364. "metadata": "meta",
  365. "profileName": "myprofile",
  366. "sourceName": "ds",
  367. },
  368. expected: &dtos.Event{
  369. DeviceName: "demo",
  370. ProfileName: "demoProfile",
  371. SourceName: "ds",
  372. Origin: 3,
  373. Tags: map[string]interface{}{"auth": "admin"},
  374. Readings: []dtos.BaseReading{
  375. {
  376. DeviceName: "demo",
  377. ProfileName: "demoProfile",
  378. ResourceName: "meta1",
  379. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  380. ValueType: v2.ValueTypeString,
  381. },
  382. {
  383. ResourceName: "sa",
  384. BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
  385. ProfileName: "demoProfile",
  386. DeviceName: "test device name1",
  387. Id: "12",
  388. Origin: 14,
  389. ValueType: v2.ValueTypeBinary,
  390. },
  391. },
  392. },
  393. error: "",
  394. }, { // 7
  395. input: `[
  396. {"meta":{
  397. "correlationid":"","deviceName":"demo","id":"","origin":3,
  398. "obj":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Object"}
  399. }
  400. },
  401. {"obj":{"a":1,"b":"sttt"}}
  402. ]`,
  403. conf: map[string]interface{}{
  404. "metadata": "meta",
  405. },
  406. expected: &dtos.Event{
  407. Id: "",
  408. DeviceName: "demo",
  409. ProfileName: "ekuiperProfile",
  410. SourceName: "ruleTest",
  411. Origin: 3,
  412. Readings: []dtos.BaseReading{
  413. {
  414. ResourceName: "obj",
  415. DeviceName: "test device name1",
  416. ProfileName: "ekuiperProfile",
  417. Id: "12",
  418. Origin: 14,
  419. ValueType: v2.ValueTypeObject,
  420. ObjectReading: dtos.ObjectReading{ObjectValue: map[string]interface{}{
  421. "a": float64(1),
  422. "b": "sttt",
  423. }},
  424. },
  425. },
  426. },
  427. error: "",
  428. }, { // 8
  429. input: `[
  430. {"obj":{"a":1,"b":"sttt"}}
  431. ]`,
  432. conf: map[string]interface{}{},
  433. expected: &dtos.Event{
  434. Id: "",
  435. DeviceName: "ekuiper",
  436. ProfileName: "ekuiperProfile",
  437. SourceName: "ruleTest",
  438. Origin: 0,
  439. Readings: []dtos.BaseReading{
  440. {
  441. ResourceName: "obj",
  442. DeviceName: "ekuiper",
  443. ProfileName: "ekuiperProfile",
  444. Id: "",
  445. Origin: 0,
  446. ValueType: v2.ValueTypeObject,
  447. ObjectReading: dtos.ObjectReading{ObjectValue: map[string]interface{}{
  448. "a": float64(1),
  449. "b": "sttt",
  450. }},
  451. },
  452. },
  453. },
  454. error: "",
  455. },
  456. }
  457. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  458. for i, t := range tests {
  459. ems := EdgexMsgBusSink{}
  460. err := ems.Configure(t.conf)
  461. if err != nil {
  462. t1.Errorf("%d: configure error %v", i, err)
  463. continue
  464. }
  465. if ems.c.SourceName == "" {
  466. ems.c.SourceName = "ruleTest"
  467. }
  468. var payload []map[string]interface{}
  469. json.Unmarshal([]byte(t.input), &payload)
  470. result, err := ems.produceEvents(ctx, payload)
  471. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  472. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  473. } else if t.error == "" && !compareEvent(t.expected, result) {
  474. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  475. }
  476. }
  477. }