|
@@ -266,11 +266,13 @@ func TestFormat_Apply(t *testing.T) {
|
|
|
}
|
|
|
transform.RegisterAdditionalFuncs()
|
|
|
var tests = []struct {
|
|
|
+ name string
|
|
|
config map[string]interface{}
|
|
|
data []map[string]interface{}
|
|
|
result [][]byte
|
|
|
}{
|
|
|
{
|
|
|
+ name: "test normal protobuf format",
|
|
|
config: map[string]interface{}{
|
|
|
"sendSingle": true,
|
|
|
"format": `protobuf`,
|
|
@@ -283,6 +285,7 @@ func TestFormat_Apply(t *testing.T) {
|
|
|
}},
|
|
|
result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
|
|
|
}, {
|
|
|
+ name: "test dateTemplate + protobuf format",
|
|
|
config: map[string]interface{}{
|
|
|
"sendSingle": true,
|
|
|
"dataTemplate": `{"name":"test","email":"{{.ab}}","id":1}`,
|
|
@@ -298,22 +301,18 @@ func TestFormat_Apply(t *testing.T) {
|
|
|
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
|
|
|
|
|
|
for i, tt := range tests {
|
|
|
- mockSink := mocknode.NewMockSink()
|
|
|
- s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
|
|
|
- s.Open(ctx, make(chan error))
|
|
|
- s.input <- tt.data
|
|
|
- var results [][]byte
|
|
|
- // try at most 5 seconds
|
|
|
- for j := 0; j < 10; j++ {
|
|
|
- time.Sleep(500 * time.Millisecond)
|
|
|
+ t.Run(tt.name, func(t *testing.T) {
|
|
|
+ mockSink := mocknode.NewMockSink()
|
|
|
+ s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
|
|
|
+ s.Open(ctx, make(chan error))
|
|
|
+ s.input <- tt.data
|
|
|
+ var results [][]byte
|
|
|
+ time.Sleep(100 * time.Millisecond)
|
|
|
results = mockSink.GetResults()
|
|
|
- if len(results) > 0 {
|
|
|
- break
|
|
|
+ if !reflect.DeepEqual(tt.result, results) {
|
|
|
+ t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
|
|
|
}
|
|
|
- }
|
|
|
- if !reflect.DeepEqual(tt.result, results) {
|
|
|
- t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
|
|
|
- }
|
|
|
+ })
|
|
|
}
|
|
|
}
|
|
|
|