// Copyright 2022-2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build template || !core
package node
import (
"errors"
"fmt"
"os"
"path/filepath"
"reflect"
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/stretchr/testify/assert"
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/schema"
"github.com/lf-edge/ekuiper/internal/testx"
"github.com/lf-edge/ekuiper/internal/topo/context"
"github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
"github.com/lf-edge/ekuiper/internal/topo/transform"
"github.com/lf-edge/ekuiper/internal/xsql"
)
func init() {
testx.InitEnv()
}
func TestBatchSink(t *testing.T) {
mc := conf.Clock.(*clock.Mock)
conf.InitConf()
transform.RegisterAdditionalFuncs()
tests := []struct {
config map[string]interface{}
data []map[string]interface{}
result [][]byte
}{
{
config: map[string]interface{}{
"batchSize": 2,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}},
result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
},
{
config: map[string]interface{}{
"lingerInterval": 1000,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}},
result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"},{"ab":"hello3"}]`)},
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := conf.Log.WithField("rule", "TestBatchSink")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
for i, tt := range tests {
mc.Set(mc.Now())
mockSink := mocknode.NewMockSink()
s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
s.Open(ctx, make(chan error))
s.input <- tt.data
for i := 0; i < 10; i++ {
mc.Add(1 * time.Second)
time.Sleep(10 * time.Millisecond)
// wait until mockSink get results
if len(mockSink.GetResults()) > 0 {
break
}
}
results := mockSink.GetResults()
if !reflect.DeepEqual(tt.result, results) {
t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
}
}
}
func TestSinkTemplate_Apply(t *testing.T) {
conf.InitConf()
transform.RegisterAdditionalFuncs()
tests := []struct {
config map[string]interface{}
data []map[string]interface{}
result [][]byte
}{
{
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `
results
{{range .}}- {{.ab}}
{{end}}
`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`results
`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{"content":{{toJson .}}}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
}, {
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"newab":"{{.ab}}"}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
}, {
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"newab":"{{.ab}}"}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
}, {
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
},
data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
},
data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
}, {
config: map[string]interface{}{
"dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
},
data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
},
data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
result: [][]byte{[]byte(`{"result":2}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
"sendSingle": true,
},
data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
for i, tt := range tests {
mockSink := mocknode.NewMockSink()
s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
s.Open(ctx, make(chan error))
s.input <- tt.data
time.Sleep(1 * time.Second)
results := mockSink.GetResults()
if !reflect.DeepEqual(tt.result, results) {
t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
}
}
}
func TestOmitEmpty_Apply(t *testing.T) {
conf.InitConf()
tests := []struct {
config map[string]interface{}
data []map[string]interface{}
result [][]byte
}{
{ // 0
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": true,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{"ab":"hello2"}`)},
}, { // 1
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": true,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
}, { // 2
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": false,
},
data: []map[string]interface{}{},
result: [][]byte{[]byte(`[]`)},
}, { // 3
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": false,
},
data: nil,
result: [][]byte{[]byte(`null`)},
}, { // 4
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": false,
},
data: []map[string]interface{}{},
result: nil,
}, { // 5
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": true,
},
data: []map[string]interface{}{},
result: nil,
}, { // 6
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": true,
},
data: nil,
result: nil,
}, { // 7
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": false,
},
data: []map[string]interface{}{},
result: nil,
}, { // 8
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": true,
},
data: []map[string]interface{}{{"ab": "hello1"}, {}},
result: [][]byte{[]byte(`{"ab":"hello1"}`)},
}, { // 9
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": false,
},
data: []map[string]interface{}{{"ab": "hello1"}, {}},
result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{}`)},
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := conf.Log.WithField("rule", "TestOmitEmpty_Apply")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
for i, tt := range tests {
mockSink := mocknode.NewMockSink()
s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
s.Open(ctx, make(chan error))
s.input <- tt.data
time.Sleep(100 * time.Millisecond)
results := mockSink.GetResults()
if !reflect.DeepEqual(tt.result, results) {
t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
}
}
}
func TestFormat_Apply(t *testing.T) {
conf.InitConf()
etcDir, err := conf.GetDataLoc()
if err != nil {
t.Fatal(err)
}
etcDir = filepath.Join(etcDir, "schemas", "protobuf")
err = os.MkdirAll(etcDir, os.ModePerm)
if err != nil {
t.Fatal(err)
}
// Copy init.proto
bytesRead, err := os.ReadFile("../../schema/test/test1.proto")
if err != nil {
t.Fatal(err)
}
err = os.WriteFile(filepath.Join(etcDir, "test1.proto"), bytesRead, 0o755)
if err != nil {
t.Fatal(err)
}
defer func() {
err = os.RemoveAll(etcDir)
if err != nil {
t.Fatal(err)
}
}()
err = schema.InitRegistry()
if err != nil {
t.Fatal(err)
}
transform.RegisterAdditionalFuncs()
tests := []struct {
name string
config map[string]interface{}
data []map[string]interface{}
result [][]byte
}{
{
name: "test normal protobuf format",
config: map[string]interface{}{
"sendSingle": true,
"format": `protobuf`,
"schemaId": "test1.Person",
},
data: []map[string]interface{}{{
"name": "test",
"id": 1,
"email": "Dddd",
}},
result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
}, {
name: "test dateTemplate + protobuf format",
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"name":"test","email":"{{.ab}}","id":1}`,
"format": `protobuf`,
"schemaId": "test1.Person",
},
data: []map[string]interface{}{{"ab": "Dddd"}},
result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := conf.Log.WithField("rule", "TestSinkFormat_Apply")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockSink := mocknode.NewMockSink()
s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
s.Open(ctx, make(chan error))
s.input <- tt.data
var results [][]byte
time.Sleep(100 * time.Millisecond)
results = mockSink.GetResults()
if !reflect.DeepEqual(tt.result, results) {
t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
}
})
}
}
func TestConfig(t *testing.T) {
tests := []struct {
config map[string]interface{}
sconf *SinkConf
err error
}{
{
config: map[string]interface{}{
"sendSingle": true,
},
sconf: &SinkConf{
Concurrency: 1,
SendSingle: true,
Format: "json",
BufferLength: 1024,
SinkConf: conf.SinkConf{
MemoryCacheThreshold: 1024,
MaxDiskCache: 1024000,
BufferPageSize: 256,
EnableCache: false,
ResendInterval: 0,
CleanCacheAtStop: false,
},
},
}, {
config: map[string]interface{}{
"enableCache": true,
"memoryCacheThreshold": 2,
"bufferPageSize": 2,
"sendSingle": true,
"maxDiskCache": 6,
"resendInterval": 10,
},
sconf: &SinkConf{
Concurrency: 1,
SendSingle: true,
Format: "json",
BufferLength: 1024,
SinkConf: conf.SinkConf{
MemoryCacheThreshold: 2,
MaxDiskCache: 6,
BufferPageSize: 2,
EnableCache: true,
ResendInterval: 10,
CleanCacheAtStop: false,
},
},
}, {
config: map[string]interface{}{
"enableCache": true,
"memoryCacheThreshold": 256,
"bufferLength": 10,
"maxDiskCache": 6,
"resendInterval": 10,
},
err: errors.New("invalid cache properties: maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"),
}, {
config: map[string]interface{}{
"enableCache": true,
"memoryCacheThreshold": 7,
"bufferPageSize": 3,
"sendSingle": true,
"maxDiskCache": 21,
"resendInterval": 10,
},
err: errors.New("invalid cache properties: memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"),
}, {
config: map[string]interface{}{
"enableCache": true,
"memoryCacheThreshold": 9,
"bufferPageSize": 3,
"sendSingle": true,
"maxDiskCache": 22,
"resendInterval": 10,
},
err: errors.New("invalid cache properties: maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"),
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := conf.Log.WithField("rule", "TestConfig")
conf.InitConf()
for i, tt := range tests {
mockSink := NewSinkNode(fmt.Sprintf("test_%d", i), "mockSink", tt.config)
sconf, err := mockSink.parseConf(contextLogger)
if !reflect.DeepEqual(tt.err, err) {
t.Errorf("%d \terror mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.err, err)
} else if !reflect.DeepEqual(tt.sconf, sconf) {
t.Errorf("%d \tresult mismatch:\n\nexp=%v\n\ngot=%v\n\n", i, tt.sconf, sconf)
}
}
}
func TestSinkNode_reset(t *testing.T) {
mockSink := mocknode.NewMockSink()
s := NewSinkNodeWithSink("mockSink", mockSink, nil)
s.reset()
if s.statManagers != nil {
t.Errorf("reset() failed")
}
}
func Test_getSink(t *testing.T) {
_, err := getSink("mock", map[string]interface{}{"sendSingle": true, "omitIfEmpty": true})
if err == nil {
t.Errorf("getSink() failed")
}
}
func Test_itemToMap(t *testing.T) {
type args struct {
item interface{}
}
tests := []struct {
name string
args args
want []map[string]interface{}
}{
{
name: "test1",
args: args{
item: errors.New("test"),
},
want: []map[string]interface{}{
{"error": "test"},
},
},
{
name: "test2",
args: args{
item: "test2",
},
want: []map[string]interface{}{
{"error": fmt.Sprintf("result is not a map slice but found %#v", "test2")},
},
},
{
name: "test3",
args: args{
item: xsql.Row(&xsql.Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil}),
},
want: []map[string]interface{}{
{"a": 1, "b": "2"},
},
},
{
name: "test4",
args: args{
item: xsql.Collection(&xsql.WindowTuples{Content: []xsql.TupleRow{
&xsql.Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
}}),
},
want: []map[string]interface{}{
{"a": 1, "b": "2"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := itemToMap(tt.args.item); !reflect.DeepEqual(got, tt.want) {
t.Errorf("itemToMap() = %v, want %v", got, tt.want)
}
})
}
}
func TestSinkFields_Apply(t *testing.T) {
conf.InitConf()
transform.RegisterAdditionalFuncs()
tests := []struct {
dt string
format string
schemaId string
delimiter string
dataField string
fields []string
data interface{}
result [][]byte
}{
{
format: "json",
fields: []string{"a", "b"},
data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
result: [][]byte{[]byte(`{"a":"1","b":"2"}`)},
},
{
format: "json",
fields: []string{"a", "b"},
data: []map[string]interface{}{{"a": "1", "b": "2", "c": "3"}},
result: [][]byte{[]byte(`[{"a":"1","b":"2"}]`)},
},
{
format: "delimited",
delimiter: ",",
fields: []string{"a", "b"},
data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
result: [][]byte{[]byte(`1,2`)},
},
{
format: "delimited",
delimiter: ",",
fields: []string{"b", "c", "a"},
data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
result: [][]byte{[]byte(`2,3,1`)},
},
{
format: "json",
schemaId: "",
fields: []string{"ax", "bx"},
dt: `{"ax": {{.a}}, "bx": {{.b}}}`,
data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
result: [][]byte{[]byte(`{"ax":1,"bx":2}`)},
},
{
format: "json",
schemaId: "",
fields: []string{"a", "b"},
dt: `{"ax": {{.a}}, "bx": {{.b}}}`,
data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
result: [][]byte{[]byte(`{"a":null,"b":null}`)},
},
{
format: "json",
dataField: "device",
fields: []string{"a", "b"},
data: map[string]interface{}{"device": map[string]interface{}{"a": "1", "b": "2", "c": "3"}, "a": 11, "b": 22, "c": 33},
result: [][]byte{[]byte(`{"a":"1","b":"2"}`)},
},
{
format: "delimited",
delimiter: ",",
fields: []string{"a", "b"},
dataField: "device",
data: map[string]interface{}{"device": map[string]interface{}{"a": "1", "b": "2", "c": "3"}, "a": 11, "b": 22, "c": 33},
result: [][]byte{[]byte(`1,2`)},
},
{
format: "json",
schemaId: "",
fields: []string{"a", "b"},
dt: `{"device": {"a": {{.a}}}}`,
dataField: "device",
data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
result: [][]byte{[]byte(`{"a":1,"b":null}`)},
},
}
contextLogger := conf.Log.WithField("rule", "TestSinkFields_Apply")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
for i, tt := range tests {
tf, _ := transform.GenTransform(tt.dt, tt.format, tt.schemaId, tt.delimiter, tt.dataField, tt.fields)
vCtx := context.WithValue(ctx, context.TransKey, tf)
mockSink := mocknode.NewMockSink()
_ = mockSink.Collect(vCtx, tt.data)
time.Sleep(1 * time.Second)
results := mockSink.GetResults()
if !reflect.DeepEqual(tt.result, results) {
t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
}
}
}
func TestSinkCache(t *testing.T) {
conf.InitConf()
transform.RegisterAdditionalFuncs()
contextLogger := conf.Log.WithField("rule", "TestSinkCache")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
tests := []struct {
name string
config map[string]interface{}
result [][]byte
resendResult [][]byte
}{
{
name: "test cache",
config: map[string]interface{}{
"enableCache": true,
},
result: [][]byte{
[]byte(`[{"a":1}]`),
[]byte(`[{"a":2}]`),
[]byte(`[{"a":3}]`),
[]byte(`[{"a":4}]`),
[]byte(`[{"a":5}]`),
[]byte(`[{"a":6}]`),
[]byte(`[{"a":7}]`),
[]byte(`[{"a":8}]`),
[]byte(`[{"a":9}]`),
[]byte(`[{"a":10}]`),
},
},
{
name: "test resend",
config: map[string]interface{}{
"enableCache": true,
"resendAlterQueue": true,
},
result: [][]byte{
[]byte(`[{"a":2}]`),
[]byte(`[{"a":4}]`),
[]byte(`[{"a":6}]`),
[]byte(`[{"a":8}]`),
[]byte(`[{"a":10}]`),
},
resendResult: [][]byte{
[]byte(`[{"a":1}]`),
[]byte(`[{"a":3}]`),
[]byte(`[{"a":5}]`),
},
},
{
name: "test resend priority low",
config: map[string]interface{}{
"enableCache": true,
"resendAlterQueue": true,
"resendPriority": -1,
"resendIndicatorField": "isResend",
},
result: [][]byte{
[]byte(`[{"a":2}]`),
[]byte(`[{"a":4}]`),
[]byte(`[{"a":6}]`),
[]byte(`[{"a":8}]`),
[]byte(`[{"a":10}]`),
},
resendResult: [][]byte{
[]byte(`[{"a":1,"isResend":true}]`),
[]byte(`[{"a":3,"isResend":true}]`),
[]byte(`[{"a":5,"isResend":true}]`),
},
},
{
name: "test resend priority high",
config: map[string]interface{}{
"enableCache": true,
"resendAlterQueue": true,
"resendPriority": 1,
"batchSize": 1,
},
result: [][]byte{
[]byte(`[{"a":2}]`),
[]byte(`[{"a":4}]`),
[]byte(`[{"a":6}]`),
[]byte(`[{"a":8}]`),
[]byte(`[{"a":10}]`),
},
resendResult: [][]byte{
[]byte(`[{"a":1}]`),
[]byte(`[{"a":3}]`),
[]byte(`[{"a":5}]`),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data := [][]map[string]interface{}{
{{"a": 1}},
{{"a": 2}},
{{"a": 3}},
{{"a": 4}},
{{"a": 5}},
{{"a": 6}},
{{"a": 7}},
{{"a": 8}},
{{"a": 9}},
{{"a": 10}},
}
hitch := make(chan int, 10)
mockSink := mocknode.NewMockResendSink(hitch)
fmt.Printf("mockSink: %+v\n", tt.config)
s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
s.Open(ctx, make(chan error))
for i := 0; i < 20; i++ {
s.input <- data[i%len(data)]
select {
case <-hitch:
done := true
results := mockSink.GetResults()
if len(results) != len(tt.result) {
done = false
}
if done && tt.resendResult != nil {
resentResults := mockSink.GetResendResults()
if len(resentResults) != len(tt.resendResult) {
done = false
}
}
if done {
goto end
}
case <-time.After(1 * time.Second):
}
}
end:
results := mockSink.GetResults()
minLen := len(results)
if len(tt.result) < minLen {
minLen = len(tt.result)
}
assert.Equal(t, results[:minLen], tt.result[:minLen])
if tt.resendResult != nil {
resentResults := mockSink.GetResendResults()
minLen = len(resentResults)
if len(tt.resendResult) < minLen {
minLen = len(tt.resendResult)
}
assert.Equal(t, resentResults[:minLen], tt.resendResult[:minLen])
}
})
}
}