common_test.go 24 KB


  1. package processors
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream"
  9. "github.com/emqx/kuiper/xstream/api"
  10. "github.com/emqx/kuiper/xstream/nodes"
  11. "github.com/emqx/kuiper/xstream/planner"
  12. "github.com/emqx/kuiper/xstream/test"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "reflect"
  17. "strings"
  18. "testing"
  19. "time"
  20. )
  21. const POSTLEAP = 1000 // Time change after all data sends out
  22. type ruleTest struct {
  23. name string
  24. sql string
  25. r interface{} // The result
  26. m map[string]interface{} // final metrics
  27. t *xstream.PrintableTopo // printable topo, an optional field
  28. w int // wait time for each data sending, in milli
  29. }
  30. var (
  31. DbDir = getDbDir()
  32. image, b64img = getImg()
  33. )
  34. func getDbDir() string {
  35. common.InitConf()
  36. dbDir, err := common.GetDataLoc()
  37. if err != nil {
  38. log.Panic(err)
  39. }
  40. log.Infof("db location is %s", dbDir)
  41. return dbDir
  42. }
  43. func getImg() ([]byte, string) {
  44. docsFolder, err := common.GetLoc("/docs/")
  45. if err != nil {
  46. log.Fatalf("Cannot find docs folder: %v", err)
  47. }
  48. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  49. if err != nil {
  50. log.Fatalf("Cannot read image: %v", err)
  51. }
  52. b64img := base64.StdEncoding.EncodeToString(image)
  53. return image, b64img
  54. }
  55. func cleanStateData() {
  56. dbDir, err := common.GetDataLoc()
  57. if err != nil {
  58. log.Panic(err)
  59. }
  60. c := path.Join(dbDir, "checkpoints")
  61. err = os.RemoveAll(c)
  62. if err != nil {
  63. log.Errorf("%s", err)
  64. }
  65. s := path.Join(dbDir, "sink", "cache")
  66. err = os.RemoveAll(s)
  67. if err != nil {
  68. log.Errorf("%s", err)
  69. }
  70. }
  71. func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err error) {
  72. keys, values := tp.GetMetrics()
  73. for k, v := range m {
  74. var (
  75. index int
  76. key string
  77. matched bool
  78. )
  79. for index, key = range keys {
  80. if k == key {
  81. if strings.HasSuffix(k, "process_latency_us") {
  82. if values[index].(int64) >= v.(int64) {
  83. matched = true
  84. continue
  85. } else {
  86. break
  87. }
  88. }
  89. if values[index] == v {
  90. matched = true
  91. }
  92. break
  93. }
  94. }
  95. if matched {
  96. continue
  97. }
  98. if common.Config.Basic.Debug == true {
  99. for i, k := range keys {
  100. log.Printf("%s:%v", k, values[i])
  101. }
  102. }
  103. //do not find
  104. if index < len(values) {
  105. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  106. } else {
  107. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  108. }
  109. }
  110. return nil
  111. }
  112. func errstring(err error) string {
  113. if err != nil {
  114. return err.Error()
  115. }
  116. return ""
  117. }
  118. // The time diff must larger than timeleap
  119. var testData = map[string][]*xsql.Tuple{
  120. "demo": {
  121. {
  122. Emitter: "demo",
  123. Message: map[string]interface{}{
  124. "color": "red",
  125. "size": 3,
  126. "ts": 1541152486013,
  127. },
  128. Timestamp: 1541152486013,
  129. },
  130. {
  131. Emitter: "demo",
  132. Message: map[string]interface{}{
  133. "color": "blue",
  134. "size": 6,
  135. "ts": 1541152486822,
  136. },
  137. Timestamp: 1541152486822,
  138. },
  139. {
  140. Emitter: "demo",
  141. Message: map[string]interface{}{
  142. "color": "blue",
  143. "size": 2,
  144. "ts": 1541152487632,
  145. },
  146. Timestamp: 1541152487632,
  147. },
  148. {
  149. Emitter: "demo",
  150. Message: map[string]interface{}{
  151. "color": "yellow",
  152. "size": 4,
  153. "ts": 1541152488442,
  154. },
  155. Timestamp: 1541152488442,
  156. },
  157. {
  158. Emitter: "demo",
  159. Message: map[string]interface{}{
  160. "color": "red",
  161. "size": 1,
  162. "ts": 1541152489252,
  163. },
  164. Timestamp: 1541152489252,
  165. },
  166. },
  167. "demoError": {
  168. {
  169. Emitter: "demoError",
  170. Message: map[string]interface{}{
  171. "color": 3,
  172. "size": "red",
  173. "ts": 1541152486013,
  174. },
  175. Timestamp: 1541152486013,
  176. },
  177. {
  178. Emitter: "demoError",
  179. Message: map[string]interface{}{
  180. "color": "blue",
  181. "size": 6,
  182. "ts": "1541152486822",
  183. },
  184. Timestamp: 1541152486822,
  185. },
  186. {
  187. Emitter: "demoError",
  188. Message: map[string]interface{}{
  189. "color": "blue",
  190. "size": 2,
  191. "ts": 1541152487632,
  192. },
  193. Timestamp: 1541152487632,
  194. },
  195. {
  196. Emitter: "demoError",
  197. Message: map[string]interface{}{
  198. "color": 7,
  199. "size": 4,
  200. "ts": 1541152488442,
  201. },
  202. Timestamp: 1541152488442,
  203. },
  204. {
  205. Emitter: "demoError",
  206. Message: map[string]interface{}{
  207. "color": "red",
  208. "size": "blue",
  209. "ts": 1541152489252,
  210. },
  211. Timestamp: 1541152489252,
  212. },
  213. },
  214. "demo1": {
  215. {
  216. Emitter: "demo1",
  217. Message: map[string]interface{}{
  218. "temp": 25.5,
  219. "hum": 65,
  220. "from": "device1",
  221. "ts": 1541152486013,
  222. },
  223. Timestamp: 1541152486115,
  224. },
  225. {
  226. Emitter: "demo1",
  227. Message: map[string]interface{}{
  228. "temp": 27.5,
  229. "hum": 59,
  230. "from": "device2",
  231. "ts": 1541152486823,
  232. },
  233. Timestamp: 1541152486903,
  234. },
  235. {
  236. Emitter: "demo1",
  237. Message: map[string]interface{}{
  238. "temp": 28.1,
  239. "hum": 75,
  240. "from": "device3",
  241. "ts": 1541152487632,
  242. },
  243. Timestamp: 1541152487702,
  244. },
  245. {
  246. Emitter: "demo1",
  247. Message: map[string]interface{}{
  248. "temp": 27.4,
  249. "hum": 80,
  250. "from": "device1",
  251. "ts": 1541152488442,
  252. },
  253. Timestamp: 1541152488605,
  254. },
  255. {
  256. Emitter: "demo1",
  257. Message: map[string]interface{}{
  258. "temp": 25.5,
  259. "hum": 62,
  260. "from": "device3",
  261. "ts": 1541152489252,
  262. },
  263. Timestamp: 1541152489305,
  264. },
  265. },
  266. "sessionDemo": {
  267. {
  268. Emitter: "sessionDemo",
  269. Message: map[string]interface{}{
  270. "temp": 25.5,
  271. "hum": 65,
  272. "ts": 1541152486013,
  273. },
  274. Timestamp: 1541152486013,
  275. },
  276. {
  277. Emitter: "sessionDemo",
  278. Message: map[string]interface{}{
  279. "temp": 27.5,
  280. "hum": 59,
  281. "ts": 1541152486823,
  282. },
  283. Timestamp: 1541152486823,
  284. },
  285. {
  286. Emitter: "sessionDemo",
  287. Message: map[string]interface{}{
  288. "temp": 28.1,
  289. "hum": 75,
  290. "ts": 1541152487932,
  291. },
  292. Timestamp: 1541152487932,
  293. },
  294. {
  295. Emitter: "sessionDemo",
  296. Message: map[string]interface{}{
  297. "temp": 27.4,
  298. "hum": 80,
  299. "ts": 1541152488442,
  300. },
  301. Timestamp: 1541152488442,
  302. },
  303. {
  304. Emitter: "sessionDemo",
  305. Message: map[string]interface{}{
  306. "temp": 25.5,
  307. "hum": 62,
  308. "ts": 1541152489252,
  309. },
  310. Timestamp: 1541152489252,
  311. },
  312. {
  313. Emitter: "sessionDemo",
  314. Message: map[string]interface{}{
  315. "temp": 26.2,
  316. "hum": 63,
  317. "ts": 1541152490062,
  318. },
  319. Timestamp: 1541152490062,
  320. },
  321. {
  322. Emitter: "sessionDemo",
  323. Message: map[string]interface{}{
  324. "temp": 26.8,
  325. "hum": 71,
  326. "ts": 1541152490872,
  327. },
  328. Timestamp: 1541152490872,
  329. },
  330. {
  331. Emitter: "sessionDemo",
  332. Message: map[string]interface{}{
  333. "temp": 28.9,
  334. "hum": 85,
  335. "ts": 1541152491682,
  336. },
  337. Timestamp: 1541152491682,
  338. },
  339. {
  340. Emitter: "sessionDemo",
  341. Message: map[string]interface{}{
  342. "temp": 29.1,
  343. "hum": 92,
  344. "ts": 1541152492492,
  345. },
  346. Timestamp: 1541152492492,
  347. },
  348. {
  349. Emitter: "sessionDemo",
  350. Message: map[string]interface{}{
  351. "temp": 32.2,
  352. "hum": 99,
  353. "ts": 1541152493202,
  354. },
  355. Timestamp: 1541152493202,
  356. },
  357. {
  358. Emitter: "sessionDemo",
  359. Message: map[string]interface{}{
  360. "temp": 30.9,
  361. "hum": 87,
  362. "ts": 1541152494112,
  363. },
  364. Timestamp: 1541152494112,
  365. },
  366. },
  367. "demoE": {
  368. {
  369. Emitter: "demoE",
  370. Message: map[string]interface{}{
  371. "color": "red",
  372. "size": 3,
  373. "ts": 1541152486013,
  374. },
  375. Timestamp: 1541152486023,
  376. },
  377. {
  378. Emitter: "demoE",
  379. Message: map[string]interface{}{
  380. "color": "blue",
  381. "size": 2,
  382. "ts": 1541152487632,
  383. },
  384. Timestamp: 1541152487822,
  385. },
  386. {
  387. Emitter: "demoE",
  388. Message: map[string]interface{}{
  389. "color": "red",
  390. "size": 1,
  391. "ts": 1541152489252,
  392. },
  393. Timestamp: 1541152489632,
  394. },
  395. { //dropped item
  396. Emitter: "demoE",
  397. Message: map[string]interface{}{
  398. "color": "blue",
  399. "size": 6,
  400. "ts": 1541152486822,
  401. },
  402. Timestamp: 1541152489842,
  403. },
  404. {
  405. Emitter: "demoE",
  406. Message: map[string]interface{}{
  407. "color": "yellow",
  408. "size": 4,
  409. "ts": 1541152488442,
  410. },
  411. Timestamp: 1541152490052,
  412. },
  413. { //To lift the watermark and issue all windows
  414. Emitter: "demoE",
  415. Message: map[string]interface{}{
  416. "color": "yellow",
  417. "size": 4,
  418. "ts": 1541152492342,
  419. },
  420. Timestamp: 1541152498888,
  421. },
  422. },
  423. "demo1E": {
  424. {
  425. Emitter: "demo1E",
  426. Message: map[string]interface{}{
  427. "temp": 27.5,
  428. "hum": 59,
  429. "ts": 1541152486823,
  430. },
  431. Timestamp: 1541152487250,
  432. },
  433. {
  434. Emitter: "demo1E",
  435. Message: map[string]interface{}{
  436. "temp": 25.5,
  437. "hum": 65,
  438. "ts": 1541152486013,
  439. },
  440. Timestamp: 1541152487751,
  441. },
  442. {
  443. Emitter: "demo1E",
  444. Message: map[string]interface{}{
  445. "temp": 27.4,
  446. "hum": 80,
  447. "ts": 1541152488442,
  448. },
  449. Timestamp: 1541152489252,
  450. },
  451. {
  452. Emitter: "demo1E",
  453. Message: map[string]interface{}{
  454. "temp": 28.1,
  455. "hum": 75,
  456. "ts": 1541152487632,
  457. },
  458. Timestamp: 1541152489753,
  459. },
  460. {
  461. Emitter: "demo1E",
  462. Message: map[string]interface{}{
  463. "temp": 25.5,
  464. "hum": 62,
  465. "ts": 1541152489252,
  466. },
  467. Timestamp: 1541152489954,
  468. },
  469. {
  470. Emitter: "demo1E",
  471. Message: map[string]interface{}{
  472. "temp": 25.5,
  473. "hum": 62,
  474. "ts": 1541152499252,
  475. },
  476. Timestamp: 1541152499755,
  477. },
  478. },
  479. "sessionDemoE": {
  480. {
  481. Emitter: "sessionDemoE",
  482. Message: map[string]interface{}{
  483. "temp": 25.5,
  484. "hum": 65,
  485. "ts": 1541152486013,
  486. },
  487. Timestamp: 1541152486250,
  488. },
  489. {
  490. Emitter: "sessionDemoE",
  491. Message: map[string]interface{}{
  492. "temp": 28.1,
  493. "hum": 75,
  494. "ts": 1541152487932,
  495. },
  496. Timestamp: 1541152487951,
  497. },
  498. {
  499. Emitter: "sessionDemoE",
  500. Message: map[string]interface{}{
  501. "temp": 27.5,
  502. "hum": 59,
  503. "ts": 1541152486823,
  504. },
  505. Timestamp: 1541152488552,
  506. },
  507. {
  508. Emitter: "sessionDemoE",
  509. Message: map[string]interface{}{
  510. "temp": 25.5,
  511. "hum": 62,
  512. "ts": 1541152489252,
  513. },
  514. Timestamp: 1541152489353,
  515. },
  516. {
  517. Emitter: "sessionDemoE",
  518. Message: map[string]interface{}{
  519. "temp": 27.4,
  520. "hum": 80,
  521. "ts": 1541152488442,
  522. },
  523. Timestamp: 1541152489854,
  524. },
  525. {
  526. Emitter: "sessionDemoE",
  527. Message: map[string]interface{}{
  528. "temp": 26.2,
  529. "hum": 63,
  530. "ts": 1541152490062,
  531. },
  532. Timestamp: 1541152490155,
  533. },
  534. {
  535. Emitter: "sessionDemoE",
  536. Message: map[string]interface{}{
  537. "temp": 28.9,
  538. "hum": 85,
  539. "ts": 1541152491682,
  540. },
  541. Timestamp: 1541152491686,
  542. },
  543. {
  544. Emitter: "sessionDemoE",
  545. Message: map[string]interface{}{
  546. "temp": 26.8,
  547. "hum": 71,
  548. "ts": 1541152490872,
  549. },
  550. Timestamp: 1541152491972,
  551. },
  552. {
  553. Emitter: "sessionDemoE",
  554. Message: map[string]interface{}{
  555. "temp": 29.1,
  556. "hum": 92,
  557. "ts": 1541152492492,
  558. },
  559. Timestamp: 1541152492592,
  560. },
  561. {
  562. Emitter: "sessionDemoE",
  563. Message: map[string]interface{}{
  564. "temp": 30.9,
  565. "hum": 87,
  566. "ts": 1541152494112,
  567. },
  568. Timestamp: 1541152494212,
  569. },
  570. {
  571. Emitter: "sessionDemoE",
  572. Message: map[string]interface{}{
  573. "temp": 32.2,
  574. "hum": 99,
  575. "ts": 1541152493202,
  576. },
  577. Timestamp: 1541152495202,
  578. },
  579. {
  580. Emitter: "sessionDemoE",
  581. Message: map[string]interface{}{
  582. "temp": 32.2,
  583. "hum": 99,
  584. "ts": 1541152499202,
  585. },
  586. Timestamp: 1541152499402,
  587. },
  588. },
  589. "demoErr": {
  590. {
  591. Emitter: "demoErr",
  592. Message: map[string]interface{}{
  593. "color": "red",
  594. "size": 3,
  595. "ts": 1541152486013,
  596. },
  597. Timestamp: 1541152486221,
  598. },
  599. {
  600. Emitter: "demoErr",
  601. Message: map[string]interface{}{
  602. "color": 2,
  603. "size": "blue",
  604. "ts": 1541152487632,
  605. },
  606. Timestamp: 1541152487722,
  607. },
  608. {
  609. Emitter: "demoErr",
  610. Message: map[string]interface{}{
  611. "color": "red",
  612. "size": 1,
  613. "ts": 1541152489252,
  614. },
  615. Timestamp: 1541152489332,
  616. },
  617. { //dropped item
  618. Emitter: "demoErr",
  619. Message: map[string]interface{}{
  620. "color": "blue",
  621. "size": 6,
  622. "ts": 1541152486822,
  623. },
  624. Timestamp: 1541152489822,
  625. },
  626. {
  627. Emitter: "demoErr",
  628. Message: map[string]interface{}{
  629. "color": "yellow",
  630. "size": 4,
  631. "ts": 1541152488442,
  632. },
  633. Timestamp: 1541152490042,
  634. },
  635. { //To lift the watermark and issue all windows
  636. Emitter: "demoErr",
  637. Message: map[string]interface{}{
  638. "color": "yellow",
  639. "size": 4,
  640. "ts": 1541152492342,
  641. },
  642. Timestamp: 1541152493842,
  643. },
  644. },
  645. "ldemo": {
  646. {
  647. Emitter: "ldemo",
  648. Message: map[string]interface{}{
  649. "color": "red",
  650. "size": 3,
  651. "ts": 1541152486013,
  652. },
  653. Timestamp: 1541152486013,
  654. },
  655. {
  656. Emitter: "ldemo",
  657. Message: map[string]interface{}{
  658. "color": "blue",
  659. "size": "string",
  660. "ts": 1541152486822,
  661. },
  662. Timestamp: 1541152486822,
  663. },
  664. {
  665. Emitter: "ldemo",
  666. Message: map[string]interface{}{
  667. "size": 3,
  668. "ts": 1541152487632,
  669. },
  670. Timestamp: 1541152487632,
  671. },
  672. {
  673. Emitter: "ldemo",
  674. Message: map[string]interface{}{
  675. "color": 49,
  676. "size": 2,
  677. "ts": 1541152488442,
  678. },
  679. Timestamp: 1541152488442,
  680. },
  681. {
  682. Emitter: "ldemo",
  683. Message: map[string]interface{}{
  684. "color": "red",
  685. "ts": 1541152489252,
  686. },
  687. Timestamp: 1541152489252,
  688. },
  689. },
  690. "ldemo1": {
  691. {
  692. Emitter: "ldemo1",
  693. Message: map[string]interface{}{
  694. "temp": 25.5,
  695. "hum": 65,
  696. "ts": 1541152486013,
  697. },
  698. Timestamp: 1541152486013,
  699. },
  700. {
  701. Emitter: "ldemo1",
  702. Message: map[string]interface{}{
  703. "temp": 27.5,
  704. "hum": 59,
  705. "ts": 1541152486823,
  706. },
  707. Timestamp: 1541152486823,
  708. },
  709. {
  710. Emitter: "ldemo1",
  711. Message: map[string]interface{}{
  712. "temp": 28.1,
  713. "hum": 75,
  714. "ts": 1541152487632,
  715. },
  716. Timestamp: 1541152487632,
  717. },
  718. {
  719. Emitter: "ldemo1",
  720. Message: map[string]interface{}{
  721. "temp": 27.4,
  722. "hum": 80,
  723. "ts": "1541152488442",
  724. },
  725. Timestamp: 1541152488442,
  726. },
  727. {
  728. Emitter: "ldemo1",
  729. Message: map[string]interface{}{
  730. "temp": 25.5,
  731. "hum": 62,
  732. "ts": 1541152489252,
  733. },
  734. Timestamp: 1541152489252,
  735. },
  736. },
  737. "lsessionDemo": {
  738. {
  739. Emitter: "lsessionDemo",
  740. Message: map[string]interface{}{
  741. "temp": 25.5,
  742. "hum": 65,
  743. "ts": 1541152486013,
  744. },
  745. Timestamp: 1541152486013,
  746. },
  747. {
  748. Emitter: "lsessionDemo",
  749. Message: map[string]interface{}{
  750. "temp": 27.5,
  751. "hum": 59,
  752. "ts": 1541152486823,
  753. },
  754. Timestamp: 1541152486823,
  755. },
  756. {
  757. Emitter: "lsessionDemo",
  758. Message: map[string]interface{}{
  759. "temp": 28.1,
  760. "hum": 75,
  761. "ts": 1541152487932,
  762. },
  763. Timestamp: 1541152487932,
  764. },
  765. {
  766. Emitter: "lsessionDemo",
  767. Message: map[string]interface{}{
  768. "temp": 27.4,
  769. "hum": 80,
  770. "ts": 1541152488442,
  771. },
  772. Timestamp: 1541152488442,
  773. },
  774. {
  775. Emitter: "lsessionDemo",
  776. Message: map[string]interface{}{
  777. "temp": 25.5,
  778. "hum": 62,
  779. "ts": 1541152489252,
  780. },
  781. Timestamp: 1541152489252,
  782. },
  783. {
  784. Emitter: "lsessionDemo",
  785. Message: map[string]interface{}{
  786. "temp": 26.2,
  787. "hum": 63,
  788. "ts": 1541152490062,
  789. },
  790. Timestamp: 1541152490062,
  791. },
  792. {
  793. Emitter: "lsessionDemo",
  794. Message: map[string]interface{}{
  795. "temp": 26.8,
  796. "hum": 71,
  797. "ts": 1541152490872,
  798. },
  799. Timestamp: 1541152490872,
  800. },
  801. {
  802. Emitter: "lsessionDemo",
  803. Message: map[string]interface{}{
  804. "temp": 28.9,
  805. "hum": 85,
  806. "ts": 1541152491682,
  807. },
  808. Timestamp: 1541152491682,
  809. },
  810. {
  811. Emitter: "lsessionDemo",
  812. Message: map[string]interface{}{
  813. "temp": 29.1,
  814. "hum": 92,
  815. "ts": 1541152492492,
  816. },
  817. Timestamp: 1541152492492,
  818. },
  819. {
  820. Emitter: "lsessionDemo",
  821. Message: map[string]interface{}{
  822. "temp": 2.2,
  823. "hum": 99,
  824. "ts": 1541152493202,
  825. },
  826. Timestamp: 1541152493202,
  827. },
  828. {
  829. Emitter: "lsessionDemo",
  830. Message: map[string]interface{}{
  831. "temp": 30.9,
  832. "hum": 87,
  833. "ts": 1541152494112,
  834. },
  835. Timestamp: 1541152494112,
  836. },
  837. },
  838. "text": {
  839. {
  840. Emitter: "text",
  841. Message: map[string]interface{}{
  842. "slogan": "Impossible is nothing",
  843. "brand": "Adidas",
  844. },
  845. Timestamp: 1541152486500,
  846. },
  847. {
  848. Emitter: "text",
  849. Message: map[string]interface{}{
  850. "slogan": "Stronger than dirt",
  851. "brand": "Ajax",
  852. },
  853. Timestamp: 1541152487400,
  854. },
  855. {
  856. Emitter: "text",
  857. Message: map[string]interface{}{
  858. "slogan": "Belong anywhere",
  859. "brand": "Airbnb",
  860. },
  861. Timestamp: 1541152488300,
  862. },
  863. {
  864. Emitter: "text",
  865. Message: map[string]interface{}{
  866. "slogan": "I can't believe I ate the whole thing",
  867. "brand": "Alka Seltzer",
  868. },
  869. Timestamp: 1541152489200,
  870. },
  871. {
  872. Emitter: "text",
  873. Message: map[string]interface{}{
  874. "slogan": "You're in good hands",
  875. "brand": "Allstate",
  876. },
  877. Timestamp: 1541152490100,
  878. },
  879. {
  880. Emitter: "text",
  881. Message: map[string]interface{}{
  882. "slogan": "Don't leave home without it",
  883. "brand": "American Express",
  884. },
  885. Timestamp: 1541152491200,
  886. },
  887. {
  888. Emitter: "text",
  889. Message: map[string]interface{}{
  890. "slogan": "Think different",
  891. "brand": "Apple",
  892. },
  893. Timestamp: 1541152492300,
  894. },
  895. {
  896. Emitter: "text",
  897. Message: map[string]interface{}{
  898. "slogan": "We try harder",
  899. "brand": "Avis",
  900. },
  901. Timestamp: 1541152493400,
  902. },
  903. },
  904. "binDemo": {
  905. {
  906. Emitter: "binDemo",
  907. Message: map[string]interface{}{
  908. "self": image,
  909. },
  910. Timestamp: 1541152486013,
  911. },
  912. },
  913. }
  914. func commonResultFunc(result [][]byte) interface{} {
  915. var maps [][]map[string]interface{}
  916. for _, v := range result {
  917. var mapRes []map[string]interface{}
  918. err := json.Unmarshal(v, &mapRes)
  919. if err != nil {
  920. panic("Failed to parse the input into map")
  921. }
  922. maps = append(maps, mapRes)
  923. }
  924. return maps
  925. }
  926. func doRuleTest(t *testing.T, tests []ruleTest, j int, opt *api.RuleOption) {
  927. doRuleTestBySinkProps(t, tests, j, opt, nil, commonResultFunc)
  928. }
  929. func doRuleTestBySinkProps(t *testing.T, tests []ruleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}, resultFunc func(result [][]byte) interface{}) {
  930. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  931. for i, tt := range tests {
  932. datas, dataLength, tp, mockSink, errCh := createStream(t, tt, j, opt, sinkProps)
  933. wait := tt.w
  934. if wait == 0 {
  935. wait = 5
  936. if opt.Qos == api.ExactlyOnce {
  937. wait = 10
  938. }
  939. }
  940. if err := sendData(t, dataLength, tt.m, datas, errCh, tp, POSTLEAP, wait); err != nil {
  941. t.Errorf("send data error %s", err)
  942. break
  943. }
  944. compareResult(t, mockSink, resultFunc, tt, i, tp)
  945. }
  946. }
  947. func compareResult(t *testing.T, mockSink *test.MockSink, resultFunc func(result [][]byte) interface{}, tt ruleTest, i int, tp *xstream.TopologyNew) {
  948. // Check results
  949. results := mockSink.GetResults()
  950. maps := resultFunc(results)
  951. if !reflect.DeepEqual(tt.r, maps) {
  952. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  953. }
  954. if err := compareMetrics(tp, tt.m); err != nil {
  955. t.Errorf("%d. %q\n\nmetrics mismatch:\n\n%s\n\n", i, tt.sql, err)
  956. }
  957. if tt.t != nil {
  958. topo := tp.GetTopo()
  959. if !reflect.DeepEqual(tt.t, topo) {
  960. t.Errorf("%d. %q\n\ntopo mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.t, topo)
  961. }
  962. }
  963. tp.Cancel()
  964. }
  965. func sendData(t *testing.T, dataLength int, metrics map[string]interface{}, datas [][]*xsql.Tuple, errCh <-chan error, tp *xstream.TopologyNew, postleap int, wait int) error {
  966. // Send data and move time
  967. mockClock := test.GetMockClock()
  968. // Set the current time
  969. mockClock.Add(0)
  970. // TODO assume multiple data source send the data in order and has the same length
  971. for i := 0; i < dataLength; i++ {
  972. for _, d := range datas {
  973. // Make sure time is going forward only
  974. // gradually add up time to ensure checkpoint is triggered before the data send
  975. for n := common.GetNowInMilli() + 100; d[i].Timestamp+100 > n; n += 100 {
  976. if d[i].Timestamp < n {
  977. n = d[i].Timestamp
  978. }
  979. mockClock.Set(common.TimeFromUnixMilli(n))
  980. common.Log.Debugf("Clock set to %d", common.GetNowInMilli())
  981. time.Sleep(1)
  982. }
  983. select {
  984. case err := <-errCh:
  985. t.Log(err)
  986. tp.Cancel()
  987. return err
  988. default:
  989. }
  990. time.Sleep(time.Duration(wait) * time.Millisecond)
  991. }
  992. }
  993. mockClock.Add(time.Duration(postleap) * time.Millisecond)
  994. common.Log.Debugf("Clock add to %d", common.GetNowInMilli())
  995. // Check if stream done. Poll for metrics,
  996. time.Sleep(10 * time.Millisecond)
  997. var retry int
  998. for retry = 2; retry > 0; retry-- {
  999. if err := compareMetrics(tp, metrics); err == nil {
  1000. break
  1001. } else {
  1002. common.Log.Errorf("check metrics error at %d: %s", retry, err)
  1003. }
  1004. time.Sleep(1000 * time.Millisecond)
  1005. }
  1006. if retry == 0 {
  1007. fmt.Printf("send data timeout\n")
  1008. } else if retry < 2 {
  1009. fmt.Printf("try %d for metric comparison\n", 2-retry)
  1010. }
  1011. return nil
  1012. }
  1013. func createStream(t *testing.T, tt ruleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}) ([][]*xsql.Tuple, int, *xstream.TopologyNew, *test.MockSink, <-chan error) {
  1014. // Rest for each test
  1015. cleanStateData()
  1016. test.ResetClock(1541152486000)
  1017. // Create stream
  1018. var (
  1019. sources []*nodes.SourceNode
  1020. datas [][]*xsql.Tuple
  1021. dataLength int
  1022. )
  1023. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1024. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1025. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1026. } else {
  1027. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1028. t.Errorf("sql %s is not a select statement", tt.sql)
  1029. } else {
  1030. streams := xsql.GetStreams(selectStmt)
  1031. for _, stream := range streams {
  1032. data := testData[stream]
  1033. dataLength = len(data)
  1034. datas = append(datas, data)
  1035. source := nodes.NewSourceNodeWithSource(stream, test.NewMockSource(data), map[string]string{
  1036. "DATASOURCE": stream,
  1037. })
  1038. sources = append(sources, source)
  1039. }
  1040. }
  1041. }
  1042. mockSink := test.NewMockSink()
  1043. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
  1044. tp, err := planner.PlanWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.name, j), Sql: tt.sql, Options: opt}, DbDir, sources, []*nodes.SinkNode{sink})
  1045. if err != nil {
  1046. t.Error(err)
  1047. return nil, 0, nil, nil, nil
  1048. }
  1049. errCh := tp.Open()
  1050. return datas, dataLength, tp, mockSink, errCh
  1051. }
  1052. // Create or drop streams
  1053. func handleStream(createOrDrop bool, names []string, t *testing.T) {
  1054. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  1055. for _, name := range names {
  1056. var sql string
  1057. if createOrDrop {
  1058. switch name {
  1059. case "demo":
  1060. sql = `CREATE STREAM demo (
  1061. color STRING,
  1062. size BIGINT,
  1063. ts BIGINT
  1064. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  1065. case "demoError":
  1066. sql = `CREATE STREAM demoError (
  1067. color STRING,
  1068. size BIGINT,
  1069. ts BIGINT
  1070. ) WITH (DATASOURCE="demoError", FORMAT="json", KEY="ts");`
  1071. case "demo1":
  1072. sql = `CREATE STREAM demo1 (
  1073. temp FLOAT,
  1074. hum BIGINT,` +
  1075. "`from`" + ` STRING,
  1076. ts BIGINT
  1077. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  1078. case "sessionDemo":
  1079. sql = `CREATE STREAM sessionDemo (
  1080. temp FLOAT,
  1081. hum BIGINT,
  1082. ts BIGINT
  1083. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  1084. case "demoE":
  1085. sql = `CREATE STREAM demoE (
  1086. color STRING,
  1087. size BIGINT,
  1088. ts BIGINT
  1089. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1090. case "demo1E":
  1091. sql = `CREATE STREAM demo1E (
  1092. temp FLOAT,
  1093. hum BIGINT,
  1094. ts BIGINT
  1095. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1096. case "sessionDemoE":
  1097. sql = `CREATE STREAM sessionDemoE (
  1098. temp FLOAT,
  1099. hum BIGINT,
  1100. ts BIGINT
  1101. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1102. case "demoErr":
  1103. sql = `CREATE STREAM demoErr (
  1104. color STRING,
  1105. size BIGINT,
  1106. ts BIGINT
  1107. ) WITH (DATASOURCE="demoErr", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1108. case "ldemo":
  1109. sql = `CREATE STREAM ldemo (
  1110. ) WITH (DATASOURCE="ldemo", FORMAT="json");`
  1111. case "ldemo1":
  1112. sql = `CREATE STREAM ldemo1 (
  1113. ) WITH (DATASOURCE="ldemo1", FORMAT="json");`
  1114. case "lsessionDemo":
  1115. sql = `CREATE STREAM lsessionDemo (
  1116. ) WITH (DATASOURCE="lsessionDemo", FORMAT="json");`
  1117. case "ext":
  1118. sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  1119. case "ext2":
  1120. sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  1121. case "text":
  1122. sql = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
  1123. case "binDemo":
  1124. sql = "CREATE STREAM binDemo () WITH (DATASOURCE=\"users\", FORMAT=\"BINARY\")"
  1125. default:
  1126. t.Errorf("create stream %s fail", name)
  1127. }
  1128. } else {
  1129. sql = `DROP STREAM ` + name
  1130. }
  1131. _, err := p.ExecStmt(sql)
  1132. if err != nil {
  1133. t.Log(err)
  1134. }
  1135. }
  1136. }