Explorar o código

refactor(test): optimize how to wait for sink format result

Avoid frequent timeout problem.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang %!s(int64=2) %!d(string=hai) anos
pai
achega
cd94a5298e

+ 10 - 3
internal/topo/node/sink_node_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -302,8 +302,15 @@ func TestFormat_Apply(t *testing.T) {
 		s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
 		s.Open(ctx, make(chan error))
 		s.input <- tt.data
-		time.Sleep(1 * time.Second)
-		results := mockSink.GetResults()
+		var results [][]byte
+		// try at most 5 seconds
+		for j := 0; j < 10; j++ {
+			time.Sleep(500 * time.Millisecond)
+			results = mockSink.GetResults()
+			if len(results) > 0 {
+				break
+			}
+		}
 		if !reflect.DeepEqual(tt.result, results) {
 			t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
 		}

+ 2 - 0
internal/topo/topotest/mocknode/mock_sink.go

@@ -15,6 +15,7 @@
 package mocknode
 
 import (
+	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
@@ -36,6 +37,7 @@ func (m *MockSink) Open(ctx api.StreamContext) error {
 
 func (m *MockSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
+	fmt.Println("mock sink receive ", item)
 	if v, _, err := ctx.TransformOutput(item); err == nil {
 		logger.Debugf("mock sink receive %s", item)
 		m.results = append(m.results, v)