sink_node_test.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package node
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/internal/conf"
  5. "github.com/emqx/kuiper/internal/topo/context"
  6. "github.com/emqx/kuiper/internal/topo/topotest/mocknode"
  7. "reflect"
  8. "testing"
  9. "time"
  10. )
  11. func TestSinkTemplate_Apply(t *testing.T) {
  12. conf.InitConf()
  13. var tests = []struct {
  14. config map[string]interface{}
  15. data []byte
  16. result [][]byte
  17. }{
  18. {
  19. config: map[string]interface{}{
  20. "sendSingle": true,
  21. "dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
  22. },
  23. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  24. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  25. }, {
  26. config: map[string]interface{}{
  27. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  28. },
  29. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  30. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  31. }, {
  32. config: map[string]interface{}{
  33. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  34. },
  35. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  36. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  37. }, {
  38. config: map[string]interface{}{
  39. "dataTemplate": `{"content":{{toJson .}}}`,
  40. },
  41. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  42. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  43. }, {
  44. config: map[string]interface{}{
  45. "sendSingle": true,
  46. "dataTemplate": `{"newab":"{{.ab}}"}`,
  47. },
  48. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  49. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  50. }, {
  51. config: map[string]interface{}{
  52. "sendSingle": true,
  53. "dataTemplate": `{"newab":"{{.ab}}"}`,
  54. },
  55. data: []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
  56. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  57. }, {
  58. config: map[string]interface{}{
  59. "sendSingle": true,
  60. "dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
  61. },
  62. data: []byte(`[{"temperature":33,"humidity":70,"__meta": {"messageid":45,"other": "mock"}}]`),
  63. result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
  64. }, {
  65. config: map[string]interface{}{
  66. "dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
  67. },
  68. data: []byte(`[{"temperature":33,"humidity":70,"__meta": {"messageid":45,"other": "mock"}}]`),
  69. result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
  70. }, {
  71. config: map[string]interface{}{
  72. "dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
  73. },
  74. data: []byte(`[{"temperature":33,"humidity":70},{"temperature":22.0,"humidity":50},{"temperature":11,"humidity":90}]`),
  75. result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
  76. }, {
  77. config: map[string]interface{}{
  78. "dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90.0 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
  79. },
  80. data: []byte(`[{"temperature":33,"humidity":70},{"temperature":22,"humidity":50},{"temperature":11,"humidity":90}]`),
  81. result: [][]byte{[]byte(`{"result":2}`)},
  82. }, {
  83. config: map[string]interface{}{
  84. "dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
  85. "sendSingle": true,
  86. },
  87. data: []byte(`[{"a":1,"b":3.1415,"c":"hello","d":"{\"hello\" : 3}","e":{"humidity":20,"temperature":30}}]`),
  88. result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
  89. },
  90. }
  91. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  92. contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
  93. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  94. for i, tt := range tests {
  95. mockSink := mocknode.NewMockSink()
  96. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  97. s.Open(ctx, make(chan error))
  98. s.input <- tt.data
  99. time.Sleep(1 * time.Second)
  100. s.close(ctx, contextLogger)
  101. results := mockSink.GetResults()
  102. if !reflect.DeepEqual(tt.result, results) {
  103. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  104. }
  105. }
  106. }