sink_node_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build template || !core
  15. package node
  16. import (
  17. "errors"
  18. "fmt"
  19. "os"
  20. "path/filepath"
  21. "reflect"
  22. "testing"
  23. "time"
  24. "github.com/benbjohnson/clock"
  25. "github.com/stretchr/testify/assert"
  26. "github.com/lf-edge/ekuiper/internal/conf"
  27. "github.com/lf-edge/ekuiper/internal/schema"
  28. "github.com/lf-edge/ekuiper/internal/testx"
  29. "github.com/lf-edge/ekuiper/internal/topo/context"
  30. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  31. "github.com/lf-edge/ekuiper/internal/topo/transform"
  32. "github.com/lf-edge/ekuiper/internal/xsql"
  33. )
  34. func init() {
  35. testx.InitEnv()
  36. }
  37. func TestBatchSink(t *testing.T) {
  38. mc := conf.Clock.(*clock.Mock)
  39. conf.InitConf()
  40. transform.RegisterAdditionalFuncs()
  41. tests := []struct {
  42. config map[string]interface{}
  43. data []map[string]interface{}
  44. result [][]byte
  45. }{
  46. {
  47. config: map[string]interface{}{
  48. "batchSize": 2,
  49. },
  50. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}},
  51. result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
  52. },
  53. {
  54. config: map[string]interface{}{
  55. "lingerInterval": 1000,
  56. },
  57. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}},
  58. result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"},{"ab":"hello3"}]`)},
  59. },
  60. }
  61. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  62. contextLogger := conf.Log.WithField("rule", "TestBatchSink")
  63. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  64. for i, tt := range tests {
  65. mc.Set(mc.Now())
  66. mockSink := mocknode.NewMockSink()
  67. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  68. s.Open(ctx, make(chan error))
  69. s.input <- tt.data
  70. for i := 0; i < 10; i++ {
  71. mc.Add(1 * time.Second)
  72. time.Sleep(10 * time.Millisecond)
  73. // wait until mockSink get results
  74. if len(mockSink.GetResults()) > 0 {
  75. break
  76. }
  77. }
  78. results := mockSink.GetResults()
  79. if !reflect.DeepEqual(tt.result, results) {
  80. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  81. }
  82. }
  83. }
  84. func TestSinkTemplate_Apply(t *testing.T) {
  85. conf.InitConf()
  86. transform.RegisterAdditionalFuncs()
  87. tests := []struct {
  88. config map[string]interface{}
  89. data []map[string]interface{}
  90. result [][]byte
  91. }{
  92. {
  93. config: map[string]interface{}{
  94. "sendSingle": true,
  95. "dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
  96. },
  97. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  98. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  99. }, {
  100. config: map[string]interface{}{
  101. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  102. },
  103. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  104. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  105. }, {
  106. config: map[string]interface{}{
  107. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  108. },
  109. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  110. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  111. }, {
  112. config: map[string]interface{}{
  113. "dataTemplate": `{"content":{{toJson .}}}`,
  114. },
  115. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  116. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  117. }, {
  118. config: map[string]interface{}{
  119. "sendSingle": true,
  120. "dataTemplate": `{"newab":"{{.ab}}"}`,
  121. },
  122. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  123. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  124. }, {
  125. config: map[string]interface{}{
  126. "sendSingle": true,
  127. "dataTemplate": `{"newab":"{{.ab}}"}`,
  128. },
  129. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  130. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  131. }, {
  132. config: map[string]interface{}{
  133. "sendSingle": true,
  134. "dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
  135. },
  136. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  137. result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
  138. }, {
  139. config: map[string]interface{}{
  140. "dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
  141. },
  142. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  143. result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
  144. }, {
  145. config: map[string]interface{}{
  146. "dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
  147. },
  148. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  149. result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
  150. }, {
  151. config: map[string]interface{}{
  152. "dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
  153. },
  154. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  155. result: [][]byte{[]byte(`{"result":2}`)},
  156. }, {
  157. config: map[string]interface{}{
  158. "dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
  159. "sendSingle": true,
  160. },
  161. data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
  162. result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
  163. },
  164. }
  165. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  166. contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
  167. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  168. for i, tt := range tests {
  169. mockSink := mocknode.NewMockSink()
  170. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  171. s.Open(ctx, make(chan error))
  172. s.input <- tt.data
  173. time.Sleep(1 * time.Second)
  174. results := mockSink.GetResults()
  175. if !reflect.DeepEqual(tt.result, results) {
  176. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  177. }
  178. }
  179. }
  180. func TestOmitEmpty_Apply(t *testing.T) {
  181. conf.InitConf()
  182. tests := []struct {
  183. config map[string]interface{}
  184. data []map[string]interface{}
  185. result [][]byte
  186. }{
  187. { // 0
  188. config: map[string]interface{}{
  189. "sendSingle": true,
  190. "omitIfEmpty": true,
  191. },
  192. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  193. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{"ab":"hello2"}`)},
  194. }, { // 1
  195. config: map[string]interface{}{
  196. "sendSingle": false,
  197. "omitIfEmpty": true,
  198. },
  199. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  200. result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
  201. }, { // 2
  202. config: map[string]interface{}{
  203. "sendSingle": false,
  204. "omitIfEmpty": false,
  205. },
  206. data: []map[string]interface{}{},
  207. result: [][]byte{[]byte(`[]`)},
  208. }, { // 3
  209. config: map[string]interface{}{
  210. "sendSingle": false,
  211. "omitIfEmpty": false,
  212. },
  213. data: nil,
  214. result: [][]byte{[]byte(`null`)},
  215. }, { // 4
  216. config: map[string]interface{}{
  217. "sendSingle": true,
  218. "omitIfEmpty": false,
  219. },
  220. data: []map[string]interface{}{},
  221. result: nil,
  222. }, { // 5
  223. config: map[string]interface{}{
  224. "sendSingle": false,
  225. "omitIfEmpty": true,
  226. },
  227. data: []map[string]interface{}{},
  228. result: nil,
  229. }, { // 6
  230. config: map[string]interface{}{
  231. "sendSingle": false,
  232. "omitIfEmpty": true,
  233. },
  234. data: nil,
  235. result: nil,
  236. }, { // 7
  237. config: map[string]interface{}{
  238. "sendSingle": true,
  239. "omitIfEmpty": false,
  240. },
  241. data: []map[string]interface{}{},
  242. result: nil,
  243. }, { // 8
  244. config: map[string]interface{}{
  245. "sendSingle": true,
  246. "omitIfEmpty": true,
  247. },
  248. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  249. result: [][]byte{[]byte(`{"ab":"hello1"}`)},
  250. }, { // 9
  251. config: map[string]interface{}{
  252. "sendSingle": true,
  253. "omitIfEmpty": false,
  254. },
  255. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  256. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{}`)},
  257. },
  258. }
  259. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  260. contextLogger := conf.Log.WithField("rule", "TestOmitEmpty_Apply")
  261. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  262. for i, tt := range tests {
  263. mockSink := mocknode.NewMockSink()
  264. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  265. s.Open(ctx, make(chan error))
  266. s.input <- tt.data
  267. time.Sleep(100 * time.Millisecond)
  268. results := mockSink.GetResults()
  269. if !reflect.DeepEqual(tt.result, results) {
  270. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  271. }
  272. }
  273. }
  274. func TestFormat_Apply(t *testing.T) {
  275. conf.InitConf()
  276. etcDir, err := conf.GetDataLoc()
  277. if err != nil {
  278. t.Fatal(err)
  279. }
  280. etcDir = filepath.Join(etcDir, "schemas", "protobuf")
  281. err = os.MkdirAll(etcDir, os.ModePerm)
  282. if err != nil {
  283. t.Fatal(err)
  284. }
  285. // Copy init.proto
  286. bytesRead, err := os.ReadFile("../../schema/test/test1.proto")
  287. if err != nil {
  288. t.Fatal(err)
  289. }
  290. err = os.WriteFile(filepath.Join(etcDir, "test1.proto"), bytesRead, 0o755)
  291. if err != nil {
  292. t.Fatal(err)
  293. }
  294. defer func() {
  295. err = os.RemoveAll(etcDir)
  296. if err != nil {
  297. t.Fatal(err)
  298. }
  299. }()
  300. err = schema.InitRegistry()
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. transform.RegisterAdditionalFuncs()
  305. tests := []struct {
  306. name string
  307. config map[string]interface{}
  308. data []map[string]interface{}
  309. result [][]byte
  310. }{
  311. {
  312. name: "test normal protobuf format",
  313. config: map[string]interface{}{
  314. "sendSingle": true,
  315. "format": `protobuf`,
  316. "schemaId": "test1.Person",
  317. },
  318. data: []map[string]interface{}{{
  319. "name": "test",
  320. "id": 1,
  321. "email": "Dddd",
  322. }},
  323. result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
  324. }, {
  325. name: "test dateTemplate + protobuf format",
  326. config: map[string]interface{}{
  327. "sendSingle": true,
  328. "dataTemplate": `{"name":"test","email":"{{.ab}}","id":1}`,
  329. "format": `protobuf`,
  330. "schemaId": "test1.Person",
  331. },
  332. data: []map[string]interface{}{{"ab": "Dddd"}},
  333. result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
  334. },
  335. }
  336. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  337. contextLogger := conf.Log.WithField("rule", "TestSinkFormat_Apply")
  338. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  339. for i, tt := range tests {
  340. t.Run(tt.name, func(t *testing.T) {
  341. mockSink := mocknode.NewMockSink()
  342. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  343. s.Open(ctx, make(chan error))
  344. s.input <- tt.data
  345. var results [][]byte
  346. time.Sleep(100 * time.Millisecond)
  347. results = mockSink.GetResults()
  348. if !reflect.DeepEqual(tt.result, results) {
  349. t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
  350. }
  351. })
  352. }
  353. }
  354. func TestConfig(t *testing.T) {
  355. tests := []struct {
  356. config map[string]interface{}
  357. sconf *SinkConf
  358. err error
  359. }{
  360. {
  361. config: map[string]interface{}{
  362. "sendSingle": true,
  363. },
  364. sconf: &SinkConf{
  365. Concurrency: 1,
  366. SendSingle: true,
  367. Format: "json",
  368. BufferLength: 1024,
  369. SinkConf: conf.SinkConf{
  370. MemoryCacheThreshold: 1024,
  371. MaxDiskCache: 1024000,
  372. BufferPageSize: 256,
  373. EnableCache: false,
  374. ResendInterval: 0,
  375. CleanCacheAtStop: false,
  376. },
  377. },
  378. }, {
  379. config: map[string]interface{}{
  380. "enableCache": true,
  381. "memoryCacheThreshold": 2,
  382. "bufferPageSize": 2,
  383. "sendSingle": true,
  384. "maxDiskCache": 6,
  385. "resendInterval": 10,
  386. },
  387. sconf: &SinkConf{
  388. Concurrency: 1,
  389. SendSingle: true,
  390. Format: "json",
  391. BufferLength: 1024,
  392. SinkConf: conf.SinkConf{
  393. MemoryCacheThreshold: 2,
  394. MaxDiskCache: 6,
  395. BufferPageSize: 2,
  396. EnableCache: true,
  397. ResendInterval: 10,
  398. CleanCacheAtStop: false,
  399. },
  400. },
  401. }, {
  402. config: map[string]interface{}{
  403. "enableCache": true,
  404. "memoryCacheThreshold": 256,
  405. "bufferLength": 10,
  406. "maxDiskCache": 6,
  407. "resendInterval": 10,
  408. },
  409. err: errors.New("invalid cache properties: maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"),
  410. }, {
  411. config: map[string]interface{}{
  412. "enableCache": true,
  413. "memoryCacheThreshold": 7,
  414. "bufferPageSize": 3,
  415. "sendSingle": true,
  416. "maxDiskCache": 21,
  417. "resendInterval": 10,
  418. },
  419. err: errors.New("invalid cache properties: memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"),
  420. }, {
  421. config: map[string]interface{}{
  422. "enableCache": true,
  423. "memoryCacheThreshold": 9,
  424. "bufferPageSize": 3,
  425. "sendSingle": true,
  426. "maxDiskCache": 22,
  427. "resendInterval": 10,
  428. },
  429. err: errors.New("invalid cache properties: maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"),
  430. },
  431. }
  432. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  433. contextLogger := conf.Log.WithField("rule", "TestConfig")
  434. conf.InitConf()
  435. for i, tt := range tests {
  436. mockSink := NewSinkNode(fmt.Sprintf("test_%d", i), "mockSink", tt.config)
  437. sconf, err := mockSink.parseConf(contextLogger)
  438. if !reflect.DeepEqual(tt.err, err) {
  439. t.Errorf("%d \terror mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.err, err)
  440. } else if !reflect.DeepEqual(tt.sconf, sconf) {
  441. t.Errorf("%d \tresult mismatch:\n\nexp=%v\n\ngot=%v\n\n", i, tt.sconf, sconf)
  442. }
  443. }
  444. }
  445. func TestSinkNode_reset(t *testing.T) {
  446. mockSink := mocknode.NewMockSink()
  447. s := NewSinkNodeWithSink("mockSink", mockSink, nil)
  448. s.reset()
  449. if s.statManagers != nil {
  450. t.Errorf("reset() failed")
  451. }
  452. }
  453. func Test_getSink(t *testing.T) {
  454. _, err := getSink("mock", map[string]interface{}{"sendSingle": true, "omitIfEmpty": true})
  455. if err == nil {
  456. t.Errorf("getSink() failed")
  457. }
  458. }
  459. func Test_itemToMap(t *testing.T) {
  460. type args struct {
  461. item interface{}
  462. }
  463. tests := []struct {
  464. name string
  465. args args
  466. want []map[string]interface{}
  467. }{
  468. {
  469. name: "test1",
  470. args: args{
  471. item: errors.New("test"),
  472. },
  473. want: []map[string]interface{}{
  474. {"error": "test"},
  475. },
  476. },
  477. {
  478. name: "test2",
  479. args: args{
  480. item: "test2",
  481. },
  482. want: []map[string]interface{}{
  483. {"error": fmt.Sprintf("result is not a map slice but found %#v", "test2")},
  484. },
  485. },
  486. {
  487. name: "test3",
  488. args: args{
  489. item: xsql.Row(&xsql.Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil}),
  490. },
  491. want: []map[string]interface{}{
  492. {"a": 1, "b": "2"},
  493. },
  494. },
  495. {
  496. name: "test4",
  497. args: args{
  498. item: xsql.Collection(&xsql.WindowTuples{Content: []xsql.TupleRow{
  499. &xsql.Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
  500. }}),
  501. },
  502. want: []map[string]interface{}{
  503. {"a": 1, "b": "2"},
  504. },
  505. },
  506. }
  507. for _, tt := range tests {
  508. t.Run(tt.name, func(t *testing.T) {
  509. if got := itemToMap(tt.args.item); !reflect.DeepEqual(got, tt.want) {
  510. t.Errorf("itemToMap() = %v, want %v", got, tt.want)
  511. }
  512. })
  513. }
  514. }
  515. func TestSinkFields_Apply(t *testing.T) {
  516. conf.InitConf()
  517. transform.RegisterAdditionalFuncs()
  518. tests := []struct {
  519. dt string
  520. format string
  521. schemaId string
  522. delimiter string
  523. dataField string
  524. fields []string
  525. data interface{}
  526. result [][]byte
  527. }{
  528. {
  529. format: "json",
  530. fields: []string{"a", "b"},
  531. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  532. result: [][]byte{[]byte(`{"a":"1","b":"2"}`)},
  533. },
  534. {
  535. format: "json",
  536. fields: []string{"a", "b"},
  537. data: []map[string]interface{}{{"a": "1", "b": "2", "c": "3"}},
  538. result: [][]byte{[]byte(`[{"a":"1","b":"2"}]`)},
  539. },
  540. {
  541. format: "delimited",
  542. delimiter: ",",
  543. fields: []string{"a", "b"},
  544. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  545. result: [][]byte{[]byte(`1,2`)},
  546. },
  547. {
  548. format: "delimited",
  549. delimiter: ",",
  550. fields: []string{"b", "c", "a"},
  551. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  552. result: [][]byte{[]byte(`2,3,1`)},
  553. },
  554. {
  555. format: "json",
  556. schemaId: "",
  557. fields: []string{"ax", "bx"},
  558. dt: `{"ax": {{.a}}, "bx": {{.b}}}`,
  559. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  560. result: [][]byte{[]byte(`{"ax":1,"bx":2}`)},
  561. },
  562. {
  563. format: "json",
  564. schemaId: "",
  565. fields: []string{"a", "b"},
  566. dt: `{"ax": {{.a}}, "bx": {{.b}}}`,
  567. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  568. result: [][]byte{[]byte(`{"a":null,"b":null}`)},
  569. },
  570. {
  571. format: "json",
  572. dataField: "device",
  573. fields: []string{"a", "b"},
  574. data: map[string]interface{}{"device": map[string]interface{}{"a": "1", "b": "2", "c": "3"}, "a": 11, "b": 22, "c": 33},
  575. result: [][]byte{[]byte(`{"a":"1","b":"2"}`)},
  576. },
  577. {
  578. format: "delimited",
  579. delimiter: ",",
  580. fields: []string{"a", "b"},
  581. dataField: "device",
  582. data: map[string]interface{}{"device": map[string]interface{}{"a": "1", "b": "2", "c": "3"}, "a": 11, "b": 22, "c": 33},
  583. result: [][]byte{[]byte(`1,2`)},
  584. },
  585. {
  586. format: "json",
  587. schemaId: "",
  588. fields: []string{"a", "b"},
  589. dt: `{"device": {"a": {{.a}}}}`,
  590. dataField: "device",
  591. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  592. result: [][]byte{[]byte(`{"a":1,"b":null}`)},
  593. },
  594. }
  595. contextLogger := conf.Log.WithField("rule", "TestSinkFields_Apply")
  596. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  597. for i, tt := range tests {
  598. tf, _ := transform.GenTransform(tt.dt, tt.format, tt.schemaId, tt.delimiter, tt.dataField, tt.fields)
  599. vCtx := context.WithValue(ctx, context.TransKey, tf)
  600. mockSink := mocknode.NewMockSink()
  601. _ = mockSink.Collect(vCtx, tt.data)
  602. time.Sleep(1 * time.Second)
  603. results := mockSink.GetResults()
  604. if !reflect.DeepEqual(tt.result, results) {
  605. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  606. }
  607. }
  608. }
  609. func TestSinkCache(t *testing.T) {
  610. conf.InitConf()
  611. transform.RegisterAdditionalFuncs()
  612. contextLogger := conf.Log.WithField("rule", "TestSinkCache")
  613. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  614. tests := []struct {
  615. name string
  616. config map[string]interface{}
  617. result [][]byte
  618. resendResult [][]byte
  619. }{
  620. {
  621. name: "test cache",
  622. config: map[string]interface{}{
  623. "enableCache": true,
  624. },
  625. result: [][]byte{
  626. []byte(`[{"a":1}]`),
  627. []byte(`[{"a":2}]`),
  628. []byte(`[{"a":3}]`),
  629. []byte(`[{"a":4}]`),
  630. []byte(`[{"a":5}]`),
  631. []byte(`[{"a":6}]`),
  632. []byte(`[{"a":7}]`),
  633. []byte(`[{"a":8}]`),
  634. []byte(`[{"a":9}]`),
  635. []byte(`[{"a":10}]`),
  636. },
  637. },
  638. {
  639. name: "test resend",
  640. config: map[string]interface{}{
  641. "enableCache": true,
  642. "resendAlterQueue": true,
  643. },
  644. result: [][]byte{
  645. []byte(`[{"a":2}]`),
  646. []byte(`[{"a":4}]`),
  647. []byte(`[{"a":6}]`),
  648. []byte(`[{"a":8}]`),
  649. []byte(`[{"a":10}]`),
  650. },
  651. resendResult: [][]byte{
  652. []byte(`[{"a":1}]`),
  653. []byte(`[{"a":3}]`),
  654. []byte(`[{"a":5}]`),
  655. },
  656. },
  657. {
  658. name: "test resend priority low",
  659. config: map[string]interface{}{
  660. "enableCache": true,
  661. "resendAlterQueue": true,
  662. "resendPriority": -1,
  663. "resendIndicatorField": "isResend",
  664. },
  665. result: [][]byte{
  666. []byte(`[{"a":2}]`),
  667. []byte(`[{"a":4}]`),
  668. []byte(`[{"a":6}]`),
  669. []byte(`[{"a":8}]`),
  670. []byte(`[{"a":10}]`),
  671. },
  672. resendResult: [][]byte{
  673. []byte(`[{"a":1,"isResend":true}]`),
  674. []byte(`[{"a":3,"isResend":true}]`),
  675. []byte(`[{"a":5,"isResend":true}]`),
  676. },
  677. },
  678. {
  679. name: "test resend priority high",
  680. config: map[string]interface{}{
  681. "enableCache": true,
  682. "resendAlterQueue": true,
  683. "resendPriority": 1,
  684. "batchSize": 1,
  685. },
  686. result: [][]byte{
  687. []byte(`[{"a":2}]`),
  688. []byte(`[{"a":4}]`),
  689. []byte(`[{"a":6}]`),
  690. []byte(`[{"a":8}]`),
  691. []byte(`[{"a":10}]`),
  692. },
  693. resendResult: [][]byte{
  694. []byte(`[{"a":1}]`),
  695. []byte(`[{"a":3}]`),
  696. []byte(`[{"a":5}]`),
  697. },
  698. },
  699. }
  700. for _, tt := range tests {
  701. t.Run(tt.name, func(t *testing.T) {
  702. data := [][]map[string]interface{}{
  703. {{"a": 1}},
  704. {{"a": 2}},
  705. {{"a": 3}},
  706. {{"a": 4}},
  707. {{"a": 5}},
  708. {{"a": 6}},
  709. {{"a": 7}},
  710. {{"a": 8}},
  711. {{"a": 9}},
  712. {{"a": 10}},
  713. }
  714. hitch := make(chan int, 10)
  715. mockSink := mocknode.NewMockResendSink(hitch)
  716. fmt.Printf("mockSink: %+v\n", tt.config)
  717. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  718. s.Open(ctx, make(chan error))
  719. for i := 0; i < 20; i++ {
  720. s.input <- data[i%len(data)]
  721. select {
  722. case <-hitch:
  723. done := true
  724. results := mockSink.GetResults()
  725. if len(results) != len(tt.result) {
  726. done = false
  727. }
  728. if done && tt.resendResult != nil {
  729. resentResults := mockSink.GetResendResults()
  730. if len(resentResults) != len(tt.resendResult) {
  731. done = false
  732. }
  733. }
  734. if done {
  735. goto end
  736. }
  737. case <-time.After(1 * time.Second):
  738. }
  739. }
  740. end:
  741. results := mockSink.GetResults()
  742. minLen := len(results)
  743. if len(tt.result) < minLen {
  744. minLen = len(tt.result)
  745. }
  746. assert.Equal(t, results[:minLen], tt.result[:minLen])
  747. if tt.resendResult != nil {
  748. resentResults := mockSink.GetResendResults()
  749. minLen = len(resentResults)
  750. if len(tt.resendResult) < minLen {
  751. minLen = len(tt.resendResult)
  752. }
  753. assert.Equal(t, resentResults[:minLen], tt.resendResult[:minLen])
  754. }
  755. })
  756. }
  757. }