common_test.go 24 KB


  1. package processors
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream"
  9. "github.com/emqx/kuiper/xstream/api"
  10. "github.com/emqx/kuiper/xstream/nodes"
  11. "github.com/emqx/kuiper/xstream/planner"
  12. "github.com/emqx/kuiper/xstream/test"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "reflect"
  17. "strings"
  18. "testing"
  19. "time"
  20. )
  21. const POSTLEAP = 1000 // Time change after all data sends out
  22. type ruleTest struct {
  23. name string
  24. sql string
  25. r interface{} // The result
  26. m map[string]interface{} // final metrics
  27. t *xstream.PrintableTopo // printable topo, an optional field
  28. }
  29. var (
  30. DbDir = getDbDir()
  31. image, b64img = getImg()
  32. )
  33. func getDbDir() string {
  34. common.InitConf()
  35. dbDir, err := common.GetDataLoc()
  36. if err != nil {
  37. log.Panic(err)
  38. }
  39. log.Infof("db location is %s", dbDir)
  40. return dbDir
  41. }
  42. func getImg() ([]byte, string) {
  43. docsFolder, err := common.GetLoc("/docs/")
  44. if err != nil {
  45. log.Fatalf("Cannot find docs folder: %v", err)
  46. }
  47. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  48. if err != nil {
  49. log.Fatalf("Cannot read image: %v", err)
  50. }
  51. b64img := base64.StdEncoding.EncodeToString(image)
  52. return image, b64img
  53. }
  54. func cleanStateData() {
  55. dbDir, err := common.GetDataLoc()
  56. if err != nil {
  57. log.Panic(err)
  58. }
  59. c := path.Join(dbDir, "checkpoints")
  60. err = os.RemoveAll(c)
  61. if err != nil {
  62. log.Errorf("%s", err)
  63. }
  64. s := path.Join(dbDir, "sink", "cache")
  65. err = os.RemoveAll(s)
  66. if err != nil {
  67. log.Errorf("%s", err)
  68. }
  69. }
  70. func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err error) {
  71. keys, values := tp.GetMetrics()
  72. for k, v := range m {
  73. var (
  74. index int
  75. key string
  76. matched bool
  77. )
  78. for index, key = range keys {
  79. if k == key {
  80. if strings.HasSuffix(k, "process_latency_us") {
  81. if values[index].(int64) >= v.(int64) {
  82. matched = true
  83. continue
  84. } else {
  85. break
  86. }
  87. }
  88. if values[index] == v {
  89. matched = true
  90. }
  91. break
  92. }
  93. }
  94. if matched {
  95. continue
  96. }
  97. //do not find
  98. if index < len(values) {
  99. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  100. } else {
  101. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  102. }
  103. }
  104. return nil
  105. }
  106. func errstring(err error) string {
  107. if err != nil {
  108. return err.Error()
  109. }
  110. return ""
  111. }
  112. // The time diff must larger than timeleap
  113. var testData = map[string][]*xsql.Tuple{
  114. "demo": {
  115. {
  116. Emitter: "demo",
  117. Message: map[string]interface{}{
  118. "color": "red",
  119. "size": 3,
  120. "ts": 1541152486013,
  121. },
  122. Timestamp: 1541152486013,
  123. },
  124. {
  125. Emitter: "demo",
  126. Message: map[string]interface{}{
  127. "color": "blue",
  128. "size": 6,
  129. "ts": 1541152486822,
  130. },
  131. Timestamp: 1541152486822,
  132. },
  133. {
  134. Emitter: "demo",
  135. Message: map[string]interface{}{
  136. "color": "blue",
  137. "size": 2,
  138. "ts": 1541152487632,
  139. },
  140. Timestamp: 1541152487632,
  141. },
  142. {
  143. Emitter: "demo",
  144. Message: map[string]interface{}{
  145. "color": "yellow",
  146. "size": 4,
  147. "ts": 1541152488442,
  148. },
  149. Timestamp: 1541152488442,
  150. },
  151. {
  152. Emitter: "demo",
  153. Message: map[string]interface{}{
  154. "color": "red",
  155. "size": 1,
  156. "ts": 1541152489252,
  157. },
  158. Timestamp: 1541152489252,
  159. },
  160. },
  161. "demoError": {
  162. {
  163. Emitter: "demoError",
  164. Message: map[string]interface{}{
  165. "color": 3,
  166. "size": "red",
  167. "ts": 1541152486013,
  168. },
  169. Timestamp: 1541152486013,
  170. },
  171. {
  172. Emitter: "demoError",
  173. Message: map[string]interface{}{
  174. "color": "blue",
  175. "size": 6,
  176. "ts": "1541152486822",
  177. },
  178. Timestamp: 1541152486822,
  179. },
  180. {
  181. Emitter: "demoError",
  182. Message: map[string]interface{}{
  183. "color": "blue",
  184. "size": 2,
  185. "ts": 1541152487632,
  186. },
  187. Timestamp: 1541152487632,
  188. },
  189. {
  190. Emitter: "demoError",
  191. Message: map[string]interface{}{
  192. "color": 7,
  193. "size": 4,
  194. "ts": 1541152488442,
  195. },
  196. Timestamp: 1541152488442,
  197. },
  198. {
  199. Emitter: "demoError",
  200. Message: map[string]interface{}{
  201. "color": "red",
  202. "size": "blue",
  203. "ts": 1541152489252,
  204. },
  205. Timestamp: 1541152489252,
  206. },
  207. },
  208. "demo1": {
  209. {
  210. Emitter: "demo1",
  211. Message: map[string]interface{}{
  212. "temp": 25.5,
  213. "hum": 65,
  214. "from": "device1",
  215. "ts": 1541152486013,
  216. },
  217. Timestamp: 1541152486013,
  218. },
  219. {
  220. Emitter: "demo1",
  221. Message: map[string]interface{}{
  222. "temp": 27.5,
  223. "hum": 59,
  224. "from": "device2",
  225. "ts": 1541152486823,
  226. },
  227. Timestamp: 1541152486823,
  228. },
  229. {
  230. Emitter: "demo1",
  231. Message: map[string]interface{}{
  232. "temp": 28.1,
  233. "hum": 75,
  234. "from": "device3",
  235. "ts": 1541152487632,
  236. },
  237. Timestamp: 1541152487632,
  238. },
  239. {
  240. Emitter: "demo1",
  241. Message: map[string]interface{}{
  242. "temp": 27.4,
  243. "hum": 80,
  244. "from": "device1",
  245. "ts": 1541152488442,
  246. },
  247. Timestamp: 1541152488442,
  248. },
  249. {
  250. Emitter: "demo1",
  251. Message: map[string]interface{}{
  252. "temp": 25.5,
  253. "hum": 62,
  254. "from": "device3",
  255. "ts": 1541152489252,
  256. },
  257. Timestamp: 1541152489252,
  258. },
  259. },
  260. "sessionDemo": {
  261. {
  262. Emitter: "sessionDemo",
  263. Message: map[string]interface{}{
  264. "temp": 25.5,
  265. "hum": 65,
  266. "ts": 1541152486013,
  267. },
  268. Timestamp: 1541152486013,
  269. },
  270. {
  271. Emitter: "sessionDemo",
  272. Message: map[string]interface{}{
  273. "temp": 27.5,
  274. "hum": 59,
  275. "ts": 1541152486823,
  276. },
  277. Timestamp: 1541152486823,
  278. },
  279. {
  280. Emitter: "sessionDemo",
  281. Message: map[string]interface{}{
  282. "temp": 28.1,
  283. "hum": 75,
  284. "ts": 1541152487932,
  285. },
  286. Timestamp: 1541152487932,
  287. },
  288. {
  289. Emitter: "sessionDemo",
  290. Message: map[string]interface{}{
  291. "temp": 27.4,
  292. "hum": 80,
  293. "ts": 1541152488442,
  294. },
  295. Timestamp: 1541152488442,
  296. },
  297. {
  298. Emitter: "sessionDemo",
  299. Message: map[string]interface{}{
  300. "temp": 25.5,
  301. "hum": 62,
  302. "ts": 1541152489252,
  303. },
  304. Timestamp: 1541152489252,
  305. },
  306. {
  307. Emitter: "sessionDemo",
  308. Message: map[string]interface{}{
  309. "temp": 26.2,
  310. "hum": 63,
  311. "ts": 1541152490062,
  312. },
  313. Timestamp: 1541152490062,
  314. },
  315. {
  316. Emitter: "sessionDemo",
  317. Message: map[string]interface{}{
  318. "temp": 26.8,
  319. "hum": 71,
  320. "ts": 1541152490872,
  321. },
  322. Timestamp: 1541152490872,
  323. },
  324. {
  325. Emitter: "sessionDemo",
  326. Message: map[string]interface{}{
  327. "temp": 28.9,
  328. "hum": 85,
  329. "ts": 1541152491682,
  330. },
  331. Timestamp: 1541152491682,
  332. },
  333. {
  334. Emitter: "sessionDemo",
  335. Message: map[string]interface{}{
  336. "temp": 29.1,
  337. "hum": 92,
  338. "ts": 1541152492492,
  339. },
  340. Timestamp: 1541152492492,
  341. },
  342. {
  343. Emitter: "sessionDemo",
  344. Message: map[string]interface{}{
  345. "temp": 32.2,
  346. "hum": 99,
  347. "ts": 1541152493202,
  348. },
  349. Timestamp: 1541152493202,
  350. },
  351. {
  352. Emitter: "sessionDemo",
  353. Message: map[string]interface{}{
  354. "temp": 30.9,
  355. "hum": 87,
  356. "ts": 1541152494112,
  357. },
  358. Timestamp: 1541152494112,
  359. },
  360. },
  361. "demoE": {
  362. {
  363. Emitter: "demoE",
  364. Message: map[string]interface{}{
  365. "color": "red",
  366. "size": 3,
  367. "ts": 1541152486013,
  368. },
  369. Timestamp: 1541152486023,
  370. },
  371. {
  372. Emitter: "demoE",
  373. Message: map[string]interface{}{
  374. "color": "blue",
  375. "size": 2,
  376. "ts": 1541152487632,
  377. },
  378. Timestamp: 1541152487822,
  379. },
  380. {
  381. Emitter: "demoE",
  382. Message: map[string]interface{}{
  383. "color": "red",
  384. "size": 1,
  385. "ts": 1541152489252,
  386. },
  387. Timestamp: 1541152489632,
  388. },
  389. { //dropped item
  390. Emitter: "demoE",
  391. Message: map[string]interface{}{
  392. "color": "blue",
  393. "size": 6,
  394. "ts": 1541152486822,
  395. },
  396. Timestamp: 1541152489842,
  397. },
  398. {
  399. Emitter: "demoE",
  400. Message: map[string]interface{}{
  401. "color": "yellow",
  402. "size": 4,
  403. "ts": 1541152488442,
  404. },
  405. Timestamp: 1541152490052,
  406. },
  407. { //To lift the watermark and issue all windows
  408. Emitter: "demoE",
  409. Message: map[string]interface{}{
  410. "color": "yellow",
  411. "size": 4,
  412. "ts": 1541152492342,
  413. },
  414. Timestamp: 1541152498888,
  415. },
  416. },
  417. "demo1E": {
  418. {
  419. Emitter: "demo1E",
  420. Message: map[string]interface{}{
  421. "temp": 27.5,
  422. "hum": 59,
  423. "ts": 1541152486823,
  424. },
  425. Timestamp: 1541152487250,
  426. },
  427. {
  428. Emitter: "demo1E",
  429. Message: map[string]interface{}{
  430. "temp": 25.5,
  431. "hum": 65,
  432. "ts": 1541152486013,
  433. },
  434. Timestamp: 1541152487751,
  435. },
  436. {
  437. Emitter: "demo1E",
  438. Message: map[string]interface{}{
  439. "temp": 27.4,
  440. "hum": 80,
  441. "ts": 1541152488442,
  442. },
  443. Timestamp: 1541152489252,
  444. },
  445. {
  446. Emitter: "demo1E",
  447. Message: map[string]interface{}{
  448. "temp": 28.1,
  449. "hum": 75,
  450. "ts": 1541152487632,
  451. },
  452. Timestamp: 1541152489753,
  453. },
  454. {
  455. Emitter: "demo1E",
  456. Message: map[string]interface{}{
  457. "temp": 25.5,
  458. "hum": 62,
  459. "ts": 1541152489252,
  460. },
  461. Timestamp: 1541152489954,
  462. },
  463. {
  464. Emitter: "demo1E",
  465. Message: map[string]interface{}{
  466. "temp": 25.5,
  467. "hum": 62,
  468. "ts": 1541152499252,
  469. },
  470. Timestamp: 1541152499755,
  471. },
  472. },
  473. "sessionDemoE": {
  474. {
  475. Emitter: "sessionDemoE",
  476. Message: map[string]interface{}{
  477. "temp": 25.5,
  478. "hum": 65,
  479. "ts": 1541152486013,
  480. },
  481. Timestamp: 1541152486250,
  482. },
  483. {
  484. Emitter: "sessionDemoE",
  485. Message: map[string]interface{}{
  486. "temp": 28.1,
  487. "hum": 75,
  488. "ts": 1541152487932,
  489. },
  490. Timestamp: 1541152487951,
  491. },
  492. {
  493. Emitter: "sessionDemoE",
  494. Message: map[string]interface{}{
  495. "temp": 27.5,
  496. "hum": 59,
  497. "ts": 1541152486823,
  498. },
  499. Timestamp: 1541152488552,
  500. },
  501. {
  502. Emitter: "sessionDemoE",
  503. Message: map[string]interface{}{
  504. "temp": 25.5,
  505. "hum": 62,
  506. "ts": 1541152489252,
  507. },
  508. Timestamp: 1541152489353,
  509. },
  510. {
  511. Emitter: "sessionDemoE",
  512. Message: map[string]interface{}{
  513. "temp": 27.4,
  514. "hum": 80,
  515. "ts": 1541152488442,
  516. },
  517. Timestamp: 1541152489854,
  518. },
  519. {
  520. Emitter: "sessionDemoE",
  521. Message: map[string]interface{}{
  522. "temp": 26.2,
  523. "hum": 63,
  524. "ts": 1541152490062,
  525. },
  526. Timestamp: 1541152490155,
  527. },
  528. {
  529. Emitter: "sessionDemoE",
  530. Message: map[string]interface{}{
  531. "temp": 28.9,
  532. "hum": 85,
  533. "ts": 1541152491682,
  534. },
  535. Timestamp: 1541152491686,
  536. },
  537. {
  538. Emitter: "sessionDemoE",
  539. Message: map[string]interface{}{
  540. "temp": 26.8,
  541. "hum": 71,
  542. "ts": 1541152490872,
  543. },
  544. Timestamp: 1541152491972,
  545. },
  546. {
  547. Emitter: "sessionDemoE",
  548. Message: map[string]interface{}{
  549. "temp": 29.1,
  550. "hum": 92,
  551. "ts": 1541152492492,
  552. },
  553. Timestamp: 1541152492592,
  554. },
  555. {
  556. Emitter: "sessionDemoE",
  557. Message: map[string]interface{}{
  558. "temp": 30.9,
  559. "hum": 87,
  560. "ts": 1541152494112,
  561. },
  562. Timestamp: 1541152494212,
  563. },
  564. {
  565. Emitter: "sessionDemoE",
  566. Message: map[string]interface{}{
  567. "temp": 32.2,
  568. "hum": 99,
  569. "ts": 1541152493202,
  570. },
  571. Timestamp: 1541152495202,
  572. },
  573. {
  574. Emitter: "sessionDemoE",
  575. Message: map[string]interface{}{
  576. "temp": 32.2,
  577. "hum": 99,
  578. "ts": 1541152499202,
  579. },
  580. Timestamp: 1541152499402,
  581. },
  582. },
  583. "demoErr": {
  584. {
  585. Emitter: "demoErr",
  586. Message: map[string]interface{}{
  587. "color": "red",
  588. "size": 3,
  589. "ts": 1541152486013,
  590. },
  591. Timestamp: 1541152486221,
  592. },
  593. {
  594. Emitter: "demoErr",
  595. Message: map[string]interface{}{
  596. "color": 2,
  597. "size": "blue",
  598. "ts": 1541152487632,
  599. },
  600. Timestamp: 1541152487722,
  601. },
  602. {
  603. Emitter: "demoErr",
  604. Message: map[string]interface{}{
  605. "color": "red",
  606. "size": 1,
  607. "ts": 1541152489252,
  608. },
  609. Timestamp: 1541152489332,
  610. },
  611. { //dropped item
  612. Emitter: "demoErr",
  613. Message: map[string]interface{}{
  614. "color": "blue",
  615. "size": 6,
  616. "ts": 1541152486822,
  617. },
  618. Timestamp: 1541152489822,
  619. },
  620. {
  621. Emitter: "demoErr",
  622. Message: map[string]interface{}{
  623. "color": "yellow",
  624. "size": 4,
  625. "ts": 1541152488442,
  626. },
  627. Timestamp: 1541152490042,
  628. },
  629. { //To lift the watermark and issue all windows
  630. Emitter: "demoErr",
  631. Message: map[string]interface{}{
  632. "color": "yellow",
  633. "size": 4,
  634. "ts": 1541152492342,
  635. },
  636. Timestamp: 1541152493842,
  637. },
  638. },
  639. "ldemo": {
  640. {
  641. Emitter: "ldemo",
  642. Message: map[string]interface{}{
  643. "color": "red",
  644. "size": 3,
  645. "ts": 1541152486013,
  646. },
  647. Timestamp: 1541152486013,
  648. },
  649. {
  650. Emitter: "ldemo",
  651. Message: map[string]interface{}{
  652. "color": "blue",
  653. "size": "string",
  654. "ts": 1541152486822,
  655. },
  656. Timestamp: 1541152486822,
  657. },
  658. {
  659. Emitter: "ldemo",
  660. Message: map[string]interface{}{
  661. "size": 3,
  662. "ts": 1541152487632,
  663. },
  664. Timestamp: 1541152487632,
  665. },
  666. {
  667. Emitter: "ldemo",
  668. Message: map[string]interface{}{
  669. "color": 49,
  670. "size": 2,
  671. "ts": 1541152488442,
  672. },
  673. Timestamp: 1541152488442,
  674. },
  675. {
  676. Emitter: "ldemo",
  677. Message: map[string]interface{}{
  678. "color": "red",
  679. "ts": 1541152489252,
  680. },
  681. Timestamp: 1541152489252,
  682. },
  683. },
  684. "ldemo1": {
  685. {
  686. Emitter: "ldemo1",
  687. Message: map[string]interface{}{
  688. "temp": 25.5,
  689. "hum": 65,
  690. "ts": 1541152486013,
  691. },
  692. Timestamp: 1541152486013,
  693. },
  694. {
  695. Emitter: "ldemo1",
  696. Message: map[string]interface{}{
  697. "temp": 27.5,
  698. "hum": 59,
  699. "ts": 1541152486823,
  700. },
  701. Timestamp: 1541152486823,
  702. },
  703. {
  704. Emitter: "ldemo1",
  705. Message: map[string]interface{}{
  706. "temp": 28.1,
  707. "hum": 75,
  708. "ts": 1541152487632,
  709. },
  710. Timestamp: 1541152487632,
  711. },
  712. {
  713. Emitter: "ldemo1",
  714. Message: map[string]interface{}{
  715. "temp": 27.4,
  716. "hum": 80,
  717. "ts": "1541152488442",
  718. },
  719. Timestamp: 1541152488442,
  720. },
  721. {
  722. Emitter: "ldemo1",
  723. Message: map[string]interface{}{
  724. "temp": 25.5,
  725. "hum": 62,
  726. "ts": 1541152489252,
  727. },
  728. Timestamp: 1541152489252,
  729. },
  730. },
  731. "lsessionDemo": {
  732. {
  733. Emitter: "lsessionDemo",
  734. Message: map[string]interface{}{
  735. "temp": 25.5,
  736. "hum": 65,
  737. "ts": 1541152486013,
  738. },
  739. Timestamp: 1541152486013,
  740. },
  741. {
  742. Emitter: "lsessionDemo",
  743. Message: map[string]interface{}{
  744. "temp": 27.5,
  745. "hum": 59,
  746. "ts": 1541152486823,
  747. },
  748. Timestamp: 1541152486823,
  749. },
  750. {
  751. Emitter: "lsessionDemo",
  752. Message: map[string]interface{}{
  753. "temp": 28.1,
  754. "hum": 75,
  755. "ts": 1541152487932,
  756. },
  757. Timestamp: 1541152487932,
  758. },
  759. {
  760. Emitter: "lsessionDemo",
  761. Message: map[string]interface{}{
  762. "temp": 27.4,
  763. "hum": 80,
  764. "ts": 1541152488442,
  765. },
  766. Timestamp: 1541152488442,
  767. },
  768. {
  769. Emitter: "lsessionDemo",
  770. Message: map[string]interface{}{
  771. "temp": 25.5,
  772. "hum": 62,
  773. "ts": 1541152489252,
  774. },
  775. Timestamp: 1541152489252,
  776. },
  777. {
  778. Emitter: "lsessionDemo",
  779. Message: map[string]interface{}{
  780. "temp": 26.2,
  781. "hum": 63,
  782. "ts": 1541152490062,
  783. },
  784. Timestamp: 1541152490062,
  785. },
  786. {
  787. Emitter: "lsessionDemo",
  788. Message: map[string]interface{}{
  789. "temp": 26.8,
  790. "hum": 71,
  791. "ts": 1541152490872,
  792. },
  793. Timestamp: 1541152490872,
  794. },
  795. {
  796. Emitter: "lsessionDemo",
  797. Message: map[string]interface{}{
  798. "temp": 28.9,
  799. "hum": 85,
  800. "ts": 1541152491682,
  801. },
  802. Timestamp: 1541152491682,
  803. },
  804. {
  805. Emitter: "lsessionDemo",
  806. Message: map[string]interface{}{
  807. "temp": 29.1,
  808. "hum": 92,
  809. "ts": 1541152492492,
  810. },
  811. Timestamp: 1541152492492,
  812. },
  813. {
  814. Emitter: "lsessionDemo",
  815. Message: map[string]interface{}{
  816. "temp": 2.2,
  817. "hum": 99,
  818. "ts": 1541152493202,
  819. },
  820. Timestamp: 1541152493202,
  821. },
  822. {
  823. Emitter: "lsessionDemo",
  824. Message: map[string]interface{}{
  825. "temp": 30.9,
  826. "hum": 87,
  827. "ts": 1541152494112,
  828. },
  829. Timestamp: 1541152494112,
  830. },
  831. },
  832. "text": {
  833. {
  834. Emitter: "text",
  835. Message: map[string]interface{}{
  836. "slogan": "Impossible is nothing",
  837. "brand": "Adidas",
  838. },
  839. Timestamp: 1541152486500,
  840. },
  841. {
  842. Emitter: "text",
  843. Message: map[string]interface{}{
  844. "slogan": "Stronger than dirt",
  845. "brand": "Ajax",
  846. },
  847. Timestamp: 1541152487400,
  848. },
  849. {
  850. Emitter: "text",
  851. Message: map[string]interface{}{
  852. "slogan": "Belong anywhere",
  853. "brand": "Airbnb",
  854. },
  855. Timestamp: 1541152488300,
  856. },
  857. {
  858. Emitter: "text",
  859. Message: map[string]interface{}{
  860. "slogan": "I can't believe I ate the whole thing",
  861. "brand": "Alka Seltzer",
  862. },
  863. Timestamp: 1541152489200,
  864. },
  865. {
  866. Emitter: "text",
  867. Message: map[string]interface{}{
  868. "slogan": "You're in good hands",
  869. "brand": "Allstate",
  870. },
  871. Timestamp: 1541152490100,
  872. },
  873. {
  874. Emitter: "text",
  875. Message: map[string]interface{}{
  876. "slogan": "Don't leave home without it",
  877. "brand": "American Express",
  878. },
  879. Timestamp: 1541152491200,
  880. },
  881. {
  882. Emitter: "text",
  883. Message: map[string]interface{}{
  884. "slogan": "Think different",
  885. "brand": "Apple",
  886. },
  887. Timestamp: 1541152492300,
  888. },
  889. {
  890. Emitter: "text",
  891. Message: map[string]interface{}{
  892. "slogan": "We try harder",
  893. "brand": "Avis",
  894. },
  895. Timestamp: 1541152493400,
  896. },
  897. },
  898. "binDemo": {
  899. {
  900. Emitter: "binDemo",
  901. Message: map[string]interface{}{
  902. "self": image,
  903. },
  904. Timestamp: 1541152486013,
  905. },
  906. },
  907. }
  908. func commonResultFunc(result [][]byte) interface{} {
  909. var maps [][]map[string]interface{}
  910. for _, v := range result {
  911. var mapRes []map[string]interface{}
  912. err := json.Unmarshal(v, &mapRes)
  913. if err != nil {
  914. panic("Failed to parse the input into map")
  915. }
  916. maps = append(maps, mapRes)
  917. }
  918. return maps
  919. }
  920. func doRuleTest(t *testing.T, tests []ruleTest, j int, opt *api.RuleOption) {
  921. doRuleTestBySinkProps(t, tests, j, opt, nil, commonResultFunc)
  922. }
  923. func doRuleTestBySinkProps(t *testing.T, tests []ruleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}, resultFunc func(result [][]byte) interface{}) {
  924. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  925. for i, tt := range tests {
  926. datas, dataLength, tp, mockSink, errCh := createStream(t, tt, j, opt, sinkProps)
  927. wait := 20
  928. if opt.Qos == api.ExactlyOnce {
  929. wait = 30
  930. }
  931. if err := sendData(t, dataLength, tt.m, datas, errCh, tp, POSTLEAP, wait); err != nil {
  932. t.Errorf("send data error %s", err)
  933. break
  934. }
  935. compareResult(t, mockSink, resultFunc, tt, i, tp)
  936. }
  937. }
  938. func compareResult(t *testing.T, mockSink *test.MockSink, resultFunc func(result [][]byte) interface{}, tt ruleTest, i int, tp *xstream.TopologyNew) {
  939. // Check results
  940. results := mockSink.GetResults()
  941. maps := resultFunc(results)
  942. if !reflect.DeepEqual(tt.r, maps) {
  943. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  944. }
  945. if err := compareMetrics(tp, tt.m); err != nil {
  946. t.Errorf("%d. %q\n\nmetrics mismatch:\n\n%s\n\n", i, tt.sql, err)
  947. }
  948. if tt.t != nil {
  949. topo := tp.GetTopo()
  950. if !reflect.DeepEqual(tt.t, topo) {
  951. t.Errorf("%d. %q\n\ntopo mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.t, topo)
  952. }
  953. }
  954. tp.Cancel()
  955. }
  956. func sendData(t *testing.T, dataLength int, metrics map[string]interface{}, datas [][]*xsql.Tuple, errCh <-chan error, tp *xstream.TopologyNew, postleap int, wait int) error {
  957. // Send data and move time
  958. mockClock := test.GetMockClock()
  959. // Set the current time
  960. mockClock.Add(0)
  961. // TODO assume multiple data source send the data in order and has the same length
  962. for i := 0; i < dataLength; i++ {
  963. for _, d := range datas {
  964. // Make sure time is going forward only
  965. // gradually add up time to ensure checkpoint is triggered before the data send
  966. for n := common.GetNowInMilli() + 100; d[i].Timestamp+100 > n; n += 100 {
  967. if d[i].Timestamp < n {
  968. n = d[i].Timestamp
  969. }
  970. mockClock.Set(common.TimeFromUnixMilli(n))
  971. common.Log.Debugf("Clock set to %d", common.GetNowInMilli())
  972. time.Sleep(1)
  973. }
  974. select {
  975. case err := <-errCh:
  976. t.Log(err)
  977. tp.Cancel()
  978. return err
  979. default:
  980. }
  981. }
  982. time.Sleep(time.Duration(wait) * time.Millisecond)
  983. }
  984. mockClock.Add(time.Duration(postleap) * time.Millisecond)
  985. common.Log.Debugf("Clock add to %d", common.GetNowInMilli())
  986. // Check if stream done. Poll for metrics,
  987. time.Sleep(10 * time.Millisecond)
  988. var retry int
  989. for retry = 2; retry > 0; retry-- {
  990. if err := compareMetrics(tp, metrics); err == nil {
  991. break
  992. } else {
  993. common.Log.Errorf("check metrics error at %d: %s", retry, err)
  994. }
  995. time.Sleep(1000 * time.Millisecond)
  996. }
  997. if retry == 0 {
  998. fmt.Printf("send data timeout\n")
  999. } else if retry < 2 {
  1000. fmt.Printf("try %d for metric comparison\n", 2-retry)
  1001. }
  1002. return nil
  1003. }
  1004. func createStream(t *testing.T, tt ruleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}) ([][]*xsql.Tuple, int, *xstream.TopologyNew, *test.MockSink, <-chan error) {
  1005. // Rest for each test
  1006. cleanStateData()
  1007. test.ResetClock(1541152486000)
  1008. // Create stream
  1009. var (
  1010. sources []*nodes.SourceNode
  1011. datas [][]*xsql.Tuple
  1012. dataLength int
  1013. )
  1014. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1015. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1016. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1017. } else {
  1018. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1019. t.Errorf("sql %s is not a select statement", tt.sql)
  1020. } else {
  1021. streams := xsql.GetStreams(selectStmt)
  1022. for _, stream := range streams {
  1023. data := testData[stream]
  1024. dataLength = len(data)
  1025. datas = append(datas, data)
  1026. source := nodes.NewSourceNodeWithSource(stream, test.NewMockSource(data), map[string]string{
  1027. "DATASOURCE": stream,
  1028. })
  1029. sources = append(sources, source)
  1030. }
  1031. }
  1032. }
  1033. mockSink := test.NewMockSink()
  1034. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
  1035. tp, err := planner.PlanWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.name, j), Sql: tt.sql, Options: opt}, DbDir, sources, []*nodes.SinkNode{sink})
  1036. if err != nil {
  1037. t.Error(err)
  1038. return nil, 0, nil, nil, nil
  1039. }
  1040. errCh := tp.Open()
  1041. return datas, dataLength, tp, mockSink, errCh
  1042. }
  1043. // Create or drop streams
  1044. func handleStream(createOrDrop bool, names []string, t *testing.T) {
  1045. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  1046. for _, name := range names {
  1047. var sql string
  1048. if createOrDrop {
  1049. switch name {
  1050. case "demo":
  1051. sql = `CREATE STREAM demo (
  1052. color STRING,
  1053. size BIGINT,
  1054. ts BIGINT
  1055. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  1056. case "demoError":
  1057. sql = `CREATE STREAM demoError (
  1058. color STRING,
  1059. size BIGINT,
  1060. ts BIGINT
  1061. ) WITH (DATASOURCE="demoError", FORMAT="json", KEY="ts");`
  1062. case "demo1":
  1063. sql = `CREATE STREAM demo1 (
  1064. temp FLOAT,
  1065. hum BIGINT,` +
  1066. "`from`" + ` STRING,
  1067. ts BIGINT
  1068. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  1069. case "sessionDemo":
  1070. sql = `CREATE STREAM sessionDemo (
  1071. temp FLOAT,
  1072. hum BIGINT,
  1073. ts BIGINT
  1074. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  1075. case "demoE":
  1076. sql = `CREATE STREAM demoE (
  1077. color STRING,
  1078. size BIGINT,
  1079. ts BIGINT
  1080. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1081. case "demo1E":
  1082. sql = `CREATE STREAM demo1E (
  1083. temp FLOAT,
  1084. hum BIGINT,
  1085. ts BIGINT
  1086. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1087. case "sessionDemoE":
  1088. sql = `CREATE STREAM sessionDemoE (
  1089. temp FLOAT,
  1090. hum BIGINT,
  1091. ts BIGINT
  1092. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1093. case "demoErr":
  1094. sql = `CREATE STREAM demoErr (
  1095. color STRING,
  1096. size BIGINT,
  1097. ts BIGINT
  1098. ) WITH (DATASOURCE="demoErr", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1099. case "ldemo":
  1100. sql = `CREATE STREAM ldemo (
  1101. ) WITH (DATASOURCE="ldemo", FORMAT="json");`
  1102. case "ldemo1":
  1103. sql = `CREATE STREAM ldemo1 (
  1104. ) WITH (DATASOURCE="ldemo1", FORMAT="json");`
  1105. case "lsessionDemo":
  1106. sql = `CREATE STREAM lsessionDemo (
  1107. ) WITH (DATASOURCE="lsessionDemo", FORMAT="json");`
  1108. case "ext":
  1109. sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  1110. case "ext2":
  1111. sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  1112. case "text":
  1113. sql = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
  1114. case "binDemo":
  1115. sql = "CREATE STREAM binDemo () WITH (DATASOURCE=\"users\", FORMAT=\"BINARY\")"
  1116. default:
  1117. t.Errorf("create stream %s fail", name)
  1118. }
  1119. } else {
  1120. sql = `DROP STREAM ` + name
  1121. }
  1122. _, err := p.ExecStmt(sql)
  1123. if err != nil {
  1124. t.Log(err)
  1125. }
  1126. }
  1127. }