// Copyright 2022-2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build template || !core
// +build template !core
package node
import (
"errors"
"fmt"
"github.com/lf-edge/ekuiper/internal/testx"
"os"
"path/filepath"
"reflect"
"testing"
"time"
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/schema"
"github.com/lf-edge/ekuiper/internal/topo/context"
"github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
"github.com/lf-edge/ekuiper/internal/topo/transform"
"github.com/lf-edge/ekuiper/internal/xsql"
)
func init() {
testx.InitEnv()
}
func TestSinkTemplate_Apply(t *testing.T) {
conf.InitConf()
transform.RegisterAdditionalFuncs()
var tests = []struct {
config map[string]interface{}
data []map[string]interface{}
result [][]byte
}{
{
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `
results
{{range .}}- {{.ab}}
{{end}}
`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`results
`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{"content":{{toJson .}}}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
}, {
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"newab":"{{.ab}}"}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
}, {
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"newab":"{{.ab}}"}`,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
}, {
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
},
data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
},
data: []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
}, {
config: map[string]interface{}{
"dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
},
data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
},
data: []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
result: [][]byte{[]byte(`{"result":2}`)},
}, {
config: map[string]interface{}{
"dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
"sendSingle": true,
},
data: []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := conf.Log.WithField("rule", "TestSinkTemplate_Apply")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
for i, tt := range tests {
mockSink := mocknode.NewMockSink()
s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
s.Open(ctx, make(chan error))
s.input <- tt.data
time.Sleep(1 * time.Second)
results := mockSink.GetResults()
if !reflect.DeepEqual(tt.result, results) {
t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
}
}
}
func TestOmitEmpty_Apply(t *testing.T) {
conf.InitConf()
var tests = []struct {
config map[string]interface{}
data []map[string]interface{}
result [][]byte
}{
{ // 0
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": true,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{"ab":"hello2"}`)},
}, { // 1
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": true,
},
data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
}, { // 2
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": false,
},
data: []map[string]interface{}{},
result: [][]byte{[]byte(`[]`)},
}, { // 3
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": false,
},
data: nil,
result: [][]byte{[]byte(`null`)},
}, { // 4
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": false,
},
data: []map[string]interface{}{},
result: nil,
}, { // 5
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": true,
},
data: []map[string]interface{}{},
result: nil,
}, { // 6
config: map[string]interface{}{
"sendSingle": false,
"omitIfEmpty": true,
},
data: nil,
result: nil,
}, { // 7
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": false,
},
data: []map[string]interface{}{},
result: nil,
}, { // 8
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": true,
},
data: []map[string]interface{}{{"ab": "hello1"}, {}},
result: [][]byte{[]byte(`{"ab":"hello1"}`)},
}, { // 9
config: map[string]interface{}{
"sendSingle": true,
"omitIfEmpty": false,
},
data: []map[string]interface{}{{"ab": "hello1"}, {}},
result: [][]byte{[]byte(`{"ab":"hello1"}`), []byte(`{}`)},
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := conf.Log.WithField("rule", "TestOmitEmpty_Apply")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
for i, tt := range tests {
mockSink := mocknode.NewMockSink()
s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
s.Open(ctx, make(chan error))
s.input <- tt.data
time.Sleep(100 * time.Millisecond)
results := mockSink.GetResults()
if !reflect.DeepEqual(tt.result, results) {
t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
}
}
}
func TestFormat_Apply(t *testing.T) {
conf.InitConf()
etcDir, err := conf.GetDataLoc()
if err != nil {
t.Fatal(err)
}
etcDir = filepath.Join(etcDir, "schemas", "protobuf")
err = os.MkdirAll(etcDir, os.ModePerm)
if err != nil {
t.Fatal(err)
}
//Copy init.proto
bytesRead, err := os.ReadFile("../../schema/test/test1.proto")
if err != nil {
t.Fatal(err)
}
err = os.WriteFile(filepath.Join(etcDir, "test1.proto"), bytesRead, 0755)
if err != nil {
t.Fatal(err)
}
defer func() {
err = os.RemoveAll(etcDir)
if err != nil {
t.Fatal(err)
}
}()
err = schema.InitRegistry()
if err != nil {
t.Fatal(err)
}
transform.RegisterAdditionalFuncs()
var tests = []struct {
config map[string]interface{}
data []map[string]interface{}
result [][]byte
}{
{
config: map[string]interface{}{
"sendSingle": true,
"format": `protobuf`,
"schemaId": "test1.Person",
},
data: []map[string]interface{}{{
"name": "test",
"id": 1,
"email": "Dddd",
}},
result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
}, {
config: map[string]interface{}{
"sendSingle": true,
"dataTemplate": `{"name":"test","email":"{{.ab}}","id":1}`,
"format": `protobuf`,
"schemaId": "test1.Person",
},
data: []map[string]interface{}{{"ab": "Dddd"}},
result: [][]byte{{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74, 0x10, 0x01, 0x1a, 0x04, 0x44, 0x64, 0x64, 0x64}},
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := conf.Log.WithField("rule", "TestSinkFormat_Apply")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
for i, tt := range tests {
mockSink := mocknode.NewMockSink()
s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
s.Open(ctx, make(chan error))
s.input <- tt.data
var results [][]byte
// try at most 5 seconds
for j := 0; j < 10; j++ {
time.Sleep(500 * time.Millisecond)
results = mockSink.GetResults()
if len(results) > 0 {
break
}
}
if !reflect.DeepEqual(tt.result, results) {
t.Errorf("%d \tresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.result, results)
}
}
}
func TestConfig(t *testing.T) {
var tests = []struct {
config map[string]interface{}
sconf *SinkConf
err error
}{
{
config: map[string]interface{}{
"sendSingle": true,
},
sconf: &SinkConf{
Concurrency: 1,
SendSingle: true,
Format: "json",
BufferLength: 1024,
SinkConf: conf.SinkConf{
MemoryCacheThreshold: 1024,
MaxDiskCache: 1024000,
BufferPageSize: 256,
EnableCache: false,
ResendInterval: 0,
CleanCacheAtStop: false,
},
},
}, {
config: map[string]interface{}{
"enableCache": true,
"memoryCacheThreshold": 2,
"bufferPageSize": 2,
"sendSingle": true,
"maxDiskCache": 6,
"resendInterval": 10,
},
sconf: &SinkConf{
Concurrency: 1,
SendSingle: true,
Format: "json",
BufferLength: 1024,
SinkConf: conf.SinkConf{
MemoryCacheThreshold: 2,
MaxDiskCache: 6,
BufferPageSize: 2,
EnableCache: true,
ResendInterval: 10,
CleanCacheAtStop: false,
},
},
}, {
config: map[string]interface{}{
"enableCache": true,
"memoryCacheThreshold": 2,
"bufferPageSize": 2,
"runAsync": true,
"maxDiskCache": 6,
"resendInterval": 10,
},
err: errors.New("cache is not supported for async sink, do not use enableCache and runAsync properties together"),
}, {
config: map[string]interface{}{
"enableCache": true,
"memoryCacheThreshold": 256,
"bufferLength": 10,
"maxDiskCache": 6,
"resendInterval": 10,
},
err: errors.New("invalid cache properties: maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"),
}, {
config: map[string]interface{}{
"enableCache": true,
"memoryCacheThreshold": 7,
"bufferPageSize": 3,
"sendSingle": true,
"maxDiskCache": 21,
"resendInterval": 10,
},
err: errors.New("invalid cache properties: memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"),
}, {
config: map[string]interface{}{
"enableCache": true,
"memoryCacheThreshold": 9,
"bufferPageSize": 3,
"sendSingle": true,
"maxDiskCache": 22,
"resendInterval": 10,
},
err: errors.New("invalid cache properties: maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"),
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
contextLogger := conf.Log.WithField("rule", "TestConfig")
conf.InitConf()
for i, tt := range tests {
mockSink := NewSinkNode(fmt.Sprintf("test_%d", i), "mockSink", tt.config)
sconf, err := mockSink.parseConf(contextLogger)
if !reflect.DeepEqual(tt.err, err) {
t.Errorf("%d \terror mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.err, err)
} else if !reflect.DeepEqual(tt.sconf, sconf) {
t.Errorf("%d \tresult mismatch:\n\nexp=%v\n\ngot=%v\n\n", i, tt.sconf, sconf)
}
}
}