package nodes import ( "fmt" "github.com/emqx/kuiper/common" "github.com/emqx/kuiper/xstream/contexts" "github.com/emqx/kuiper/xstream/test" "reflect" "testing" "time" ) func TestSinkTemplate_Apply(t *testing.T) { var tests = []struct { config map[string]interface{} data []byte result [][]byte }{ { config: map[string]interface{}{ "sendSingle": true, "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`, }, data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`), result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)}, }, { config: map[string]interface{}{ "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`, }, data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`), result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)}, }, { config: map[string]interface{}{ "dataTemplate": `
results
`, }, data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`), result: [][]byte{[]byte(`
results
`)}, }, { config: map[string]interface{}{ "dataTemplate": `{"content":{{json .}}}`, }, data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`), result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)}, }, { config: map[string]interface{}{ "sendSingle": true, "dataTemplate": `{"newab":"{{.ab}}"}`, }, data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`), result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)}, }, { config: map[string]interface{}{ "sendSingle": true, "dataTemplate": `{"newab":"{{.ab}}"}`, }, data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`), result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)}, }, } fmt.Printf("The test bucket size is %d.\n\n", len(tests)) contextLogger := common.Log.WithField("rule", "TestSinkTemplate_Apply") ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger) for i, tt := range tests { mockSink := test.NewMockSink() s := NewSinkNodeWithSink("mockSink", mockSink, tt.config) s.Open(ctx, make(chan error)) s.input <- tt.data time.Sleep(1 * time.Second) s.close(ctx, contextLogger) results := mockSink.GetResults() if !reflect.DeepEqual(tt.result, results) { t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results) } } }