common_test.go 24 KB


  1. package processors
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream"
  9. "github.com/emqx/kuiper/xstream/api"
  10. "github.com/emqx/kuiper/xstream/nodes"
  11. "github.com/emqx/kuiper/xstream/planner"
  12. "github.com/emqx/kuiper/xstream/test"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "reflect"
  17. "strings"
  18. "testing"
  19. "time"
  20. )
  21. const POSTLEAP = 1000 // Time change after all data sends out
  22. type ruleTest struct {
  23. name string
  24. sql string
  25. r interface{} // The result
  26. m map[string]interface{} // final metrics
  27. t *xstream.PrintableTopo // printable topo, an optional field
  28. w int // wait time for each data sending, in milli
  29. }
  30. var (
  31. DbDir = getDbDir()
  32. image, b64img = getImg()
  33. )
  34. func getDbDir() string {
  35. common.InitConf()
  36. dbDir, err := common.GetDataLoc()
  37. if err != nil {
  38. log.Panic(err)
  39. }
  40. log.Infof("db location is %s", dbDir)
  41. return dbDir
  42. }
  43. func getImg() ([]byte, string) {
  44. docsFolder, err := common.GetLoc("/docs/")
  45. if err != nil {
  46. log.Fatalf("Cannot find docs folder: %v", err)
  47. }
  48. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  49. if err != nil {
  50. log.Fatalf("Cannot read image: %v", err)
  51. }
  52. b64img := base64.StdEncoding.EncodeToString(image)
  53. return image, b64img
  54. }
  55. func cleanStateData() {
  56. dbDir, err := common.GetDataLoc()
  57. if err != nil {
  58. log.Panic(err)
  59. }
  60. c := path.Join(dbDir, "checkpoints")
  61. err = os.RemoveAll(c)
  62. if err != nil {
  63. log.Errorf("%s", err)
  64. }
  65. s := path.Join(dbDir, "sink", "cache")
  66. err = os.RemoveAll(s)
  67. if err != nil {
  68. log.Errorf("%s", err)
  69. }
  70. }
  71. func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err error) {
  72. keys, values := tp.GetMetrics()
  73. for k, v := range m {
  74. var (
  75. index int
  76. key string
  77. matched bool
  78. )
  79. for index, key = range keys {
  80. if k == key {
  81. if strings.HasSuffix(k, "process_latency_us") {
  82. if values[index].(int64) >= v.(int64) {
  83. matched = true
  84. continue
  85. } else {
  86. break
  87. }
  88. }
  89. if values[index] == v {
  90. matched = true
  91. }
  92. break
  93. }
  94. }
  95. if matched {
  96. continue
  97. }
  98. //do not find
  99. if index < len(values) {
  100. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  101. } else {
  102. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  103. }
  104. }
  105. return nil
  106. }
  107. func errstring(err error) string {
  108. if err != nil {
  109. return err.Error()
  110. }
  111. return ""
  112. }
  113. // The time diff must larger than timeleap
  114. var testData = map[string][]*xsql.Tuple{
  115. "demo": {
  116. {
  117. Emitter: "demo",
  118. Message: map[string]interface{}{
  119. "color": "red",
  120. "size": 3,
  121. "ts": 1541152486013,
  122. },
  123. Timestamp: 1541152486013,
  124. },
  125. {
  126. Emitter: "demo",
  127. Message: map[string]interface{}{
  128. "color": "blue",
  129. "size": 6,
  130. "ts": 1541152486822,
  131. },
  132. Timestamp: 1541152486822,
  133. },
  134. {
  135. Emitter: "demo",
  136. Message: map[string]interface{}{
  137. "color": "blue",
  138. "size": 2,
  139. "ts": 1541152487632,
  140. },
  141. Timestamp: 1541152487632,
  142. },
  143. {
  144. Emitter: "demo",
  145. Message: map[string]interface{}{
  146. "color": "yellow",
  147. "size": 4,
  148. "ts": 1541152488442,
  149. },
  150. Timestamp: 1541152488442,
  151. },
  152. {
  153. Emitter: "demo",
  154. Message: map[string]interface{}{
  155. "color": "red",
  156. "size": 1,
  157. "ts": 1541152489252,
  158. },
  159. Timestamp: 1541152489252,
  160. },
  161. },
  162. "demoError": {
  163. {
  164. Emitter: "demoError",
  165. Message: map[string]interface{}{
  166. "color": 3,
  167. "size": "red",
  168. "ts": 1541152486013,
  169. },
  170. Timestamp: 1541152486013,
  171. },
  172. {
  173. Emitter: "demoError",
  174. Message: map[string]interface{}{
  175. "color": "blue",
  176. "size": 6,
  177. "ts": "1541152486822",
  178. },
  179. Timestamp: 1541152486822,
  180. },
  181. {
  182. Emitter: "demoError",
  183. Message: map[string]interface{}{
  184. "color": "blue",
  185. "size": 2,
  186. "ts": 1541152487632,
  187. },
  188. Timestamp: 1541152487632,
  189. },
  190. {
  191. Emitter: "demoError",
  192. Message: map[string]interface{}{
  193. "color": 7,
  194. "size": 4,
  195. "ts": 1541152488442,
  196. },
  197. Timestamp: 1541152488442,
  198. },
  199. {
  200. Emitter: "demoError",
  201. Message: map[string]interface{}{
  202. "color": "red",
  203. "size": "blue",
  204. "ts": 1541152489252,
  205. },
  206. Timestamp: 1541152489252,
  207. },
  208. },
  209. "demo1": {
  210. {
  211. Emitter: "demo1",
  212. Message: map[string]interface{}{
  213. "temp": 25.5,
  214. "hum": 65,
  215. "from": "device1",
  216. "ts": 1541152486013,
  217. },
  218. Timestamp: 1541152486115,
  219. },
  220. {
  221. Emitter: "demo1",
  222. Message: map[string]interface{}{
  223. "temp": 27.5,
  224. "hum": 59,
  225. "from": "device2",
  226. "ts": 1541152486823,
  227. },
  228. Timestamp: 1541152486903,
  229. },
  230. {
  231. Emitter: "demo1",
  232. Message: map[string]interface{}{
  233. "temp": 28.1,
  234. "hum": 75,
  235. "from": "device3",
  236. "ts": 1541152487632,
  237. },
  238. Timestamp: 1541152487702,
  239. },
  240. {
  241. Emitter: "demo1",
  242. Message: map[string]interface{}{
  243. "temp": 27.4,
  244. "hum": 80,
  245. "from": "device1",
  246. "ts": 1541152488442,
  247. },
  248. Timestamp: 1541152488605,
  249. },
  250. {
  251. Emitter: "demo1",
  252. Message: map[string]interface{}{
  253. "temp": 25.5,
  254. "hum": 62,
  255. "from": "device3",
  256. "ts": 1541152489252,
  257. },
  258. Timestamp: 1541152489305,
  259. },
  260. },
  261. "sessionDemo": {
  262. {
  263. Emitter: "sessionDemo",
  264. Message: map[string]interface{}{
  265. "temp": 25.5,
  266. "hum": 65,
  267. "ts": 1541152486013,
  268. },
  269. Timestamp: 1541152486013,
  270. },
  271. {
  272. Emitter: "sessionDemo",
  273. Message: map[string]interface{}{
  274. "temp": 27.5,
  275. "hum": 59,
  276. "ts": 1541152486823,
  277. },
  278. Timestamp: 1541152486823,
  279. },
  280. {
  281. Emitter: "sessionDemo",
  282. Message: map[string]interface{}{
  283. "temp": 28.1,
  284. "hum": 75,
  285. "ts": 1541152487932,
  286. },
  287. Timestamp: 1541152487932,
  288. },
  289. {
  290. Emitter: "sessionDemo",
  291. Message: map[string]interface{}{
  292. "temp": 27.4,
  293. "hum": 80,
  294. "ts": 1541152488442,
  295. },
  296. Timestamp: 1541152488442,
  297. },
  298. {
  299. Emitter: "sessionDemo",
  300. Message: map[string]interface{}{
  301. "temp": 25.5,
  302. "hum": 62,
  303. "ts": 1541152489252,
  304. },
  305. Timestamp: 1541152489252,
  306. },
  307. {
  308. Emitter: "sessionDemo",
  309. Message: map[string]interface{}{
  310. "temp": 26.2,
  311. "hum": 63,
  312. "ts": 1541152490062,
  313. },
  314. Timestamp: 1541152490062,
  315. },
  316. {
  317. Emitter: "sessionDemo",
  318. Message: map[string]interface{}{
  319. "temp": 26.8,
  320. "hum": 71,
  321. "ts": 1541152490872,
  322. },
  323. Timestamp: 1541152490872,
  324. },
  325. {
  326. Emitter: "sessionDemo",
  327. Message: map[string]interface{}{
  328. "temp": 28.9,
  329. "hum": 85,
  330. "ts": 1541152491682,
  331. },
  332. Timestamp: 1541152491682,
  333. },
  334. {
  335. Emitter: "sessionDemo",
  336. Message: map[string]interface{}{
  337. "temp": 29.1,
  338. "hum": 92,
  339. "ts": 1541152492492,
  340. },
  341. Timestamp: 1541152492492,
  342. },
  343. {
  344. Emitter: "sessionDemo",
  345. Message: map[string]interface{}{
  346. "temp": 32.2,
  347. "hum": 99,
  348. "ts": 1541152493202,
  349. },
  350. Timestamp: 1541152493202,
  351. },
  352. {
  353. Emitter: "sessionDemo",
  354. Message: map[string]interface{}{
  355. "temp": 30.9,
  356. "hum": 87,
  357. "ts": 1541152494112,
  358. },
  359. Timestamp: 1541152494112,
  360. },
  361. },
  362. "demoE": {
  363. {
  364. Emitter: "demoE",
  365. Message: map[string]interface{}{
  366. "color": "red",
  367. "size": 3,
  368. "ts": 1541152486013,
  369. },
  370. Timestamp: 1541152486023,
  371. },
  372. {
  373. Emitter: "demoE",
  374. Message: map[string]interface{}{
  375. "color": "blue",
  376. "size": 2,
  377. "ts": 1541152487632,
  378. },
  379. Timestamp: 1541152487822,
  380. },
  381. {
  382. Emitter: "demoE",
  383. Message: map[string]interface{}{
  384. "color": "red",
  385. "size": 1,
  386. "ts": 1541152489252,
  387. },
  388. Timestamp: 1541152489632,
  389. },
  390. { //dropped item
  391. Emitter: "demoE",
  392. Message: map[string]interface{}{
  393. "color": "blue",
  394. "size": 6,
  395. "ts": 1541152486822,
  396. },
  397. Timestamp: 1541152489842,
  398. },
  399. {
  400. Emitter: "demoE",
  401. Message: map[string]interface{}{
  402. "color": "yellow",
  403. "size": 4,
  404. "ts": 1541152488442,
  405. },
  406. Timestamp: 1541152490052,
  407. },
  408. { //To lift the watermark and issue all windows
  409. Emitter: "demoE",
  410. Message: map[string]interface{}{
  411. "color": "yellow",
  412. "size": 4,
  413. "ts": 1541152492342,
  414. },
  415. Timestamp: 1541152498888,
  416. },
  417. },
  418. "demo1E": {
  419. {
  420. Emitter: "demo1E",
  421. Message: map[string]interface{}{
  422. "temp": 27.5,
  423. "hum": 59,
  424. "ts": 1541152486823,
  425. },
  426. Timestamp: 1541152487250,
  427. },
  428. {
  429. Emitter: "demo1E",
  430. Message: map[string]interface{}{
  431. "temp": 25.5,
  432. "hum": 65,
  433. "ts": 1541152486013,
  434. },
  435. Timestamp: 1541152487751,
  436. },
  437. {
  438. Emitter: "demo1E",
  439. Message: map[string]interface{}{
  440. "temp": 27.4,
  441. "hum": 80,
  442. "ts": 1541152488442,
  443. },
  444. Timestamp: 1541152489252,
  445. },
  446. {
  447. Emitter: "demo1E",
  448. Message: map[string]interface{}{
  449. "temp": 28.1,
  450. "hum": 75,
  451. "ts": 1541152487632,
  452. },
  453. Timestamp: 1541152489753,
  454. },
  455. {
  456. Emitter: "demo1E",
  457. Message: map[string]interface{}{
  458. "temp": 25.5,
  459. "hum": 62,
  460. "ts": 1541152489252,
  461. },
  462. Timestamp: 1541152489954,
  463. },
  464. {
  465. Emitter: "demo1E",
  466. Message: map[string]interface{}{
  467. "temp": 25.5,
  468. "hum": 62,
  469. "ts": 1541152499252,
  470. },
  471. Timestamp: 1541152499755,
  472. },
  473. },
  474. "sessionDemoE": {
  475. {
  476. Emitter: "sessionDemoE",
  477. Message: map[string]interface{}{
  478. "temp": 25.5,
  479. "hum": 65,
  480. "ts": 1541152486013,
  481. },
  482. Timestamp: 1541152486250,
  483. },
  484. {
  485. Emitter: "sessionDemoE",
  486. Message: map[string]interface{}{
  487. "temp": 28.1,
  488. "hum": 75,
  489. "ts": 1541152487932,
  490. },
  491. Timestamp: 1541152487951,
  492. },
  493. {
  494. Emitter: "sessionDemoE",
  495. Message: map[string]interface{}{
  496. "temp": 27.5,
  497. "hum": 59,
  498. "ts": 1541152486823,
  499. },
  500. Timestamp: 1541152488552,
  501. },
  502. {
  503. Emitter: "sessionDemoE",
  504. Message: map[string]interface{}{
  505. "temp": 25.5,
  506. "hum": 62,
  507. "ts": 1541152489252,
  508. },
  509. Timestamp: 1541152489353,
  510. },
  511. {
  512. Emitter: "sessionDemoE",
  513. Message: map[string]interface{}{
  514. "temp": 27.4,
  515. "hum": 80,
  516. "ts": 1541152488442,
  517. },
  518. Timestamp: 1541152489854,
  519. },
  520. {
  521. Emitter: "sessionDemoE",
  522. Message: map[string]interface{}{
  523. "temp": 26.2,
  524. "hum": 63,
  525. "ts": 1541152490062,
  526. },
  527. Timestamp: 1541152490155,
  528. },
  529. {
  530. Emitter: "sessionDemoE",
  531. Message: map[string]interface{}{
  532. "temp": 28.9,
  533. "hum": 85,
  534. "ts": 1541152491682,
  535. },
  536. Timestamp: 1541152491686,
  537. },
  538. {
  539. Emitter: "sessionDemoE",
  540. Message: map[string]interface{}{
  541. "temp": 26.8,
  542. "hum": 71,
  543. "ts": 1541152490872,
  544. },
  545. Timestamp: 1541152491972,
  546. },
  547. {
  548. Emitter: "sessionDemoE",
  549. Message: map[string]interface{}{
  550. "temp": 29.1,
  551. "hum": 92,
  552. "ts": 1541152492492,
  553. },
  554. Timestamp: 1541152492592,
  555. },
  556. {
  557. Emitter: "sessionDemoE",
  558. Message: map[string]interface{}{
  559. "temp": 30.9,
  560. "hum": 87,
  561. "ts": 1541152494112,
  562. },
  563. Timestamp: 1541152494212,
  564. },
  565. {
  566. Emitter: "sessionDemoE",
  567. Message: map[string]interface{}{
  568. "temp": 32.2,
  569. "hum": 99,
  570. "ts": 1541152493202,
  571. },
  572. Timestamp: 1541152495202,
  573. },
  574. {
  575. Emitter: "sessionDemoE",
  576. Message: map[string]interface{}{
  577. "temp": 32.2,
  578. "hum": 99,
  579. "ts": 1541152499202,
  580. },
  581. Timestamp: 1541152499402,
  582. },
  583. },
  584. "demoErr": {
  585. {
  586. Emitter: "demoErr",
  587. Message: map[string]interface{}{
  588. "color": "red",
  589. "size": 3,
  590. "ts": 1541152486013,
  591. },
  592. Timestamp: 1541152486221,
  593. },
  594. {
  595. Emitter: "demoErr",
  596. Message: map[string]interface{}{
  597. "color": 2,
  598. "size": "blue",
  599. "ts": 1541152487632,
  600. },
  601. Timestamp: 1541152487722,
  602. },
  603. {
  604. Emitter: "demoErr",
  605. Message: map[string]interface{}{
  606. "color": "red",
  607. "size": 1,
  608. "ts": 1541152489252,
  609. },
  610. Timestamp: 1541152489332,
  611. },
  612. { //dropped item
  613. Emitter: "demoErr",
  614. Message: map[string]interface{}{
  615. "color": "blue",
  616. "size": 6,
  617. "ts": 1541152486822,
  618. },
  619. Timestamp: 1541152489822,
  620. },
  621. {
  622. Emitter: "demoErr",
  623. Message: map[string]interface{}{
  624. "color": "yellow",
  625. "size": 4,
  626. "ts": 1541152488442,
  627. },
  628. Timestamp: 1541152490042,
  629. },
  630. { //To lift the watermark and issue all windows
  631. Emitter: "demoErr",
  632. Message: map[string]interface{}{
  633. "color": "yellow",
  634. "size": 4,
  635. "ts": 1541152492342,
  636. },
  637. Timestamp: 1541152493842,
  638. },
  639. },
  640. "ldemo": {
  641. {
  642. Emitter: "ldemo",
  643. Message: map[string]interface{}{
  644. "color": "red",
  645. "size": 3,
  646. "ts": 1541152486013,
  647. },
  648. Timestamp: 1541152486013,
  649. },
  650. {
  651. Emitter: "ldemo",
  652. Message: map[string]interface{}{
  653. "color": "blue",
  654. "size": "string",
  655. "ts": 1541152486822,
  656. },
  657. Timestamp: 1541152486822,
  658. },
  659. {
  660. Emitter: "ldemo",
  661. Message: map[string]interface{}{
  662. "size": 3,
  663. "ts": 1541152487632,
  664. },
  665. Timestamp: 1541152487632,
  666. },
  667. {
  668. Emitter: "ldemo",
  669. Message: map[string]interface{}{
  670. "color": 49,
  671. "size": 2,
  672. "ts": 1541152488442,
  673. },
  674. Timestamp: 1541152488442,
  675. },
  676. {
  677. Emitter: "ldemo",
  678. Message: map[string]interface{}{
  679. "color": "red",
  680. "ts": 1541152489252,
  681. },
  682. Timestamp: 1541152489252,
  683. },
  684. },
  685. "ldemo1": {
  686. {
  687. Emitter: "ldemo1",
  688. Message: map[string]interface{}{
  689. "temp": 25.5,
  690. "hum": 65,
  691. "ts": 1541152486013,
  692. },
  693. Timestamp: 1541152486013,
  694. },
  695. {
  696. Emitter: "ldemo1",
  697. Message: map[string]interface{}{
  698. "temp": 27.5,
  699. "hum": 59,
  700. "ts": 1541152486823,
  701. },
  702. Timestamp: 1541152486823,
  703. },
  704. {
  705. Emitter: "ldemo1",
  706. Message: map[string]interface{}{
  707. "temp": 28.1,
  708. "hum": 75,
  709. "ts": 1541152487632,
  710. },
  711. Timestamp: 1541152487632,
  712. },
  713. {
  714. Emitter: "ldemo1",
  715. Message: map[string]interface{}{
  716. "temp": 27.4,
  717. "hum": 80,
  718. "ts": "1541152488442",
  719. },
  720. Timestamp: 1541152488442,
  721. },
  722. {
  723. Emitter: "ldemo1",
  724. Message: map[string]interface{}{
  725. "temp": 25.5,
  726. "hum": 62,
  727. "ts": 1541152489252,
  728. },
  729. Timestamp: 1541152489252,
  730. },
  731. },
  732. "lsessionDemo": {
  733. {
  734. Emitter: "lsessionDemo",
  735. Message: map[string]interface{}{
  736. "temp": 25.5,
  737. "hum": 65,
  738. "ts": 1541152486013,
  739. },
  740. Timestamp: 1541152486013,
  741. },
  742. {
  743. Emitter: "lsessionDemo",
  744. Message: map[string]interface{}{
  745. "temp": 27.5,
  746. "hum": 59,
  747. "ts": 1541152486823,
  748. },
  749. Timestamp: 1541152486823,
  750. },
  751. {
  752. Emitter: "lsessionDemo",
  753. Message: map[string]interface{}{
  754. "temp": 28.1,
  755. "hum": 75,
  756. "ts": 1541152487932,
  757. },
  758. Timestamp: 1541152487932,
  759. },
  760. {
  761. Emitter: "lsessionDemo",
  762. Message: map[string]interface{}{
  763. "temp": 27.4,
  764. "hum": 80,
  765. "ts": 1541152488442,
  766. },
  767. Timestamp: 1541152488442,
  768. },
  769. {
  770. Emitter: "lsessionDemo",
  771. Message: map[string]interface{}{
  772. "temp": 25.5,
  773. "hum": 62,
  774. "ts": 1541152489252,
  775. },
  776. Timestamp: 1541152489252,
  777. },
  778. {
  779. Emitter: "lsessionDemo",
  780. Message: map[string]interface{}{
  781. "temp": 26.2,
  782. "hum": 63,
  783. "ts": 1541152490062,
  784. },
  785. Timestamp: 1541152490062,
  786. },
  787. {
  788. Emitter: "lsessionDemo",
  789. Message: map[string]interface{}{
  790. "temp": 26.8,
  791. "hum": 71,
  792. "ts": 1541152490872,
  793. },
  794. Timestamp: 1541152490872,
  795. },
  796. {
  797. Emitter: "lsessionDemo",
  798. Message: map[string]interface{}{
  799. "temp": 28.9,
  800. "hum": 85,
  801. "ts": 1541152491682,
  802. },
  803. Timestamp: 1541152491682,
  804. },
  805. {
  806. Emitter: "lsessionDemo",
  807. Message: map[string]interface{}{
  808. "temp": 29.1,
  809. "hum": 92,
  810. "ts": 1541152492492,
  811. },
  812. Timestamp: 1541152492492,
  813. },
  814. {
  815. Emitter: "lsessionDemo",
  816. Message: map[string]interface{}{
  817. "temp": 2.2,
  818. "hum": 99,
  819. "ts": 1541152493202,
  820. },
  821. Timestamp: 1541152493202,
  822. },
  823. {
  824. Emitter: "lsessionDemo",
  825. Message: map[string]interface{}{
  826. "temp": 30.9,
  827. "hum": 87,
  828. "ts": 1541152494112,
  829. },
  830. Timestamp: 1541152494112,
  831. },
  832. },
  833. "text": {
  834. {
  835. Emitter: "text",
  836. Message: map[string]interface{}{
  837. "slogan": "Impossible is nothing",
  838. "brand": "Adidas",
  839. },
  840. Timestamp: 1541152486500,
  841. },
  842. {
  843. Emitter: "text",
  844. Message: map[string]interface{}{
  845. "slogan": "Stronger than dirt",
  846. "brand": "Ajax",
  847. },
  848. Timestamp: 1541152487400,
  849. },
  850. {
  851. Emitter: "text",
  852. Message: map[string]interface{}{
  853. "slogan": "Belong anywhere",
  854. "brand": "Airbnb",
  855. },
  856. Timestamp: 1541152488300,
  857. },
  858. {
  859. Emitter: "text",
  860. Message: map[string]interface{}{
  861. "slogan": "I can't believe I ate the whole thing",
  862. "brand": "Alka Seltzer",
  863. },
  864. Timestamp: 1541152489200,
  865. },
  866. {
  867. Emitter: "text",
  868. Message: map[string]interface{}{
  869. "slogan": "You're in good hands",
  870. "brand": "Allstate",
  871. },
  872. Timestamp: 1541152490100,
  873. },
  874. {
  875. Emitter: "text",
  876. Message: map[string]interface{}{
  877. "slogan": "Don't leave home without it",
  878. "brand": "American Express",
  879. },
  880. Timestamp: 1541152491200,
  881. },
  882. {
  883. Emitter: "text",
  884. Message: map[string]interface{}{
  885. "slogan": "Think different",
  886. "brand": "Apple",
  887. },
  888. Timestamp: 1541152492300,
  889. },
  890. {
  891. Emitter: "text",
  892. Message: map[string]interface{}{
  893. "slogan": "We try harder",
  894. "brand": "Avis",
  895. },
  896. Timestamp: 1541152493400,
  897. },
  898. },
  899. "binDemo": {
  900. {
  901. Emitter: "binDemo",
  902. Message: map[string]interface{}{
  903. "self": image,
  904. },
  905. Timestamp: 1541152486013,
  906. },
  907. },
  908. }
  909. func commonResultFunc(result [][]byte) interface{} {
  910. var maps [][]map[string]interface{}
  911. for _, v := range result {
  912. var mapRes []map[string]interface{}
  913. err := json.Unmarshal(v, &mapRes)
  914. if err != nil {
  915. panic("Failed to parse the input into map")
  916. }
  917. maps = append(maps, mapRes)
  918. }
  919. return maps
  920. }
  921. func doRuleTest(t *testing.T, tests []ruleTest, j int, opt *api.RuleOption) {
  922. doRuleTestBySinkProps(t, tests, j, opt, nil, commonResultFunc)
  923. }
  924. func doRuleTestBySinkProps(t *testing.T, tests []ruleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}, resultFunc func(result [][]byte) interface{}) {
  925. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  926. for i, tt := range tests {
  927. datas, dataLength, tp, mockSink, errCh := createStream(t, tt, j, opt, sinkProps)
  928. wait := tt.w
  929. if wait == 0 {
  930. wait = 5
  931. if opt.Qos == api.ExactlyOnce {
  932. wait = 10
  933. }
  934. }
  935. if err := sendData(t, dataLength, tt.m, datas, errCh, tp, POSTLEAP, wait); err != nil {
  936. t.Errorf("send data error %s", err)
  937. break
  938. }
  939. compareResult(t, mockSink, resultFunc, tt, i, tp)
  940. }
  941. }
  942. func compareResult(t *testing.T, mockSink *test.MockSink, resultFunc func(result [][]byte) interface{}, tt ruleTest, i int, tp *xstream.TopologyNew) {
  943. // Check results
  944. results := mockSink.GetResults()
  945. maps := resultFunc(results)
  946. if !reflect.DeepEqual(tt.r, maps) {
  947. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  948. }
  949. if err := compareMetrics(tp, tt.m); err != nil {
  950. t.Errorf("%d. %q\n\nmetrics mismatch:\n\n%s\n\n", i, tt.sql, err)
  951. }
  952. if tt.t != nil {
  953. topo := tp.GetTopo()
  954. if !reflect.DeepEqual(tt.t, topo) {
  955. t.Errorf("%d. %q\n\ntopo mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.t, topo)
  956. }
  957. }
  958. tp.Cancel()
  959. }
  960. func sendData(t *testing.T, dataLength int, metrics map[string]interface{}, datas [][]*xsql.Tuple, errCh <-chan error, tp *xstream.TopologyNew, postleap int, wait int) error {
  961. // Send data and move time
  962. mockClock := test.GetMockClock()
  963. // Set the current time
  964. mockClock.Add(0)
  965. // TODO assume multiple data source send the data in order and has the same length
  966. for i := 0; i < dataLength; i++ {
  967. for _, d := range datas {
  968. // Make sure time is going forward only
  969. // gradually add up time to ensure checkpoint is triggered before the data send
  970. for n := common.GetNowInMilli() + 100; d[i].Timestamp+100 > n; n += 100 {
  971. if d[i].Timestamp < n {
  972. n = d[i].Timestamp
  973. }
  974. mockClock.Set(common.TimeFromUnixMilli(n))
  975. common.Log.Debugf("Clock set to %d", common.GetNowInMilli())
  976. time.Sleep(1)
  977. }
  978. select {
  979. case err := <-errCh:
  980. t.Log(err)
  981. tp.Cancel()
  982. return err
  983. default:
  984. }
  985. time.Sleep(time.Duration(wait) * time.Millisecond)
  986. }
  987. }
  988. mockClock.Add(time.Duration(postleap) * time.Millisecond)
  989. common.Log.Debugf("Clock add to %d", common.GetNowInMilli())
  990. // Check if stream done. Poll for metrics,
  991. time.Sleep(10 * time.Millisecond)
  992. var retry int
  993. for retry = 2; retry > 0; retry-- {
  994. if err := compareMetrics(tp, metrics); err == nil {
  995. break
  996. } else {
  997. common.Log.Errorf("check metrics error at %d: %s", retry, err)
  998. }
  999. time.Sleep(1000 * time.Millisecond)
  1000. }
  1001. if retry == 0 {
  1002. fmt.Printf("send data timeout\n")
  1003. } else if retry < 2 {
  1004. fmt.Printf("try %d for metric comparison\n", 2-retry)
  1005. }
  1006. return nil
  1007. }
  1008. func createStream(t *testing.T, tt ruleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}) ([][]*xsql.Tuple, int, *xstream.TopologyNew, *test.MockSink, <-chan error) {
  1009. // Rest for each test
  1010. cleanStateData()
  1011. test.ResetClock(1541152486000)
  1012. // Create stream
  1013. var (
  1014. sources []*nodes.SourceNode
  1015. datas [][]*xsql.Tuple
  1016. dataLength int
  1017. )
  1018. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1019. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1020. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1021. } else {
  1022. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1023. t.Errorf("sql %s is not a select statement", tt.sql)
  1024. } else {
  1025. streams := xsql.GetStreams(selectStmt)
  1026. for _, stream := range streams {
  1027. data := testData[stream]
  1028. dataLength = len(data)
  1029. datas = append(datas, data)
  1030. source := nodes.NewSourceNodeWithSource(stream, test.NewMockSource(data), map[string]string{
  1031. "DATASOURCE": stream,
  1032. })
  1033. sources = append(sources, source)
  1034. }
  1035. }
  1036. }
  1037. mockSink := test.NewMockSink()
  1038. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
  1039. tp, err := planner.PlanWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.name, j), Sql: tt.sql, Options: opt}, DbDir, sources, []*nodes.SinkNode{sink})
  1040. if err != nil {
  1041. t.Error(err)
  1042. return nil, 0, nil, nil, nil
  1043. }
  1044. errCh := tp.Open()
  1045. return datas, dataLength, tp, mockSink, errCh
  1046. }
  1047. // Create or drop streams
  1048. func handleStream(createOrDrop bool, names []string, t *testing.T) {
  1049. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  1050. for _, name := range names {
  1051. var sql string
  1052. if createOrDrop {
  1053. switch name {
  1054. case "demo":
  1055. sql = `CREATE STREAM demo (
  1056. color STRING,
  1057. size BIGINT,
  1058. ts BIGINT
  1059. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  1060. case "demoError":
  1061. sql = `CREATE STREAM demoError (
  1062. color STRING,
  1063. size BIGINT,
  1064. ts BIGINT
  1065. ) WITH (DATASOURCE="demoError", FORMAT="json", KEY="ts");`
  1066. case "demo1":
  1067. sql = `CREATE STREAM demo1 (
  1068. temp FLOAT,
  1069. hum BIGINT,` +
  1070. "`from`" + ` STRING,
  1071. ts BIGINT
  1072. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  1073. case "sessionDemo":
  1074. sql = `CREATE STREAM sessionDemo (
  1075. temp FLOAT,
  1076. hum BIGINT,
  1077. ts BIGINT
  1078. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  1079. case "demoE":
  1080. sql = `CREATE STREAM demoE (
  1081. color STRING,
  1082. size BIGINT,
  1083. ts BIGINT
  1084. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1085. case "demo1E":
  1086. sql = `CREATE STREAM demo1E (
  1087. temp FLOAT,
  1088. hum BIGINT,
  1089. ts BIGINT
  1090. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1091. case "sessionDemoE":
  1092. sql = `CREATE STREAM sessionDemoE (
  1093. temp FLOAT,
  1094. hum BIGINT,
  1095. ts BIGINT
  1096. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1097. case "demoErr":
  1098. sql = `CREATE STREAM demoErr (
  1099. color STRING,
  1100. size BIGINT,
  1101. ts BIGINT
  1102. ) WITH (DATASOURCE="demoErr", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1103. case "ldemo":
  1104. sql = `CREATE STREAM ldemo (
  1105. ) WITH (DATASOURCE="ldemo", FORMAT="json");`
  1106. case "ldemo1":
  1107. sql = `CREATE STREAM ldemo1 (
  1108. ) WITH (DATASOURCE="ldemo1", FORMAT="json");`
  1109. case "lsessionDemo":
  1110. sql = `CREATE STREAM lsessionDemo (
  1111. ) WITH (DATASOURCE="lsessionDemo", FORMAT="json");`
  1112. case "ext":
  1113. sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  1114. case "ext2":
  1115. sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  1116. case "text":
  1117. sql = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
  1118. case "binDemo":
  1119. sql = "CREATE STREAM binDemo () WITH (DATASOURCE=\"users\", FORMAT=\"BINARY\")"
  1120. default:
  1121. t.Errorf("create stream %s fail", name)
  1122. }
  1123. } else {
  1124. sql = `DROP STREAM ` + name
  1125. }
  1126. _, err := p.ExecStmt(sql)
  1127. if err != nil {
  1128. t.Log(err)
  1129. }
  1130. }
  1131. }