mock_topo.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. package topotest
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/lf-edge/ekuiper/internal/conf"
  6. "github.com/lf-edge/ekuiper/internal/processor"
  7. "github.com/lf-edge/ekuiper/internal/testx"
  8. "github.com/lf-edge/ekuiper/internal/topo"
  9. "github.com/lf-edge/ekuiper/internal/topo/node"
  10. "github.com/lf-edge/ekuiper/internal/topo/planner"
  11. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  12. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  13. "github.com/lf-edge/ekuiper/internal/xsql"
  14. "github.com/lf-edge/ekuiper/pkg/api"
  15. "github.com/lf-edge/ekuiper/pkg/ast"
  16. "github.com/lf-edge/ekuiper/pkg/cast"
  17. "path"
  18. "reflect"
  19. "strings"
  20. "testing"
  21. "time"
  22. )
  23. const POSTLEAP = 1000 // Time change after all data sends out
  24. type RuleTest struct {
  25. Name string
  26. Sql string
  27. R interface{} // The result
  28. M map[string]interface{} // final metrics
  29. T *topo.PrintableTopo // printable topo, an optional field
  30. W int // wait time for each data sending, in milli
  31. }
  32. var (
  33. DbDir = testx.GetDbDir()
  34. )
  35. func compareMetrics(tp *topo.Topo, m map[string]interface{}) (err error) {
  36. keys, values := tp.GetMetrics()
  37. for k, v := range m {
  38. var (
  39. index int
  40. key string
  41. matched bool
  42. )
  43. for index, key = range keys {
  44. if k == key {
  45. if strings.HasSuffix(k, "process_latency_us") {
  46. if values[index].(int64) >= v.(int64) {
  47. matched = true
  48. continue
  49. } else {
  50. break
  51. }
  52. }
  53. if values[index] == v {
  54. matched = true
  55. }
  56. break
  57. }
  58. }
  59. if matched {
  60. continue
  61. }
  62. if conf.Config.Basic.Debug == true {
  63. for i, k := range keys {
  64. conf.Log.Printf("%s:%v", k, values[i])
  65. }
  66. }
  67. //do not find
  68. if index < len(values) {
  69. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%T)\n\ngot=%#v(%T)\n\n", k, v, v, values[index], values[index])
  70. } else {
  71. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  72. }
  73. }
  74. return nil
  75. }
  76. func commonResultFunc(result [][]byte) interface{} {
  77. var maps [][]map[string]interface{}
  78. for _, v := range result {
  79. var mapRes []map[string]interface{}
  80. err := json.Unmarshal(v, &mapRes)
  81. if err != nil {
  82. panic("Failed to parse the input into map")
  83. }
  84. maps = append(maps, mapRes)
  85. }
  86. return maps
  87. }
  88. func DoRuleTest(t *testing.T, tests []RuleTest, j int, opt *api.RuleOption, wait int) {
  89. doRuleTestBySinkProps(t, tests, j, opt, wait, nil, commonResultFunc)
  90. }
  91. func doRuleTestBySinkProps(t *testing.T, tests []RuleTest, j int, opt *api.RuleOption, w int, sinkProps map[string]interface{}, resultFunc func(result [][]byte) interface{}) {
  92. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  93. for i, tt := range tests {
  94. datas, dataLength, tp, mockSink, errCh := createStream(t, tt, j, opt, sinkProps)
  95. if tp == nil {
  96. t.Errorf("topo is not created successfully")
  97. break
  98. }
  99. wait := tt.W
  100. if wait == 0 {
  101. if w > 0 {
  102. wait = w
  103. } else {
  104. wait = 5
  105. }
  106. }
  107. switch opt.Qos {
  108. case api.ExactlyOnce:
  109. wait *= 10
  110. case api.AtLeastOnce:
  111. wait *= 3
  112. }
  113. var retry int
  114. if opt.Qos > api.AtMostOnce {
  115. for retry = 3; retry > 0; retry-- {
  116. if tp.GetCoordinator() == nil || !tp.GetCoordinator().IsActivated() {
  117. conf.Log.Debugf("waiting for coordinator ready %d\n", retry)
  118. time.Sleep(10 * time.Millisecond)
  119. } else {
  120. break
  121. }
  122. }
  123. if retry < 0 {
  124. t.Error("coordinator timeout")
  125. t.FailNow()
  126. }
  127. }
  128. if err := sendData(t, dataLength, tt.M, datas, errCh, tp, POSTLEAP, wait); err != nil {
  129. t.Errorf("send data error %s", err)
  130. break
  131. }
  132. compareResult(t, mockSink, resultFunc, tt, i, tp)
  133. }
  134. }
  135. func compareResult(t *testing.T, mockSink *mocknode.MockSink, resultFunc func(result [][]byte) interface{}, tt RuleTest, i int, tp *topo.Topo) {
  136. // Check results
  137. results := mockSink.GetResults()
  138. maps := resultFunc(results)
  139. if !reflect.DeepEqual(tt.R, maps) {
  140. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.Sql, tt.R, maps)
  141. }
  142. if err := compareMetrics(tp, tt.M); err != nil {
  143. t.Errorf("%d. %q\n\nmetrics mismatch:\n\n%s\n\n", i, tt.Sql, err)
  144. }
  145. if tt.T != nil {
  146. topo := tp.GetTopo()
  147. if !reflect.DeepEqual(tt.T, topo) {
  148. t.Errorf("%d. %q\n\ntopo mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.Sql, tt.T, topo)
  149. }
  150. }
  151. tp.Cancel()
  152. }
  153. func sendData(t *testing.T, dataLength int, metrics map[string]interface{}, datas [][]*xsql.Tuple, errCh <-chan error, tp *topo.Topo, postleap int, wait int) error {
  154. // Send data and move time
  155. mockClock := mockclock.GetMockClock()
  156. // Set the current time
  157. mockClock.Add(0)
  158. // TODO assume multiple data source send the data in order and has the same length
  159. for i := 0; i < dataLength; i++ {
  160. for _, d := range datas {
  161. time.Sleep(time.Duration(wait) * time.Millisecond)
  162. // Make sure time is going forward only
  163. // gradually add up time to ensure checkpoint is triggered before the data send
  164. for n := conf.GetNowInMilli() + 100; d[i].Timestamp+100 > n; n += 100 {
  165. if d[i].Timestamp < n {
  166. n = d[i].Timestamp
  167. }
  168. mockClock.Set(cast.TimeFromUnixMilli(n))
  169. conf.Log.Debugf("Clock set to %d", conf.GetNowInMilli())
  170. time.Sleep(1)
  171. }
  172. select {
  173. case err := <-errCh:
  174. t.Log(err)
  175. tp.Cancel()
  176. return err
  177. default:
  178. }
  179. }
  180. }
  181. mockClock.Add(time.Duration(postleap) * time.Millisecond)
  182. conf.Log.Debugf("Clock add to %d", conf.GetNowInMilli())
  183. // Check if stream done. Poll for metrics,
  184. time.Sleep(10 * time.Millisecond)
  185. var retry int
  186. for retry = 4; retry > 0; retry-- {
  187. if err := compareMetrics(tp, metrics); err == nil {
  188. break
  189. } else {
  190. conf.Log.Errorf("check metrics error at %d: %s", retry, err)
  191. }
  192. time.Sleep(1000 * time.Millisecond)
  193. }
  194. if retry == 0 {
  195. t.Error("send data timeout")
  196. } else if retry < 2 {
  197. conf.Log.Debugf("try %d for metric comparison\n", 2-retry)
  198. }
  199. return nil
  200. }
  201. func createStream(t *testing.T, tt RuleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}) ([][]*xsql.Tuple, int, *topo.Topo, *mocknode.MockSink, <-chan error) {
  202. mockclock.ResetClock(1541152486000)
  203. // Create stream
  204. var (
  205. sources []*node.SourceNode
  206. datas [][]*xsql.Tuple
  207. dataLength int
  208. )
  209. parser := xsql.NewParser(strings.NewReader(tt.Sql))
  210. if stmt, err := xsql.Language.Parse(parser); err != nil {
  211. t.Errorf("parse sql %s error: %s", tt.Sql, err)
  212. } else {
  213. if selectStmt, ok := stmt.(*ast.SelectStatement); !ok {
  214. t.Errorf("sql %s is not a select statement", tt.Sql)
  215. } else {
  216. streams := xsql.GetStreams(selectStmt)
  217. for _, stream := range streams {
  218. data, ok := mocknode.TestData[stream]
  219. if !ok {
  220. continue
  221. }
  222. dataLength = len(data)
  223. datas = append(datas, data)
  224. }
  225. }
  226. }
  227. mockSink := mocknode.NewMockSink()
  228. sink := node.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
  229. tp, err := planner.PlanWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.Name, j), Sql: tt.Sql, Options: opt}, DbDir, sources, []*node.SinkNode{sink})
  230. if err != nil {
  231. t.Error(err)
  232. return nil, 0, nil, nil, nil
  233. }
  234. errCh := tp.Open()
  235. return datas, dataLength, tp, mockSink, errCh
  236. }
  237. // Create or drop streams
  238. func HandleStream(createOrDrop bool, names []string, t *testing.T) {
  239. p := processor.NewStreamProcessor(path.Join(DbDir, "stream"))
  240. for _, name := range names {
  241. var sql string
  242. if createOrDrop {
  243. switch name {
  244. case "demo":
  245. sql = `CREATE STREAM demo (
  246. color STRING,
  247. size BIGINT,
  248. ts BIGINT
  249. ) WITH (DATASOURCE="demo", TYPE="mock", FORMAT="json", KEY="ts");`
  250. case "demoError":
  251. sql = `CREATE STREAM demoError (
  252. color STRING,
  253. size BIGINT,
  254. ts BIGINT
  255. ) WITH (DATASOURCE="demoError", TYPE="mock", FORMAT="json", KEY="ts");`
  256. case "demo1":
  257. sql = `CREATE STREAM demo1 (
  258. temp FLOAT,
  259. hum BIGINT,` +
  260. "`from`" + ` STRING,
  261. ts BIGINT
  262. ) WITH (DATASOURCE="demo1", TYPE="mock", FORMAT="json", KEY="ts");`
  263. case "demoTable":
  264. sql = `CREATE TABLE demoTable (
  265. device STRING,
  266. ts BIGINT
  267. ) WITH (DATASOURCE="demoTable", TYPE="mock", RETAIN_SIZE="3");`
  268. case "sessionDemo":
  269. sql = `CREATE STREAM sessionDemo (
  270. temp FLOAT,
  271. hum BIGINT,
  272. ts BIGINT
  273. ) WITH (DATASOURCE="sessionDemo", TYPE="mock", FORMAT="json", KEY="ts");`
  274. case "demoE":
  275. sql = `CREATE STREAM demoE (
  276. color STRING,
  277. size BIGINT,
  278. ts BIGINT
  279. ) WITH (DATASOURCE="demoE", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  280. case "demo1E":
  281. sql = `CREATE STREAM demo1E (
  282. temp FLOAT,
  283. hum BIGINT,
  284. ts BIGINT
  285. ) WITH (DATASOURCE="demo1E", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  286. case "sessionDemoE":
  287. sql = `CREATE STREAM sessionDemoE (
  288. temp FLOAT,
  289. hum BIGINT,
  290. ts BIGINT
  291. ) WITH (DATASOURCE="sessionDemoE", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  292. case "demoErr":
  293. sql = `CREATE STREAM demoErr (
  294. color STRING,
  295. size BIGINT,
  296. ts BIGINT
  297. ) WITH (DATASOURCE="demoErr", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  298. case "ldemo":
  299. sql = `CREATE STREAM ldemo (
  300. ) WITH (DATASOURCE="ldemo", TYPE="mock", FORMAT="json");`
  301. case "ldemo1":
  302. sql = `CREATE STREAM ldemo1 (
  303. ) WITH (DATASOURCE="ldemo1", TYPE="mock", FORMAT="json");`
  304. case "lsessionDemo":
  305. sql = `CREATE STREAM lsessionDemo (
  306. ) WITH (DATASOURCE="lsessionDemo", TYPE="mock", FORMAT="json");`
  307. case "ext":
  308. sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"ext\", TYPE=\"mock\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  309. case "ext2":
  310. sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"ext2\", TYPE=\"mock\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  311. case "text":
  312. sql = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"text\", TYPE=\"mock\", FORMAT=\"JSON\")"
  313. case "binDemo":
  314. sql = "CREATE STREAM binDemo () WITH (DATASOURCE=\"binDemo\", TYPE=\"mock\", FORMAT=\"BINARY\")"
  315. case "table1":
  316. sql = `CREATE TABLE table1 (
  317. name STRING,
  318. size BIGINT,
  319. id BIGINT
  320. ) WITH (DATASOURCE="lookup.json", FORMAT="json", CONF_KEY="test");`
  321. case "helloStr":
  322. sql = `CREATE STREAM helloStr (name string) WITH (DATASOURCE="helloStr", TYPE="mock", FORMAT="JSON")`
  323. case "commands":
  324. sql = `CREATE STREAM commands (cmd string, base64_img string, encoded_json string) WITH (DATASOURCE="commands", FORMAT="JSON", TYPE="mock")`
  325. case "fakeBin":
  326. sql = "CREATE STREAM fakeBin () WITH (DATASOURCE=\"fakeBin\", TYPE=\"mock\", FORMAT=\"BINARY\")"
  327. case "shelves":
  328. sql = `CREATE STREAM shelves (
  329. name string,
  330. size BIGINT,
  331. shelf STRUCT(theme STRING,id BIGINT, subfield STRING)
  332. ) WITH (DATASOURCE="shelves", TYPE="mock", FORMAT="json");`
  333. case "mes":
  334. sql = `CREATE STREAM mes (message_id string, text string) WITH (DATASOURCE="mes", TYPE="mock", FORMAT="JSON")`
  335. default:
  336. t.Errorf("create stream %s fail", name)
  337. }
  338. } else {
  339. if strings.Index(name, "table") == 0 {
  340. sql = `DROP TABLE ` + name
  341. } else {
  342. sql = `DROP STREAM ` + name
  343. }
  344. }
  345. _, err := p.ExecStmt(sql)
  346. if err != nil {
  347. t.Log(err)
  348. }
  349. }
  350. }
  351. type RuleCheckpointTest struct {
  352. RuleTest
  353. PauseSize int // Stop stream after sending pauseSize source to test checkpoint resume
  354. Cc int // checkpoint count when paused
  355. PauseMetric map[string]interface{} // The metric to check when paused
  356. }
  357. func DoCheckpointRuleTest(t *testing.T, tests []RuleCheckpointTest, j int, opt *api.RuleOption) {
  358. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  359. for i, tt := range tests {
  360. datas, dataLength, tp, mockSink, errCh := createStream(t, tt.RuleTest, j, opt, nil)
  361. if tp == nil {
  362. t.Errorf("topo is not created successfully")
  363. break
  364. }
  365. var retry int
  366. for retry = 10; retry > 0; retry-- {
  367. if tp.GetCoordinator() == nil || !tp.GetCoordinator().IsActivated() {
  368. conf.Log.Debugf("waiting for coordinator ready %d\n", retry)
  369. time.Sleep(10 * time.Millisecond)
  370. } else {
  371. break
  372. }
  373. }
  374. if retry == 0 {
  375. t.Error("coordinator timeout")
  376. t.FailNow()
  377. }
  378. conf.Log.Debugf("Start sending first phase data done at %d", conf.GetNowInMilli())
  379. if err := sendData(t, tt.PauseSize, tt.PauseMetric, datas, errCh, tp, 100, 100); err != nil {
  380. t.Errorf("first phase send data error %s", err)
  381. break
  382. }
  383. conf.Log.Debugf("Send first phase data done at %d", conf.GetNowInMilli())
  384. // compare checkpoint count
  385. time.Sleep(10 * time.Millisecond)
  386. for retry = 3; retry > 0; retry-- {
  387. actual := tp.GetCoordinator().GetCompleteCount()
  388. if tt.Cc == actual {
  389. break
  390. } else {
  391. conf.Log.Debugf("check checkpointCount error at %d: %d\n", retry, actual)
  392. }
  393. time.Sleep(200 * time.Millisecond)
  394. }
  395. cc := tp.GetCoordinator().GetCompleteCount()
  396. tp.Cancel()
  397. if retry == 0 {
  398. t.Errorf("%d-%d. checkpoint count\n\nresult mismatch:\n\nexp=%#v\n\ngot=%d\n\n", i, j, tt.Cc, cc)
  399. return
  400. } else if retry < 3 {
  401. conf.Log.Debugf("try %d for checkpoint count\n", 4-retry)
  402. }
  403. tp.Cancel()
  404. time.Sleep(10 * time.Millisecond)
  405. // resume stream
  406. conf.Log.Debugf("Resume stream at %d", conf.GetNowInMilli())
  407. errCh = tp.Open()
  408. conf.Log.Debugf("After open stream at %d", conf.GetNowInMilli())
  409. if err := sendData(t, dataLength, tt.M, datas, errCh, tp, POSTLEAP, 10); err != nil {
  410. t.Errorf("second phase send data error %s", err)
  411. break
  412. }
  413. compareResult(t, mockSink, commonResultFunc, tt.RuleTest, i, tp)
  414. }
  415. }
  416. func CreateRule(name, sql string) (*api.Rule, error) {
  417. p := processor.NewRuleProcessor(DbDir)
  418. p.ExecDrop(name)
  419. return p.ExecCreate(name, sql)
  420. }