edgex_sink_test.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. // +build edgex
  2. package sinks
  3. import (
  4. "fmt"
  5. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  6. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "reflect"
  10. "testing"
  11. )
  12. var (
  13. contextLogger = common.Log.WithField("rule", "testEdgexSink")
  14. ctx = contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  15. )
  16. func compareEvent(expected, actual *dtos.Event) bool {
  17. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
  18. for i, r := range expected.Readings {
  19. if !compareReading(r, actual.Readings[i]) {
  20. break
  21. }
  22. }
  23. return true
  24. }
  25. return false
  26. }
  27. func compareReading(expected, actual dtos.BaseReading) bool {
  28. if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && expected.ResourceName == actual.ResourceName && expected.Value == actual.Value && expected.ValueType == actual.ValueType {
  29. return true
  30. }
  31. return false
  32. }
  33. func TestProduceEvents(t1 *testing.T) {
  34. var tests = []struct {
  35. input string
  36. deviceName string
  37. profileName string
  38. topic string
  39. expected *dtos.Event
  40. error string
  41. }{
  42. {
  43. input: `[
  44. {"meta":{
  45. "correlationid":"","deviceName":"demo","id":"","origin":3,
  46. "humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
  47. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  48. }
  49. },
  50. {"humidity":100},
  51. {"temperature":50}
  52. ]`,
  53. expected: &dtos.Event{
  54. Id: "",
  55. DeviceName: "demo",
  56. ProfileName: "kuiperProfile",
  57. Origin: 3,
  58. Readings: []dtos.BaseReading{
  59. {
  60. ResourceName: "humidity",
  61. DeviceName: "test device name1",
  62. ProfileName: "kuiperProfile",
  63. Id: "12",
  64. Origin: 14,
  65. ValueType: v2.ValueTypeInt64,
  66. SimpleReading: dtos.SimpleReading{Value: "100"},
  67. },
  68. {
  69. ResourceName: "temperature",
  70. DeviceName: "test device name2",
  71. ProfileName: "kuiperProfile",
  72. Id: "22",
  73. Origin: 24,
  74. ValueType: v2.ValueTypeFloat64,
  75. SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
  76. },
  77. },
  78. },
  79. error: "",
  80. },
  81. {
  82. input: `[
  83. {"meta":{
  84. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  85. "humidity":{"deviceName":"test device name1","id":"12","origin":14},
  86. "temperature":{"deviceName":"test device name2","id":"22","origin":24}
  87. }
  88. },
  89. {"h1":100}
  90. ]`,
  91. expected: &dtos.Event{
  92. Id: "abc",
  93. DeviceName: "demo",
  94. ProfileName: "demoProfile",
  95. SourceName: "demoSource",
  96. Origin: 3,
  97. Tags: map[string]string{"auth": "admin"},
  98. Readings: []dtos.BaseReading{
  99. {
  100. ResourceName: "h1",
  101. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  102. DeviceName: "demo",
  103. ProfileName: "demoProfile",
  104. ValueType: v2.ValueTypeFloat64,
  105. },
  106. },
  107. },
  108. error: "",
  109. },
  110. {
  111. input: `[
  112. {"meta": 50},
  113. {"h1":100}
  114. ]`,
  115. expected: &dtos.Event{
  116. ProfileName: "kuiperProfile",
  117. Readings: []dtos.BaseReading{
  118. {
  119. ResourceName: "h1",
  120. SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
  121. ProfileName: "kuiperProfile",
  122. ValueType: v2.ValueTypeFloat64,
  123. },
  124. },
  125. },
  126. error: "",
  127. },
  128. {
  129. input: `[
  130. {"meta1": "newmeta"},
  131. {"h1":true},
  132. {"sa":["1","2","3","4"]},
  133. {"fa":[1.1,2.2,3.3,4.4]}
  134. ]`,
  135. expected: &dtos.Event{
  136. ProfileName: "kuiperProfile",
  137. Readings: []dtos.BaseReading{
  138. {
  139. ResourceName: "meta1",
  140. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  141. ProfileName: "kuiperProfile",
  142. ValueType: v2.ValueTypeString,
  143. },
  144. {
  145. ResourceName: "h1",
  146. SimpleReading: dtos.SimpleReading{Value: "true"},
  147. ProfileName: "kuiperProfile",
  148. ValueType: v2.ValueTypeBool,
  149. },
  150. {
  151. ResourceName: "sa",
  152. SimpleReading: dtos.SimpleReading{Value: "[\"1\",\"2\",\"3\",\"4\"]"},
  153. ProfileName: "kuiperProfile",
  154. ValueType: v2.ValueTypeStringArray,
  155. },
  156. {
  157. ResourceName: "fa",
  158. SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
  159. ProfileName: "kuiperProfile",
  160. ValueType: v2.ValueTypeFloat64Array,
  161. },
  162. },
  163. },
  164. error: "",
  165. },
  166. {
  167. input: `[]`,
  168. deviceName: "kuiper",
  169. profileName: "kp",
  170. topic: "demo",
  171. expected: &dtos.Event{
  172. ProfileName: "kp",
  173. DeviceName: "kuiper",
  174. SourceName: "demo",
  175. Origin: 0,
  176. Readings: nil,
  177. },
  178. error: "",
  179. },
  180. {
  181. input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
  182. expected: &dtos.Event{
  183. ProfileName: "kuiperProfile",
  184. Origin: 0,
  185. Readings: nil,
  186. },
  187. },
  188. {
  189. input: `[
  190. {"meta1": "newmeta"},
  191. {"sa":"SGVsbG8gV29ybGQ="},
  192. {"meta":{
  193. "correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
  194. "sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
  195. }}
  196. ]`,
  197. expected: &dtos.Event{
  198. DeviceName: "demo",
  199. ProfileName: "demoProfile",
  200. SourceName: "demoSource",
  201. Origin: 3,
  202. Tags: map[string]string{"auth": "admin"},
  203. Readings: []dtos.BaseReading{
  204. {
  205. DeviceName: "demo",
  206. ProfileName: "demoProfile",
  207. ResourceName: "meta1",
  208. SimpleReading: dtos.SimpleReading{Value: "newmeta"},
  209. ValueType: v2.ValueTypeString,
  210. },
  211. {
  212. ResourceName: "sa",
  213. BinaryReading: dtos.BinaryReading{BinaryValue: []byte("Hello World"), MediaType: "application/css"},
  214. ProfileName: "demoProfile",
  215. DeviceName: "test device name1",
  216. Id: "12",
  217. Origin: 14,
  218. ValueType: v2.ValueTypeBinary,
  219. },
  220. },
  221. },
  222. error: "",
  223. },
  224. }
  225. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  226. for i, t := range tests {
  227. ems := EdgexMsgBusSink{deviceName: t.deviceName, profileName: t.profileName, topic: t.topic, metadata: "meta"}
  228. result, err := ems.produceEvents(ctx, []byte(t.input))
  229. if !reflect.DeepEqual(t.error, common.Errstring(err)) {
  230. t1.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, t.input, t.error, err)
  231. } else if t.error == "" && !compareEvent(t.expected, result) {
  232. t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
  233. }
  234. }
  235. }