sink_node_test.go 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/contexts"
  6. "github.com/emqx/kuiper/xstream/test"
  7. "reflect"
  8. "testing"
  9. "time"
  10. )
  11. func TestSinkTemplate_Apply(t *testing.T) {
  12. var tests = []struct {
  13. config map[string]interface{}
  14. data []byte
  15. result [][]byte
  16. }{
  17. {
  18. config: map[string]interface{}{
  19. "sendSingle": true,
  20. "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
  21. },
  22. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  23. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  24. }, {
  25. config: map[string]interface{}{
  26. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  27. },
  28. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  29. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  30. }, {
  31. config: map[string]interface{}{
  32. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  33. },
  34. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  35. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  36. }, {
  37. config: map[string]interface{}{
  38. "dataTemplate": `{"content":{{json .}}}`,
  39. },
  40. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  41. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  42. }, {
  43. config: map[string]interface{}{
  44. "sendSingle": true,
  45. "dataTemplate": `{"newab":"{{.ab}}"}`,
  46. },
  47. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  48. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  49. }, {
  50. config: map[string]interface{}{
  51. "sendSingle": true,
  52. "dataTemplate": `{"newab":"{{.ab}}"}`,
  53. },
  54. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  55. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  56. }, {
  57. config: map[string]interface{}{
  58. "sendSingle": true,
  59. "dataTemplate": `{"__meta":{{json .__meta}},"temp":{{.temperature}}}`,
  60. },
  61. data: []byte(`[{"temperature":33,"humidity":70,"__meta": {"messageid":45,"other": "mock"}}]`),
  62. result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
  63. }, {
  64. config: map[string]interface{}{
  65. "dataTemplate": `[{"__meta":{{json (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
  66. },
  67. data: []byte(`[{"temperature":33,"humidity":70,"__meta": {"messageid":45,"other": "mock"}}]`),
  68. result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
  69. },
  70. }
  71. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  72. contextLogger := common.Log.WithField("rule", "TestSinkTemplate_Apply")
  73. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  74. for i, tt := range tests {
  75. mockSink := test.NewMockSink()
  76. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  77. s.Open(ctx, make(chan error))
  78. s.input <- tt.data
  79. time.Sleep(1 * time.Second)
  80. s.close(ctx, contextLogger)
  81. results := mockSink.GetResults()
  82. if !reflect.DeepEqual(tt.result, results) {
  83. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  84. }
  85. }
  86. }