sink_node_test.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/contexts"
  6. "github.com/emqx/kuiper/xstream/test"
  7. "reflect"
  8. "testing"
  9. "time"
  10. )
  11. func TestSinkTemplate_Apply(t *testing.T) {
  12. var tests = []struct {
  13. config map[string]interface{}
  14. data []byte
  15. result [][]byte
  16. }{
  17. {
  18. config: map[string]interface{}{
  19. "sendSingle": true,
  20. "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
  21. },
  22. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  23. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  24. }, {
  25. config: map[string]interface{}{
  26. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  27. },
  28. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  29. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  30. }, {
  31. config: map[string]interface{}{
  32. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  33. },
  34. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  35. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  36. }, {
  37. config: map[string]interface{}{
  38. "dataTemplate": `{"content":{{json .}}}`,
  39. },
  40. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  41. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  42. }, {
  43. config: map[string]interface{}{
  44. "sendSingle": true,
  45. "dataTemplate": `{"newab":"{{.ab}}"}`,
  46. },
  47. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  48. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  49. }, {
  50. config: map[string]interface{}{
  51. "sendSingle": true,
  52. "dataTemplate": `{"newab":"{{.ab}}"}`,
  53. },
  54. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  55. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  56. },
  57. }
  58. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  59. contextLogger := common.Log.WithField("rule", "TestSinkTemplate_Apply")
  60. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  61. for i, tt := range tests {
  62. mockSink := test.NewMockSink()
  63. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  64. s.Open(ctx, make(chan error))
  65. s.input <- tt.data
  66. time.Sleep(1 * time.Second)
  67. s.close(ctx, contextLogger)
  68. results := mockSink.GetResults()
  69. if !reflect.DeepEqual(tt.result, results) {
  70. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  71. }
  72. }
  73. }