123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- // Copyright 2022 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- //go:build template || !core
- // +build template !core
- package node
- import (
- "fmt"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/topo/context"
- "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
- "github.com/lf-edge/ekuiper/internal/topo/transform"
- "github.com/lf-edge/ekuiper/internal/xsql"
- "reflect"
- "testing"
- "time"
- )
- func TestSinkTemplate_Apply(t *testing.T) {
- conf.InitConf()
- transform.RegisterAdditionalFuncs()
- var tests = []struct {
- config map[string]interface{}
- data []map[string]interface{}
- result [][]byte
- }{
- {
- config: map[string]interface{}{
- "sendSingle": true,
- "dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `{"content":{{toJson .}}}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
- }, {
- config: map[string]interface{}{
- "sendSingle": true,
- "dataTemplate": `{"newab":"{{.ab}}"}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
- }, {
- config: map[string]interface{}{
- "sendSingle": true,
- "dataTemplate": `{"newab":"{{.ab}}"}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
- }, {
- config: map[string]interface{}{
- "sendSingle": true,
- "dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
- },
- data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
- result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
- },
- data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
- result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
- },
- data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
- result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
- },
- data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
- result: [][]byte{[]byte(`{"result":2}`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
- "sendSingle": true,
- },
- data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
- result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
- ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- for i, tt := range tests {
- mockSink := mocknode.NewMockSink()
- s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
- s.Open(ctx, make(chan error))
- s.input <- tt.data
- time.Sleep(1 * time.Second)
- results := mockSink.GetResults()
- if !reflect.DeepEqual(tt.result, results) {
- t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
- }
- }
- }
- func TestOmitEmpty_Apply(t *testing.T) {
- conf.InitConf()
- var tests = []struct {
- config map[string]interface{}
- data []map[string]interface{}
- result [][]byte
- }{
- { // 0
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": true,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{"ab":"hello2"}`)},
- }, { // 1
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": true,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
- }, { // 2
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": false,
- },
- data: []map[string]interface{}{},
- result: [][]byte{[]byte(`[]`)},
- }, { // 3
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": false,
- },
- data: nil,
- result: [][]byte{[]byte(`null`)},
- }, { // 4
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": false,
- },
- data: []map[string]interface{}{},
- result: nil,
- }, { // 5
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": true,
- },
- data: []map[string]interface{}{},
- result: nil,
- }, { // 6
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": true,
- },
- data: nil,
- result: nil,
- }, { // 7
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": false,
- },
- data: []map[string]interface{}{},
- result: nil,
- }, { // 8
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": true,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {}},
- result: [][]byte{[]byte(`{"ab":"hello1"}`)},
- }, { // 9
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": false,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {}},
- result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{}`)},
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- contextLogger := conf.Log.WithField("rule", "TestOmitEmpty_Apply")
- ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- for i, tt := range tests {
- mockSink := mocknode.NewMockSink()
- s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
- s.Open(ctx, make(chan error))
- s.input <- tt.data
- time.Sleep(100 * time.Millisecond)
- results := mockSink.GetResults()
- if !reflect.DeepEqual(tt.result, results) {
- t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
- }
- }
- }
|