mock_topo.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "reflect"
  19. "strings"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/assert"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/processor"
  25. "github.com/lf-edge/ekuiper/internal/testx"
  26. "github.com/lf-edge/ekuiper/internal/topo"
  27. "github.com/lf-edge/ekuiper/internal/topo/node"
  28. "github.com/lf-edge/ekuiper/internal/topo/planner"
  29. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  30. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  31. "github.com/lf-edge/ekuiper/internal/xsql"
  32. "github.com/lf-edge/ekuiper/pkg/api"
  33. "github.com/lf-edge/ekuiper/pkg/ast"
  34. "github.com/lf-edge/ekuiper/pkg/cast"
  35. )
  36. func init() {
  37. testx.InitEnv()
  38. }
  39. const POSTLEAP = 1000 // Time change after all data sends out
  40. type RuleTest struct {
  41. Name string
  42. Sql string
  43. R interface{} // The result
  44. M map[string]interface{} // final metrics
  45. T *api.PrintableTopo // printable topo, an optional field
  46. W int // wait time for each data sending, in milli
  47. }
  48. func CompareMetrics(tp *topo.Topo, m map[string]interface{}) (err error) {
  49. keys, values := tp.GetMetrics()
  50. for k, v := range m {
  51. var (
  52. index int
  53. key string
  54. matched bool
  55. )
  56. for index, key = range keys {
  57. if k == key {
  58. if strings.HasSuffix(k, "process_latency_us") {
  59. if values[index].(int64) >= v.(int64) {
  60. matched = true
  61. continue
  62. }
  63. break
  64. }
  65. if values[index] == v {
  66. matched = true
  67. }
  68. break
  69. }
  70. }
  71. if matched {
  72. continue
  73. }
  74. if conf.Config.Basic.Debug == true {
  75. for i, k := range keys {
  76. conf.Log.Printf("%s:%v", k, values[i])
  77. }
  78. }
  79. // do not find
  80. if index < len(values) {
  81. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%T)\n\ngot=%#v(%T)\n\n", k, v, v, values[index], values[index])
  82. } else {
  83. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  84. }
  85. }
  86. return nil
  87. }
  88. func CommonResultFunc(result [][]byte) interface{} {
  89. var maps [][]map[string]interface{}
  90. for _, v := range result {
  91. var mapRes []map[string]interface{}
  92. err := json.Unmarshal(v, &mapRes)
  93. if err != nil {
  94. panic(fmt.Sprintf("Failed to parse the input %v into map", string(v)))
  95. }
  96. maps = append(maps, mapRes)
  97. }
  98. return maps
  99. }
  100. func DoRuleTest(t *testing.T, tests []RuleTest, j int, opt *api.RuleOption, wait int) {
  101. doRuleTestBySinkProps(t, tests, j, opt, wait, nil, CommonResultFunc)
  102. }
  103. func doRuleTestBySinkProps(t *testing.T, tests []RuleTest, j int, opt *api.RuleOption, w int, sinkProps map[string]interface{}, resultFunc func(result [][]byte) interface{}) {
  104. for i, tt := range tests {
  105. t.Run(tt.Name, func(t *testing.T) {
  106. datas, dataLength, tp, mockSink, errCh := createStream(t, tt, j, opt, sinkProps)
  107. if tp == nil {
  108. t.Errorf("topo is not created successfully")
  109. return
  110. }
  111. wait := tt.W
  112. if wait == 0 {
  113. if w > 0 {
  114. wait = w
  115. } else {
  116. wait = 5
  117. }
  118. }
  119. switch opt.Qos {
  120. case api.ExactlyOnce:
  121. wait *= 10
  122. case api.AtLeastOnce:
  123. wait *= 3
  124. }
  125. var retry int
  126. if opt.Qos > api.AtMostOnce {
  127. for retry = 3; retry > 0; retry-- {
  128. if tp.GetCoordinator() == nil || !tp.GetCoordinator().IsActivated() {
  129. conf.Log.Debugf("waiting for coordinator ready %d\n", retry)
  130. time.Sleep(10 * time.Millisecond)
  131. } else {
  132. break
  133. }
  134. }
  135. if retry < 0 {
  136. t.Error("coordinator timeout")
  137. t.FailNow()
  138. }
  139. }
  140. if err := sendData(t, dataLength, tt.M, datas, errCh, tp, POSTLEAP, wait); err != nil {
  141. t.Errorf("send data error %s", err)
  142. return
  143. }
  144. compareResult(t, mockSink, resultFunc, tt, i, tp)
  145. })
  146. }
  147. }
  148. func compareResult(t *testing.T, mockSink *mocknode.MockSink, resultFunc func(result [][]byte) interface{}, tt RuleTest, i int, tp *topo.Topo) {
  149. // Check results
  150. results := mockSink.GetResults()
  151. maps := resultFunc(results)
  152. assert.Equal(t, tt.R, maps)
  153. if err := CompareMetrics(tp, tt.M); err != nil {
  154. t.Errorf("%d. %q\n\nmetrics mismatch:\n\n%s\n\n", i, tt.Sql, err)
  155. }
  156. if tt.T != nil {
  157. topo := tp.GetTopo()
  158. if !reflect.DeepEqual(tt.T, topo) {
  159. t.Errorf("%d. %q\n\ntopo mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.Sql, tt.T, topo)
  160. }
  161. }
  162. tp.Cancel()
  163. }
  164. func sendData(t *testing.T, dataLength int, metrics map[string]interface{}, datas [][]*xsql.Tuple, errCh <-chan error, tp *topo.Topo, postleap int, wait int) error {
  165. // Send data and move time
  166. mockClock := mockclock.GetMockClock()
  167. // Set the current time
  168. mockClock.Add(0)
  169. // TODO assume multiple data source send the data in order and has the same length
  170. for i := 0; i < dataLength; i++ {
  171. // wait for table to load
  172. time.Sleep(100 * time.Millisecond)
  173. for _, d := range datas {
  174. time.Sleep(time.Duration(wait) * time.Millisecond)
  175. // Make sure time is going forward only
  176. // gradually add up time to ensure checkpoint is triggered before the data send
  177. for n := conf.GetNowInMilli() + 100; d[i].Timestamp+100 > n; n += 100 {
  178. if d[i].Timestamp < n {
  179. n = d[i].Timestamp
  180. }
  181. mockClock.Set(cast.TimeFromUnixMilli(n))
  182. conf.Log.Debugf("Clock set to %d", conf.GetNowInMilli())
  183. time.Sleep(1 * time.Millisecond)
  184. }
  185. select {
  186. case err := <-errCh:
  187. t.Log(err)
  188. tp.Cancel()
  189. return err
  190. default:
  191. }
  192. }
  193. }
  194. mockClock.Add(time.Duration(postleap) * time.Millisecond)
  195. conf.Log.Debugf("Clock add to %d", conf.GetNowInMilli())
  196. // Check if stream done. Poll for metrics,
  197. time.Sleep(10 * time.Millisecond)
  198. var retry int
  199. for retry = 4; retry > 0; retry-- {
  200. var err error
  201. if err = CompareMetrics(tp, metrics); err == nil {
  202. break
  203. }
  204. conf.Log.Errorf("check metrics error at %d: %s", retry, err)
  205. time.Sleep(1000 * time.Millisecond)
  206. }
  207. if retry == 0 {
  208. t.Error("send data timeout")
  209. } else if retry < 2 {
  210. conf.Log.Debugf("try %d for metric comparison\n", 2-retry)
  211. }
  212. return nil
  213. }
  214. func createStream(t *testing.T, tt RuleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}) ([][]*xsql.Tuple, int, *topo.Topo, *mocknode.MockSink, <-chan error) {
  215. mockclock.ResetClock(1541152486000)
  216. // Create stream
  217. var (
  218. sources []*node.SourceNode
  219. datas [][]*xsql.Tuple
  220. dataLength int
  221. )
  222. parser := xsql.NewParser(strings.NewReader(tt.Sql))
  223. if stmt, err := xsql.Language.Parse(parser); err != nil {
  224. t.Errorf("parse sql %s error: %s", tt.Sql, err)
  225. } else {
  226. if selectStmt, ok := stmt.(*ast.SelectStatement); !ok {
  227. t.Errorf("sql %s is not a select statement", tt.Sql)
  228. } else {
  229. streams := xsql.GetStreams(selectStmt)
  230. for _, stream := range streams {
  231. data, ok := mocknode.TestData[stream]
  232. if !ok {
  233. continue
  234. }
  235. dataLength = len(data)
  236. datas = append(datas, data)
  237. }
  238. }
  239. }
  240. mockSink := mocknode.NewMockSink()
  241. sink := node.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
  242. tp, err := planner.PlanSQLWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.Name, j), Sql: tt.Sql, Options: opt}, sources, []*node.SinkNode{sink})
  243. if err != nil {
  244. t.Error(err)
  245. return nil, 0, nil, nil, nil
  246. }
  247. errCh := tp.Open()
  248. return datas, dataLength, tp, mockSink, errCh
  249. }
  250. // Create or drop streams
  251. func HandleStream(createOrDrop bool, names []string, t *testing.T) {
  252. p := processor.NewStreamProcessor()
  253. for _, name := range names {
  254. var sql string
  255. if createOrDrop {
  256. switch name {
  257. case "demoE2":
  258. sql = `CREATE STREAM demoE2 () WITH (DATASOURCE="demoE2", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  259. case "demoArr2":
  260. sql = `CREATE STREAM demoArr2 () WITH (DATASOURCE="demoArr2", TYPE="mock", FORMAT="json", KEY="ts");`
  261. case "demoArr":
  262. sql = `CREATE STREAM demoArr () WITH (DATASOURCE="demoArr", TYPE="mock", FORMAT="json", KEY="ts");`
  263. case "demo":
  264. sql = `CREATE STREAM demo (
  265. color STRING,
  266. size BIGINT,
  267. ts BIGINT
  268. ) WITH (DATASOURCE="demo", TYPE="mock", FORMAT="json", KEY="ts");`
  269. case "demoError":
  270. sql = `CREATE STREAM demoError (
  271. color STRING,
  272. size BIGINT,
  273. ts BIGINT
  274. ) WITH (DATASOURCE="demoError", TYPE="mock", FORMAT="json", KEY="ts",STRICT_VALIDATION="true");`
  275. case "demo1":
  276. sql = `CREATE STREAM demo1 (
  277. temp FLOAT,
  278. hum BIGINT,` +
  279. "`from`" + ` STRING,
  280. ts BIGINT
  281. ) WITH (DATASOURCE="demo1", TYPE="mock", FORMAT="json", KEY="ts");`
  282. case "demoTable":
  283. sql = `CREATE TABLE demoTable (
  284. device STRING,
  285. ts BIGINT
  286. ) WITH (DATASOURCE="demoTable", TYPE="mock", RETAIN_SIZE="3");`
  287. case "sessionDemo":
  288. sql = `CREATE STREAM sessionDemo (
  289. temp FLOAT,
  290. hum BIGINT,
  291. ts BIGINT
  292. ) WITH (DATASOURCE="sessionDemo", TYPE="mock", FORMAT="json", KEY="ts");`
  293. case "demoE":
  294. sql = `CREATE STREAM demoE (
  295. color STRING,
  296. size BIGINT,
  297. ts BIGINT
  298. ) WITH (DATASOURCE="demoE", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  299. case "demo1E":
  300. sql = `CREATE STREAM demo1E (
  301. temp FLOAT,
  302. hum BIGINT,
  303. ts BIGINT
  304. ) WITH (DATASOURCE="demo1E", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  305. case "sessionDemoE":
  306. sql = `CREATE STREAM sessionDemoE (
  307. temp FLOAT,
  308. hum BIGINT,
  309. ts BIGINT
  310. ) WITH (DATASOURCE="sessionDemoE", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  311. case "demoErr":
  312. sql = `CREATE STREAM demoErr (
  313. color STRING,
  314. size BIGINT,
  315. ts BIGINT
  316. ) WITH (DATASOURCE="demoErr", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts",STRICT_VALIDATION="true");`
  317. case "ldemo":
  318. sql = `CREATE STREAM ldemo (
  319. ) WITH (DATASOURCE="ldemo", TYPE="mock", FORMAT="json");`
  320. case "ldemo1":
  321. sql = `CREATE STREAM ldemo1 (
  322. ) WITH (DATASOURCE="ldemo1", TYPE="mock", FORMAT="json");`
  323. case "lsessionDemo":
  324. sql = `CREATE STREAM lsessionDemo (
  325. ) WITH (DATASOURCE="lsessionDemo", TYPE="mock", FORMAT="json");`
  326. case "ext":
  327. sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"ext\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\",STRICT_VALIDATION=\"true\")"
  328. case "ext2":
  329. sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"ext2\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  330. case "extpy":
  331. sql = "CREATE STREAM extpy (name string, value bigint) WITH (FORMAT=\"JSON\", TYPE=\"pyjson\", CONF_KEY=\"ext\")"
  332. case "text":
  333. sql = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"text\", TYPE=\"mock\", FORMAT=\"JSON\")"
  334. case "binDemo":
  335. sql = "CREATE STREAM binDemo () WITH (DATASOURCE=\"binDemo\", TYPE=\"mock\", FORMAT=\"BINARY\")"
  336. case "table1":
  337. sql = `CREATE TABLE table1 (
  338. name STRING,
  339. size BIGINT,
  340. id BIGINT
  341. ) WITH (DATASOURCE="lookup.json", FORMAT="json", CONF_KEY="test");`
  342. case "helloStr":
  343. sql = `CREATE STREAM helloStr (name string) WITH (DATASOURCE="helloStr", TYPE="mock", FORMAT="JSON")`
  344. case "commands":
  345. sql = `CREATE STREAM commands (cmd string, base64_img string, encoded_json string) WITH (DATASOURCE="commands", FORMAT="JSON", TYPE="mock")`
  346. case "fakeBin":
  347. sql = "CREATE STREAM fakeBin () WITH (DATASOURCE=\"fakeBin\", TYPE=\"mock\", FORMAT=\"BINARY\")"
  348. case "shelves":
  349. sql = `CREATE STREAM shelves (
  350. name string,
  351. size BIGINT,
  352. shelf STRUCT(theme STRING,id BIGINT, subfield STRING)
  353. ) WITH (DATASOURCE="shelves", TYPE="mock", FORMAT="json");`
  354. case "mes":
  355. sql = `CREATE STREAM mes (message_id string, text string) WITH (DATASOURCE="mes", TYPE="mock", FORMAT="JSON")`
  356. default:
  357. t.Errorf("create stream %s fail", name)
  358. }
  359. } else {
  360. if strings.Index(name, "table") == 0 {
  361. sql = `DROP TABLE ` + name
  362. } else {
  363. sql = `DROP STREAM ` + name
  364. }
  365. }
  366. _, err := p.ExecStmt(sql)
  367. if err != nil {
  368. t.Log(err)
  369. }
  370. }
  371. }
  372. type RuleCheckpointTest struct {
  373. RuleTest
  374. PauseSize int // Stop stream after sending pauseSize source to test checkpoint resume
  375. Cc int // checkpoint count when paused
  376. PauseMetric map[string]interface{} // The metric to check when paused
  377. }
  378. func DoCheckpointRuleTest(t *testing.T, tests []RuleCheckpointTest, j int, opt *api.RuleOption) {
  379. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  380. for i, tt := range tests {
  381. datas, dataLength, tp, mockSink, errCh := createStream(t, tt.RuleTest, j, opt, nil)
  382. if tp == nil {
  383. t.Errorf("topo is not created successfully")
  384. break
  385. }
  386. var retry int
  387. for retry = 10; retry > 0; retry-- {
  388. if tp.GetCoordinator() == nil || !tp.GetCoordinator().IsActivated() {
  389. conf.Log.Debugf("waiting for coordinator ready %d\n", retry)
  390. time.Sleep(10 * time.Millisecond)
  391. } else {
  392. break
  393. }
  394. }
  395. if retry == 0 {
  396. t.Error("coordinator timeout")
  397. t.FailNow()
  398. }
  399. conf.Log.Debugf("Start sending first phase data done at %d", conf.GetNowInMilli())
  400. if err := sendData(t, tt.PauseSize, tt.PauseMetric, datas, errCh, tp, 100, 100); err != nil {
  401. t.Errorf("first phase send data error %s", err)
  402. break
  403. }
  404. conf.Log.Debugf("Send first phase data done at %d", conf.GetNowInMilli())
  405. // compare checkpoint count
  406. time.Sleep(10 * time.Millisecond)
  407. for retry = 3; retry > 0; retry-- {
  408. actual := tp.GetCoordinator().GetCompleteCount()
  409. if tt.Cc == actual {
  410. break
  411. }
  412. conf.Log.Debugf("check checkpointCount error at %d: %d\n", retry, actual)
  413. time.Sleep(200 * time.Millisecond)
  414. }
  415. cc := tp.GetCoordinator().GetCompleteCount()
  416. tp.Cancel()
  417. if retry == 0 {
  418. t.Errorf("%d-%d. checkpoint count\n\nresult mismatch:\n\nexp=%#v\n\ngot=%d\n\n", i, j, tt.Cc, cc)
  419. return
  420. } else if retry < 3 {
  421. conf.Log.Debugf("try %d for checkpoint count\n", 4-retry)
  422. }
  423. tp.Cancel()
  424. time.Sleep(10 * time.Millisecond)
  425. // resume stream
  426. conf.Log.Debugf("Resume stream at %d", conf.GetNowInMilli())
  427. errCh = tp.Open()
  428. conf.Log.Debugf("After open stream at %d", conf.GetNowInMilli())
  429. if err := sendData(t, dataLength, tt.M, datas, errCh, tp, POSTLEAP, 10); err != nil {
  430. t.Errorf("second phase send data error %s", err)
  431. break
  432. }
  433. compareResult(t, mockSink, CommonResultFunc, tt.RuleTest, i, tp)
  434. }
  435. }
  436. func CreateRule(name, sql string) (*api.Rule, error) {
  437. p := processor.NewRuleProcessor()
  438. p.ExecDrop(name)
  439. return p.ExecCreateWithValidation(name, sql)
  440. }