edgex_sink_test.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // +build edgex
  15. package sink
  16. import (
  17. "fmt"
  18. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  19. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/testx"
  22. "github.com/lf-edge/ekuiper/internal/topo/context"
  23. "reflect"
  24. "testing"
  25. )
  26. var (
  27. contextLogger = conf.Log.WithField("rule", "testEdgexSink")
  28. ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  29. )
  30. func compareEvent(expected, actual *dtos.Event) bool {
  31. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
  32. for i, r := range expected.Readings {
  33. if !compareReading(r, actual.Readings[i]) {
  34. break
  35. }
  36. }
  37. return true
  38. }
  39. return false
  40. }
  41. func compareReading(expected, actual dtos.BaseReading) bool {
  42. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
  43. return true
  44. }
  45. return false
  46. }
  47. func TestProduceEvents(t1 *testing.T) {
  48. var tests = []struct {
  49. input string
  50. deviceName string
  51. profileName string
  52. topic string
  53. expected *dtos.Event
  54. error string
  55. }{
  56. {
  57. input: `[
  58. {"meta":{
  59. "correlationid":"","deviceName":"demo","id":"","origin":3,
  60. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  61. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  62. }
  63. },
  64. {"humidity":100},
  65. {"temperature":50}
  66. ]`,
  67. expected: &dtos.Event{
  68. Id: "",
  69. DeviceName: "demo",
  70. ProfileName: "kuiperProfile",
  71. Origin: 3,
  72. Readings: []dtos.BaseReading{
  73. {
  74. ResourceName: "humidity",
  75. DeviceName: "test device name1",
  76. ProfileName: "kuiperProfile",
  77. Id: "12",
  78. Origin: 14,
  79. ValueType: v2.ValueTypeInt64,
  80. SimpleReading: dtos.SimpleReading{Value: "100"},
  81. },
  82. {
  83. ResourceName: "temperature",
  84. DeviceName: "test device name2",
  85. ProfileName: "kuiperProfile",
  86. Id: "22",
  87. Origin: 24,
  88. ValueType: v2.ValueTypeFloat64,
  89. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  90. },
  91. },
  92. },
  93. error: "",
  94. },
  95. {
  96. input: `[
  97. {"meta":{
  98. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  99. "humidity":{"deviceName":"test device name1","id":"12","origin":14},
  100. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  101. }
  102. },
  103. {"h1":100}
  104. ]`,
  105. expected: &dtos.Event{
  106. Id: "abc",
  107. DeviceName: "demo",
  108. ProfileName: "demoProfile",
  109. SourceName: "demoSource",
  110. Origin: 3,
  111. Tags: map[string]string{"auth": "admin"},
  112. Readings: []dtos.BaseReading{
  113. {
  114. ResourceName: "h1",
  115. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  116. DeviceName: "demo",
  117. ProfileName: "demoProfile",
  118. ValueType: v2.ValueTypeFloat64,
  119. },
  120. },
  121. },
  122. error: "",
  123. },
  124. {
  125. input: `[
  126. {"meta": 50},
  127. {"h1":100}
  128. ]`,
  129. expected: &dtos.Event{
  130. ProfileName: "kuiperProfile",
  131. Readings: []dtos.BaseReading{
  132. {
  133. ResourceName: "h1",
  134. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  135. ProfileName: "kuiperProfile",
  136. ValueType: v2.ValueTypeFloat64,
  137. },
  138. },
  139. },
  140. error: "",
  141. },
  142. {
  143. input: `[
  144. {"meta1": "newmeta"},
  145. {"h1":true},
  146. {"sa":["1","2","3","4"]},
  147. {"fa":[1.1,2.2,3.3,4.4]}
  148. ]`,
  149. expected: &dtos.Event{
  150. ProfileName: "kuiperProfile",
  151. Readings: []dtos.BaseReading{
  152. {
  153. ResourceName: "meta1",
  154. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  155. ProfileName: "kuiperProfile",
  156. ValueType: v2.ValueTypeString,
  157. },
  158. {
  159. ResourceName: "h1",
  160. SimpleReading: dtos.SimpleReading{Value: "true"},
  161. ProfileName: "kuiperProfile",
  162. ValueType: v2.ValueTypeBool,
  163. },
  164. {
  165. ResourceName: "sa",
  166. SimpleReading: dtos.SimpleReading{Value: "[\"1\",\"2\",\"3\",\"4\"]"},
  167. ProfileName: "kuiperProfile",
  168. ValueType: v2.ValueTypeStringArray,
  169. },
  170. {
  171. ResourceName: "fa",
  172. SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
  173. ProfileName: "kuiperProfile",
  174. ValueType: v2.ValueTypeFloat64Array,
  175. },
  176. },
  177. },
  178. error: "",
  179. },
  180. {
  181. input: `[]`,
  182. deviceName: "kuiper",
  183. profileName: "kp",
  184. topic: "demo",
  185. expected: &dtos.Event{
  186. ProfileName: "kp",
  187. DeviceName: "kuiper",
  188. SourceName: "demo",
  189. Origin: 0,
  190. Readings: nil,
  191. },
  192. error: "",
  193. },
  194. {
  195. input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
  196. expected: &dtos.Event{
  197. ProfileName: "kuiperProfile",
  198. Origin: 0,
  199. Readings: nil,
  200. },
  201. },
  202. {
  203. input: `[
  204. {"meta1": "newmeta"},
  205. {"sa":"SGVsbG8gV29ybGQ="},
  206. {"meta":{
  207. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  208. "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
  209. }}
  210. ]`,
  211. expected: &dtos.Event{
  212. DeviceName: "demo",
  213. ProfileName: "demoProfile",
  214. SourceName: "demoSource",
  215. Origin: 3,
  216. Tags: map[string]string{"auth": "admin"},
  217. Readings: []dtos.BaseReading{
  218. {
  219. DeviceName: "demo",
  220. ProfileName: "demoProfile",
  221. ResourceName: "meta1",
  222. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  223. ValueType: v2.ValueTypeString,
  224. },
  225. {
  226. ResourceName: "sa",
  227. BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
  228. ProfileName: "demoProfile",
  229. DeviceName: "test device name1",
  230. Id: "12",
  231. Origin: 14,
  232. ValueType: v2.ValueTypeBinary,
  233. },
  234. },
  235. },
  236. error: "",
  237. },
  238. }
  239. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  240. for i, t := range tests {
  241. ems := EdgexMsgBusSink{deviceName: t.deviceName, profileName: t.profileName, topic: t.topic, metadata: "meta"}
  242. result, err := ems.produceEvents(ctx, []byte(t.input))
  243. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  244. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  245. } else if t.error == "" && !compareEvent(t.expected, result) {
  246. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  247. }
  248. }
  249. }