sink_node_test.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/topo/context"
  19. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "reflect"
  22. "testing"
  23. "time"
  24. )
  25. func TestSinkTemplate_Apply(t *testing.T) {
  26. conf.InitConf()
  27. var tests = []struct {
  28. config map[string]interface{}
  29. data []map[string]interface{}
  30. result [][]byte
  31. }{
  32. {
  33. config: map[string]interface{}{
  34. "sendSingle": true,
  35. "dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
  36. },
  37. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  38. result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  39. }, {
  40. config: map[string]interface{}{
  41. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  42. },
  43. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  44. result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  45. }, {
  46. config: map[string]interface{}{
  47. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  48. },
  49. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  50. result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  51. }, {
  52. config: map[string]interface{}{
  53. "dataTemplate": `{"content":{{toJson .}}}`,
  54. },
  55. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  56. result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  57. }, {
  58. config: map[string]interface{}{
  59. "sendSingle": true,
  60. "dataTemplate": `{"newab":"{{.ab}}"}`,
  61. },
  62. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  63. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  64. }, {
  65. config: map[string]interface{}{
  66. "sendSingle": true,
  67. "dataTemplate": `{"newab":"{{.ab}}"}`,
  68. },
  69. data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
  70. result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  71. }, {
  72. config: map[string]interface{}{
  73. "sendSingle": true,
  74. "dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
  75. },
  76. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  77. result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
  78. }, {
  79. config: map[string]interface{}{
  80. "dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
  81. },
  82. data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
  83. result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
  84. }, {
  85. config: map[string]interface{}{
  86. "dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
  87. },
  88. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  89. result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
  90. }, {
  91. config: map[string]interface{}{
  92. "dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
  93. },
  94. data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
  95. result: [][]byte{[]byte(`{"result":2}`)},
  96. }, {
  97. config: map[string]interface{}{
  98. "dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
  99. "sendSingle": true,
  100. },
  101. data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
  102. result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
  103. },
  104. }
  105. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  106. contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
  107. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  108. for i, tt := range tests {
  109. mockSink := mocknode.NewMockSink()
  110. s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
  111. s.Open(ctx, make(chan error))
  112. s.input <- tt.data
  113. time.Sleep(1 * time.Second)
  114. s.close(ctx, contextLogger)
  115. results := mockSink.GetResults()
  116. if !reflect.DeepEqual(tt.result, results) {
  117. t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
  118. }
  119. }
  120. }