mock_topo.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435
  1. package topotest
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xsql/processors"
  9. "github.com/emqx/kuiper/xstream"
  10. "github.com/emqx/kuiper/xstream/api"
  11. "github.com/emqx/kuiper/xstream/nodes"
  12. "github.com/emqx/kuiper/xstream/planner"
  13. "github.com/emqx/kuiper/xstream/topotest/mockclock"
  14. "github.com/emqx/kuiper/xstream/topotest/mocknodes"
  15. "io/ioutil"
  16. "path"
  17. "reflect"
  18. "strings"
  19. "testing"
  20. "time"
  21. )
  22. const POSTLEAP = 1000 // Time change after all data sends out
  23. type RuleTest struct {
  24. Name string
  25. Sql string
  26. R interface{} // The result
  27. M map[string]interface{} // final metrics
  28. T *xstream.PrintableTopo // printable topo, an optional field
  29. W int // wait time for each data sending, in milli
  30. }
  31. var (
  32. DbDir = common.GetDbDir()
  33. image, _ = getImg()
  34. )
  35. func getImg() ([]byte, string) {
  36. docsFolder, err := common.GetLoc("/docs/")
  37. if err != nil {
  38. common.Log.Fatalf("Cannot find docs folder: %v", err)
  39. }
  40. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  41. if err != nil {
  42. common.Log.Fatalf("Cannot read image: %v", err)
  43. }
  44. b64img := base64.StdEncoding.EncodeToString(image)
  45. return image, b64img
  46. }
  47. func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err error) {
  48. keys, values := tp.GetMetrics()
  49. for k, v := range m {
  50. var (
  51. index int
  52. key string
  53. matched bool
  54. )
  55. for index, key = range keys {
  56. if k == key {
  57. if strings.HasSuffix(k, "process_latency_us") {
  58. if values[index].(int64) >= v.(int64) {
  59. matched = true
  60. continue
  61. } else {
  62. break
  63. }
  64. }
  65. if values[index] == v {
  66. matched = true
  67. }
  68. break
  69. }
  70. }
  71. if matched {
  72. continue
  73. }
  74. if common.Config.Basic.Debug == true {
  75. for i, k := range keys {
  76. common.Log.Printf("%s:%v", k, values[i])
  77. }
  78. }
  79. //do not find
  80. if index < len(values) {
  81. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%T)\n\ngot=%#v(%T)\n\n", k, v, v, values[index], values[index])
  82. } else {
  83. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  84. }
  85. }
  86. return nil
  87. }
  88. // The time diff must larger than timeleap
  89. var testData = map[string][]*xsql.Tuple{
  90. "demo": {
  91. {
  92. Emitter: "demo",
  93. Message: map[string]interface{}{
  94. "color": "red",
  95. "size": 3,
  96. "ts": 1541152486013,
  97. },
  98. Timestamp: 1541152486013,
  99. },
  100. {
  101. Emitter: "demo",
  102. Message: map[string]interface{}{
  103. "color": "blue",
  104. "size": 6,
  105. "ts": 1541152486822,
  106. },
  107. Timestamp: 1541152486822,
  108. },
  109. {
  110. Emitter: "demo",
  111. Message: map[string]interface{}{
  112. "color": "blue",
  113. "size": 2,
  114. "ts": 1541152487632,
  115. },
  116. Timestamp: 1541152487632,
  117. },
  118. {
  119. Emitter: "demo",
  120. Message: map[string]interface{}{
  121. "color": "yellow",
  122. "size": 4,
  123. "ts": 1541152488442,
  124. },
  125. Timestamp: 1541152488442,
  126. },
  127. {
  128. Emitter: "demo",
  129. Message: map[string]interface{}{
  130. "color": "red",
  131. "size": 1,
  132. "ts": 1541152489252,
  133. },
  134. Timestamp: 1541152489252,
  135. },
  136. },
  137. "demoError": {
  138. {
  139. Emitter: "demoError",
  140. Message: map[string]interface{}{
  141. "color": 3,
  142. "size": "red",
  143. "ts": 1541152486013,
  144. },
  145. Timestamp: 1541152486013,
  146. },
  147. {
  148. Emitter: "demoError",
  149. Message: map[string]interface{}{
  150. "color": "blue",
  151. "size": 6,
  152. "ts": "1541152486822",
  153. },
  154. Timestamp: 1541152486822,
  155. },
  156. {
  157. Emitter: "demoError",
  158. Message: map[string]interface{}{
  159. "color": "blue",
  160. "size": 2,
  161. "ts": 1541152487632,
  162. },
  163. Timestamp: 1541152487632,
  164. },
  165. {
  166. Emitter: "demoError",
  167. Message: map[string]interface{}{
  168. "color": 7,
  169. "size": 4,
  170. "ts": 1541152488442,
  171. },
  172. Timestamp: 1541152488442,
  173. },
  174. {
  175. Emitter: "demoError",
  176. Message: map[string]interface{}{
  177. "color": "red",
  178. "size": "blue",
  179. "ts": 1541152489252,
  180. },
  181. Timestamp: 1541152489252,
  182. },
  183. },
  184. "demo1": {
  185. {
  186. Emitter: "demo1",
  187. Message: map[string]interface{}{
  188. "temp": 25.5,
  189. "hum": 65,
  190. "from": "device1",
  191. "ts": 1541152486013,
  192. },
  193. Timestamp: 1541152486115,
  194. },
  195. {
  196. Emitter: "demo1",
  197. Message: map[string]interface{}{
  198. "temp": 27.5,
  199. "hum": 59,
  200. "from": "device2",
  201. "ts": 1541152486823,
  202. },
  203. Timestamp: 1541152486903,
  204. },
  205. {
  206. Emitter: "demo1",
  207. Message: map[string]interface{}{
  208. "temp": 28.1,
  209. "hum": 75,
  210. "from": "device3",
  211. "ts": 1541152487632,
  212. },
  213. Timestamp: 1541152487702,
  214. },
  215. {
  216. Emitter: "demo1",
  217. Message: map[string]interface{}{
  218. "temp": 27.4,
  219. "hum": 80,
  220. "from": "device1",
  221. "ts": 1541152488442,
  222. },
  223. Timestamp: 1541152488605,
  224. },
  225. {
  226. Emitter: "demo1",
  227. Message: map[string]interface{}{
  228. "temp": 25.5,
  229. "hum": 62,
  230. "from": "device3",
  231. "ts": 1541152489252,
  232. },
  233. Timestamp: 1541152489305,
  234. },
  235. },
  236. "sessionDemo": {
  237. {
  238. Emitter: "sessionDemo",
  239. Message: map[string]interface{}{
  240. "temp": 25.5,
  241. "hum": 65,
  242. "ts": 1541152486013,
  243. },
  244. Timestamp: 1541152486013,
  245. },
  246. {
  247. Emitter: "sessionDemo",
  248. Message: map[string]interface{}{
  249. "temp": 27.5,
  250. "hum": 59,
  251. "ts": 1541152486823,
  252. },
  253. Timestamp: 1541152486823,
  254. },
  255. {
  256. Emitter: "sessionDemo",
  257. Message: map[string]interface{}{
  258. "temp": 28.1,
  259. "hum": 75,
  260. "ts": 1541152487932,
  261. },
  262. Timestamp: 1541152487932,
  263. },
  264. {
  265. Emitter: "sessionDemo",
  266. Message: map[string]interface{}{
  267. "temp": 27.4,
  268. "hum": 80,
  269. "ts": 1541152488442,
  270. },
  271. Timestamp: 1541152488442,
  272. },
  273. {
  274. Emitter: "sessionDemo",
  275. Message: map[string]interface{}{
  276. "temp": 25.5,
  277. "hum": 62,
  278. "ts": 1541152489252,
  279. },
  280. Timestamp: 1541152489252,
  281. },
  282. {
  283. Emitter: "sessionDemo",
  284. Message: map[string]interface{}{
  285. "temp": 26.2,
  286. "hum": 63,
  287. "ts": 1541152490062,
  288. },
  289. Timestamp: 1541152490062,
  290. },
  291. {
  292. Emitter: "sessionDemo",
  293. Message: map[string]interface{}{
  294. "temp": 26.8,
  295. "hum": 71,
  296. "ts": 1541152490872,
  297. },
  298. Timestamp: 1541152490872,
  299. },
  300. {
  301. Emitter: "sessionDemo",
  302. Message: map[string]interface{}{
  303. "temp": 28.9,
  304. "hum": 85,
  305. "ts": 1541152491682,
  306. },
  307. Timestamp: 1541152491682,
  308. },
  309. {
  310. Emitter: "sessionDemo",
  311. Message: map[string]interface{}{
  312. "temp": 29.1,
  313. "hum": 92,
  314. "ts": 1541152492492,
  315. },
  316. Timestamp: 1541152492492,
  317. },
  318. {
  319. Emitter: "sessionDemo",
  320. Message: map[string]interface{}{
  321. "temp": 32.2,
  322. "hum": 99,
  323. "ts": 1541152493202,
  324. },
  325. Timestamp: 1541152493202,
  326. },
  327. {
  328. Emitter: "sessionDemo",
  329. Message: map[string]interface{}{
  330. "temp": 30.9,
  331. "hum": 87,
  332. "ts": 1541152494112,
  333. },
  334. Timestamp: 1541152494112,
  335. },
  336. },
  337. "demoE": {
  338. {
  339. Emitter: "demoE",
  340. Message: map[string]interface{}{
  341. "color": "red",
  342. "size": 3,
  343. "ts": 1541152486013,
  344. },
  345. Timestamp: 1541152486023,
  346. },
  347. {
  348. Emitter: "demoE",
  349. Message: map[string]interface{}{
  350. "color": "blue",
  351. "size": 2,
  352. "ts": 1541152487632,
  353. },
  354. Timestamp: 1541152487822,
  355. },
  356. {
  357. Emitter: "demoE",
  358. Message: map[string]interface{}{
  359. "color": "red",
  360. "size": 1,
  361. "ts": 1541152489252,
  362. },
  363. Timestamp: 1541152489632,
  364. },
  365. { //dropped item
  366. Emitter: "demoE",
  367. Message: map[string]interface{}{
  368. "color": "blue",
  369. "size": 6,
  370. "ts": 1541152486822,
  371. },
  372. Timestamp: 1541152489842,
  373. },
  374. {
  375. Emitter: "demoE",
  376. Message: map[string]interface{}{
  377. "color": "yellow",
  378. "size": 4,
  379. "ts": 1541152488442,
  380. },
  381. Timestamp: 1541152490052,
  382. },
  383. { //To lift the watermark and issue all windows
  384. Emitter: "demoE",
  385. Message: map[string]interface{}{
  386. "color": "yellow",
  387. "size": 4,
  388. "ts": 1541152492342,
  389. },
  390. Timestamp: 1541152498888,
  391. },
  392. },
  393. "demo1E": {
  394. {
  395. Emitter: "demo1E",
  396. Message: map[string]interface{}{
  397. "temp": 27.5,
  398. "hum": 59,
  399. "ts": 1541152486823,
  400. },
  401. Timestamp: 1541152487250,
  402. },
  403. {
  404. Emitter: "demo1E",
  405. Message: map[string]interface{}{
  406. "temp": 25.5,
  407. "hum": 65,
  408. "ts": 1541152486013,
  409. },
  410. Timestamp: 1541152487751,
  411. },
  412. {
  413. Emitter: "demo1E",
  414. Message: map[string]interface{}{
  415. "temp": 27.4,
  416. "hum": 80,
  417. "ts": 1541152488442,
  418. },
  419. Timestamp: 1541152489252,
  420. },
  421. {
  422. Emitter: "demo1E",
  423. Message: map[string]interface{}{
  424. "temp": 28.1,
  425. "hum": 75,
  426. "ts": 1541152487632,
  427. },
  428. Timestamp: 1541152489753,
  429. },
  430. {
  431. Emitter: "demo1E",
  432. Message: map[string]interface{}{
  433. "temp": 25.5,
  434. "hum": 62,
  435. "ts": 1541152489252,
  436. },
  437. Timestamp: 1541152489954,
  438. },
  439. {
  440. Emitter: "demo1E",
  441. Message: map[string]interface{}{
  442. "temp": 25.5,
  443. "hum": 62,
  444. "ts": 1541152499252,
  445. },
  446. Timestamp: 1541152499755,
  447. },
  448. },
  449. "sessionDemoE": {
  450. {
  451. Emitter: "sessionDemoE",
  452. Message: map[string]interface{}{
  453. "temp": 25.5,
  454. "hum": 65,
  455. "ts": 1541152486013,
  456. },
  457. Timestamp: 1541152486250,
  458. },
  459. {
  460. Emitter: "sessionDemoE",
  461. Message: map[string]interface{}{
  462. "temp": 28.1,
  463. "hum": 75,
  464. "ts": 1541152487932,
  465. },
  466. Timestamp: 1541152487951,
  467. },
  468. {
  469. Emitter: "sessionDemoE",
  470. Message: map[string]interface{}{
  471. "temp": 27.5,
  472. "hum": 59,
  473. "ts": 1541152486823,
  474. },
  475. Timestamp: 1541152488552,
  476. },
  477. {
  478. Emitter: "sessionDemoE",
  479. Message: map[string]interface{}{
  480. "temp": 25.5,
  481. "hum": 62,
  482. "ts": 1541152489252,
  483. },
  484. Timestamp: 1541152489353,
  485. },
  486. {
  487. Emitter: "sessionDemoE",
  488. Message: map[string]interface{}{
  489. "temp": 27.4,
  490. "hum": 80,
  491. "ts": 1541152488442,
  492. },
  493. Timestamp: 1541152489854,
  494. },
  495. {
  496. Emitter: "sessionDemoE",
  497. Message: map[string]interface{}{
  498. "temp": 26.2,
  499. "hum": 63,
  500. "ts": 1541152490062,
  501. },
  502. Timestamp: 1541152490155,
  503. },
  504. {
  505. Emitter: "sessionDemoE",
  506. Message: map[string]interface{}{
  507. "temp": 28.9,
  508. "hum": 85,
  509. "ts": 1541152491682,
  510. },
  511. Timestamp: 1541152491686,
  512. },
  513. {
  514. Emitter: "sessionDemoE",
  515. Message: map[string]interface{}{
  516. "temp": 26.8,
  517. "hum": 71,
  518. "ts": 1541152490872,
  519. },
  520. Timestamp: 1541152491972,
  521. },
  522. {
  523. Emitter: "sessionDemoE",
  524. Message: map[string]interface{}{
  525. "temp": 29.1,
  526. "hum": 92,
  527. "ts": 1541152492492,
  528. },
  529. Timestamp: 1541152492592,
  530. },
  531. {
  532. Emitter: "sessionDemoE",
  533. Message: map[string]interface{}{
  534. "temp": 30.9,
  535. "hum": 87,
  536. "ts": 1541152494112,
  537. },
  538. Timestamp: 1541152494212,
  539. },
  540. {
  541. Emitter: "sessionDemoE",
  542. Message: map[string]interface{}{
  543. "temp": 32.2,
  544. "hum": 99,
  545. "ts": 1541152493202,
  546. },
  547. Timestamp: 1541152495202,
  548. },
  549. {
  550. Emitter: "sessionDemoE",
  551. Message: map[string]interface{}{
  552. "temp": 32.2,
  553. "hum": 99,
  554. "ts": 1541152499202,
  555. },
  556. Timestamp: 1541152499402,
  557. },
  558. },
  559. "demoErr": {
  560. {
  561. Emitter: "demoErr",
  562. Message: map[string]interface{}{
  563. "color": "red",
  564. "size": 3,
  565. "ts": 1541152486013,
  566. },
  567. Timestamp: 1541152486221,
  568. },
  569. {
  570. Emitter: "demoErr",
  571. Message: map[string]interface{}{
  572. "color": 2,
  573. "size": "blue",
  574. "ts": 1541152487632,
  575. },
  576. Timestamp: 1541152487722,
  577. },
  578. {
  579. Emitter: "demoErr",
  580. Message: map[string]interface{}{
  581. "color": "red",
  582. "size": 1,
  583. "ts": 1541152489252,
  584. },
  585. Timestamp: 1541152489332,
  586. },
  587. { //dropped item
  588. Emitter: "demoErr",
  589. Message: map[string]interface{}{
  590. "color": "blue",
  591. "size": 6,
  592. "ts": 1541152486822,
  593. },
  594. Timestamp: 1541152489822,
  595. },
  596. {
  597. Emitter: "demoErr",
  598. Message: map[string]interface{}{
  599. "color": "yellow",
  600. "size": 4,
  601. "ts": 1541152488442,
  602. },
  603. Timestamp: 1541152490042,
  604. },
  605. { //To lift the watermark and issue all windows
  606. Emitter: "demoErr",
  607. Message: map[string]interface{}{
  608. "color": "yellow",
  609. "size": 4,
  610. "ts": 1541152492342,
  611. },
  612. Timestamp: 1541152493842,
  613. },
  614. },
  615. "ldemo": {
  616. {
  617. Emitter: "ldemo",
  618. Message: map[string]interface{}{
  619. "color": "red",
  620. "size": 3,
  621. "ts": 1541152486013,
  622. },
  623. Timestamp: 1541152486013,
  624. },
  625. {
  626. Emitter: "ldemo",
  627. Message: map[string]interface{}{
  628. "color": "blue",
  629. "size": "string",
  630. "ts": 1541152486822,
  631. },
  632. Timestamp: 1541152486822,
  633. },
  634. {
  635. Emitter: "ldemo",
  636. Message: map[string]interface{}{
  637. "size": 3,
  638. "ts": 1541152487632,
  639. },
  640. Timestamp: 1541152487632,
  641. },
  642. {
  643. Emitter: "ldemo",
  644. Message: map[string]interface{}{
  645. "color": 49,
  646. "size": 2,
  647. "ts": 1541152488442,
  648. },
  649. Timestamp: 1541152488442,
  650. },
  651. {
  652. Emitter: "ldemo",
  653. Message: map[string]interface{}{
  654. "color": "red",
  655. "ts": 1541152489252,
  656. },
  657. Timestamp: 1541152489252,
  658. },
  659. },
  660. "ldemo1": {
  661. {
  662. Emitter: "ldemo1",
  663. Message: map[string]interface{}{
  664. "temp": 25.5,
  665. "hum": 65,
  666. "ts": 1541152486013,
  667. },
  668. Timestamp: 1541152486013,
  669. },
  670. {
  671. Emitter: "ldemo1",
  672. Message: map[string]interface{}{
  673. "temp": 27.5,
  674. "hum": 59,
  675. "ts": 1541152486823,
  676. },
  677. Timestamp: 1541152486823,
  678. },
  679. {
  680. Emitter: "ldemo1",
  681. Message: map[string]interface{}{
  682. "temp": 28.1,
  683. "hum": 75,
  684. "ts": 1541152487632,
  685. },
  686. Timestamp: 1541152487632,
  687. },
  688. {
  689. Emitter: "ldemo1",
  690. Message: map[string]interface{}{
  691. "temp": 27.4,
  692. "hum": 80,
  693. "ts": "1541152488442",
  694. },
  695. Timestamp: 1541152488442,
  696. },
  697. {
  698. Emitter: "ldemo1",
  699. Message: map[string]interface{}{
  700. "temp": 25.5,
  701. "hum": 62,
  702. "ts": 1541152489252,
  703. },
  704. Timestamp: 1541152489252,
  705. },
  706. },
  707. "lsessionDemo": {
  708. {
  709. Emitter: "lsessionDemo",
  710. Message: map[string]interface{}{
  711. "temp": 25.5,
  712. "hum": 65,
  713. "ts": 1541152486013,
  714. },
  715. Timestamp: 1541152486013,
  716. },
  717. {
  718. Emitter: "lsessionDemo",
  719. Message: map[string]interface{}{
  720. "temp": 27.5,
  721. "hum": 59,
  722. "ts": 1541152486823,
  723. },
  724. Timestamp: 1541152486823,
  725. },
  726. {
  727. Emitter: "lsessionDemo",
  728. Message: map[string]interface{}{
  729. "temp": 28.1,
  730. "hum": 75,
  731. "ts": 1541152487932,
  732. },
  733. Timestamp: 1541152487932,
  734. },
  735. {
  736. Emitter: "lsessionDemo",
  737. Message: map[string]interface{}{
  738. "temp": 27.4,
  739. "hum": 80,
  740. "ts": 1541152488442,
  741. },
  742. Timestamp: 1541152488442,
  743. },
  744. {
  745. Emitter: "lsessionDemo",
  746. Message: map[string]interface{}{
  747. "temp": 25.5,
  748. "hum": 62,
  749. "ts": 1541152489252,
  750. },
  751. Timestamp: 1541152489252,
  752. },
  753. {
  754. Emitter: "lsessionDemo",
  755. Message: map[string]interface{}{
  756. "temp": 26.2,
  757. "hum": 63,
  758. "ts": 1541152490062,
  759. },
  760. Timestamp: 1541152490062,
  761. },
  762. {
  763. Emitter: "lsessionDemo",
  764. Message: map[string]interface{}{
  765. "temp": 26.8,
  766. "hum": 71,
  767. "ts": 1541152490872,
  768. },
  769. Timestamp: 1541152490872,
  770. },
  771. {
  772. Emitter: "lsessionDemo",
  773. Message: map[string]interface{}{
  774. "temp": 28.9,
  775. "hum": 85,
  776. "ts": 1541152491682,
  777. },
  778. Timestamp: 1541152491682,
  779. },
  780. {
  781. Emitter: "lsessionDemo",
  782. Message: map[string]interface{}{
  783. "temp": 29.1,
  784. "hum": 92,
  785. "ts": 1541152492492,
  786. },
  787. Timestamp: 1541152492492,
  788. },
  789. {
  790. Emitter: "lsessionDemo",
  791. Message: map[string]interface{}{
  792. "temp": 2.2,
  793. "hum": 99,
  794. "ts": 1541152493202,
  795. },
  796. Timestamp: 1541152493202,
  797. },
  798. {
  799. Emitter: "lsessionDemo",
  800. Message: map[string]interface{}{
  801. "temp": 30.9,
  802. "hum": 87,
  803. "ts": 1541152494112,
  804. },
  805. Timestamp: 1541152494112,
  806. },
  807. },
  808. "text": {
  809. {
  810. Emitter: "text",
  811. Message: map[string]interface{}{
  812. "slogan": "Impossible is nothing",
  813. "brand": "Adidas",
  814. },
  815. Timestamp: 1541152486500,
  816. },
  817. {
  818. Emitter: "text",
  819. Message: map[string]interface{}{
  820. "slogan": "Stronger than dirt",
  821. "brand": "Ajax",
  822. },
  823. Timestamp: 1541152487400,
  824. },
  825. {
  826. Emitter: "text",
  827. Message: map[string]interface{}{
  828. "slogan": "Belong anywhere",
  829. "brand": "Airbnb",
  830. },
  831. Timestamp: 1541152488300,
  832. },
  833. {
  834. Emitter: "text",
  835. Message: map[string]interface{}{
  836. "slogan": "I can'T believe I ate the whole thing",
  837. "brand": "Alka Seltzer",
  838. },
  839. Timestamp: 1541152489200,
  840. },
  841. {
  842. Emitter: "text",
  843. Message: map[string]interface{}{
  844. "slogan": "You're in good hands",
  845. "brand": "Allstate",
  846. },
  847. Timestamp: 1541152490100,
  848. },
  849. {
  850. Emitter: "text",
  851. Message: map[string]interface{}{
  852. "slogan": "Don'T leave home without it",
  853. "brand": "American Express",
  854. },
  855. Timestamp: 1541152491200,
  856. },
  857. {
  858. Emitter: "text",
  859. Message: map[string]interface{}{
  860. "slogan": "Think different",
  861. "brand": "Apple",
  862. },
  863. Timestamp: 1541152492300,
  864. },
  865. {
  866. Emitter: "text",
  867. Message: map[string]interface{}{
  868. "slogan": "We try harder",
  869. "brand": "Avis",
  870. },
  871. Timestamp: 1541152493400,
  872. },
  873. },
  874. "binDemo": {
  875. {
  876. Emitter: "binDemo",
  877. Message: map[string]interface{}{
  878. "self": image,
  879. },
  880. Timestamp: 1541152486013,
  881. },
  882. },
  883. "fakeBin": {
  884. {
  885. Emitter: "fakeBin",
  886. Message: map[string]interface{}{
  887. "self": []byte("golang"),
  888. },
  889. Timestamp: 1541152486013,
  890. },
  891. {
  892. Emitter: "fakeBin",
  893. Message: map[string]interface{}{
  894. "self": []byte("peacock"),
  895. },
  896. Timestamp: 1541152487013,
  897. },
  898. {
  899. Emitter: "fakeBin",
  900. Message: map[string]interface{}{
  901. "self": []byte("bullfrog"),
  902. },
  903. Timestamp: 1541152488013,
  904. },
  905. },
  906. "helloStr": {
  907. {
  908. Emitter: "helloStr",
  909. Message: map[string]interface{}{
  910. "Name": "world",
  911. },
  912. Timestamp: 1541152486013,
  913. },
  914. {
  915. Emitter: "helloStr",
  916. Message: map[string]interface{}{
  917. "Name": "golang",
  918. },
  919. Timestamp: 1541152487013,
  920. },
  921. {
  922. Emitter: "helloStr",
  923. Message: map[string]interface{}{
  924. "Name": "peacock",
  925. },
  926. Timestamp: 1541152488013,
  927. },
  928. },
  929. "commands": {
  930. {
  931. Emitter: "commands",
  932. Message: map[string]interface{}{
  933. "cmd": "get",
  934. "base64_img": "my image",
  935. "encoded_json": "{\"name\": \"name1\",\"size\": 22}",
  936. },
  937. Timestamp: 1541152486013,
  938. },
  939. {
  940. Emitter: "commands",
  941. Message: map[string]interface{}{
  942. "cmd": "detect",
  943. "base64_img": "my image",
  944. "encoded_json": "{\"name\": \"name2\",\"size\": 33}",
  945. },
  946. Timestamp: 1541152487013,
  947. },
  948. {
  949. Emitter: "commands",
  950. Message: map[string]interface{}{
  951. "cmd": "delete",
  952. "base64_img": "my image",
  953. "encoded_json": "{\"name\": \"name3\",\"size\": 11}",
  954. },
  955. Timestamp: 1541152488013,
  956. },
  957. },
  958. "demoTable": {
  959. {
  960. Emitter: "demoTable",
  961. Message: map[string]interface{}{
  962. "ts": 1541152486013,
  963. "device": "device1",
  964. },
  965. Timestamp: 1541152486501,
  966. },
  967. {
  968. Emitter: "demoTable",
  969. Message: map[string]interface{}{
  970. "ts": 1541152486822,
  971. "device": "device2",
  972. },
  973. Timestamp: 1541152486502,
  974. },
  975. {
  976. Emitter: "demoTable",
  977. Message: map[string]interface{}{
  978. "ts": 1541152487632,
  979. "device": "device3",
  980. },
  981. Timestamp: 1541152488001,
  982. },
  983. {
  984. Emitter: "demoTable",
  985. Message: map[string]interface{}{
  986. "ts": 1541152488442,
  987. "device": "device4",
  988. },
  989. Timestamp: 1541152488002,
  990. },
  991. {
  992. Emitter: "demoTable",
  993. Message: map[string]interface{}{
  994. "ts": 1541152489252,
  995. "device": "device5",
  996. },
  997. Timestamp: 1541152488003,
  998. },
  999. },
  1000. "shelves": {
  1001. {
  1002. Emitter: "shelves",
  1003. Message: map[string]interface{}{
  1004. "name": "name1",
  1005. "size": 2,
  1006. "shelf": map[string]interface{}{
  1007. "id": 1541152486013,
  1008. "theme": "tandra",
  1009. "subfield": "sub1",
  1010. },
  1011. },
  1012. Timestamp: 1541152486501,
  1013. },
  1014. {
  1015. Emitter: "shelves",
  1016. Message: map[string]interface{}{
  1017. "name": "name2",
  1018. "size": 3,
  1019. "shelf": map[string]interface{}{
  1020. "id": 1541152486822,
  1021. "theme": "claro",
  1022. "subfield": "sub2",
  1023. },
  1024. },
  1025. Timestamp: 1541152486502,
  1026. },
  1027. {
  1028. Emitter: "shelves",
  1029. Message: map[string]interface{}{
  1030. "name": "name3",
  1031. "size": 4,
  1032. "shelf": map[string]interface{}{
  1033. "id": 1541152487632,
  1034. "theme": "dark",
  1035. "subfield": "sub3",
  1036. },
  1037. },
  1038. Timestamp: 1541152488001,
  1039. },
  1040. },
  1041. "mes": {
  1042. {
  1043. Emitter: "mes",
  1044. Message: map[string]interface{}{
  1045. "message_id": "1541152486013",
  1046. "text": "message1",
  1047. },
  1048. Timestamp: 1541152486501,
  1049. },
  1050. {
  1051. Emitter: "mes",
  1052. Message: map[string]interface{}{
  1053. "message_id": "1541152486501",
  1054. "text": "message2",
  1055. },
  1056. Timestamp: 1541152486501,
  1057. },
  1058. {
  1059. Emitter: "mes",
  1060. Message: map[string]interface{}{
  1061. "message_id": "1541152487013",
  1062. "text": "message3",
  1063. },
  1064. Timestamp: 1541152487501,
  1065. },
  1066. },
  1067. }
  1068. func commonResultFunc(result [][]byte) interface{} {
  1069. var maps [][]map[string]interface{}
  1070. for _, v := range result {
  1071. var mapRes []map[string]interface{}
  1072. err := json.Unmarshal(v, &mapRes)
  1073. if err != nil {
  1074. panic("Failed to parse the input into map")
  1075. }
  1076. maps = append(maps, mapRes)
  1077. }
  1078. return maps
  1079. }
  1080. func DoRuleTest(t *testing.T, tests []RuleTest, j int, opt *api.RuleOption, wait int) {
  1081. doRuleTestBySinkProps(t, tests, j, opt, wait, nil, commonResultFunc)
  1082. }
  1083. func doRuleTestBySinkProps(t *testing.T, tests []RuleTest, j int, opt *api.RuleOption, w int, sinkProps map[string]interface{}, resultFunc func(result [][]byte) interface{}) {
  1084. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  1085. for i, tt := range tests {
  1086. datas, dataLength, tp, mockSink, errCh := createStream(t, tt, j, opt, sinkProps)
  1087. if tp == nil {
  1088. t.Errorf("topo is not created successfully")
  1089. break
  1090. }
  1091. wait := tt.W
  1092. if wait == 0 {
  1093. if w > 0 {
  1094. wait = w
  1095. } else {
  1096. wait = 5
  1097. }
  1098. }
  1099. switch opt.Qos {
  1100. case api.ExactlyOnce:
  1101. wait *= 10
  1102. case api.AtLeastOnce:
  1103. wait *= 3
  1104. }
  1105. var retry int
  1106. if opt.Qos > api.AtMostOnce {
  1107. for retry = 3; retry > 0; retry-- {
  1108. if tp.GetCoordinator() == nil || !tp.GetCoordinator().IsActivated() {
  1109. common.Log.Debugf("waiting for coordinator ready %d\n", retry)
  1110. time.Sleep(10 * time.Millisecond)
  1111. } else {
  1112. break
  1113. }
  1114. }
  1115. if retry < 0 {
  1116. t.Error("coordinator timeout")
  1117. t.FailNow()
  1118. }
  1119. }
  1120. if err := sendData(t, dataLength, tt.M, datas, errCh, tp, POSTLEAP, wait); err != nil {
  1121. t.Errorf("send data error %s", err)
  1122. break
  1123. }
  1124. compareResult(t, mockSink, resultFunc, tt, i, tp)
  1125. }
  1126. }
  1127. func compareResult(t *testing.T, mockSink *mocknodes.MockSink, resultFunc func(result [][]byte) interface{}, tt RuleTest, i int, tp *xstream.TopologyNew) {
  1128. // Check results
  1129. results := mockSink.GetResults()
  1130. maps := resultFunc(results)
  1131. if !reflect.DeepEqual(tt.R, maps) {
  1132. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.Sql, tt.R, maps)
  1133. }
  1134. if err := compareMetrics(tp, tt.M); err != nil {
  1135. t.Errorf("%d. %q\n\nmetrics mismatch:\n\n%s\n\n", i, tt.Sql, err)
  1136. }
  1137. if tt.T != nil {
  1138. topo := tp.GetTopo()
  1139. if !reflect.DeepEqual(tt.T, topo) {
  1140. t.Errorf("%d. %q\n\ntopo mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.Sql, tt.T, topo)
  1141. }
  1142. }
  1143. tp.Cancel()
  1144. }
  1145. func sendData(t *testing.T, dataLength int, metrics map[string]interface{}, datas [][]*xsql.Tuple, errCh <-chan error, tp *xstream.TopologyNew, postleap int, wait int) error {
  1146. // Send data and move time
  1147. mockClock := mockclock.GetMockClock()
  1148. // Set the current time
  1149. mockClock.Add(0)
  1150. // TODO assume multiple data source send the data in order and has the same length
  1151. for i := 0; i < dataLength; i++ {
  1152. for _, d := range datas {
  1153. time.Sleep(time.Duration(wait) * time.Millisecond)
  1154. // Make sure time is going forward only
  1155. // gradually add up time to ensure checkpoint is triggered before the data send
  1156. for n := common.GetNowInMilli() + 100; d[i].Timestamp+100 > n; n += 100 {
  1157. if d[i].Timestamp < n {
  1158. n = d[i].Timestamp
  1159. }
  1160. mockClock.Set(common.TimeFromUnixMilli(n))
  1161. common.Log.Debugf("Clock set to %d", common.GetNowInMilli())
  1162. time.Sleep(1)
  1163. }
  1164. select {
  1165. case err := <-errCh:
  1166. t.Log(err)
  1167. tp.Cancel()
  1168. return err
  1169. default:
  1170. }
  1171. }
  1172. }
  1173. mockClock.Add(time.Duration(postleap) * time.Millisecond)
  1174. common.Log.Debugf("Clock add to %d", common.GetNowInMilli())
  1175. // Check if stream done. Poll for metrics,
  1176. time.Sleep(10 * time.Millisecond)
  1177. var retry int
  1178. for retry = 4; retry > 0; retry-- {
  1179. if err := compareMetrics(tp, metrics); err == nil {
  1180. break
  1181. } else {
  1182. common.Log.Errorf("check metrics error at %d: %s", retry, err)
  1183. }
  1184. time.Sleep(1000 * time.Millisecond)
  1185. }
  1186. if retry == 0 {
  1187. t.Error("send data timeout")
  1188. } else if retry < 2 {
  1189. common.Log.Debugf("try %d for metric comparison\n", 2-retry)
  1190. }
  1191. return nil
  1192. }
  1193. func createStream(t *testing.T, tt RuleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}) ([][]*xsql.Tuple, int, *xstream.TopologyNew, *mocknodes.MockSink, <-chan error) {
  1194. mockclock.ResetClock(1541152486000)
  1195. // Create stream
  1196. var (
  1197. sources []*nodes.SourceNode
  1198. datas [][]*xsql.Tuple
  1199. dataLength int
  1200. )
  1201. parser := xsql.NewParser(strings.NewReader(tt.Sql))
  1202. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1203. t.Errorf("parse sql %s error: %s", tt.Sql, err)
  1204. } else {
  1205. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1206. t.Errorf("sql %s is not a select statement", tt.Sql)
  1207. } else {
  1208. streams := xsql.GetStreams(selectStmt)
  1209. for _, stream := range streams {
  1210. data, ok := testData[stream]
  1211. if !ok {
  1212. continue
  1213. }
  1214. dataLength = len(data)
  1215. datas = append(datas, data)
  1216. source := nodes.NewSourceNodeWithSource(stream, mocknodes.NewMockSource(data), &xsql.Options{
  1217. DATASOURCE: stream,
  1218. })
  1219. sources = append(sources, source)
  1220. }
  1221. }
  1222. }
  1223. mockSink := mocknodes.NewMockSink()
  1224. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
  1225. tp, err := planner.PlanWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.Name, j), Sql: tt.Sql, Options: opt}, DbDir, sources, []*nodes.SinkNode{sink})
  1226. if err != nil {
  1227. t.Error(err)
  1228. return nil, 0, nil, nil, nil
  1229. }
  1230. errCh := tp.Open()
  1231. return datas, dataLength, tp, mockSink, errCh
  1232. }
  1233. // Create or drop streams
  1234. func HandleStream(createOrDrop bool, names []string, t *testing.T) {
  1235. p := processors.NewStreamProcessor(path.Join(DbDir, "stream"))
  1236. for _, name := range names {
  1237. var sql string
  1238. if createOrDrop {
  1239. switch name {
  1240. case "demo":
  1241. sql = `CREATE STREAM demo (
  1242. color STRING,
  1243. size BIGINT,
  1244. ts BIGINT
  1245. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  1246. case "demoError":
  1247. sql = `CREATE STREAM demoError (
  1248. color STRING,
  1249. size BIGINT,
  1250. ts BIGINT
  1251. ) WITH (DATASOURCE="demoError", FORMAT="json", KEY="ts");`
  1252. case "demo1":
  1253. sql = `CREATE STREAM demo1 (
  1254. temp FLOAT,
  1255. hum BIGINT,` +
  1256. "`from`" + ` STRING,
  1257. ts BIGINT
  1258. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  1259. case "demoTable":
  1260. sql = `CREATE TABLE demoTable (
  1261. device STRING,
  1262. ts BIGINT
  1263. ) WITH (DATASOURCE="demoTable", TYPE="mqtt", RETAIN_SIZE="3");`
  1264. case "sessionDemo":
  1265. sql = `CREATE STREAM sessionDemo (
  1266. temp FLOAT,
  1267. hum BIGINT,
  1268. ts BIGINT
  1269. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  1270. case "demoE":
  1271. sql = `CREATE STREAM demoE (
  1272. color STRING,
  1273. size BIGINT,
  1274. ts BIGINT
  1275. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1276. case "demo1E":
  1277. sql = `CREATE STREAM demo1E (
  1278. temp FLOAT,
  1279. hum BIGINT,
  1280. ts BIGINT
  1281. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1282. case "sessionDemoE":
  1283. sql = `CREATE STREAM sessionDemoE (
  1284. temp FLOAT,
  1285. hum BIGINT,
  1286. ts BIGINT
  1287. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1288. case "demoErr":
  1289. sql = `CREATE STREAM demoErr (
  1290. color STRING,
  1291. size BIGINT,
  1292. ts BIGINT
  1293. ) WITH (DATASOURCE="demoErr", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1294. case "ldemo":
  1295. sql = `CREATE STREAM ldemo (
  1296. ) WITH (DATASOURCE="ldemo", FORMAT="json");`
  1297. case "ldemo1":
  1298. sql = `CREATE STREAM ldemo1 (
  1299. ) WITH (DATASOURCE="ldemo1", FORMAT="json");`
  1300. case "lsessionDemo":
  1301. sql = `CREATE STREAM lsessionDemo (
  1302. ) WITH (DATASOURCE="lsessionDemo", FORMAT="json");`
  1303. case "ext":
  1304. sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  1305. case "ext2":
  1306. sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  1307. case "text":
  1308. sql = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
  1309. case "binDemo":
  1310. sql = "CREATE STREAM binDemo () WITH (DATASOURCE=\"users\", FORMAT=\"BINARY\")"
  1311. case "table1":
  1312. sql = `CREATE TABLE table1 (
  1313. name STRING,
  1314. size BIGINT,
  1315. id BIGINT
  1316. ) WITH (DATASOURCE="lookup.json", FORMAT="json", CONF_KEY="test");`
  1317. case "helloStr":
  1318. sql = `CREATE STREAM helloStr (name string) WITH (DATASOURCE="hello", FORMAT="JSON")`
  1319. case "commands":
  1320. sql = `CREATE STREAM commands (cmd string, base64_img string, encoded_json string) WITH (DATASOURCE="commands", FORMAT="JSON")`
  1321. case "fakeBin":
  1322. sql = "CREATE STREAM fakeBin () WITH (DATASOURCE=\"users\", FORMAT=\"BINARY\")"
  1323. case "shelves":
  1324. sql = `CREATE STREAM shelves (
  1325. name string,
  1326. size BIGINT,
  1327. shelf STRUCT(theme STRING,id BIGINT, subfield STRING)
  1328. ) WITH (DATASOURCE="shelves", FORMAT="json");`
  1329. case "mes":
  1330. sql = `CREATE STREAM mes (message_id string, text string) WITH (DATASOURCE="mes", FORMAT="JSON")`
  1331. default:
  1332. t.Errorf("create stream %s fail", name)
  1333. }
  1334. } else {
  1335. if strings.Index(name, "table") == 0 {
  1336. sql = `DROP TABLE ` + name
  1337. } else {
  1338. sql = `DROP STREAM ` + name
  1339. }
  1340. }
  1341. _, err := p.ExecStmt(sql)
  1342. if err != nil {
  1343. t.Log(err)
  1344. }
  1345. }
  1346. }
  1347. type RuleCheckpointTest struct {
  1348. RuleTest
  1349. PauseSize int // Stop stream after sending pauseSize source to test checkpoint resume
  1350. Cc int // checkpoint count when paused
  1351. PauseMetric map[string]interface{} // The metric to check when paused
  1352. }
  1353. func DoCheckpointRuleTest(t *testing.T, tests []RuleCheckpointTest, j int, opt *api.RuleOption) {
  1354. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  1355. for i, tt := range tests {
  1356. datas, dataLength, tp, mockSink, errCh := createStream(t, tt.RuleTest, j, opt, nil)
  1357. if tp == nil {
  1358. t.Errorf("topo is not created successfully")
  1359. break
  1360. }
  1361. var retry int
  1362. for retry = 10; retry > 0; retry-- {
  1363. if tp.GetCoordinator() == nil || !tp.GetCoordinator().IsActivated() {
  1364. common.Log.Debugf("waiting for coordinator ready %d\n", retry)
  1365. time.Sleep(10 * time.Millisecond)
  1366. } else {
  1367. break
  1368. }
  1369. }
  1370. if retry == 0 {
  1371. t.Error("coordinator timeout")
  1372. t.FailNow()
  1373. }
  1374. common.Log.Debugf("Start sending first phase data done at %d", common.GetNowInMilli())
  1375. if err := sendData(t, tt.PauseSize, tt.PauseMetric, datas, errCh, tp, 100, 100); err != nil {
  1376. t.Errorf("first phase send data error %s", err)
  1377. break
  1378. }
  1379. common.Log.Debugf("Send first phase data done at %d", common.GetNowInMilli())
  1380. // compare checkpoint count
  1381. time.Sleep(10 * time.Millisecond)
  1382. for retry = 3; retry > 0; retry-- {
  1383. actual := tp.GetCoordinator().GetCompleteCount()
  1384. if tt.Cc == actual {
  1385. break
  1386. } else {
  1387. common.Log.Debugf("check checkpointCount error at %d: %d\n", retry, actual)
  1388. }
  1389. time.Sleep(200 * time.Millisecond)
  1390. }
  1391. cc := tp.GetCoordinator().GetCompleteCount()
  1392. tp.Cancel()
  1393. if retry == 0 {
  1394. t.Errorf("%d-%d. checkpoint count\n\nresult mismatch:\n\nexp=%#v\n\ngot=%d\n\n", i, j, tt.Cc, cc)
  1395. return
  1396. } else if retry < 3 {
  1397. common.Log.Debugf("try %d for checkpoint count\n", 4-retry)
  1398. }
  1399. tp.Cancel()
  1400. time.Sleep(10 * time.Millisecond)
  1401. // resume stream
  1402. common.Log.Debugf("Resume stream at %d", common.GetNowInMilli())
  1403. errCh = tp.Open()
  1404. common.Log.Debugf("After open stream at %d", common.GetNowInMilli())
  1405. if err := sendData(t, dataLength, tt.M, datas, errCh, tp, POSTLEAP, 10); err != nil {
  1406. t.Errorf("second phase send data error %s", err)
  1407. break
  1408. }
  1409. compareResult(t, mockSink, commonResultFunc, tt.RuleTest, i, tp)
  1410. }
  1411. }
  1412. func CreateRule(name, sql string) (*api.Rule, error) {
  1413. p := processors.NewRuleProcessor(DbDir)
  1414. p.ExecDrop(name)
  1415. return p.ExecCreate(name, sql)
  1416. }