common_test.go 23 KB


  1. package processors
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/nodes"
  10. "github.com/emqx/kuiper/xstream/test"
  11. "os"
  12. "path"
  13. "reflect"
  14. "strings"
  15. "testing"
  16. "time"
  17. )
  18. const SOURCELEAP = 200 // Time change before sending a data
  19. const POSTLEAP = 1000 // Time change after all data sends out
  20. type ruleTest struct {
  21. name string
  22. sql string
  23. r interface{} // The result
  24. m map[string]interface{} // final metrics
  25. t *xstream.PrintableTopo // printable topo, an optional field
  26. }
  27. var DbDir = getDbDir()
  28. func getDbDir() string {
  29. common.InitConf()
  30. dbDir, err := common.GetDataLoc()
  31. if err != nil {
  32. log.Panic(err)
  33. }
  34. log.Infof("db location is %s", dbDir)
  35. return dbDir
  36. }
  37. func cleanStateData() {
  38. dbDir, err := common.GetDataLoc()
  39. if err != nil {
  40. log.Panic(err)
  41. }
  42. c := path.Join(dbDir, "checkpoints")
  43. err = os.RemoveAll(c)
  44. if err != nil {
  45. log.Errorf("%s", err)
  46. }
  47. s := path.Join(dbDir, "sink", "cache")
  48. err = os.RemoveAll(s)
  49. if err != nil {
  50. log.Errorf("%s", err)
  51. }
  52. }
  53. func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err error) {
  54. keys, values := tp.GetMetrics()
  55. if common.Config.Basic.Debug == true {
  56. for i, k := range keys {
  57. log.Printf("%s:%v", k, values[i])
  58. }
  59. }
  60. for k, v := range m {
  61. var (
  62. index int
  63. key string
  64. matched bool
  65. )
  66. for index, key = range keys {
  67. if k == key {
  68. if strings.HasSuffix(k, "process_latency_ms") {
  69. if values[index].(int64) >= v.(int64) {
  70. matched = true
  71. continue
  72. } else {
  73. break
  74. }
  75. }
  76. if values[index] == v {
  77. matched = true
  78. }
  79. break
  80. }
  81. }
  82. if matched {
  83. continue
  84. }
  85. //do not find
  86. if index < len(values) {
  87. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  88. } else {
  89. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  90. }
  91. }
  92. return nil
  93. }
  94. func errstring(err error) string {
  95. if err != nil {
  96. return err.Error()
  97. }
  98. return ""
  99. }
  100. // The time diff must larger than timeleap
  101. var testData = map[string][]*xsql.Tuple{
  102. "demo": {
  103. {
  104. Emitter: "demo",
  105. Message: map[string]interface{}{
  106. "color": "red",
  107. "size": 3,
  108. "ts": 1541152486013,
  109. },
  110. Timestamp: 1541152486013,
  111. },
  112. {
  113. Emitter: "demo",
  114. Message: map[string]interface{}{
  115. "color": "blue",
  116. "size": 6,
  117. "ts": 1541152486822,
  118. },
  119. Timestamp: 1541152486822,
  120. },
  121. {
  122. Emitter: "demo",
  123. Message: map[string]interface{}{
  124. "color": "blue",
  125. "size": 2,
  126. "ts": 1541152487632,
  127. },
  128. Timestamp: 1541152487632,
  129. },
  130. {
  131. Emitter: "demo",
  132. Message: map[string]interface{}{
  133. "color": "yellow",
  134. "size": 4,
  135. "ts": 1541152488442,
  136. },
  137. Timestamp: 1541152488442,
  138. },
  139. {
  140. Emitter: "demo",
  141. Message: map[string]interface{}{
  142. "color": "red",
  143. "size": 1,
  144. "ts": 1541152489252,
  145. },
  146. Timestamp: 1541152489252,
  147. },
  148. },
  149. "demoError": {
  150. {
  151. Emitter: "demoError",
  152. Message: map[string]interface{}{
  153. "color": 3,
  154. "size": "red",
  155. "ts": 1541152486013,
  156. },
  157. Timestamp: 1541152486013,
  158. },
  159. {
  160. Emitter: "demoError",
  161. Message: map[string]interface{}{
  162. "color": "blue",
  163. "size": 6,
  164. "ts": "1541152486822",
  165. },
  166. Timestamp: 1541152486822,
  167. },
  168. {
  169. Emitter: "demoError",
  170. Message: map[string]interface{}{
  171. "color": "blue",
  172. "size": 2,
  173. "ts": 1541152487632,
  174. },
  175. Timestamp: 1541152487632,
  176. },
  177. {
  178. Emitter: "demoError",
  179. Message: map[string]interface{}{
  180. "color": 7,
  181. "size": 4,
  182. "ts": 1541152488442,
  183. },
  184. Timestamp: 1541152488442,
  185. },
  186. {
  187. Emitter: "demoError",
  188. Message: map[string]interface{}{
  189. "color": "red",
  190. "size": "blue",
  191. "ts": 1541152489252,
  192. },
  193. Timestamp: 1541152489252,
  194. },
  195. },
  196. "demo1": {
  197. {
  198. Emitter: "demo1",
  199. Message: map[string]interface{}{
  200. "temp": 25.5,
  201. "hum": 65,
  202. "from": "device1",
  203. "ts": 1541152486013,
  204. },
  205. Timestamp: 1541152486013,
  206. },
  207. {
  208. Emitter: "demo1",
  209. Message: map[string]interface{}{
  210. "temp": 27.5,
  211. "hum": 59,
  212. "from": "device2",
  213. "ts": 1541152486823,
  214. },
  215. Timestamp: 1541152486823,
  216. },
  217. {
  218. Emitter: "demo1",
  219. Message: map[string]interface{}{
  220. "temp": 28.1,
  221. "hum": 75,
  222. "from": "device3",
  223. "ts": 1541152487632,
  224. },
  225. Timestamp: 1541152487632,
  226. },
  227. {
  228. Emitter: "demo1",
  229. Message: map[string]interface{}{
  230. "temp": 27.4,
  231. "hum": 80,
  232. "from": "device1",
  233. "ts": 1541152488442,
  234. },
  235. Timestamp: 1541152488442,
  236. },
  237. {
  238. Emitter: "demo1",
  239. Message: map[string]interface{}{
  240. "temp": 25.5,
  241. "hum": 62,
  242. "from": "device3",
  243. "ts": 1541152489252,
  244. },
  245. Timestamp: 1541152489252,
  246. },
  247. },
  248. "sessionDemo": {
  249. {
  250. Emitter: "sessionDemo",
  251. Message: map[string]interface{}{
  252. "temp": 25.5,
  253. "hum": 65,
  254. "ts": 1541152486013,
  255. },
  256. Timestamp: 1541152486013,
  257. },
  258. {
  259. Emitter: "sessionDemo",
  260. Message: map[string]interface{}{
  261. "temp": 27.5,
  262. "hum": 59,
  263. "ts": 1541152486823,
  264. },
  265. Timestamp: 1541152486823,
  266. },
  267. {
  268. Emitter: "sessionDemo",
  269. Message: map[string]interface{}{
  270. "temp": 28.1,
  271. "hum": 75,
  272. "ts": 1541152487932,
  273. },
  274. Timestamp: 1541152487932,
  275. },
  276. {
  277. Emitter: "sessionDemo",
  278. Message: map[string]interface{}{
  279. "temp": 27.4,
  280. "hum": 80,
  281. "ts": 1541152488442,
  282. },
  283. Timestamp: 1541152488442,
  284. },
  285. {
  286. Emitter: "sessionDemo",
  287. Message: map[string]interface{}{
  288. "temp": 25.5,
  289. "hum": 62,
  290. "ts": 1541152489252,
  291. },
  292. Timestamp: 1541152489252,
  293. },
  294. {
  295. Emitter: "sessionDemo",
  296. Message: map[string]interface{}{
  297. "temp": 26.2,
  298. "hum": 63,
  299. "ts": 1541152490062,
  300. },
  301. Timestamp: 1541152490062,
  302. },
  303. {
  304. Emitter: "sessionDemo",
  305. Message: map[string]interface{}{
  306. "temp": 26.8,
  307. "hum": 71,
  308. "ts": 1541152490872,
  309. },
  310. Timestamp: 1541152490872,
  311. },
  312. {
  313. Emitter: "sessionDemo",
  314. Message: map[string]interface{}{
  315. "temp": 28.9,
  316. "hum": 85,
  317. "ts": 1541152491682,
  318. },
  319. Timestamp: 1541152491682,
  320. },
  321. {
  322. Emitter: "sessionDemo",
  323. Message: map[string]interface{}{
  324. "temp": 29.1,
  325. "hum": 92,
  326. "ts": 1541152492492,
  327. },
  328. Timestamp: 1541152492492,
  329. },
  330. {
  331. Emitter: "sessionDemo",
  332. Message: map[string]interface{}{
  333. "temp": 32.2,
  334. "hum": 99,
  335. "ts": 1541152493202,
  336. },
  337. Timestamp: 1541152493202,
  338. },
  339. {
  340. Emitter: "sessionDemo",
  341. Message: map[string]interface{}{
  342. "temp": 30.9,
  343. "hum": 87,
  344. "ts": 1541152494112,
  345. },
  346. Timestamp: 1541152494112,
  347. },
  348. },
  349. "demoE": {
  350. {
  351. Emitter: "demoE",
  352. Message: map[string]interface{}{
  353. "color": "red",
  354. "size": 3,
  355. "ts": 1541152486013,
  356. },
  357. Timestamp: 1541152486023,
  358. },
  359. {
  360. Emitter: "demoE",
  361. Message: map[string]interface{}{
  362. "color": "blue",
  363. "size": 2,
  364. "ts": 1541152487632,
  365. },
  366. Timestamp: 1541152487822,
  367. },
  368. {
  369. Emitter: "demoE",
  370. Message: map[string]interface{}{
  371. "color": "red",
  372. "size": 1,
  373. "ts": 1541152489252,
  374. },
  375. Timestamp: 1541152489632,
  376. },
  377. { //dropped item
  378. Emitter: "demoE",
  379. Message: map[string]interface{}{
  380. "color": "blue",
  381. "size": 6,
  382. "ts": 1541152486822,
  383. },
  384. Timestamp: 1541152489842,
  385. },
  386. {
  387. Emitter: "demoE",
  388. Message: map[string]interface{}{
  389. "color": "yellow",
  390. "size": 4,
  391. "ts": 1541152488442,
  392. },
  393. Timestamp: 1541152490052,
  394. },
  395. { //To lift the watermark and issue all windows
  396. Emitter: "demoE",
  397. Message: map[string]interface{}{
  398. "color": "yellow",
  399. "size": 4,
  400. "ts": 1541152492342,
  401. },
  402. Timestamp: 1541152498888,
  403. },
  404. },
  405. "demo1E": {
  406. {
  407. Emitter: "demo1E",
  408. Message: map[string]interface{}{
  409. "temp": 27.5,
  410. "hum": 59,
  411. "ts": 1541152486823,
  412. },
  413. Timestamp: 1541152487250,
  414. },
  415. {
  416. Emitter: "demo1E",
  417. Message: map[string]interface{}{
  418. "temp": 25.5,
  419. "hum": 65,
  420. "ts": 1541152486013,
  421. },
  422. Timestamp: 1541152487751,
  423. },
  424. {
  425. Emitter: "demo1E",
  426. Message: map[string]interface{}{
  427. "temp": 27.4,
  428. "hum": 80,
  429. "ts": 1541152488442,
  430. },
  431. Timestamp: 1541152489252,
  432. },
  433. {
  434. Emitter: "demo1E",
  435. Message: map[string]interface{}{
  436. "temp": 28.1,
  437. "hum": 75,
  438. "ts": 1541152487632,
  439. },
  440. Timestamp: 1541152489753,
  441. },
  442. {
  443. Emitter: "demo1E",
  444. Message: map[string]interface{}{
  445. "temp": 25.5,
  446. "hum": 62,
  447. "ts": 1541152489252,
  448. },
  449. Timestamp: 1541152489954,
  450. },
  451. {
  452. Emitter: "demo1E",
  453. Message: map[string]interface{}{
  454. "temp": 25.5,
  455. "hum": 62,
  456. "ts": 1541152499252,
  457. },
  458. Timestamp: 1541152499755,
  459. },
  460. },
  461. "sessionDemoE": {
  462. {
  463. Emitter: "sessionDemoE",
  464. Message: map[string]interface{}{
  465. "temp": 25.5,
  466. "hum": 65,
  467. "ts": 1541152486013,
  468. },
  469. Timestamp: 1541152486250,
  470. },
  471. {
  472. Emitter: "sessionDemoE",
  473. Message: map[string]interface{}{
  474. "temp": 28.1,
  475. "hum": 75,
  476. "ts": 1541152487932,
  477. },
  478. Timestamp: 1541152487951,
  479. },
  480. {
  481. Emitter: "sessionDemoE",
  482. Message: map[string]interface{}{
  483. "temp": 27.5,
  484. "hum": 59,
  485. "ts": 1541152486823,
  486. },
  487. Timestamp: 1541152488552,
  488. },
  489. {
  490. Emitter: "sessionDemoE",
  491. Message: map[string]interface{}{
  492. "temp": 25.5,
  493. "hum": 62,
  494. "ts": 1541152489252,
  495. },
  496. Timestamp: 1541152489353,
  497. },
  498. {
  499. Emitter: "sessionDemoE",
  500. Message: map[string]interface{}{
  501. "temp": 27.4,
  502. "hum": 80,
  503. "ts": 1541152488442,
  504. },
  505. Timestamp: 1541152489854,
  506. },
  507. {
  508. Emitter: "sessionDemoE",
  509. Message: map[string]interface{}{
  510. "temp": 26.2,
  511. "hum": 63,
  512. "ts": 1541152490062,
  513. },
  514. Timestamp: 1541152490155,
  515. },
  516. {
  517. Emitter: "sessionDemoE",
  518. Message: map[string]interface{}{
  519. "temp": 28.9,
  520. "hum": 85,
  521. "ts": 1541152491682,
  522. },
  523. Timestamp: 1541152491686,
  524. },
  525. {
  526. Emitter: "sessionDemoE",
  527. Message: map[string]interface{}{
  528. "temp": 26.8,
  529. "hum": 71,
  530. "ts": 1541152490872,
  531. },
  532. Timestamp: 1541152491972,
  533. },
  534. {
  535. Emitter: "sessionDemoE",
  536. Message: map[string]interface{}{
  537. "temp": 29.1,
  538. "hum": 92,
  539. "ts": 1541152492492,
  540. },
  541. Timestamp: 1541152492592,
  542. },
  543. {
  544. Emitter: "sessionDemoE",
  545. Message: map[string]interface{}{
  546. "temp": 30.9,
  547. "hum": 87,
  548. "ts": 1541152494112,
  549. },
  550. Timestamp: 1541152494212,
  551. },
  552. {
  553. Emitter: "sessionDemoE",
  554. Message: map[string]interface{}{
  555. "temp": 32.2,
  556. "hum": 99,
  557. "ts": 1541152493202,
  558. },
  559. Timestamp: 1541152495202,
  560. },
  561. {
  562. Emitter: "sessionDemoE",
  563. Message: map[string]interface{}{
  564. "temp": 32.2,
  565. "hum": 99,
  566. "ts": 1541152499202,
  567. },
  568. Timestamp: 1541152499402,
  569. },
  570. },
  571. "demoErr": {
  572. {
  573. Emitter: "demoErr",
  574. Message: map[string]interface{}{
  575. "color": "red",
  576. "size": 3,
  577. "ts": 1541152486013,
  578. },
  579. Timestamp: 1541152486221,
  580. },
  581. {
  582. Emitter: "demoErr",
  583. Message: map[string]interface{}{
  584. "color": 2,
  585. "size": "blue",
  586. "ts": 1541152487632,
  587. },
  588. Timestamp: 1541152487722,
  589. },
  590. {
  591. Emitter: "demoErr",
  592. Message: map[string]interface{}{
  593. "color": "red",
  594. "size": 1,
  595. "ts": 1541152489252,
  596. },
  597. Timestamp: 1541152489332,
  598. },
  599. { //dropped item
  600. Emitter: "demoErr",
  601. Message: map[string]interface{}{
  602. "color": "blue",
  603. "size": 6,
  604. "ts": 1541152486822,
  605. },
  606. Timestamp: 1541152489822,
  607. },
  608. {
  609. Emitter: "demoErr",
  610. Message: map[string]interface{}{
  611. "color": "yellow",
  612. "size": 4,
  613. "ts": 1541152488442,
  614. },
  615. Timestamp: 1541152490042,
  616. },
  617. { //To lift the watermark and issue all windows
  618. Emitter: "demoErr",
  619. Message: map[string]interface{}{
  620. "color": "yellow",
  621. "size": 4,
  622. "ts": 1541152492342,
  623. },
  624. Timestamp: 1541152493842,
  625. },
  626. },
  627. "ldemo": {
  628. {
  629. Emitter: "ldemo",
  630. Message: map[string]interface{}{
  631. "color": "red",
  632. "size": 3,
  633. "ts": 1541152486013,
  634. },
  635. Timestamp: 1541152486013,
  636. },
  637. {
  638. Emitter: "ldemo",
  639. Message: map[string]interface{}{
  640. "color": "blue",
  641. "size": "string",
  642. "ts": 1541152486822,
  643. },
  644. Timestamp: 1541152486822,
  645. },
  646. {
  647. Emitter: "ldemo",
  648. Message: map[string]interface{}{
  649. "size": 3,
  650. "ts": 1541152487632,
  651. },
  652. Timestamp: 1541152487632,
  653. },
  654. {
  655. Emitter: "ldemo",
  656. Message: map[string]interface{}{
  657. "color": 49,
  658. "size": 2,
  659. "ts": 1541152488442,
  660. },
  661. Timestamp: 1541152488442,
  662. },
  663. {
  664. Emitter: "ldemo",
  665. Message: map[string]interface{}{
  666. "color": "red",
  667. "ts": 1541152489252,
  668. },
  669. Timestamp: 1541152489252,
  670. },
  671. },
  672. "ldemo1": {
  673. {
  674. Emitter: "ldemo1",
  675. Message: map[string]interface{}{
  676. "temp": 25.5,
  677. "hum": 65,
  678. "ts": 1541152486013,
  679. },
  680. Timestamp: 1541152486013,
  681. },
  682. {
  683. Emitter: "ldemo1",
  684. Message: map[string]interface{}{
  685. "temp": 27.5,
  686. "hum": 59,
  687. "ts": 1541152486823,
  688. },
  689. Timestamp: 1541152486823,
  690. },
  691. {
  692. Emitter: "ldemo1",
  693. Message: map[string]interface{}{
  694. "temp": 28.1,
  695. "hum": 75,
  696. "ts": 1541152487632,
  697. },
  698. Timestamp: 1541152487632,
  699. },
  700. {
  701. Emitter: "ldemo1",
  702. Message: map[string]interface{}{
  703. "temp": 27.4,
  704. "hum": 80,
  705. "ts": "1541152488442",
  706. },
  707. Timestamp: 1541152488442,
  708. },
  709. {
  710. Emitter: "ldemo1",
  711. Message: map[string]interface{}{
  712. "temp": 25.5,
  713. "hum": 62,
  714. "ts": 1541152489252,
  715. },
  716. Timestamp: 1541152489252,
  717. },
  718. },
  719. "lsessionDemo": {
  720. {
  721. Emitter: "lsessionDemo",
  722. Message: map[string]interface{}{
  723. "temp": 25.5,
  724. "hum": 65,
  725. "ts": 1541152486013,
  726. },
  727. Timestamp: 1541152486013,
  728. },
  729. {
  730. Emitter: "lsessionDemo",
  731. Message: map[string]interface{}{
  732. "temp": 27.5,
  733. "hum": 59,
  734. "ts": 1541152486823,
  735. },
  736. Timestamp: 1541152486823,
  737. },
  738. {
  739. Emitter: "lsessionDemo",
  740. Message: map[string]interface{}{
  741. "temp": 28.1,
  742. "hum": 75,
  743. "ts": 1541152487932,
  744. },
  745. Timestamp: 1541152487932,
  746. },
  747. {
  748. Emitter: "lsessionDemo",
  749. Message: map[string]interface{}{
  750. "temp": 27.4,
  751. "hum": 80,
  752. "ts": 1541152488442,
  753. },
  754. Timestamp: 1541152488442,
  755. },
  756. {
  757. Emitter: "lsessionDemo",
  758. Message: map[string]interface{}{
  759. "temp": 25.5,
  760. "hum": 62,
  761. "ts": 1541152489252,
  762. },
  763. Timestamp: 1541152489252,
  764. },
  765. {
  766. Emitter: "lsessionDemo",
  767. Message: map[string]interface{}{
  768. "temp": 26.2,
  769. "hum": 63,
  770. "ts": 1541152490062,
  771. },
  772. Timestamp: 1541152490062,
  773. },
  774. {
  775. Emitter: "lsessionDemo",
  776. Message: map[string]interface{}{
  777. "temp": 26.8,
  778. "hum": 71,
  779. "ts": 1541152490872,
  780. },
  781. Timestamp: 1541152490872,
  782. },
  783. {
  784. Emitter: "lsessionDemo",
  785. Message: map[string]interface{}{
  786. "temp": 28.9,
  787. "hum": 85,
  788. "ts": 1541152491682,
  789. },
  790. Timestamp: 1541152491682,
  791. },
  792. {
  793. Emitter: "lsessionDemo",
  794. Message: map[string]interface{}{
  795. "temp": 29.1,
  796. "hum": 92,
  797. "ts": 1541152492492,
  798. },
  799. Timestamp: 1541152492492,
  800. },
  801. {
  802. Emitter: "lsessionDemo",
  803. Message: map[string]interface{}{
  804. "temp": 2.2,
  805. "hum": 99,
  806. "ts": 1541152493202,
  807. },
  808. Timestamp: 1541152493202,
  809. },
  810. {
  811. Emitter: "lsessionDemo",
  812. Message: map[string]interface{}{
  813. "temp": 30.9,
  814. "hum": 87,
  815. "ts": 1541152494112,
  816. },
  817. Timestamp: 1541152494112,
  818. },
  819. },
  820. "text": {
  821. {
  822. Emitter: "text",
  823. Message: map[string]interface{}{
  824. "slogan": "Impossible is nothing",
  825. "brand": "Adidas",
  826. },
  827. Timestamp: 1541152486500,
  828. },
  829. {
  830. Emitter: "text",
  831. Message: map[string]interface{}{
  832. "slogan": "Stronger than dirt",
  833. "brand": "Ajax",
  834. },
  835. Timestamp: 1541152487400,
  836. },
  837. {
  838. Emitter: "text",
  839. Message: map[string]interface{}{
  840. "slogan": "Belong anywhere",
  841. "brand": "Airbnb",
  842. },
  843. Timestamp: 1541152488300,
  844. },
  845. {
  846. Emitter: "text",
  847. Message: map[string]interface{}{
  848. "slogan": "I can't believe I ate the whole thing",
  849. "brand": "Alka Seltzer",
  850. },
  851. Timestamp: 1541152489200,
  852. },
  853. {
  854. Emitter: "text",
  855. Message: map[string]interface{}{
  856. "slogan": "You're in good hands",
  857. "brand": "Allstate",
  858. },
  859. Timestamp: 1541152490100,
  860. },
  861. {
  862. Emitter: "text",
  863. Message: map[string]interface{}{
  864. "slogan": "Don't leave home without it",
  865. "brand": "American Express",
  866. },
  867. Timestamp: 1541152491200,
  868. },
  869. {
  870. Emitter: "text",
  871. Message: map[string]interface{}{
  872. "slogan": "Think different",
  873. "brand": "Apple",
  874. },
  875. Timestamp: 1541152492300,
  876. },
  877. {
  878. Emitter: "text",
  879. Message: map[string]interface{}{
  880. "slogan": "We try harder",
  881. "brand": "Avis",
  882. },
  883. Timestamp: 1541152493400,
  884. },
  885. },
  886. }
  887. func commonResultFunc(result [][]byte) interface{} {
  888. var maps [][]map[string]interface{}
  889. for _, v := range result {
  890. var mapRes []map[string]interface{}
  891. err := json.Unmarshal(v, &mapRes)
  892. if err != nil {
  893. panic("Failed to parse the input into map")
  894. }
  895. maps = append(maps, mapRes)
  896. }
  897. return maps
  898. }
  899. func doRuleTest(t *testing.T, tests []ruleTest, j int, opt *api.RuleOption) {
  900. doRuleTestBySinkProps(t, tests, j, opt, nil, commonResultFunc)
  901. }
  902. func doRuleTestBySinkProps(t *testing.T, tests []ruleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}, resultFunc func(result [][]byte) interface{}) {
  903. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  904. for i, tt := range tests {
  905. datas, dataLength, tp, mockSink, errCh := createStream(t, tt, j, opt, sinkProps)
  906. if err := sendData(t, dataLength, tt.m, datas, errCh, tp, POSTLEAP); err != nil {
  907. t.Errorf("send data error %s", err)
  908. break
  909. }
  910. compareResult(t, mockSink, resultFunc, tt, i, tp)
  911. }
  912. }
  913. func compareResult(t *testing.T, mockSink *test.MockSink, resultFunc func(result [][]byte) interface{}, tt ruleTest, i int, tp *xstream.TopologyNew) {
  914. // Check results
  915. results := mockSink.GetResults()
  916. maps := resultFunc(results)
  917. if !reflect.DeepEqual(tt.r, maps) {
  918. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  919. }
  920. if err := compareMetrics(tp, tt.m); err != nil {
  921. t.Errorf("%d. %q\n\nmetrics mismatch:\n\n%s\n\n", i, tt.sql, err)
  922. }
  923. if tt.t != nil {
  924. topo := tp.GetTopo()
  925. if !reflect.DeepEqual(tt.t, topo) {
  926. t.Errorf("%d. %q\n\ntopo mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.t, topo)
  927. }
  928. }
  929. tp.Cancel()
  930. }
  931. func sendData(t *testing.T, dataLength int, metrics map[string]interface{}, datas [][]*xsql.Tuple, errCh <-chan error, tp *xstream.TopologyNew, postleap int) error {
  932. // Send data and move time
  933. mockClock := test.GetMockClock()
  934. // TODO assume multiple data source send the data in order and has the same length
  935. for i := 0; i < dataLength; i++ {
  936. mockClock.Add(SOURCELEAP * time.Millisecond)
  937. common.Log.Debugf("Clock add to %d", common.GetNowInMilli())
  938. time.Sleep(1)
  939. for _, d := range datas {
  940. // Make sure time is going forward only
  941. if d[i].Timestamp > common.GetNowInMilli() {
  942. mockClock.Set(common.TimeFromUnixMilli(d[i].Timestamp))
  943. common.Log.Debugf("Clock set to %d", common.GetNowInMilli())
  944. time.Sleep(1)
  945. }
  946. select {
  947. case err := <-errCh:
  948. t.Log(err)
  949. tp.Cancel()
  950. return err
  951. default:
  952. }
  953. }
  954. }
  955. mockClock.Add(time.Duration(postleap) * time.Millisecond)
  956. common.Log.Debugf("Clock add to %d", common.GetNowInMilli())
  957. time.Sleep(1)
  958. // Check if stream done. Poll for metrics,
  959. for retry := 100; retry > 0; retry-- {
  960. time.Sleep(time.Duration(retry) * time.Millisecond)
  961. if err := compareMetrics(tp, metrics); err == nil {
  962. break
  963. } else {
  964. common.Log.Debugf("check metrics error at %d: %s", retry, err)
  965. }
  966. }
  967. return nil
  968. }
  969. func createStream(t *testing.T, tt ruleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}) ([][]*xsql.Tuple, int, *xstream.TopologyNew, *test.MockSink, <-chan error) {
  970. // Rest for each test
  971. cleanStateData()
  972. test.ResetClock(1541152485800)
  973. // Create stream
  974. var (
  975. sources []*nodes.SourceNode
  976. datas [][]*xsql.Tuple
  977. dataLength int
  978. )
  979. p := NewRuleProcessor(DbDir)
  980. parser := xsql.NewParser(strings.NewReader(tt.sql))
  981. if stmt, err := xsql.Language.Parse(parser); err != nil {
  982. t.Errorf("parse sql %s error: %s", tt.sql, err)
  983. } else {
  984. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  985. t.Errorf("sql %s is not a select statement", tt.sql)
  986. } else {
  987. streams := xsql.GetStreams(selectStmt)
  988. for _, stream := range streams {
  989. data := testData[stream]
  990. dataLength = len(data)
  991. datas = append(datas, data)
  992. source := nodes.NewSourceNodeWithSource(stream, test.NewMockSource(data), map[string]string{
  993. "DATASOURCE": stream,
  994. })
  995. sources = append(sources, source)
  996. }
  997. }
  998. }
  999. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.name, j), Sql: tt.sql, Options: opt}, sources)
  1000. if err != nil {
  1001. t.Error(err)
  1002. }
  1003. mockSink := test.NewMockSink()
  1004. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
  1005. tp.AddSink(inputs, sink)
  1006. errCh := tp.Open()
  1007. return datas, dataLength, tp, mockSink, errCh
  1008. }
  1009. // Create or drop streams
  1010. func handleStream(createOrDrop bool, names []string, t *testing.T) {
  1011. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  1012. for _, name := range names {
  1013. var sql string
  1014. if createOrDrop {
  1015. switch name {
  1016. case "demo":
  1017. sql = `CREATE STREAM demo (
  1018. color STRING,
  1019. size BIGINT,
  1020. ts BIGINT
  1021. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  1022. case "demoError":
  1023. sql = `CREATE STREAM demoError (
  1024. color STRING,
  1025. size BIGINT,
  1026. ts BIGINT
  1027. ) WITH (DATASOURCE="demoError", FORMAT="json", KEY="ts");`
  1028. case "demo1":
  1029. sql = `CREATE STREAM demo1 (
  1030. temp FLOAT,
  1031. hum BIGINT,` +
  1032. "`from`" + ` STRING,
  1033. ts BIGINT
  1034. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  1035. case "sessionDemo":
  1036. sql = `CREATE STREAM sessionDemo (
  1037. temp FLOAT,
  1038. hum BIGINT,
  1039. ts BIGINT
  1040. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  1041. case "demoE":
  1042. sql = `CREATE STREAM demoE (
  1043. color STRING,
  1044. size BIGINT,
  1045. ts BIGINT
  1046. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1047. case "demo1E":
  1048. sql = `CREATE STREAM demo1E (
  1049. temp FLOAT,
  1050. hum BIGINT,
  1051. ts BIGINT
  1052. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1053. case "sessionDemoE":
  1054. sql = `CREATE STREAM sessionDemoE (
  1055. temp FLOAT,
  1056. hum BIGINT,
  1057. ts BIGINT
  1058. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1059. case "demoErr":
  1060. sql = `CREATE STREAM demoErr (
  1061. color STRING,
  1062. size BIGINT,
  1063. ts BIGINT
  1064. ) WITH (DATASOURCE="demoErr", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1065. case "ldemo":
  1066. sql = `CREATE STREAM ldemo (
  1067. ) WITH (DATASOURCE="ldemo", FORMAT="json");`
  1068. case "ldemo1":
  1069. sql = `CREATE STREAM ldemo1 (
  1070. ) WITH (DATASOURCE="ldemo1", FORMAT="json");`
  1071. case "lsessionDemo":
  1072. sql = `CREATE STREAM lsessionDemo (
  1073. ) WITH (DATASOURCE="lsessionDemo", FORMAT="json");`
  1074. case "ext":
  1075. sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  1076. case "ext2":
  1077. sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  1078. case "text":
  1079. sql = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
  1080. default:
  1081. t.Errorf("create stream %s fail", name)
  1082. }
  1083. } else {
  1084. sql = `DROP STREAM ` + name
  1085. }
  1086. _, err := p.ExecStmt(sql)
  1087. if err != nil {
  1088. t.Log(err)
  1089. }
  1090. }
  1091. }