123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782 |
- // Copyright 2022-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- //go:build template || !core
- package node
- import (
- "errors"
- "fmt"
- "os"
- "path/filepath"
- "reflect"
- "testing"
- "time"
- "github.com/benbjohnson/clock"
- "github.com/stretchr/testify/assert"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/schema"
- "github.com/lf-edge/ekuiper/internal/testx"
- "github.com/lf-edge/ekuiper/internal/topo/context"
- "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
- "github.com/lf-edge/ekuiper/internal/topo/transform"
- "github.com/lf-edge/ekuiper/internal/xsql"
- )
- func init() {
- testx.InitEnv()
- }
- func TestBatchSink(t *testing.T) {
- mc := conf.Clock.(*clock.Mock)
- conf.InitConf()
- transform.RegisterAdditionalFuncs()
- tests := []struct {
- config map[string]interface{}
- data []map[string]interface{}
- result [][]byte
- }{
- {
- config: map[string]interface{}{
- "batchSize": 2,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}},
- result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
- },
- {
- config: map[string]interface{}{
- "lingerInterval": 1000,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}},
- result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"},{"ab":"hello3"}]`)},
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- contextLogger := conf.Log.WithField("rule", "TestBatchSink")
- ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- for i, tt := range tests {
- mc.Set(mc.Now())
- mockSink := mocknode.NewMockSink()
- s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
- s.Open(ctx, make(chan error))
- s.input <- tt.data
- for i := 0; i < 10; i++ {
- mc.Add(1 * time.Second)
- time.Sleep(10 * time.Millisecond)
- // wait until mockSink get results
- if len(mockSink.GetResults()) > 0 {
- break
- }
- }
- results := mockSink.GetResults()
- if !reflect.DeepEqual(tt.result, results) {
- t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
- }
- }
- }
- func TestSinkTemplate_Apply(t *testing.T) {
- conf.InitConf()
- transform.RegisterAdditionalFuncs()
- tests := []struct {
- config map[string]interface{}
- data []map[string]interface{}
- result [][]byte
- }{
- {
- config: map[string]interface{}{
- "sendSingle": true,
- "dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `{"content":{{toJson .}}}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
- }, {
- config: map[string]interface{}{
- "sendSingle": true,
- "dataTemplate": `{"newab":"{{.ab}}"}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
- }, {
- config: map[string]interface{}{
- "sendSingle": true,
- "dataTemplate": `{"newab":"{{.ab}}"}`,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
- }, {
- config: map[string]interface{}{
- "sendSingle": true,
- "dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
- },
- data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
- result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
- },
- data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
- result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
- },
- data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
- result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
- },
- data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
- result: [][]byte{[]byte(`{"result":2}`)},
- }, {
- config: map[string]interface{}{
- "dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
- "sendSingle": true,
- },
- data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
- result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
- ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- for i, tt := range tests {
- mockSink := mocknode.NewMockSink()
- s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
- s.Open(ctx, make(chan error))
- s.input <- tt.data
- time.Sleep(1 * time.Second)
- results := mockSink.GetResults()
- if !reflect.DeepEqual(tt.result, results) {
- t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
- }
- }
- }
- func TestOmitEmpty_Apply(t *testing.T) {
- conf.InitConf()
- tests := []struct {
- config map[string]interface{}
- data []map[string]interface{}
- result [][]byte
- }{
- { // 0
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": true,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{"ab":"hello2"}`)},
- }, { // 1
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": true,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
- result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
- }, { // 2
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": false,
- },
- data: []map[string]interface{}{},
- result: [][]byte{[]byte(`[]`)},
- }, { // 3
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": false,
- },
- data: nil,
- result: [][]byte{[]byte(`null`)},
- }, { // 4
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": false,
- },
- data: []map[string]interface{}{},
- result: nil,
- }, { // 5
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": true,
- },
- data: []map[string]interface{}{},
- result: nil,
- }, { // 6
- config: map[string]interface{}{
- "sendSingle": false,
- "omitIfEmpty": true,
- },
- data: nil,
- result: nil,
- }, { // 7
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": false,
- },
- data: []map[string]interface{}{},
- result: nil,
- }, { // 8
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": true,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {}},
- result: [][]byte{[]byte(`{"ab":"hello1"}`)},
- }, { // 9
- config: map[string]interface{}{
- "sendSingle": true,
- "omitIfEmpty": false,
- },
- data: []map[string]interface{}{{"ab": "hello1"}, {}},
- result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{}`)},
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- contextLogger := conf.Log.WithField("rule", "TestOmitEmpty_Apply")
- ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- for i, tt := range tests {
- mockSink := mocknode.NewMockSink()
- s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
- s.Open(ctx, make(chan error))
- s.input <- tt.data
- time.Sleep(100 * time.Millisecond)
- results := mockSink.GetResults()
- if !reflect.DeepEqual(tt.result, results) {
- t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
- }
- }
- }
- func TestFormat_Apply(t *testing.T) {
- conf.InitConf()
- etcDir, err := conf.GetDataLoc()
- if err != nil {
- t.Fatal(err)
- }
- etcDir = filepath.Join(etcDir, "schemas", "protobuf")
- err = os.MkdirAll(etcDir, os.ModePerm)
- if err != nil {
- t.Fatal(err)
- }
- // Copy init.proto
- bytesRead, err := os.ReadFile("../../schema/test/test1.proto")
- if err != nil {
- t.Fatal(err)
- }
- err = os.WriteFile(filepath.Join(etcDir, "test1.proto"), bytesRead, 0o755)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- err = os.RemoveAll(etcDir)
- if err != nil {
- t.Fatal(err)
- }
- }()
- err = schema.InitRegistry()
- if err != nil {
- t.Fatal(err)
- }
- transform.RegisterAdditionalFuncs()
- tests := []struct {
- name string
- config map[string]interface{}
- data []map[string]interface{}
- result [][]byte
- }{
- {
- name: "test normal protobuf format",
- config: map[string]interface{}{
- "sendSingle": true,
- "format": `protobuf`,
- "schemaId": "test1.Person",
- },
- data: []map[string]interface{}{{
- "name": "test",
- "id": 1,
- "email": "Dddd",
- }},
- result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
- }, {
- name: "test dateTemplate + protobuf format",
- config: map[string]interface{}{
- "sendSingle": true,
- "dataTemplate": `{"name":"test","email":"{{.ab}}","id":1}`,
- "format": `protobuf`,
- "schemaId": "test1.Person",
- },
- data: []map[string]interface{}{{"ab": "Dddd"}},
- result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- contextLogger := conf.Log.WithField("rule", "TestSinkFormat_Apply")
- ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- for i, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- mockSink := mocknode.NewMockSink()
- s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
- s.Open(ctx, make(chan error))
- s.input <- tt.data
- var results [][]byte
- time.Sleep(100 * time.Millisecond)
- results = mockSink.GetResults()
- if !reflect.DeepEqual(tt.result, results) {
- t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
- }
- })
- }
- }
- func TestConfig(t *testing.T) {
- tests := []struct {
- config map[string]interface{}
- sconf *SinkConf
- err error
- }{
- {
- config: map[string]interface{}{
- "sendSingle": true,
- },
- sconf: &SinkConf{
- Concurrency: 1,
- SendSingle: true,
- Format: "json",
- BufferLength: 1024,
- SinkConf: conf.SinkConf{
- MemoryCacheThreshold: 1024,
- MaxDiskCache: 1024000,
- BufferPageSize: 256,
- EnableCache: false,
- ResendInterval: 0,
- CleanCacheAtStop: false,
- },
- },
- }, {
- config: map[string]interface{}{
- "enableCache": true,
- "memoryCacheThreshold": 2,
- "bufferPageSize": 2,
- "sendSingle": true,
- "maxDiskCache": 6,
- "resendInterval": 10,
- },
- sconf: &SinkConf{
- Concurrency: 1,
- SendSingle: true,
- Format: "json",
- BufferLength: 1024,
- SinkConf: conf.SinkConf{
- MemoryCacheThreshold: 2,
- MaxDiskCache: 6,
- BufferPageSize: 2,
- EnableCache: true,
- ResendInterval: 10,
- CleanCacheAtStop: false,
- },
- },
- }, {
- config: map[string]interface{}{
- "enableCache": true,
- "memoryCacheThreshold": 256,
- "bufferLength": 10,
- "maxDiskCache": 6,
- "resendInterval": 10,
- },
- err: errors.New("invalid cache properties: maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"),
- }, {
- config: map[string]interface{}{
- "enableCache": true,
- "memoryCacheThreshold": 7,
- "bufferPageSize": 3,
- "sendSingle": true,
- "maxDiskCache": 21,
- "resendInterval": 10,
- },
- err: errors.New("invalid cache properties: memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"),
- }, {
- config: map[string]interface{}{
- "enableCache": true,
- "memoryCacheThreshold": 9,
- "bufferPageSize": 3,
- "sendSingle": true,
- "maxDiskCache": 22,
- "resendInterval": 10,
- },
- err: errors.New("invalid cache properties: maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"),
- },
- }
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- contextLogger := conf.Log.WithField("rule", "TestConfig")
- conf.InitConf()
- for i, tt := range tests {
- mockSink := NewSinkNode(fmt.Sprintf("test_%d", i), "mockSink", tt.config)
- sconf, err := mockSink.parseConf(contextLogger)
- if !reflect.DeepEqual(tt.err, err) {
- t.Errorf("%d \terror mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.err, err)
- } else if !reflect.DeepEqual(tt.sconf, sconf) {
- t.Errorf("%d \tresult mismatch:\n\nexp=%v\n\ngot=%v\n\n", i, tt.sconf, sconf)
- }
- }
- }
- func TestSinkNode_reset(t *testing.T) {
- mockSink := mocknode.NewMockSink()
- s := NewSinkNodeWithSink("mockSink", mockSink, nil)
- s.reset()
- if s.statManagers != nil {
- t.Errorf("reset() failed")
- }
- }
- func Test_getSink(t *testing.T) {
- _, err := getSink("mock", map[string]interface{}{"sendSingle": true, "omitIfEmpty": true})
- if err == nil {
- t.Errorf("getSink() failed")
- }
- }
- func Test_itemToMap(t *testing.T) {
- type args struct {
- item interface{}
- }
- tests := []struct {
- name string
- args args
- want []map[string]interface{}
- }{
- {
- name: "test1",
- args: args{
- item: errors.New("test"),
- },
- want: []map[string]interface{}{
- {"error": "test"},
- },
- },
- {
- name: "test2",
- args: args{
- item: "test2",
- },
- want: []map[string]interface{}{
- {"error": fmt.Sprintf("result is not a map slice but found %#v", "test2")},
- },
- },
- {
- name: "test3",
- args: args{
- item: xsql.Row(&xsql.Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil}),
- },
- want: []map[string]interface{}{
- {"a": 1, "b": "2"},
- },
- },
- {
- name: "test4",
- args: args{
- item: xsql.Collection(&xsql.WindowTuples{Content: []xsql.TupleRow{
- &xsql.Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
- }}),
- },
- want: []map[string]interface{}{
- {"a": 1, "b": "2"},
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- if got := itemToMap(tt.args.item); !reflect.DeepEqual(got, tt.want) {
- t.Errorf("itemToMap() = %v, want %v", got, tt.want)
- }
- })
- }
- }
- func TestSinkFields_Apply(t *testing.T) {
- conf.InitConf()
- transform.RegisterAdditionalFuncs()
- tests := []struct {
- dt string
- format string
- schemaId string
- delimiter string
- dataField string
- fields []string
- data interface{}
- result [][]byte
- }{
- {
- format: "json",
- fields: []string{"a", "b"},
- data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
- result: [][]byte{[]byte(`{"a":"1","b":"2"}`)},
- },
- {
- format: "json",
- fields: []string{"a", "b"},
- data: []map[string]interface{}{{"a": "1", "b": "2", "c": "3"}},
- result: [][]byte{[]byte(`[{"a":"1","b":"2"}]`)},
- },
- {
- format: "delimited",
- delimiter: ",",
- fields: []string{"a", "b"},
- data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
- result: [][]byte{[]byte(`1,2`)},
- },
- {
- format: "delimited",
- delimiter: ",",
- fields: []string{"b", "c", "a"},
- data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
- result: [][]byte{[]byte(`2,3,1`)},
- },
- {
- format: "json",
- schemaId: "",
- fields: []string{"ax", "bx"},
- dt: `{"ax": {{.a}}, "bx": {{.b}}}`,
- data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
- result: [][]byte{[]byte(`{"ax":1,"bx":2}`)},
- },
- {
- format: "json",
- schemaId: "",
- fields: []string{"a", "b"},
- dt: `{"ax": {{.a}}, "bx": {{.b}}}`,
- data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
- result: [][]byte{[]byte(`{"a":null,"b":null}`)},
- },
- {
- format: "json",
- dataField: "device",
- fields: []string{"a", "b"},
- data: map[string]interface{}{"device": map[string]interface{}{"a": "1", "b": "2", "c": "3"}, "a": 11, "b": 22, "c": 33},
- result: [][]byte{[]byte(`{"a":"1","b":"2"}`)},
- },
- {
- format: "delimited",
- delimiter: ",",
- fields: []string{"a", "b"},
- dataField: "device",
- data: map[string]interface{}{"device": map[string]interface{}{"a": "1", "b": "2", "c": "3"}, "a": 11, "b": 22, "c": 33},
- result: [][]byte{[]byte(`1,2`)},
- },
- {
- format: "json",
- schemaId: "",
- fields: []string{"a", "b"},
- dt: `{"device": {"a": {{.a}}}}`,
- dataField: "device",
- data: map[string]interface{}{"a": "1", "b": "2", "c": "3"},
- result: [][]byte{[]byte(`{"a":1,"b":null}`)},
- },
- }
- contextLogger := conf.Log.WithField("rule", "TestSinkFields_Apply")
- ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- for i, tt := range tests {
- tf, _ := transform.GenTransform(tt.dt, tt.format, tt.schemaId, tt.delimiter, tt.dataField, tt.fields)
- vCtx := context.WithValue(ctx, context.TransKey, tf)
- mockSink := mocknode.NewMockSink()
- _ = mockSink.Collect(vCtx, tt.data)
- time.Sleep(1 * time.Second)
- results := mockSink.GetResults()
- if !reflect.DeepEqual(tt.result, results) {
- t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
- }
- }
- }
- func TestSinkCache(t *testing.T) {
- conf.InitConf()
- transform.RegisterAdditionalFuncs()
- contextLogger := conf.Log.WithField("rule", "TestSinkCache")
- ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- tests := []struct {
- name string
- config map[string]interface{}
- result [][]byte
- resendResult [][]byte
- }{
- {
- name: "test cache",
- config: map[string]interface{}{
- "enableCache": true,
- },
- result: [][]byte{
- []byte(`[{"a":1}]`),
- []byte(`[{"a":2}]`),
- []byte(`[{"a":3}]`),
- []byte(`[{"a":4}]`),
- []byte(`[{"a":5}]`),
- []byte(`[{"a":6}]`),
- []byte(`[{"a":7}]`),
- []byte(`[{"a":8}]`),
- []byte(`[{"a":9}]`),
- []byte(`[{"a":10}]`),
- },
- },
- {
- name: "test resend",
- config: map[string]interface{}{
- "enableCache": true,
- "resendAlterQueue": true,
- },
- result: [][]byte{
- []byte(`[{"a":2}]`),
- []byte(`[{"a":4}]`),
- []byte(`[{"a":6}]`),
- []byte(`[{"a":8}]`),
- []byte(`[{"a":10}]`),
- },
- resendResult: [][]byte{
- []byte(`[{"a":1}]`),
- []byte(`[{"a":3}]`),
- []byte(`[{"a":5}]`),
- },
- },
- {
- name: "test resend priority low",
- config: map[string]interface{}{
- "enableCache": true,
- "resendAlterQueue": true,
- "resendPriority": -1,
- "resendIndicatorField": "isResend",
- },
- result: [][]byte{
- []byte(`[{"a":2}]`),
- []byte(`[{"a":4}]`),
- []byte(`[{"a":6}]`),
- []byte(`[{"a":8}]`),
- []byte(`[{"a":10}]`),
- },
- resendResult: [][]byte{
- []byte(`[{"a":1,"isResend":true}]`),
- []byte(`[{"a":3,"isResend":true}]`),
- []byte(`[{"a":5,"isResend":true}]`),
- },
- },
- {
- name: "test resend priority high",
- config: map[string]interface{}{
- "enableCache": true,
- "resendAlterQueue": true,
- "resendPriority": 1,
- "batchSize": 1,
- },
- result: [][]byte{
- []byte(`[{"a":2}]`),
- []byte(`[{"a":4}]`),
- []byte(`[{"a":6}]`),
- []byte(`[{"a":8}]`),
- []byte(`[{"a":10}]`),
- },
- resendResult: [][]byte{
- []byte(`[{"a":1}]`),
- []byte(`[{"a":3}]`),
- []byte(`[{"a":5}]`),
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- data := [][]map[string]interface{}{
- {{"a": 1}},
- {{"a": 2}},
- {{"a": 3}},
- {{"a": 4}},
- {{"a": 5}},
- {{"a": 6}},
- {{"a": 7}},
- {{"a": 8}},
- {{"a": 9}},
- {{"a": 10}},
- }
- hitch := make(chan int, 10)
- mockSink := mocknode.NewMockResendSink(hitch)
- fmt.Printf("mockSink: %+v\n", tt.config)
- s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
- s.Open(ctx, make(chan error))
- for i := 0; i < 20; i++ {
- s.input <- data[i%len(data)]
- select {
- case <-hitch:
- done := true
- results := mockSink.GetResults()
- if len(results) != len(tt.result) {
- done = false
- }
- if done && tt.resendResult != nil {
- resentResults := mockSink.GetResendResults()
- if len(resentResults) != len(tt.resendResult) {
- done = false
- }
- }
- if done {
- goto end
- }
- case <-time.After(1 * time.Second):
- }
- }
- end:
- results := mockSink.GetResults()
- minLen := len(results)
- if len(tt.result) < minLen {
- minLen = len(tt.result)
- }
- assert.Equal(t, results[:minLen], tt.result[:minLen])
- if tt.resendResult != nil {
- resentResults := mockSink.GetResendResults()
- minLen = len(resentResults)
- if len(tt.resendResult) < minLen {
- minLen = len(tt.resendResult)
- }
- assert.Equal(t, resentResults[:minLen], tt.resendResult[:minLen])
- }
- })
- }
- }
|