sink_node_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build template || !core
  15. // +build template !core
  16. package node
  17. import (
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/schema"
  21. "github.com/lf-edge/ekuiper/internal/topo/context"
  22. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  23. "github.com/lf-edge/ekuiper/internal/topo/transform"
  24. "github.com/lf-edge/ekuiper/internal/xsql"
  25. "io/ioutil"
  26. "os"
  27. "path/filepath"
  28. "reflect"
  29. "testing"
  30. "time"
  31. )
  32. func TestSinkTemplate_Apply(t *testing.T) {
  33. conf.InitConf()
  34. transform.RegisterAdditionalFuncs()
  35. var tests = []struct {
  36. config map[string]interface{}
  37. data []map[string]interface{}
  38. result [][]byte
  39. }{
  40. {
  41. config: map[string]interface{}{
  42. "sendSingle": true,
  43. "dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
  44. },
  45. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  46. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  47. }, {
  48. config: map[string]interface{}{
  49. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  50. },
  51. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  52. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  53. }, {
  54. config: map[string]interface{}{
  55. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  56. },
  57. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  58. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  59. }, {
  60. config: map[string]interface{}{
  61. "dataTemplate": `{"content":{{toJson .}}}`,
  62. },
  63. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  64. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  65. }, {
  66. config: map[string]interface{}{
  67. "sendSingle": true,
  68. "dataTemplate": `{"newab":"{{.ab}}"}`,
  69. },
  70. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  71. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  72. }, {
  73. config: map[string]interface{}{
  74. "sendSingle": true,
  75. "dataTemplate": `{"newab":"{{.ab}}"}`,
  76. },
  77. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  78. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  79. }, {
  80. config: map[string]interface{}{
  81. "sendSingle": true,
  82. "dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
  83. },
  84. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  85. result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
  86. }, {
  87. config: map[string]interface{}{
  88. "dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
  89. },
  90. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  91. result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
  92. }, {
  93. config: map[string]interface{}{
  94. "dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
  95. },
  96. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  97. result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
  98. }, {
  99. config: map[string]interface{}{
  100. "dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
  101. },
  102. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  103. result: [][]byte{[]byte(`{"result":2}`)},
  104. }, {
  105. config: map[string]interface{}{
  106. "dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
  107. "sendSingle": true,
  108. },
  109. data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
  110. result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
  111. },
  112. }
  113. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  114. contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
  115. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  116. for i, tt := range tests {
  117. mockSink := mocknode.NewMockSink()
  118. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  119. s.Open(ctx, make(chan error))
  120. s.input <- tt.data
  121. time.Sleep(1 * time.Second)
  122. results := mockSink.GetResults()
  123. if !reflect.DeepEqual(tt.result, results) {
  124. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  125. }
  126. }
  127. }
  128. func TestOmitEmpty_Apply(t *testing.T) {
  129. conf.InitConf()
  130. var tests = []struct {
  131. config map[string]interface{}
  132. data []map[string]interface{}
  133. result [][]byte
  134. }{
  135. { // 0
  136. config: map[string]interface{}{
  137. "sendSingle": true,
  138. "omitIfEmpty": true,
  139. },
  140. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  141. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{"ab":"hello2"}`)},
  142. }, { // 1
  143. config: map[string]interface{}{
  144. "sendSingle": false,
  145. "omitIfEmpty": true,
  146. },
  147. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  148. result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
  149. }, { // 2
  150. config: map[string]interface{}{
  151. "sendSingle": false,
  152. "omitIfEmpty": false,
  153. },
  154. data: []map[string]interface{}{},
  155. result: [][]byte{[]byte(`[]`)},
  156. }, { // 3
  157. config: map[string]interface{}{
  158. "sendSingle": false,
  159. "omitIfEmpty": false,
  160. },
  161. data: nil,
  162. result: [][]byte{[]byte(`null`)},
  163. }, { // 4
  164. config: map[string]interface{}{
  165. "sendSingle": true,
  166. "omitIfEmpty": false,
  167. },
  168. data: []map[string]interface{}{},
  169. result: nil,
  170. }, { // 5
  171. config: map[string]interface{}{
  172. "sendSingle": false,
  173. "omitIfEmpty": true,
  174. },
  175. data: []map[string]interface{}{},
  176. result: nil,
  177. }, { // 6
  178. config: map[string]interface{}{
  179. "sendSingle": false,
  180. "omitIfEmpty": true,
  181. },
  182. data: nil,
  183. result: nil,
  184. }, { // 7
  185. config: map[string]interface{}{
  186. "sendSingle": true,
  187. "omitIfEmpty": false,
  188. },
  189. data: []map[string]interface{}{},
  190. result: nil,
  191. }, { // 8
  192. config: map[string]interface{}{
  193. "sendSingle": true,
  194. "omitIfEmpty": true,
  195. },
  196. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  197. result: [][]byte{[]byte(`{"ab":"hello1"}`)},
  198. }, { // 9
  199. config: map[string]interface{}{
  200. "sendSingle": true,
  201. "omitIfEmpty": false,
  202. },
  203. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  204. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{}`)},
  205. },
  206. }
  207. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  208. contextLogger := conf.Log.WithField("rule", "TestOmitEmpty_Apply")
  209. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  210. for i, tt := range tests {
  211. mockSink := mocknode.NewMockSink()
  212. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  213. s.Open(ctx, make(chan error))
  214. s.input <- tt.data
  215. time.Sleep(100 * time.Millisecond)
  216. results := mockSink.GetResults()
  217. if !reflect.DeepEqual(tt.result, results) {
  218. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  219. }
  220. }
  221. }
  222. func TestFormat_Apply(t *testing.T) {
  223. conf.InitConf()
  224. etcDir, err := conf.GetConfLoc()
  225. if err != nil {
  226. t.Fatal(err)
  227. }
  228. etcDir = filepath.Join(etcDir, "schemas", "protobuf")
  229. err = os.MkdirAll(etcDir, os.ModePerm)
  230. if err != nil {
  231. t.Fatal(err)
  232. }
  233. //Copy init.proto
  234. bytesRead, err := ioutil.ReadFile("../../schema/test/test1.proto")
  235. if err != nil {
  236. t.Fatal(err)
  237. }
  238. err = ioutil.WriteFile(filepath.Join(etcDir, "test1.proto"), bytesRead, 0755)
  239. if err != nil {
  240. t.Fatal(err)
  241. }
  242. defer func() {
  243. err = os.RemoveAll(etcDir)
  244. if err != nil {
  245. t.Fatal(err)
  246. }
  247. }()
  248. schema.InitRegistry()
  249. transform.RegisterAdditionalFuncs()
  250. var tests = []struct {
  251. config map[string]interface{}
  252. data []map[string]interface{}
  253. result [][]byte
  254. }{
  255. {
  256. config: map[string]interface{}{
  257. "sendSingle": true,
  258. "format": `protobuf`,
  259. "schemaId": "test1.Person",
  260. },
  261. data: []map[string]interface{}{{
  262. "name": "test",
  263. "id": 1,
  264. "email": "Dddd",
  265. }},
  266. result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
  267. }, {
  268. config: map[string]interface{}{
  269. "sendSingle": true,
  270. "dataTemplate": `{"name":"test","email":"{{.ab}}","id":1}`,
  271. "format": `protobuf`,
  272. "schemaId": "test1.Person",
  273. },
  274. data: []map[string]interface{}{{"ab": "Dddd"}},
  275. result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
  276. },
  277. }
  278. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  279. contextLogger := conf.Log.WithField("rule", "TestSinkFormat_Apply")
  280. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  281. for i, tt := range tests {
  282. mockSink := mocknode.NewMockSink()
  283. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  284. s.Open(ctx, make(chan error))
  285. s.input <- tt.data
  286. time.Sleep(1 * time.Second)
  287. results := mockSink.GetResults()
  288. if !reflect.DeepEqual(tt.result, results) {
  289. t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
  290. }
  291. }
  292. }