123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455 |
- // Copyright 2021-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package topotest
- import (
- "encoding/json"
- "fmt"
- "reflect"
- "strings"
- "testing"
- "time"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/processor"
- "github.com/lf-edge/ekuiper/internal/testx"
- "github.com/lf-edge/ekuiper/internal/topo"
- "github.com/lf-edge/ekuiper/internal/topo/node"
- "github.com/lf-edge/ekuiper/internal/topo/planner"
- "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
- "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
- "github.com/lf-edge/ekuiper/internal/xsql"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/ast"
- "github.com/lf-edge/ekuiper/pkg/cast"
- )
- func init() {
- testx.InitEnv()
- }
- const POSTLEAP = 1000 // Time change after all data sends out
- type RuleTest struct {
- Name string
- Sql string
- R interface{} // The result
- M map[string]interface{} // final metrics
- T *api.PrintableTopo // printable topo, an optional field
- W int // wait time for each data sending, in milli
- }
- func CompareMetrics(tp *topo.Topo, m map[string]interface{}) (err error) {
- keys, values := tp.GetMetrics()
- for k, v := range m {
- var (
- index int
- key string
- matched bool
- )
- for index, key = range keys {
- if k == key {
- if strings.HasSuffix(k, "process_latency_us") {
- if values[index].(int64) >= v.(int64) {
- matched = true
- continue
- }
- break
- }
- if values[index] == v {
- matched = true
- }
- break
- }
- }
- if matched {
- continue
- }
- if conf.Config.Basic.Debug == true {
- for i, k := range keys {
- conf.Log.Printf("%s:%v", k, values[i])
- }
- }
- // do not find
- if index < len(values) {
- return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%T)\n\ngot=%#v(%T)\n\n", k, v, v, values[index], values[index])
- } else {
- return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
- }
- }
- return nil
- }
- func CommonResultFunc(result [][]byte) interface{} {
- var maps [][]map[string]interface{}
- for _, v := range result {
- var mapRes []map[string]interface{}
- err := json.Unmarshal(v, &mapRes)
- if err != nil {
- panic(fmt.Sprintf("Failed to parse the input %v into map", string(v)))
- }
- maps = append(maps, mapRes)
- }
- return maps
- }
- func DoRuleTest(t *testing.T, tests []RuleTest, j int, opt *api.RuleOption, wait int) {
- doRuleTestBySinkProps(t, tests, j, opt, wait, nil, CommonResultFunc)
- }
- func doRuleTestBySinkProps(t *testing.T, tests []RuleTest, j int, opt *api.RuleOption, w int, sinkProps map[string]interface{}, resultFunc func(result [][]byte) interface{}) {
- fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
- for i, tt := range tests {
- datas, dataLength, tp, mockSink, errCh := createStream(t, tt, j, opt, sinkProps)
- if tp == nil {
- t.Errorf("topo is not created successfully")
- break
- }
- wait := tt.W
- if wait == 0 {
- if w > 0 {
- wait = w
- } else {
- wait = 5
- }
- }
- switch opt.Qos {
- case api.ExactlyOnce:
- wait *= 10
- case api.AtLeastOnce:
- wait *= 3
- }
- var retry int
- if opt.Qos > api.AtMostOnce {
- for retry = 3; retry > 0; retry-- {
- if tp.GetCoordinator() == nil || !tp.GetCoordinator().IsActivated() {
- conf.Log.Debugf("waiting for coordinator ready %d\n", retry)
- time.Sleep(10 * time.Millisecond)
- } else {
- break
- }
- }
- if retry < 0 {
- t.Error("coordinator timeout")
- t.FailNow()
- }
- }
- if err := sendData(t, dataLength, tt.M, datas, errCh, tp, POSTLEAP, wait); err != nil {
- t.Errorf("send data error %s", err)
- break
- }
- compareResult(t, mockSink, resultFunc, tt, i, tp)
- }
- }
- func compareResult(t *testing.T, mockSink *mocknode.MockSink, resultFunc func(result [][]byte) interface{}, tt RuleTest, i int, tp *topo.Topo) {
- // Check results
- results := mockSink.GetResults()
- maps := resultFunc(results)
- if !reflect.DeepEqual(tt.R, maps) {
- t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.Sql, tt.R, maps)
- }
- if err := CompareMetrics(tp, tt.M); err != nil {
- t.Errorf("%d. %q\n\nmetrics mismatch:\n\n%s\n\n", i, tt.Sql, err)
- }
- if tt.T != nil {
- topo := tp.GetTopo()
- if !reflect.DeepEqual(tt.T, topo) {
- t.Errorf("%d. %q\n\ntopo mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.Sql, tt.T, topo)
- }
- }
- tp.Cancel()
- }
- func sendData(t *testing.T, dataLength int, metrics map[string]interface{}, datas [][]*xsql.Tuple, errCh <-chan error, tp *topo.Topo, postleap int, wait int) error {
- // Send data and move time
- mockClock := mockclock.GetMockClock()
- // Set the current time
- mockClock.Add(0)
- // TODO assume multiple data source send the data in order and has the same length
- for i := 0; i < dataLength; i++ {
- // wait for table to load
- time.Sleep(100 * time.Millisecond)
- for _, d := range datas {
- time.Sleep(time.Duration(wait) * time.Millisecond)
- // Make sure time is going forward only
- // gradually add up time to ensure checkpoint is triggered before the data send
- for n := conf.GetNowInMilli() + 100; d[i].Timestamp+100 > n; n += 100 {
- if d[i].Timestamp < n {
- n = d[i].Timestamp
- }
- mockClock.Set(cast.TimeFromUnixMilli(n))
- conf.Log.Debugf("Clock set to %d", conf.GetNowInMilli())
- time.Sleep(1 * time.Millisecond)
- }
- select {
- case err := <-errCh:
- t.Log(err)
- tp.Cancel()
- return err
- default:
- }
- }
- }
- mockClock.Add(time.Duration(postleap) * time.Millisecond)
- conf.Log.Debugf("Clock add to %d", conf.GetNowInMilli())
- // Check if stream done. Poll for metrics,
- time.Sleep(10 * time.Millisecond)
- var retry int
- for retry = 4; retry > 0; retry-- {
- var err error
- if err = CompareMetrics(tp, metrics); err == nil {
- break
- }
- conf.Log.Errorf("check metrics error at %d: %s", retry, err)
- time.Sleep(1000 * time.Millisecond)
- }
- if retry == 0 {
- t.Error("send data timeout")
- } else if retry < 2 {
- conf.Log.Debugf("try %d for metric comparison\n", 2-retry)
- }
- return nil
- }
- func createStream(t *testing.T, tt RuleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}) ([][]*xsql.Tuple, int, *topo.Topo, *mocknode.MockSink, <-chan error) {
- mockclock.ResetClock(1541152486000)
- // Create stream
- var (
- sources []*node.SourceNode
- datas [][]*xsql.Tuple
- dataLength int
- )
- parser := xsql.NewParser(strings.NewReader(tt.Sql))
- if stmt, err := xsql.Language.Parse(parser); err != nil {
- t.Errorf("parse sql %s error: %s", tt.Sql, err)
- } else {
- if selectStmt, ok := stmt.(*ast.SelectStatement); !ok {
- t.Errorf("sql %s is not a select statement", tt.Sql)
- } else {
- streams := xsql.GetStreams(selectStmt)
- for _, stream := range streams {
- data, ok := mocknode.TestData[stream]
- if !ok {
- continue
- }
- dataLength = len(data)
- datas = append(datas, data)
- }
- }
- }
- mockSink := mocknode.NewMockSink()
- sink := node.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
- tp, err := planner.PlanSQLWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.Name, j), Sql: tt.Sql, Options: opt}, sources, []*node.SinkNode{sink})
- if err != nil {
- t.Error(err)
- return nil, 0, nil, nil, nil
- }
- errCh := tp.Open()
- return datas, dataLength, tp, mockSink, errCh
- }
- // Create or drop streams
- func HandleStream(createOrDrop bool, names []string, t *testing.T) {
- p := processor.NewStreamProcessor()
- for _, name := range names {
- var sql string
- if createOrDrop {
- switch name {
- case "demoArr":
- sql = `CREATE STREAM demoArr () WITH (DATASOURCE="demoArr", TYPE="mock", FORMAT="json", KEY="ts");`
- case "demo":
- sql = `CREATE STREAM demo (
- color STRING,
- size BIGINT,
- ts BIGINT
- ) WITH (DATASOURCE="demo", TYPE="mock", FORMAT="json", KEY="ts");`
- case "demoError":
- sql = `CREATE STREAM demoError (
- color STRING,
- size BIGINT,
- ts BIGINT
- ) WITH (DATASOURCE="demoError", TYPE="mock", FORMAT="json", KEY="ts",STRICT_VALIDATION="true");`
- case "demo1":
- sql = `CREATE STREAM demo1 (
- temp FLOAT,
- hum BIGINT,` +
- "`from`" + ` STRING,
- ts BIGINT
- ) WITH (DATASOURCE="demo1", TYPE="mock", FORMAT="json", KEY="ts");`
- case "demoTable":
- sql = `CREATE TABLE demoTable (
- device STRING,
- ts BIGINT
- ) WITH (DATASOURCE="demoTable", TYPE="mock", RETAIN_SIZE="3");`
- case "sessionDemo":
- sql = `CREATE STREAM sessionDemo (
- temp FLOAT,
- hum BIGINT,
- ts BIGINT
- ) WITH (DATASOURCE="sessionDemo", TYPE="mock", FORMAT="json", KEY="ts");`
- case "demoE":
- sql = `CREATE STREAM demoE (
- color STRING,
- size BIGINT,
- ts BIGINT
- ) WITH (DATASOURCE="demoE", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
- case "demo1E":
- sql = `CREATE STREAM demo1E (
- temp FLOAT,
- hum BIGINT,
- ts BIGINT
- ) WITH (DATASOURCE="demo1E", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
- case "sessionDemoE":
- sql = `CREATE STREAM sessionDemoE (
- temp FLOAT,
- hum BIGINT,
- ts BIGINT
- ) WITH (DATASOURCE="sessionDemoE", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
- case "demoErr":
- sql = `CREATE STREAM demoErr (
- color STRING,
- size BIGINT,
- ts BIGINT
- ) WITH (DATASOURCE="demoErr", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts",STRICT_VALIDATION="true");`
- case "ldemo":
- sql = `CREATE STREAM ldemo (
- ) WITH (DATASOURCE="ldemo", TYPE="mock", FORMAT="json");`
- case "ldemo1":
- sql = `CREATE STREAM ldemo1 (
- ) WITH (DATASOURCE="ldemo1", TYPE="mock", FORMAT="json");`
- case "lsessionDemo":
- sql = `CREATE STREAM lsessionDemo (
- ) WITH (DATASOURCE="lsessionDemo", TYPE="mock", FORMAT="json");`
- case "ext":
- sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"ext\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\",STRICT_VALIDATION=\"true\")"
- case "ext2":
- sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"ext2\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
- case "extpy":
- sql = "CREATE STREAM extpy (name string, value bigint) WITH (FORMAT=\"JSON\", TYPE=\"pyjson\", CONF_KEY=\"ext\")"
- case "text":
- sql = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"text\", TYPE=\"mock\", FORMAT=\"JSON\")"
- case "binDemo":
- sql = "CREATE STREAM binDemo () WITH (DATASOURCE=\"binDemo\", TYPE=\"mock\", FORMAT=\"BINARY\")"
- case "table1":
- sql = `CREATE TABLE table1 (
- name STRING,
- size BIGINT,
- id BIGINT
- ) WITH (DATASOURCE="lookup.json", FORMAT="json", CONF_KEY="test");`
- case "helloStr":
- sql = `CREATE STREAM helloStr (name string) WITH (DATASOURCE="helloStr", TYPE="mock", FORMAT="JSON")`
- case "commands":
- sql = `CREATE STREAM commands (cmd string, base64_img string, encoded_json string) WITH (DATASOURCE="commands", FORMAT="JSON", TYPE="mock")`
- case "fakeBin":
- sql = "CREATE STREAM fakeBin () WITH (DATASOURCE=\"fakeBin\", TYPE=\"mock\", FORMAT=\"BINARY\")"
- case "shelves":
- sql = `CREATE STREAM shelves (
- name string,
- size BIGINT,
- shelf STRUCT(theme STRING,id BIGINT, subfield STRING)
- ) WITH (DATASOURCE="shelves", TYPE="mock", FORMAT="json");`
- case "mes":
- sql = `CREATE STREAM mes (message_id string, text string) WITH (DATASOURCE="mes", TYPE="mock", FORMAT="JSON")`
- default:
- t.Errorf("create stream %s fail", name)
- }
- } else {
- if strings.Index(name, "table") == 0 {
- sql = `DROP TABLE ` + name
- } else {
- sql = `DROP STREAM ` + name
- }
- }
- _, err := p.ExecStmt(sql)
- if err != nil {
- t.Log(err)
- }
- }
- }
- type RuleCheckpointTest struct {
- RuleTest
- PauseSize int // Stop stream after sending pauseSize source to test checkpoint resume
- Cc int // checkpoint count when paused
- PauseMetric map[string]interface{} // The metric to check when paused
- }
- func DoCheckpointRuleTest(t *testing.T, tests []RuleCheckpointTest, j int, opt *api.RuleOption) {
- fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
- for i, tt := range tests {
- datas, dataLength, tp, mockSink, errCh := createStream(t, tt.RuleTest, j, opt, nil)
- if tp == nil {
- t.Errorf("topo is not created successfully")
- break
- }
- var retry int
- for retry = 10; retry > 0; retry-- {
- if tp.GetCoordinator() == nil || !tp.GetCoordinator().IsActivated() {
- conf.Log.Debugf("waiting for coordinator ready %d\n", retry)
- time.Sleep(10 * time.Millisecond)
- } else {
- break
- }
- }
- if retry == 0 {
- t.Error("coordinator timeout")
- t.FailNow()
- }
- conf.Log.Debugf("Start sending first phase data done at %d", conf.GetNowInMilli())
- if err := sendData(t, tt.PauseSize, tt.PauseMetric, datas, errCh, tp, 100, 100); err != nil {
- t.Errorf("first phase send data error %s", err)
- break
- }
- conf.Log.Debugf("Send first phase data done at %d", conf.GetNowInMilli())
- // compare checkpoint count
- time.Sleep(10 * time.Millisecond)
- for retry = 3; retry > 0; retry-- {
- actual := tp.GetCoordinator().GetCompleteCount()
- if tt.Cc == actual {
- break
- }
- conf.Log.Debugf("check checkpointCount error at %d: %d\n", retry, actual)
- time.Sleep(200 * time.Millisecond)
- }
- cc := tp.GetCoordinator().GetCompleteCount()
- tp.Cancel()
- if retry == 0 {
- t.Errorf("%d-%d. checkpoint count\n\nresult mismatch:\n\nexp=%#v\n\ngot=%d\n\n", i, j, tt.Cc, cc)
- return
- } else if retry < 3 {
- conf.Log.Debugf("try %d for checkpoint count\n", 4-retry)
- }
- tp.Cancel()
- time.Sleep(10 * time.Millisecond)
- // resume stream
- conf.Log.Debugf("Resume stream at %d", conf.GetNowInMilli())
- errCh = tp.Open()
- conf.Log.Debugf("After open stream at %d", conf.GetNowInMilli())
- if err := sendData(t, dataLength, tt.M, datas, errCh, tp, POSTLEAP, 10); err != nil {
- t.Errorf("second phase send data error %s", err)
- break
- }
- compareResult(t, mockSink, CommonResultFunc, tt.RuleTest, i, tp)
- }
- }
- func CreateRule(name, sql string) (*api.Rule, error) {
- p := processor.NewRuleProcessor()
- p.ExecDrop(name)
- return p.ExecCreateWithValidation(name, sql)
- }
|