sink_node_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build template || !core
  15. // +build template !core
  16. package node
  17. import (
  18. "errors"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/schema"
  22. "github.com/lf-edge/ekuiper/internal/topo/context"
  23. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  24. "github.com/lf-edge/ekuiper/internal/topo/transform"
  25. "github.com/lf-edge/ekuiper/internal/xsql"
  26. "io/ioutil"
  27. "os"
  28. "path/filepath"
  29. "reflect"
  30. "testing"
  31. "time"
  32. )
  33. func TestSinkTemplate_Apply(t *testing.T) {
  34. conf.InitConf()
  35. transform.RegisterAdditionalFuncs()
  36. var tests = []struct {
  37. config map[string]interface{}
  38. data []map[string]interface{}
  39. result [][]byte
  40. }{
  41. {
  42. config: map[string]interface{}{
  43. "sendSingle": true,
  44. "dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
  45. },
  46. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  47. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  48. }, {
  49. config: map[string]interface{}{
  50. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  51. },
  52. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  53. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  54. }, {
  55. config: map[string]interface{}{
  56. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  57. },
  58. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  59. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  60. }, {
  61. config: map[string]interface{}{
  62. "dataTemplate": `{"content":{{toJson .}}}`,
  63. },
  64. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  65. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  66. }, {
  67. config: map[string]interface{}{
  68. "sendSingle": true,
  69. "dataTemplate": `{"newab":"{{.ab}}"}`,
  70. },
  71. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  72. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  73. }, {
  74. config: map[string]interface{}{
  75. "sendSingle": true,
  76. "dataTemplate": `{"newab":"{{.ab}}"}`,
  77. },
  78. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  79. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  80. }, {
  81. config: map[string]interface{}{
  82. "sendSingle": true,
  83. "dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
  84. },
  85. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  86. result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
  87. }, {
  88. config: map[string]interface{}{
  89. "dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
  90. },
  91. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  92. result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
  93. }, {
  94. config: map[string]interface{}{
  95. "dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
  96. },
  97. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  98. result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
  99. }, {
  100. config: map[string]interface{}{
  101. "dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
  102. },
  103. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  104. result: [][]byte{[]byte(`{"result":2}`)},
  105. }, {
  106. config: map[string]interface{}{
  107. "dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
  108. "sendSingle": true,
  109. },
  110. data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
  111. result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
  112. },
  113. }
  114. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  115. contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
  116. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  117. for i, tt := range tests {
  118. mockSink := mocknode.NewMockSink()
  119. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  120. s.Open(ctx, make(chan error))
  121. s.input <- tt.data
  122. time.Sleep(1 * time.Second)
  123. results := mockSink.GetResults()
  124. if !reflect.DeepEqual(tt.result, results) {
  125. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  126. }
  127. }
  128. }
  129. func TestOmitEmpty_Apply(t *testing.T) {
  130. conf.InitConf()
  131. var tests = []struct {
  132. config map[string]interface{}
  133. data []map[string]interface{}
  134. result [][]byte
  135. }{
  136. { // 0
  137. config: map[string]interface{}{
  138. "sendSingle": true,
  139. "omitIfEmpty": true,
  140. },
  141. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  142. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{"ab":"hello2"}`)},
  143. }, { // 1
  144. config: map[string]interface{}{
  145. "sendSingle": false,
  146. "omitIfEmpty": true,
  147. },
  148. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  149. result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
  150. }, { // 2
  151. config: map[string]interface{}{
  152. "sendSingle": false,
  153. "omitIfEmpty": false,
  154. },
  155. data: []map[string]interface{}{},
  156. result: [][]byte{[]byte(`[]`)},
  157. }, { // 3
  158. config: map[string]interface{}{
  159. "sendSingle": false,
  160. "omitIfEmpty": false,
  161. },
  162. data: nil,
  163. result: [][]byte{[]byte(`null`)},
  164. }, { // 4
  165. config: map[string]interface{}{
  166. "sendSingle": true,
  167. "omitIfEmpty": false,
  168. },
  169. data: []map[string]interface{}{},
  170. result: nil,
  171. }, { // 5
  172. config: map[string]interface{}{
  173. "sendSingle": false,
  174. "omitIfEmpty": true,
  175. },
  176. data: []map[string]interface{}{},
  177. result: nil,
  178. }, { // 6
  179. config: map[string]interface{}{
  180. "sendSingle": false,
  181. "omitIfEmpty": true,
  182. },
  183. data: nil,
  184. result: nil,
  185. }, { // 7
  186. config: map[string]interface{}{
  187. "sendSingle": true,
  188. "omitIfEmpty": false,
  189. },
  190. data: []map[string]interface{}{},
  191. result: nil,
  192. }, { // 8
  193. config: map[string]interface{}{
  194. "sendSingle": true,
  195. "omitIfEmpty": true,
  196. },
  197. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  198. result: [][]byte{[]byte(`{"ab":"hello1"}`)},
  199. }, { // 9
  200. config: map[string]interface{}{
  201. "sendSingle": true,
  202. "omitIfEmpty": false,
  203. },
  204. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  205. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{}`)},
  206. },
  207. }
  208. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  209. contextLogger := conf.Log.WithField("rule", "TestOmitEmpty_Apply")
  210. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  211. for i, tt := range tests {
  212. mockSink := mocknode.NewMockSink()
  213. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  214. s.Open(ctx, make(chan error))
  215. s.input <- tt.data
  216. time.Sleep(100 * time.Millisecond)
  217. results := mockSink.GetResults()
  218. if !reflect.DeepEqual(tt.result, results) {
  219. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  220. }
  221. }
  222. }
  223. func TestFormat_Apply(t *testing.T) {
  224. conf.InitConf()
  225. etcDir, err := conf.GetConfLoc()
  226. if err != nil {
  227. t.Fatal(err)
  228. }
  229. etcDir = filepath.Join(etcDir, "schemas", "protobuf")
  230. err = os.MkdirAll(etcDir, os.ModePerm)
  231. if err != nil {
  232. t.Fatal(err)
  233. }
  234. //Copy init.proto
  235. bytesRead, err := ioutil.ReadFile("../../schema/test/test1.proto")
  236. if err != nil {
  237. t.Fatal(err)
  238. }
  239. err = ioutil.WriteFile(filepath.Join(etcDir, "test1.proto"), bytesRead, 0755)
  240. if err != nil {
  241. t.Fatal(err)
  242. }
  243. defer func() {
  244. err = os.RemoveAll(etcDir)
  245. if err != nil {
  246. t.Fatal(err)
  247. }
  248. }()
  249. schema.InitRegistry()
  250. transform.RegisterAdditionalFuncs()
  251. var tests = []struct {
  252. config map[string]interface{}
  253. data []map[string]interface{}
  254. result [][]byte
  255. }{
  256. {
  257. config: map[string]interface{}{
  258. "sendSingle": true,
  259. "format": `protobuf`,
  260. "schemaId": "test1.Person",
  261. },
  262. data: []map[string]interface{}{{
  263. "name": "test",
  264. "id": 1,
  265. "email": "Dddd",
  266. }},
  267. result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
  268. }, {
  269. config: map[string]interface{}{
  270. "sendSingle": true,
  271. "dataTemplate": `{"name":"test","email":"{{.ab}}","id":1}`,
  272. "format": `protobuf`,
  273. "schemaId": "test1.Person",
  274. },
  275. data: []map[string]interface{}{{"ab": "Dddd"}},
  276. result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
  277. },
  278. }
  279. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  280. contextLogger := conf.Log.WithField("rule", "TestSinkFormat_Apply")
  281. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  282. for i, tt := range tests {
  283. mockSink := mocknode.NewMockSink()
  284. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  285. s.Open(ctx, make(chan error))
  286. s.input <- tt.data
  287. time.Sleep(1 * time.Second)
  288. results := mockSink.GetResults()
  289. if !reflect.DeepEqual(tt.result, results) {
  290. t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
  291. }
  292. }
  293. }
  294. func TestConfig(t *testing.T) {
  295. var tests = []struct {
  296. config map[string]interface{}
  297. sconf *SinkConf
  298. err error
  299. }{
  300. {
  301. config: map[string]interface{}{
  302. "sendSingle": true,
  303. },
  304. sconf: &SinkConf{
  305. Concurrency: 1,
  306. SendSingle: true,
  307. Format: "json",
  308. BufferLength: 1024,
  309. SinkConf: conf.SinkConf{
  310. MemoryCacheThreshold: 1024,
  311. MaxDiskCache: 1024000,
  312. BufferPageSize: 256,
  313. EnableCache: false,
  314. ResendInterval: 0,
  315. CleanCacheAtStop: false,
  316. },
  317. },
  318. }, {
  319. config: map[string]interface{}{
  320. "enableCache": true,
  321. "memoryCacheThreshold": 2,
  322. "bufferPageSize": 2,
  323. "sendSingle": true,
  324. "maxDiskCache": 6,
  325. "resendInterval": 10,
  326. },
  327. sconf: &SinkConf{
  328. Concurrency: 1,
  329. SendSingle: true,
  330. Format: "json",
  331. BufferLength: 1024,
  332. SinkConf: conf.SinkConf{
  333. MemoryCacheThreshold: 2,
  334. MaxDiskCache: 6,
  335. BufferPageSize: 2,
  336. EnableCache: true,
  337. ResendInterval: 10,
  338. CleanCacheAtStop: false,
  339. },
  340. },
  341. }, {
  342. config: map[string]interface{}{
  343. "enableCache": true,
  344. "memoryCacheThreshold": 2,
  345. "bufferPageSize": 2,
  346. "runAsync": true,
  347. "maxDiskCache": 6,
  348. "resendInterval": 10,
  349. },
  350. err: errors.New("cache is not supported for async sink, do not use enableCache and runAsync properties together"),
  351. }, {
  352. config: map[string]interface{}{
  353. "enableCache": true,
  354. "memoryCacheThreshold": 256,
  355. "bufferLength": 10,
  356. "maxDiskCache": 6,
  357. "resendInterval": 10,
  358. },
  359. err: errors.New("invalid cache properties: \nmaxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"),
  360. }, {
  361. config: map[string]interface{}{
  362. "enableCache": true,
  363. "memoryCacheThreshold": 7,
  364. "bufferPageSize": 3,
  365. "sendSingle": true,
  366. "maxDiskCache": 21,
  367. "resendInterval": 10,
  368. },
  369. err: errors.New("invalid cache properties: \nmemoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"),
  370. }, {
  371. config: map[string]interface{}{
  372. "enableCache": true,
  373. "memoryCacheThreshold": 9,
  374. "bufferPageSize": 3,
  375. "sendSingle": true,
  376. "maxDiskCache": 22,
  377. "resendInterval": 10,
  378. },
  379. err: errors.New("invalid cache properties: \nmaxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"),
  380. },
  381. }
  382. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  383. contextLogger := conf.Log.WithField("rule", "TestConfig")
  384. conf.InitConf()
  385. for i, tt := range tests {
  386. mockSink := NewSinkNode(fmt.Sprintf("test_%d", i), "mockSink", tt.config)
  387. sconf, err := mockSink.parseConf(contextLogger)
  388. if !reflect.DeepEqual(tt.err, err) {
  389. t.Errorf("%d \terror mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.err, err)
  390. } else if !reflect.DeepEqual(tt.sconf, sconf) {
  391. t.Errorf("%d \tresult mismatch:\n\nexp=%v\n\ngot=%v\n\n", i, tt.sconf, sconf)
  392. }
  393. }
  394. }