sink_node_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/contexts"
  6. "github.com/emqx/kuiper/xstream/test"
  7. "reflect"
  8. "testing"
  9. "time"
  10. )
  11. func TestSinkTemplate_Apply(t *testing.T) {
  12. common.InitConf()
  13. var tests = []struct {
  14. config map[string]interface{}
  15. data []byte
  16. result [][]byte
  17. }{
  18. {
  19. config: map[string]interface{}{
  20. "sendSingle": true,
  21. "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
  22. },
  23. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  24. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  25. }, {
  26. config: map[string]interface{}{
  27. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  28. },
  29. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  30. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  31. }, {
  32. config: map[string]interface{}{
  33. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  34. },
  35. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  36. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  37. }, {
  38. config: map[string]interface{}{
  39. "dataTemplate": `{"content":{{json .}}}`,
  40. },
  41. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  42. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  43. }, {
  44. config: map[string]interface{}{
  45. "sendSingle": true,
  46. "dataTemplate": `{"newab":"{{.ab}}"}`,
  47. },
  48. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  49. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  50. }, {
  51. config: map[string]interface{}{
  52. "sendSingle": true,
  53. "dataTemplate": `{"newab":"{{.ab}}"}`,
  54. },
  55. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  56. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  57. }, {
  58. config: map[string]interface{}{
  59. "sendSingle": true,
  60. "dataTemplate": `{"__meta":{{json .__meta}},"temp":{{.temperature}}}`,
  61. },
  62. data: []byte(`[{"temperature":33,"humidity":70,"__meta": {"messageid":45,"other": "mock"}}]`),
  63. result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
  64. }, {
  65. config: map[string]interface{}{
  66. "dataTemplate": `[{"__meta":{{json (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
  67. },
  68. data: []byte(`[{"temperature":33,"humidity":70,"__meta": {"messageid":45,"other": "mock"}}]`),
  69. result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
  70. },
  71. }
  72. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  73. contextLogger := common.Log.WithField("rule", "TestSinkTemplate_Apply")
  74. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  75. for i, tt := range tests {
  76. mockSink := test.NewMockSink()
  77. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  78. s.Open(ctx, make(chan error))
  79. s.input <- tt.data
  80. time.Sleep(1 * time.Second)
  81. s.close(ctx, contextLogger)
  82. results := mockSink.GetResults()
  83. if !reflect.DeepEqual(tt.result, results) {
  84. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  85. }
  86. }
  87. }