edgex_sink_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // +build edgex
  2. package sink
  3. import (
  4. "fmt"
  5. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  6. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  7. "github.com/emqx/kuiper/internal/conf"
  8. "github.com/emqx/kuiper/internal/testx"
  9. "github.com/emqx/kuiper/internal/topo/context"
  10. "reflect"
  11. "testing"
  12. )
  13. var (
  14. contextLogger = conf.Log.WithField("rule", "testEdgexSink")
  15. ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  16. )
  17. func compareEvent(expected, actual *dtos.Event) bool {
  18. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
  19. for i, r := range expected.Readings {
  20. if !compareReading(r, actual.Readings[i]) {
  21. break
  22. }
  23. }
  24. return true
  25. }
  26. return false
  27. }
  28. func compareReading(expected, actual dtos.BaseReading) bool {
  29. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
  30. return true
  31. }
  32. return false
  33. }
  34. func TestProduceEvents(t1 *testing.T) {
  35. var tests = []struct {
  36. input string
  37. deviceName string
  38. profileName string
  39. topic string
  40. expected *dtos.Event
  41. error string
  42. }{
  43. {
  44. input: `[
  45. {"meta":{
  46. "correlationid":"","deviceName":"demo","id":"","origin":3,
  47. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  48. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  49. }
  50. },
  51. {"humidity":100},
  52. {"temperature":50}
  53. ]`,
  54. expected: &dtos.Event{
  55. Id: "",
  56. DeviceName: "demo",
  57. ProfileName: "kuiperProfile",
  58. Origin: 3,
  59. Readings: []dtos.BaseReading{
  60. {
  61. ResourceName: "humidity",
  62. DeviceName: "test device name1",
  63. ProfileName: "kuiperProfile",
  64. Id: "12",
  65. Origin: 14,
  66. ValueType: v2.ValueTypeInt64,
  67. SimpleReading: dtos.SimpleReading{Value: "100"},
  68. },
  69. {
  70. ResourceName: "temperature",
  71. DeviceName: "test device name2",
  72. ProfileName: "kuiperProfile",
  73. Id: "22",
  74. Origin: 24,
  75. ValueType: v2.ValueTypeFloat64,
  76. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  77. },
  78. },
  79. },
  80. error: "",
  81. },
  82. {
  83. input: `[
  84. {"meta":{
  85. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  86. "humidity":{"deviceName":"test device name1","id":"12","origin":14},
  87. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  88. }
  89. },
  90. {"h1":100}
  91. ]`,
  92. expected: &dtos.Event{
  93. Id: "abc",
  94. DeviceName: "demo",
  95. ProfileName: "demoProfile",
  96. SourceName: "demoSource",
  97. Origin: 3,
  98. Tags: map[string]string{"auth": "admin"},
  99. Readings: []dtos.BaseReading{
  100. {
  101. ResourceName: "h1",
  102. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  103. DeviceName: "demo",
  104. ProfileName: "demoProfile",
  105. ValueType: v2.ValueTypeFloat64,
  106. },
  107. },
  108. },
  109. error: "",
  110. },
  111. {
  112. input: `[
  113. {"meta": 50},
  114. {"h1":100}
  115. ]`,
  116. expected: &dtos.Event{
  117. ProfileName: "kuiperProfile",
  118. Readings: []dtos.BaseReading{
  119. {
  120. ResourceName: "h1",
  121. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  122. ProfileName: "kuiperProfile",
  123. ValueType: v2.ValueTypeFloat64,
  124. },
  125. },
  126. },
  127. error: "",
  128. },
  129. {
  130. input: `[
  131. {"meta1": "newmeta"},
  132. {"h1":true},
  133. {"sa":["1","2","3","4"]},
  134. {"fa":[1.1,2.2,3.3,4.4]}
  135. ]`,
  136. expected: &dtos.Event{
  137. ProfileName: "kuiperProfile",
  138. Readings: []dtos.BaseReading{
  139. {
  140. ResourceName: "meta1",
  141. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  142. ProfileName: "kuiperProfile",
  143. ValueType: v2.ValueTypeString,
  144. },
  145. {
  146. ResourceName: "h1",
  147. SimpleReading: dtos.SimpleReading{Value: "true"},
  148. ProfileName: "kuiperProfile",
  149. ValueType: v2.ValueTypeBool,
  150. },
  151. {
  152. ResourceName: "sa",
  153. SimpleReading: dtos.SimpleReading{Value: "[\"1\",\"2\",\"3\",\"4\"]"},
  154. ProfileName: "kuiperProfile",
  155. ValueType: v2.ValueTypeStringArray,
  156. },
  157. {
  158. ResourceName: "fa",
  159. SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
  160. ProfileName: "kuiperProfile",
  161. ValueType: v2.ValueTypeFloat64Array,
  162. },
  163. },
  164. },
  165. error: "",
  166. },
  167. {
  168. input: `[]`,
  169. deviceName: "kuiper",
  170. profileName: "kp",
  171. topic: "demo",
  172. expected: &dtos.Event{
  173. ProfileName: "kp",
  174. DeviceName: "kuiper",
  175. SourceName: "demo",
  176. Origin: 0,
  177. Readings: nil,
  178. },
  179. error: "",
  180. },
  181. {
  182. input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
  183. expected: &dtos.Event{
  184. ProfileName: "kuiperProfile",
  185. Origin: 0,
  186. Readings: nil,
  187. },
  188. },
  189. {
  190. input: `[
  191. {"meta1": "newmeta"},
  192. {"sa":"SGVsbG8gV29ybGQ="},
  193. {"meta":{
  194. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  195. "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
  196. }}
  197. ]`,
  198. expected: &dtos.Event{
  199. DeviceName: "demo",
  200. ProfileName: "demoProfile",
  201. SourceName: "demoSource",
  202. Origin: 3,
  203. Tags: map[string]string{"auth": "admin"},
  204. Readings: []dtos.BaseReading{
  205. {
  206. DeviceName: "demo",
  207. ProfileName: "demoProfile",
  208. ResourceName: "meta1",
  209. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  210. ValueType: v2.ValueTypeString,
  211. },
  212. {
  213. ResourceName: "sa",
  214. BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
  215. ProfileName: "demoProfile",
  216. DeviceName: "test device name1",
  217. Id: "12",
  218. Origin: 14,
  219. ValueType: v2.ValueTypeBinary,
  220. },
  221. },
  222. },
  223. error: "",
  224. },
  225. }
  226. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  227. for i, t := range tests {
  228. ems := EdgexMsgBusSink{deviceName: t.deviceName, profileName: t.profileName, topic: t.topic, metadata: "meta"}
  229. result, err := ems.produceEvents(ctx, []byte(t.input))
  230. if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
  231. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  232. } else if t.error == "" && !compareEvent(t.expected, result) {
  233. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  234. }
  235. }
  236. }