sink_node_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build template || !core
  15. // +build template !core
  16. package node
  17. import (
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  22. "github.com/lf-edge/ekuiper/internal/topo/transform"
  23. "github.com/lf-edge/ekuiper/internal/xsql"
  24. "reflect"
  25. "testing"
  26. "time"
  27. )
  28. func TestSinkTemplate_Apply(t *testing.T) {
  29. conf.InitConf()
  30. transform.RegisterAdditionalFuncs()
  31. var tests = []struct {
  32. config map[string]interface{}
  33. data []map[string]interface{}
  34. result [][]byte
  35. }{
  36. {
  37. config: map[string]interface{}{
  38. "sendSingle": true,
  39. "dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
  40. },
  41. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  42. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  43. }, {
  44. config: map[string]interface{}{
  45. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  46. },
  47. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  48. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  49. }, {
  50. config: map[string]interface{}{
  51. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  52. },
  53. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  54. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  55. }, {
  56. config: map[string]interface{}{
  57. "dataTemplate": `{"content":{{toJson .}}}`,
  58. },
  59. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  60. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  61. }, {
  62. config: map[string]interface{}{
  63. "sendSingle": true,
  64. "dataTemplate": `{"newab":"{{.ab}}"}`,
  65. },
  66. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  67. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  68. }, {
  69. config: map[string]interface{}{
  70. "sendSingle": true,
  71. "dataTemplate": `{"newab":"{{.ab}}"}`,
  72. },
  73. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  74. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  75. }, {
  76. config: map[string]interface{}{
  77. "sendSingle": true,
  78. "dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
  79. },
  80. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  81. result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
  82. }, {
  83. config: map[string]interface{}{
  84. "dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
  85. },
  86. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  87. result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
  88. }, {
  89. config: map[string]interface{}{
  90. "dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
  91. },
  92. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  93. result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
  94. }, {
  95. config: map[string]interface{}{
  96. "dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
  97. },
  98. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  99. result: [][]byte{[]byte(`{"result":2}`)},
  100. }, {
  101. config: map[string]interface{}{
  102. "dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
  103. "sendSingle": true,
  104. },
  105. data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
  106. result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
  107. },
  108. }
  109. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  110. contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
  111. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  112. for i, tt := range tests {
  113. mockSink := mocknode.NewMockSink()
  114. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  115. s.Open(ctx, make(chan error))
  116. s.input <- tt.data
  117. time.Sleep(1 * time.Second)
  118. s.close(ctx, contextLogger)
  119. results := mockSink.GetResults()
  120. if !reflect.DeepEqual(tt.result, results) {
  121. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  122. }
  123. }
  124. }
  125. func TestOmitEmpty_Apply(t *testing.T) {
  126. conf.InitConf()
  127. var tests = []struct {
  128. config map[string]interface{}
  129. data []map[string]interface{}
  130. result [][]byte
  131. }{
  132. { // 0
  133. config: map[string]interface{}{
  134. "sendSingle": true,
  135. "omitIfEmpty": true,
  136. },
  137. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  138. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{"ab":"hello2"}`)},
  139. }, { // 1
  140. config: map[string]interface{}{
  141. "sendSingle": false,
  142. "omitIfEmpty": true,
  143. },
  144. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  145. result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
  146. }, { // 2
  147. config: map[string]interface{}{
  148. "sendSingle": false,
  149. "omitIfEmpty": false,
  150. },
  151. data: []map[string]interface{}{},
  152. result: [][]byte{[]byte(`[]`)},
  153. }, { // 3
  154. config: map[string]interface{}{
  155. "sendSingle": false,
  156. "omitIfEmpty": false,
  157. },
  158. data: nil,
  159. result: [][]byte{[]byte(`null`)},
  160. }, { // 4
  161. config: map[string]interface{}{
  162. "sendSingle": true,
  163. "omitIfEmpty": false,
  164. },
  165. data: []map[string]interface{}{},
  166. result: nil,
  167. }, { // 5
  168. config: map[string]interface{}{
  169. "sendSingle": false,
  170. "omitIfEmpty": true,
  171. },
  172. data: []map[string]interface{}{},
  173. result: nil,
  174. }, { // 6
  175. config: map[string]interface{}{
  176. "sendSingle": false,
  177. "omitIfEmpty": true,
  178. },
  179. data: nil,
  180. result: nil,
  181. }, { // 7
  182. config: map[string]interface{}{
  183. "sendSingle": true,
  184. "omitIfEmpty": false,
  185. },
  186. data: []map[string]interface{}{},
  187. result: nil,
  188. }, { // 8
  189. config: map[string]interface{}{
  190. "sendSingle": true,
  191. "omitIfEmpty": true,
  192. },
  193. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  194. result: [][]byte{[]byte(`{"ab":"hello1"}`)},
  195. }, { // 9
  196. config: map[string]interface{}{
  197. "sendSingle": true,
  198. "omitIfEmpty": false,
  199. },
  200. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  201. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{}`)},
  202. },
  203. }
  204. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  205. contextLogger := conf.Log.WithField("rule", "TestOmitEmpty_Apply")
  206. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  207. for i, tt := range tests {
  208. mockSink := mocknode.NewMockSink()
  209. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  210. s.Open(ctx, make(chan error))
  211. s.input <- tt.data
  212. time.Sleep(100 * time.Millisecond)
  213. s.close(ctx, contextLogger)
  214. results := mockSink.GetResults()
  215. if !reflect.DeepEqual(tt.result, results) {
  216. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  217. }
  218. }
  219. }