sink_node_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build template || !core
  15. // +build template !core
  16. package node
  17. import (
  18. "errors"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/testx"
  21. "os"
  22. "path/filepath"
  23. "reflect"
  24. "testing"
  25. "time"
  26. "github.com/lf-edge/ekuiper/internal/conf"
  27. "github.com/lf-edge/ekuiper/internal/schema"
  28. "github.com/lf-edge/ekuiper/internal/topo/context"
  29. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  30. "github.com/lf-edge/ekuiper/internal/topo/transform"
  31. "github.com/lf-edge/ekuiper/internal/xsql"
  32. )
  33. func init() {
  34. testx.InitEnv()
  35. }
  36. func TestSinkTemplate_Apply(t *testing.T) {
  37. conf.InitConf()
  38. transform.RegisterAdditionalFuncs()
  39. var tests = []struct {
  40. config map[string]interface{}
  41. data []map[string]interface{}
  42. result [][]byte
  43. }{
  44. {
  45. config: map[string]interface{}{
  46. "sendSingle": true,
  47. "dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
  48. },
  49. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  50. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  51. }, {
  52. config: map[string]interface{}{
  53. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  54. },
  55. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  56. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  57. }, {
  58. config: map[string]interface{}{
  59. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  60. },
  61. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  62. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  63. }, {
  64. config: map[string]interface{}{
  65. "dataTemplate": `{"content":{{toJson .}}}`,
  66. },
  67. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  68. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  69. }, {
  70. config: map[string]interface{}{
  71. "sendSingle": true,
  72. "dataTemplate": `{"newab":"{{.ab}}"}`,
  73. },
  74. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  75. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  76. }, {
  77. config: map[string]interface{}{
  78. "sendSingle": true,
  79. "dataTemplate": `{"newab":"{{.ab}}"}`,
  80. },
  81. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  82. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  83. }, {
  84. config: map[string]interface{}{
  85. "sendSingle": true,
  86. "dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
  87. },
  88. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  89. result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
  90. }, {
  91. config: map[string]interface{}{
  92. "dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
  93. },
  94. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  95. result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
  96. }, {
  97. config: map[string]interface{}{
  98. "dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
  99. },
  100. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  101. result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
  102. }, {
  103. config: map[string]interface{}{
  104. "dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
  105. },
  106. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  107. result: [][]byte{[]byte(`{"result":2}`)},
  108. }, {
  109. config: map[string]interface{}{
  110. "dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
  111. "sendSingle": true,
  112. },
  113. data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
  114. result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
  115. },
  116. }
  117. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  118. contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
  119. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  120. for i, tt := range tests {
  121. mockSink := mocknode.NewMockSink()
  122. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  123. s.Open(ctx, make(chan error))
  124. s.input <- tt.data
  125. time.Sleep(1 * time.Second)
  126. results := mockSink.GetResults()
  127. if !reflect.DeepEqual(tt.result, results) {
  128. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  129. }
  130. }
  131. }
  132. func TestOmitEmpty_Apply(t *testing.T) {
  133. conf.InitConf()
  134. var tests = []struct {
  135. config map[string]interface{}
  136. data []map[string]interface{}
  137. result [][]byte
  138. }{
  139. { // 0
  140. config: map[string]interface{}{
  141. "sendSingle": true,
  142. "omitIfEmpty": true,
  143. },
  144. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  145. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{"ab":"hello2"}`)},
  146. }, { // 1
  147. config: map[string]interface{}{
  148. "sendSingle": false,
  149. "omitIfEmpty": true,
  150. },
  151. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  152. result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
  153. }, { // 2
  154. config: map[string]interface{}{
  155. "sendSingle": false,
  156. "omitIfEmpty": false,
  157. },
  158. data: []map[string]interface{}{},
  159. result: [][]byte{[]byte(`[]`)},
  160. }, { // 3
  161. config: map[string]interface{}{
  162. "sendSingle": false,
  163. "omitIfEmpty": false,
  164. },
  165. data: nil,
  166. result: [][]byte{[]byte(`null`)},
  167. }, { // 4
  168. config: map[string]interface{}{
  169. "sendSingle": true,
  170. "omitIfEmpty": false,
  171. },
  172. data: []map[string]interface{}{},
  173. result: nil,
  174. }, { // 5
  175. config: map[string]interface{}{
  176. "sendSingle": false,
  177. "omitIfEmpty": true,
  178. },
  179. data: []map[string]interface{}{},
  180. result: nil,
  181. }, { // 6
  182. config: map[string]interface{}{
  183. "sendSingle": false,
  184. "omitIfEmpty": true,
  185. },
  186. data: nil,
  187. result: nil,
  188. }, { // 7
  189. config: map[string]interface{}{
  190. "sendSingle": true,
  191. "omitIfEmpty": false,
  192. },
  193. data: []map[string]interface{}{},
  194. result: nil,
  195. }, { // 8
  196. config: map[string]interface{}{
  197. "sendSingle": true,
  198. "omitIfEmpty": true,
  199. },
  200. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  201. result: [][]byte{[]byte(`{"ab":"hello1"}`)},
  202. }, { // 9
  203. config: map[string]interface{}{
  204. "sendSingle": true,
  205. "omitIfEmpty": false,
  206. },
  207. data: []map[string]interface{}{{"ab": "hello1"}, {}},
  208. result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{}`)},
  209. },
  210. }
  211. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  212. contextLogger := conf.Log.WithField("rule", "TestOmitEmpty_Apply")
  213. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  214. for i, tt := range tests {
  215. mockSink := mocknode.NewMockSink()
  216. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  217. s.Open(ctx, make(chan error))
  218. s.input <- tt.data
  219. time.Sleep(100 * time.Millisecond)
  220. results := mockSink.GetResults()
  221. if !reflect.DeepEqual(tt.result, results) {
  222. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  223. }
  224. }
  225. }
  226. func TestFormat_Apply(t *testing.T) {
  227. conf.InitConf()
  228. etcDir, err := conf.GetDataLoc()
  229. if err != nil {
  230. t.Fatal(err)
  231. }
  232. etcDir = filepath.Join(etcDir, "schemas", "protobuf")
  233. err = os.MkdirAll(etcDir, os.ModePerm)
  234. if err != nil {
  235. t.Fatal(err)
  236. }
  237. //Copy init.proto
  238. bytesRead, err := os.ReadFile("../../schema/test/test1.proto")
  239. if err != nil {
  240. t.Fatal(err)
  241. }
  242. err = os.WriteFile(filepath.Join(etcDir, "test1.proto"), bytesRead, 0755)
  243. if err != nil {
  244. t.Fatal(err)
  245. }
  246. defer func() {
  247. err = os.RemoveAll(etcDir)
  248. if err != nil {
  249. t.Fatal(err)
  250. }
  251. }()
  252. err = schema.InitRegistry()
  253. if err != nil {
  254. t.Fatal(err)
  255. }
  256. transform.RegisterAdditionalFuncs()
  257. var tests = []struct {
  258. name string
  259. config map[string]interface{}
  260. data []map[string]interface{}
  261. result [][]byte
  262. }{
  263. {
  264. name: "test normal protobuf format",
  265. config: map[string]interface{}{
  266. "sendSingle": true,
  267. "format": `protobuf`,
  268. "schemaId": "test1.Person",
  269. },
  270. data: []map[string]interface{}{{
  271. "name": "test",
  272. "id": 1,
  273. "email": "Dddd",
  274. }},
  275. result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
  276. }, {
  277. name: "test dateTemplate + protobuf format",
  278. config: map[string]interface{}{
  279. "sendSingle": true,
  280. "dataTemplate": `{"name":"test","email":"{{.ab}}","id":1}`,
  281. "format": `protobuf`,
  282. "schemaId": "test1.Person",
  283. },
  284. data: []map[string]interface{}{{"ab": "Dddd"}},
  285. result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
  286. },
  287. }
  288. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  289. contextLogger := conf.Log.WithField("rule", "TestSinkFormat_Apply")
  290. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  291. for i, tt := range tests {
  292. t.Run(tt.name, func(t *testing.T) {
  293. mockSink := mocknode.NewMockSink()
  294. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  295. s.Open(ctx, make(chan error))
  296. s.input <- tt.data
  297. var results [][]byte
  298. time.Sleep(100 * time.Millisecond)
  299. results = mockSink.GetResults()
  300. if !reflect.DeepEqual(tt.result, results) {
  301. t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
  302. }
  303. })
  304. }
  305. }
  306. func TestConfig(t *testing.T) {
  307. var tests = []struct {
  308. config map[string]interface{}
  309. sconf *SinkConf
  310. err error
  311. }{
  312. {
  313. config: map[string]interface{}{
  314. "sendSingle": true,
  315. },
  316. sconf: &SinkConf{
  317. Concurrency: 1,
  318. SendSingle: true,
  319. Format: "json",
  320. BufferLength: 1024,
  321. SinkConf: conf.SinkConf{
  322. MemoryCacheThreshold: 1024,
  323. MaxDiskCache: 1024000,
  324. BufferPageSize: 256,
  325. EnableCache: false,
  326. ResendInterval: 0,
  327. CleanCacheAtStop: false,
  328. },
  329. },
  330. }, {
  331. config: map[string]interface{}{
  332. "enableCache": true,
  333. "memoryCacheThreshold": 2,
  334. "bufferPageSize": 2,
  335. "sendSingle": true,
  336. "maxDiskCache": 6,
  337. "resendInterval": 10,
  338. },
  339. sconf: &SinkConf{
  340. Concurrency: 1,
  341. SendSingle: true,
  342. Format: "json",
  343. BufferLength: 1024,
  344. SinkConf: conf.SinkConf{
  345. MemoryCacheThreshold: 2,
  346. MaxDiskCache: 6,
  347. BufferPageSize: 2,
  348. EnableCache: true,
  349. ResendInterval: 10,
  350. CleanCacheAtStop: false,
  351. },
  352. },
  353. }, {
  354. config: map[string]interface{}{
  355. "enableCache": true,
  356. "memoryCacheThreshold": 2,
  357. "bufferPageSize": 2,
  358. "runAsync": true,
  359. "maxDiskCache": 6,
  360. "resendInterval": 10,
  361. },
  362. err: errors.New("cache is not supported for async sink, do not use enableCache and runAsync properties together"),
  363. }, {
  364. config: map[string]interface{}{
  365. "enableCache": true,
  366. "memoryCacheThreshold": 256,
  367. "bufferLength": 10,
  368. "maxDiskCache": 6,
  369. "resendInterval": 10,
  370. },
  371. err: errors.New("invalid cache properties: maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"),
  372. }, {
  373. config: map[string]interface{}{
  374. "enableCache": true,
  375. "memoryCacheThreshold": 7,
  376. "bufferPageSize": 3,
  377. "sendSingle": true,
  378. "maxDiskCache": 21,
  379. "resendInterval": 10,
  380. },
  381. err: errors.New("invalid cache properties: memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"),
  382. }, {
  383. config: map[string]interface{}{
  384. "enableCache": true,
  385. "memoryCacheThreshold": 9,
  386. "bufferPageSize": 3,
  387. "sendSingle": true,
  388. "maxDiskCache": 22,
  389. "resendInterval": 10,
  390. },
  391. err: errors.New("invalid cache properties: maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"),
  392. },
  393. }
  394. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  395. contextLogger := conf.Log.WithField("rule", "TestConfig")
  396. conf.InitConf()
  397. for i, tt := range tests {
  398. mockSink := NewSinkNode(fmt.Sprintf("test_%d", i), "mockSink", tt.config)
  399. sconf, err := mockSink.parseConf(contextLogger)
  400. if !reflect.DeepEqual(tt.err, err) {
  401. t.Errorf("%d \terror mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.err, err)
  402. } else if !reflect.DeepEqual(tt.sconf, sconf) {
  403. t.Errorf("%d \tresult mismatch:\n\nexp=%v\n\ngot=%v\n\n", i, tt.sconf, sconf)
  404. }
  405. }
  406. }
  407. func TestSinkNode_reset(t *testing.T) {
  408. mockSink := mocknode.NewMockSink()
  409. s := NewSinkNodeWithSink("mockSink", mockSink, nil)
  410. s.reset()
  411. if s.statManagers != nil {
  412. t.Errorf("reset() failed")
  413. }
  414. }
  415. func Test_getSink(t *testing.T) {
  416. _, err := getSink("mock", map[string]interface{}{"sendSingle": true, "omitIfEmpty": true})
  417. if err == nil {
  418. t.Errorf("getSink() failed")
  419. }
  420. }
  421. func Test_itemToMap(t *testing.T) {
  422. type args struct {
  423. item interface{}
  424. }
  425. tests := []struct {
  426. name string
  427. args args
  428. want []map[string]interface{}
  429. }{
  430. {
  431. name: "test1",
  432. args: args{
  433. item: errors.New("test"),
  434. },
  435. want: []map[string]interface{}{
  436. {"error": "test"},
  437. },
  438. },
  439. {
  440. name: "test2",
  441. args: args{
  442. item: "test2",
  443. },
  444. want: []map[string]interface{}{
  445. {"error": fmt.Sprintf("result is not a map slice but found %#v", "test2")},
  446. },
  447. },
  448. {
  449. name: "test3",
  450. args: args{
  451. item: xsql.Row(&xsql.Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil}),
  452. },
  453. want: []map[string]interface{}{
  454. {"a": 1, "b": "2"},
  455. },
  456. },
  457. {
  458. name: "test4",
  459. args: args{
  460. item: xsql.Collection(&xsql.WindowTuples{Content: []xsql.TupleRow{
  461. &xsql.Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
  462. }}),
  463. },
  464. want: []map[string]interface{}{
  465. {"a": 1, "b": "2"},
  466. },
  467. },
  468. }
  469. for _, tt := range tests {
  470. t.Run(tt.name, func(t *testing.T) {
  471. if got := itemToMap(tt.args.item); !reflect.DeepEqual(got, tt.want) {
  472. t.Errorf("itemToMap() = %v, want %v", got, tt.want)
  473. }
  474. })
  475. }
  476. }
  477. func TestSinkFields_Apply(t *testing.T) {
  478. conf.InitConf()
  479. transform.RegisterAdditionalFuncs()
  480. var tests = []struct {
  481. dt string
  482. format string
  483. schemaId string
  484. delimiter string
  485. fields []string
  486. data interface{}
  487. result [][]byte
  488. }{
  489. {
  490. format: "json",
  491. fields: []string{"a", "b"},
  492. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  493. result: [][]byte{[]byte(`{"a":"1","b":"2"}`)},
  494. },
  495. {
  496. format: "json",
  497. fields: []string{"a", "b"},
  498. data: []map[string]interface{}{{"a": "1", "b": "2", "c": "3"}},
  499. result: [][]byte{[]byte(`[{"a":"1","b":"2"}]`)},
  500. },
  501. {
  502. format: "delimited",
  503. delimiter: ",",
  504. fields: []string{"a", "b"},
  505. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  506. result: [][]byte{[]byte(`1,2`)},
  507. },
  508. {
  509. format: "json",
  510. schemaId: "",
  511. fields: []string{"ax", "bx"},
  512. dt: `{"ax": {{.a}}, "bx": {{.b}}}`,
  513. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  514. result: [][]byte{[]byte(`{"ax":1,"bx":2}`)},
  515. },
  516. {
  517. format: "json",
  518. schemaId: "",
  519. fields: []string{"a", "b"},
  520. dt: `{"ax": {{.a}}, "bx": {{.b}}}`,
  521. data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
  522. result: [][]byte{[]byte(`{"a":null,"b":null}`)},
  523. },
  524. }
  525. contextLogger := conf.Log.WithField("rule", "TestSinkFields_Apply")
  526. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  527. for i, tt := range tests {
  528. tf, _ := transform.GenTransform(tt.dt, tt.format, tt.schemaId, tt.delimiter, tt.fields)
  529. vCtx := context.WithValue(ctx, context.TransKey, tf)
  530. mockSink := mocknode.NewMockSink()
  531. mockSink.Collect(vCtx, tt.data)
  532. time.Sleep(1 * time.Second)
  533. results := mockSink.GetResults()
  534. if !reflect.DeepEqual(tt.result, results) {
  535. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  536. }
  537. }
  538. }