package nodes
import (
"fmt"
"github.com/emqx/kuiper/common"
"github.com/emqx/kuiper/xstream/contexts"
"github.com/emqx/kuiper/xstream/test"
"reflect"
"testing"
"time"
)
func TestSinkTemplate_Apply(t *testing.T) {
var tests = []struct {
config map[string]interface{}
data []byte
result [][]byte
}{
{
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
},
data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
},
data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `
results
{{range .}}- {{.ab}}
{{end}}
`,
},
data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
result: [][]byte{[]byte(`results
`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{"content":{{json .}}}`,
},
data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
}, {
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"newab":"{{.ab}}"}`,
},
data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
}, {
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"newab":"{{.ab}}"}`,
},
data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := common.Log.WithField("rule", "TestSinkTemplate_Apply")
ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
for i, tt := range tests {
mockSink := test.NewMockSink()
s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
s.Open(ctx, make(chan error))
s.input <- tt.data
time.Sleep(1 * time.Second)
s.close(ctx, contextLogger)
results := mockSink.GetResults()
if !reflect.DeepEqual(tt.result, results) {
t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
}
}
}