xsql_processor_test.go 27 KB


  1. package processors
  2. import (
  3. "encoding/json"
  4. "engine/common"
  5. "engine/xsql"
  6. "engine/xstream/api"
  7. "engine/xstream/nodes"
  8. "engine/xstream/test"
  9. "fmt"
  10. "path"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. "time"
  15. )
  16. var BadgerDir string
  17. func init(){
  18. BadgerDir, err := common.GetAndCreateDataLoc("test")
  19. if err != nil {
  20. log.Panic(err)
  21. }
  22. log.Infof("badge location is %s", BadgerDir)
  23. }
  24. func TestStreamCreateProcessor(t *testing.T) {
  25. var tests = []struct {
  26. s string
  27. r []string
  28. err string
  29. }{
  30. {
  31. s: `SHOW STREAMS;`,
  32. r: []string{"No stream definition found."},
  33. },
  34. {
  35. s: `EXPLAIN STREAM topic1;`,
  36. err: "stream topic1 is not found.",
  37. },
  38. {
  39. s: `CREATE STREAM topic1 (
  40. USERID BIGINT,
  41. FIRST_NAME STRING,
  42. LAST_NAME STRING,
  43. NICKNAMES ARRAY(STRING),
  44. Gender BOOLEAN,
  45. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  46. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  47. r: []string{"Stream topic1 is created."},
  48. },
  49. {
  50. s: `EXPLAIN STREAM topic1;`,
  51. r: []string{"TO BE SUPPORTED"},
  52. },
  53. {
  54. s: `DESCRIBE STREAM topic1;`,
  55. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  56. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  57. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  58. },
  59. {
  60. s: `SHOW STREAMS;`,
  61. r: []string{"topic1"},
  62. },
  63. {
  64. s: `DROP STREAM topic1;`,
  65. r: []string{"stream topic1 dropped"},
  66. },
  67. {
  68. s: `DESCRIBE STREAM topic1;`,
  69. err: "Stream topic1 is not found.",
  70. },
  71. {
  72. s: `DROP STREAM topic1;`,
  73. err: "Key not found",
  74. },
  75. }
  76. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  77. streamDB := path.Join(BadgerDir, "streamTest")
  78. for i, tt := range tests {
  79. results, err := NewStreamProcessor(tt.s, streamDB).Exec()
  80. if !reflect.DeepEqual(tt.err, errstring(err)) {
  81. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  82. } else if tt.err == "" {
  83. if !reflect.DeepEqual(tt.r, results) {
  84. t.Errorf("%d. %q\n\nstmt mismatch:\n\ngot=%#v\n\n", i, tt.s, results)
  85. }
  86. }
  87. }
  88. }
  89. func createStreams(t *testing.T){
  90. demo := `CREATE STREAM demo (
  91. color STRING,
  92. size BIGINT,
  93. ts BIGINT
  94. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  95. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  96. if err != nil{
  97. t.Log(err)
  98. }
  99. demo1 := `CREATE STREAM demo1 (
  100. temp FLOAT,
  101. hum BIGINT,
  102. ts BIGINT
  103. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  104. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  105. if err != nil{
  106. t.Log(err)
  107. }
  108. sessionDemo := `CREATE STREAM sessionDemo (
  109. temp FLOAT,
  110. hum BIGINT,
  111. ts BIGINT
  112. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  113. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  114. if err != nil{
  115. t.Log(err)
  116. }
  117. }
  118. func dropStreams(t *testing.T){
  119. demo := `DROP STREAM demo`
  120. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  121. if err != nil{
  122. t.Log(err)
  123. }
  124. demo1 := `DROP STREAM demo1`
  125. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  126. if err != nil{
  127. t.Log(err)
  128. }
  129. sessionDemo := `DROP STREAM sessionDemo`
  130. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  131. if err != nil{
  132. t.Log(err)
  133. }
  134. }
  135. func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  136. var data []*xsql.Tuple
  137. switch name{
  138. case "demo":
  139. data = []*xsql.Tuple{
  140. {
  141. Emitter: name,
  142. Message: map[string]interface{}{
  143. "color": "red",
  144. "size": 3,
  145. "ts": 1541152486013,
  146. },
  147. Timestamp: 1541152486013,
  148. },
  149. {
  150. Emitter: name,
  151. Message: map[string]interface{}{
  152. "color": "blue",
  153. "size": 6,
  154. "ts": 1541152486822,
  155. },
  156. Timestamp: 1541152486822,
  157. },
  158. {
  159. Emitter: name,
  160. Message: map[string]interface{}{
  161. "color": "blue",
  162. "size": 2,
  163. "ts": 1541152487632,
  164. },
  165. Timestamp: 1541152487632,
  166. },
  167. {
  168. Emitter: name,
  169. Message: map[string]interface{}{
  170. "color": "yellow",
  171. "size": 4,
  172. "ts": 1541152488442,
  173. },
  174. Timestamp: 1541152488442,
  175. },
  176. {
  177. Emitter: name,
  178. Message: map[string]interface{}{
  179. "color": "red",
  180. "size": 1,
  181. "ts": 1541152489252,
  182. },
  183. Timestamp: 1541152489252,
  184. },
  185. }
  186. case "demo1":
  187. data = []*xsql.Tuple{
  188. {
  189. Emitter: name,
  190. Message: map[string]interface{}{
  191. "temp": 25.5,
  192. "hum": 65,
  193. "ts": 1541152486013,
  194. },
  195. Timestamp: 1541152486013,
  196. },
  197. {
  198. Emitter: name,
  199. Message: map[string]interface{}{
  200. "temp": 27.5,
  201. "hum": 59,
  202. "ts": 1541152486823,
  203. },
  204. Timestamp: 1541152486823,
  205. },
  206. {
  207. Emitter: name,
  208. Message: map[string]interface{}{
  209. "temp": 28.1,
  210. "hum": 75,
  211. "ts": 1541152487632,
  212. },
  213. Timestamp: 1541152487632,
  214. },
  215. {
  216. Emitter: name,
  217. Message: map[string]interface{}{
  218. "temp": 27.4,
  219. "hum": 80,
  220. "ts": 1541152488442,
  221. },
  222. Timestamp: 1541152488442,
  223. },
  224. {
  225. Emitter: name,
  226. Message: map[string]interface{}{
  227. "temp": 25.5,
  228. "hum": 62,
  229. "ts": 1541152489252,
  230. },
  231. Timestamp: 1541152489252,
  232. },
  233. }
  234. case "sessionDemo":
  235. data = []*xsql.Tuple{
  236. {
  237. Emitter: name,
  238. Message: map[string]interface{}{
  239. "temp": 25.5,
  240. "hum": 65,
  241. "ts": 1541152486013,
  242. },
  243. Timestamp: 1541152486013,
  244. },
  245. {
  246. Emitter: name,
  247. Message: map[string]interface{}{
  248. "temp": 27.5,
  249. "hum": 59,
  250. "ts": 1541152486823,
  251. },
  252. Timestamp: 1541152486823,
  253. },
  254. {
  255. Emitter: name,
  256. Message: map[string]interface{}{
  257. "temp": 28.1,
  258. "hum": 75,
  259. "ts": 1541152487932,
  260. },
  261. Timestamp: 1541152487932,
  262. },
  263. {
  264. Emitter: name,
  265. Message: map[string]interface{}{
  266. "temp": 27.4,
  267. "hum": 80,
  268. "ts": 1541152488442,
  269. },
  270. Timestamp: 1541152488442,
  271. },
  272. {
  273. Emitter: name,
  274. Message: map[string]interface{}{
  275. "temp": 25.5,
  276. "hum": 62,
  277. "ts": 1541152489252,
  278. },
  279. Timestamp: 1541152489252,
  280. },
  281. {
  282. Emitter: name,
  283. Message: map[string]interface{}{
  284. "temp": 26.2,
  285. "hum": 63,
  286. "ts": 1541152490062,
  287. },
  288. Timestamp: 1541152490062,
  289. },
  290. {
  291. Emitter: name,
  292. Message: map[string]interface{}{
  293. "temp": 26.8,
  294. "hum": 71,
  295. "ts": 1541152490872,
  296. },
  297. Timestamp: 1541152490872,
  298. },
  299. {
  300. Emitter: name,
  301. Message: map[string]interface{}{
  302. "temp": 28.9,
  303. "hum": 85,
  304. "ts": 1541152491682,
  305. },
  306. Timestamp: 1541152491682,
  307. },
  308. {
  309. Emitter: name,
  310. Message: map[string]interface{}{
  311. "temp": 29.1,
  312. "hum": 92,
  313. "ts": 1541152492492,
  314. },
  315. Timestamp: 1541152492492,
  316. },
  317. {
  318. Emitter: name,
  319. Message: map[string]interface{}{
  320. "temp": 32.2,
  321. "hum": 99,
  322. "ts": 1541152493202,
  323. },
  324. Timestamp: 1541152493202,
  325. },
  326. {
  327. Emitter: name,
  328. Message: map[string]interface{}{
  329. "temp": 30.9,
  330. "hum": 87,
  331. "ts": 1541152494112,
  332. },
  333. Timestamp: 1541152494112,
  334. },
  335. }
  336. }
  337. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, false), map[string]string{
  338. "DATASOURCE": name,
  339. })
  340. }
  341. func TestSingleSQL(t *testing.T) {
  342. var tests = []struct {
  343. name string
  344. sql string
  345. r [][]map[string]interface{}
  346. }{
  347. {
  348. name: `rule1`,
  349. sql: `SELECT * FROM demo`,
  350. r: [][]map[string]interface{}{
  351. {{
  352. "color": "red",
  353. "size": float64(3),
  354. "ts": float64(1541152486013),
  355. }},
  356. {{
  357. "color": "blue",
  358. "size": float64(6),
  359. "ts": float64(1541152486822),
  360. }},
  361. {{
  362. "color": "blue",
  363. "size": float64(2),
  364. "ts": float64(1541152487632),
  365. }},
  366. {{
  367. "color": "yellow",
  368. "size": float64(4),
  369. "ts": float64(1541152488442),
  370. }},
  371. {{
  372. "color": "red",
  373. "size": float64(1),
  374. "ts": float64(1541152489252),
  375. }},
  376. },
  377. }, {
  378. name: `rule2`,
  379. sql: `SELECT color, ts FROM demo where size > 3`,
  380. r: [][]map[string]interface{}{
  381. {{
  382. "color": "blue",
  383. "ts": float64(1541152486822),
  384. }},
  385. {{
  386. "color": "yellow",
  387. "ts": float64(1541152488442),
  388. }},
  389. },
  390. },
  391. }
  392. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  393. createStreams(t)
  394. defer dropStreams(t)
  395. done := make(chan struct{})
  396. defer close(done)
  397. for i, tt := range tests {
  398. p := NewRuleProcessor(BadgerDir)
  399. parser := xsql.NewParser(strings.NewReader(tt.sql))
  400. var sources []*nodes.SourceNode
  401. if stmt, err := xsql.Language.Parse(parser); err != nil{
  402. t.Errorf("parse sql %s error: %s", tt.sql , err)
  403. }else {
  404. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  405. t.Errorf("sql %s is not a select statement", tt.sql)
  406. } else {
  407. streams := xsql.GetStreams(selectStmt)
  408. for _, stream := range streams{
  409. source := getMockSource(stream, done, 5)
  410. sources = append(sources, source)
  411. }
  412. }
  413. }
  414. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  415. if err != nil{
  416. t.Error(err)
  417. }
  418. mockSink := test.NewMockSink()
  419. sink := nodes.NewSinkNode("MockSink", mockSink)
  420. tp.AddSink(inputs, sink)
  421. count := len(sources)
  422. errCh := tp.Open()
  423. func(){
  424. for{
  425. select{
  426. case err = <- errCh:
  427. t.Log(err)
  428. tp.Cancel()
  429. return
  430. case <- done:
  431. count--
  432. log.Infof("%d sources remaining", count)
  433. if count <= 0{
  434. log.Info("stream stopping")
  435. time.Sleep(1 * time.Second)
  436. tp.Cancel()
  437. return
  438. }
  439. default:
  440. }
  441. }
  442. }()
  443. results := mockSink.GetResults()
  444. var maps [][]map[string]interface{}
  445. for _, v := range results{
  446. var mapRes []map[string]interface{}
  447. err := json.Unmarshal(v, &mapRes)
  448. if err != nil {
  449. t.Errorf("Failed to parse the input into map")
  450. continue
  451. }
  452. maps = append(maps, mapRes)
  453. }
  454. if !reflect.DeepEqual(tt.r, maps) {
  455. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  456. }
  457. }
  458. }
  459. func TestWindow(t *testing.T) {
  460. common.IsTesting = true
  461. var tests = []struct {
  462. name string
  463. sql string
  464. size int
  465. r [][]map[string]interface{}
  466. }{
  467. {
  468. name: `rule1`,
  469. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  470. size: 5,
  471. r: [][]map[string]interface{}{
  472. {{
  473. "color": "red",
  474. "size": float64(3),
  475. "ts": float64(1541152486013),
  476. },{
  477. "color": "blue",
  478. "size": float64(6),
  479. "ts": float64(1541152486822),
  480. }},
  481. {{
  482. "color": "red",
  483. "size": float64(3),
  484. "ts": float64(1541152486013),
  485. },{
  486. "color": "blue",
  487. "size": float64(6),
  488. "ts": float64(1541152486822),
  489. },{
  490. "color": "blue",
  491. "size": float64(2),
  492. "ts": float64(1541152487632),
  493. }},
  494. {{
  495. "color": "blue",
  496. "size": float64(2),
  497. "ts": float64(1541152487632),
  498. },{
  499. "color": "yellow",
  500. "size": float64(4),
  501. "ts": float64(1541152488442),
  502. }},
  503. },
  504. }, {
  505. name: `rule2`,
  506. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  507. size: 5,
  508. r: [][]map[string]interface{}{
  509. {{
  510. "color": "red",
  511. "ts": float64(1541152486013),
  512. },{
  513. "color": "blue",
  514. "ts": float64(1541152486822),
  515. }},
  516. {{
  517. "color": "yellow",
  518. "ts": float64(1541152488442),
  519. }},
  520. },
  521. }, {
  522. name: `rule3`,
  523. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  524. size: 5,
  525. r: [][]map[string]interface{}{
  526. {{
  527. "color": "red",
  528. "temp": 25.5,
  529. "ts": float64(1541152486013),
  530. }},{{
  531. "color": "red",
  532. "temp": 25.5,
  533. "ts": float64(1541152486013),
  534. }},{{
  535. "color": "red",
  536. "temp": 25.5,
  537. "ts": float64(1541152486013),
  538. }},{{
  539. "color": "blue",
  540. "temp": 28.1,
  541. "ts": float64(1541152487632),
  542. }},{{
  543. "color": "blue",
  544. "temp": 28.1,
  545. "ts": float64(1541152487632),
  546. }},{{
  547. "color": "blue",
  548. "temp": 28.1,
  549. "ts": float64(1541152487632),
  550. },{
  551. "color": "yellow",
  552. "temp": 27.4,
  553. "ts": float64(1541152488442),
  554. }},{{
  555. "color": "yellow",
  556. "temp": 27.4,
  557. "ts": float64(1541152488442),
  558. }},{{
  559. "color": "yellow",
  560. "temp": 27.4,
  561. "ts": float64(1541152488442),
  562. },{
  563. "color": "red",
  564. "temp": 25.5,
  565. "ts": float64(1541152489252),
  566. }},
  567. },
  568. }, {
  569. name: `rule4`,
  570. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  571. size: 5,
  572. r: [][]map[string]interface{}{
  573. {{
  574. "color": "red",
  575. }},{{
  576. "color": "blue",
  577. },{
  578. "color": "red",
  579. }},{{
  580. "color": "blue",
  581. },{
  582. "color": "red",
  583. }},{{
  584. "color": "blue",
  585. },{
  586. "color": "yellow",
  587. }},{{
  588. "color": "blue",
  589. }, {
  590. "color": "red",
  591. },{
  592. "color": "yellow",
  593. }},
  594. },
  595. },{
  596. name: `rule5`,
  597. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  598. size: 11,
  599. r: [][]map[string]interface{}{
  600. {{
  601. "temp": 25.5,
  602. },{
  603. "temp": 27.5,
  604. }},{{
  605. "temp": 28.1,
  606. },{
  607. "temp": 27.4,
  608. },{
  609. "temp": 25.5,
  610. }},{{
  611. "temp": 26.2,
  612. },{
  613. "temp": 26.8,
  614. },{
  615. "temp": 28.9,
  616. },{
  617. "temp": 29.1,
  618. },{
  619. "temp": 32.2,
  620. }},
  621. },
  622. },{
  623. name: `rule6`,
  624. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  625. size: 5,
  626. r: [][]map[string]interface{}{
  627. {{
  628. "m": 25.5,
  629. "c": float64(1),
  630. }},{{
  631. "m": 25.5,
  632. "c": float64(1),
  633. }},{{
  634. "m": 25.5,
  635. "c": float64(1),
  636. }},{{
  637. "m": 28.1,
  638. "c": float64(1),
  639. }},{{
  640. "m": 28.1,
  641. "c": float64(1),
  642. }},{{
  643. "m": 28.1,
  644. "c": float64(2),
  645. }},{{
  646. "m": 27.4,
  647. "c": float64(1),
  648. }},{{
  649. "m": 27.4,
  650. "c": float64(2),
  651. }},
  652. },
  653. },
  654. }
  655. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  656. createStreams(t)
  657. defer dropStreams(t)
  658. done := make(chan struct{})
  659. defer close(done)
  660. common.ResetMockTicker()
  661. for i, tt := range tests {
  662. p := NewRuleProcessor(BadgerDir)
  663. parser := xsql.NewParser(strings.NewReader(tt.sql))
  664. var sources []*nodes.SourceNode
  665. if stmt, err := xsql.Language.Parse(parser); err != nil{
  666. t.Errorf("parse sql %s error: %s", tt.sql , err)
  667. }else {
  668. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  669. t.Errorf("sql %s is not a select statement", tt.sql)
  670. } else {
  671. streams := xsql.GetStreams(selectStmt)
  672. for _, stream := range streams{
  673. source := getMockSource(stream, done, tt.size)
  674. sources = append(sources, source)
  675. }
  676. }
  677. }
  678. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  679. if err != nil{
  680. t.Error(err)
  681. }
  682. mockSink := test.NewMockSink()
  683. sink := nodes.NewSinkNode("mockSink", mockSink)
  684. tp.AddSink(inputs, sink)
  685. count := len(sources)
  686. errCh := tp.Open()
  687. func(){
  688. for{
  689. select{
  690. case err = <- errCh:
  691. t.Log(err)
  692. tp.Cancel()
  693. return
  694. case <- done:
  695. count--
  696. log.Infof("%d sources remaining", count)
  697. if count <= 0{
  698. log.Info("stream stopping")
  699. time.Sleep(1 * time.Second)
  700. tp.Cancel()
  701. return
  702. }
  703. default:
  704. }
  705. }
  706. }()
  707. results := mockSink.GetResults()
  708. var maps [][]map[string]interface{}
  709. for _, v := range results{
  710. var mapRes []map[string]interface{}
  711. err := json.Unmarshal(v, &mapRes)
  712. if err != nil {
  713. t.Errorf("Failed to parse the input into map")
  714. continue
  715. }
  716. maps = append(maps, mapRes)
  717. }
  718. if !reflect.DeepEqual(tt.r, maps) {
  719. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  720. }
  721. }
  722. }
  723. func createEventStreams(t *testing.T){
  724. demo := `CREATE STREAM demoE (
  725. color STRING,
  726. size BIGINT,
  727. ts BIGINT
  728. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  729. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  730. if err != nil{
  731. t.Log(err)
  732. }
  733. demo1 := `CREATE STREAM demo1E (
  734. temp FLOAT,
  735. hum BIGINT,
  736. ts BIGINT
  737. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  738. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  739. if err != nil{
  740. t.Log(err)
  741. }
  742. sessionDemo := `CREATE STREAM sessionDemoE (
  743. temp FLOAT,
  744. hum BIGINT,
  745. ts BIGINT
  746. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  747. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  748. if err != nil{
  749. t.Log(err)
  750. }
  751. }
  752. func dropEventStreams(t *testing.T){
  753. demo := `DROP STREAM demoE`
  754. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  755. if err != nil{
  756. t.Log(err)
  757. }
  758. demo1 := `DROP STREAM demo1E`
  759. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  760. if err != nil{
  761. t.Log(err)
  762. }
  763. sessionDemo := `DROP STREAM sessionDemoE`
  764. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  765. if err != nil{
  766. t.Log(err)
  767. }
  768. }
  769. func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  770. var data []*xsql.Tuple
  771. switch name{
  772. case "demoE":
  773. data = []*xsql.Tuple{
  774. {
  775. Emitter: name,
  776. Message: map[string]interface{}{
  777. "color": "red",
  778. "size": 3,
  779. "ts": 1541152486013,
  780. },
  781. Timestamp: 1541152486013,
  782. },
  783. {
  784. Emitter: name,
  785. Message: map[string]interface{}{
  786. "color": "blue",
  787. "size": 2,
  788. "ts": 1541152487632,
  789. },
  790. Timestamp: 1541152487632,
  791. },
  792. {
  793. Emitter: name,
  794. Message: map[string]interface{}{
  795. "color": "red",
  796. "size": 1,
  797. "ts": 1541152489252,
  798. },
  799. Timestamp: 1541152489252,
  800. },
  801. { //dropped item
  802. Emitter: name,
  803. Message: map[string]interface{}{
  804. "color": "blue",
  805. "size": 6,
  806. "ts": 1541152486822,
  807. },
  808. Timestamp: 1541152486822,
  809. },
  810. {
  811. Emitter: name,
  812. Message: map[string]interface{}{
  813. "color": "yellow",
  814. "size": 4,
  815. "ts": 1541152488442,
  816. },
  817. Timestamp: 1541152488442,
  818. },
  819. { //To lift the watermark and issue all windows
  820. Emitter: name,
  821. Message: map[string]interface{}{
  822. "color": "yellow",
  823. "size": 4,
  824. "ts": 1541152492342,
  825. },
  826. Timestamp: 1541152488442,
  827. },
  828. }
  829. case "demo1E":
  830. data = []*xsql.Tuple{
  831. {
  832. Emitter: name,
  833. Message: map[string]interface{}{
  834. "temp": 27.5,
  835. "hum": 59,
  836. "ts": 1541152486823,
  837. },
  838. Timestamp: 1541152486823,
  839. },
  840. {
  841. Emitter: name,
  842. Message: map[string]interface{}{
  843. "temp": 25.5,
  844. "hum": 65,
  845. "ts": 1541152486013,
  846. },
  847. Timestamp: 1541152486013,
  848. },
  849. {
  850. Emitter: name,
  851. Message: map[string]interface{}{
  852. "temp": 27.4,
  853. "hum": 80,
  854. "ts": 1541152488442,
  855. },
  856. Timestamp: 1541152488442,
  857. },
  858. {
  859. Emitter: name,
  860. Message: map[string]interface{}{
  861. "temp": 28.1,
  862. "hum": 75,
  863. "ts": 1541152487632,
  864. },
  865. Timestamp: 1541152487632,
  866. },
  867. {
  868. Emitter: name,
  869. Message: map[string]interface{}{
  870. "temp": 25.5,
  871. "hum": 62,
  872. "ts": 1541152489252,
  873. },
  874. Timestamp: 1541152489252,
  875. },
  876. {
  877. Emitter: name,
  878. Message: map[string]interface{}{
  879. "temp": 25.5,
  880. "hum": 62,
  881. "ts": 1541152499252,
  882. },
  883. Timestamp: 1541152499252,
  884. },
  885. }
  886. case "sessionDemoE":
  887. data = []*xsql.Tuple{
  888. {
  889. Emitter: name,
  890. Message: map[string]interface{}{
  891. "temp": 25.5,
  892. "hum": 65,
  893. "ts": 1541152486013,
  894. },
  895. Timestamp: 1541152486013,
  896. },
  897. {
  898. Emitter: name,
  899. Message: map[string]interface{}{
  900. "temp": 28.1,
  901. "hum": 75,
  902. "ts": 1541152487932,
  903. },
  904. Timestamp: 1541152487932,
  905. },
  906. {
  907. Emitter: name,
  908. Message: map[string]interface{}{
  909. "temp": 27.5,
  910. "hum": 59,
  911. "ts": 1541152486823,
  912. },
  913. Timestamp: 1541152486823,
  914. },
  915. {
  916. Emitter: name,
  917. Message: map[string]interface{}{
  918. "temp": 25.5,
  919. "hum": 62,
  920. "ts": 1541152489252,
  921. },
  922. Timestamp: 1541152489252,
  923. },
  924. {
  925. Emitter: name,
  926. Message: map[string]interface{}{
  927. "temp": 27.4,
  928. "hum": 80,
  929. "ts": 1541152488442,
  930. },
  931. Timestamp: 1541152488442,
  932. },
  933. {
  934. Emitter: name,
  935. Message: map[string]interface{}{
  936. "temp": 26.2,
  937. "hum": 63,
  938. "ts": 1541152490062,
  939. },
  940. Timestamp: 1541152490062,
  941. },
  942. {
  943. Emitter: name,
  944. Message: map[string]interface{}{
  945. "temp": 28.9,
  946. "hum": 85,
  947. "ts": 1541152491682,
  948. },
  949. Timestamp: 1541152491682,
  950. },
  951. {
  952. Emitter: name,
  953. Message: map[string]interface{}{
  954. "temp": 26.8,
  955. "hum": 71,
  956. "ts": 1541152490872,
  957. },
  958. Timestamp: 1541152490872,
  959. },
  960. {
  961. Emitter: name,
  962. Message: map[string]interface{}{
  963. "temp": 29.1,
  964. "hum": 92,
  965. "ts": 1541152492492,
  966. },
  967. Timestamp: 1541152492492,
  968. },
  969. {
  970. Emitter: name,
  971. Message: map[string]interface{}{
  972. "temp": 30.9,
  973. "hum": 87,
  974. "ts": 1541152494112,
  975. },
  976. Timestamp: 1541152494112,
  977. },
  978. {
  979. Emitter: name,
  980. Message: map[string]interface{}{
  981. "temp": 32.2,
  982. "hum": 99,
  983. "ts": 1541152493202,
  984. },
  985. Timestamp: 1541152493202,
  986. },
  987. {
  988. Emitter: name,
  989. Message: map[string]interface{}{
  990. "temp": 32.2,
  991. "hum": 99,
  992. "ts": 1541152499202,
  993. },
  994. Timestamp: 1541152499202,
  995. },
  996. }
  997. }
  998. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, true), map[string]string{
  999. "DATASOURCE": name,
  1000. })
  1001. }
  1002. func TestEventWindow(t *testing.T) {
  1003. common.IsTesting = true
  1004. var tests = []struct {
  1005. name string
  1006. sql string
  1007. size int
  1008. r [][]map[string]interface{}
  1009. }{
  1010. {
  1011. name: `rule1`,
  1012. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1013. size: 6,
  1014. r: [][]map[string]interface{}{
  1015. {{
  1016. "color": "red",
  1017. "size": float64(3),
  1018. "ts": float64(1541152486013),
  1019. }},
  1020. {{
  1021. "color": "red",
  1022. "size": float64(3),
  1023. "ts": float64(1541152486013),
  1024. },{
  1025. "color": "blue",
  1026. "size": float64(2),
  1027. "ts": float64(1541152487632),
  1028. }},
  1029. {{
  1030. "color": "blue",
  1031. "size": float64(2),
  1032. "ts": float64(1541152487632),
  1033. },{
  1034. "color": "yellow",
  1035. "size": float64(4),
  1036. "ts": float64(1541152488442),
  1037. }},{{
  1038. "color": "yellow",
  1039. "size": float64(4),
  1040. "ts": float64(1541152488442),
  1041. },{
  1042. "color": "red",
  1043. "size": float64(1),
  1044. "ts": float64(1541152489252),
  1045. }},{{
  1046. "color": "red",
  1047. "size": float64(1),
  1048. "ts": float64(1541152489252),
  1049. }},
  1050. },
  1051. }, {
  1052. name: `rule2`,
  1053. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1054. size: 6,
  1055. r: [][]map[string]interface{}{
  1056. {{
  1057. "color": "red",
  1058. "ts": float64(1541152486013),
  1059. }},
  1060. {{
  1061. "color": "yellow",
  1062. "ts": float64(1541152488442),
  1063. }},
  1064. },
  1065. }, {
  1066. name: `rule3`,
  1067. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1068. size: 6,
  1069. r: [][]map[string]interface{}{
  1070. {{
  1071. "color": "red",
  1072. "temp": 25.5,
  1073. "ts": float64(1541152486013),
  1074. }},{{
  1075. "color": "red",
  1076. "temp": 25.5,
  1077. "ts": float64(1541152486013),
  1078. }},{{
  1079. "color": "blue",
  1080. "temp": 28.1,
  1081. "ts": float64(1541152487632),
  1082. }},{{
  1083. "color": "blue",
  1084. "temp": 28.1,
  1085. "ts": float64(1541152487632),
  1086. },{
  1087. "color": "yellow",
  1088. "temp": 27.4,
  1089. "ts": float64(1541152488442),
  1090. }},{{
  1091. "color": "yellow",
  1092. "temp": 27.4,
  1093. "ts": float64(1541152488442),
  1094. },{
  1095. "color": "red",
  1096. "temp": 25.5,
  1097. "ts": float64(1541152489252),
  1098. }},
  1099. },
  1100. }, {
  1101. name: `rule4`,
  1102. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1103. size: 6,
  1104. r: [][]map[string]interface{}{
  1105. {{
  1106. "color": "red",
  1107. }},{{
  1108. "color": "blue",
  1109. },{
  1110. "color": "red",
  1111. }},{{
  1112. "color": "blue",
  1113. },{
  1114. "color": "yellow",
  1115. }},{{
  1116. "color": "blue",
  1117. }, {
  1118. "color": "red",
  1119. },{
  1120. "color": "yellow",
  1121. }},
  1122. },
  1123. },{
  1124. name: `rule5`,
  1125. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1126. size: 12,
  1127. r: [][]map[string]interface{}{
  1128. {{
  1129. "temp": 25.5,
  1130. }},{{
  1131. "temp": 28.1,
  1132. },{
  1133. "temp": 27.4,
  1134. },{
  1135. "temp": 25.5,
  1136. }},{{
  1137. "temp": 26.2,
  1138. },{
  1139. "temp": 26.8,
  1140. },{
  1141. "temp": 28.9,
  1142. },{
  1143. "temp": 29.1,
  1144. },{
  1145. "temp": 32.2,
  1146. }},{{
  1147. "temp": 30.9,
  1148. }},
  1149. },
  1150. },{
  1151. name: `rule6`,
  1152. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1153. size: 6,
  1154. r: [][]map[string]interface{}{
  1155. {{
  1156. "m": 25.5,
  1157. "c": float64(1),
  1158. }},{{
  1159. "m": 25.5,
  1160. "c": float64(1),
  1161. }},{{
  1162. "m": 28.1,
  1163. "c": float64(1),
  1164. }},{{
  1165. "m": 28.1,
  1166. "c": float64(2),
  1167. }},{{
  1168. "m": 27.4,
  1169. "c": float64(2),
  1170. }},
  1171. },
  1172. },
  1173. }
  1174. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1175. createEventStreams(t)
  1176. defer dropEventStreams(t)
  1177. done := make(chan struct{})
  1178. defer close(done)
  1179. common.ResetMockTicker()
  1180. //mock ticker
  1181. realTicker := time.NewTicker(500 * time.Millisecond)
  1182. tickerDone := make(chan bool)
  1183. go func(){
  1184. ticker := common.GetTicker(1000).(*common.MockTicker)
  1185. timer := common.GetTimer(1000).(*common.MockTimer)
  1186. for {
  1187. select {
  1188. case <-tickerDone:
  1189. log.Infof("real ticker exiting...")
  1190. return
  1191. case t := <-realTicker.C:
  1192. ts := common.TimeToUnixMilli(t)
  1193. if ticker != nil {
  1194. go ticker.DoTick(ts)
  1195. }
  1196. if timer != nil {
  1197. go timer.DoTick(ts)
  1198. }
  1199. }
  1200. }
  1201. }()
  1202. for i, tt := range tests {
  1203. p := NewRuleProcessor(BadgerDir)
  1204. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1205. var sources []*nodes.SourceNode
  1206. if stmt, err := xsql.Language.Parse(parser); err != nil{
  1207. t.Errorf("parse sql %s error: %s", tt.sql , err)
  1208. }else {
  1209. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1210. t.Errorf("sql %s is not a select statement", tt.sql)
  1211. } else {
  1212. streams := xsql.GetStreams(selectStmt)
  1213. for _, stream := range streams{
  1214. source := getEventMockSource(stream, done, tt.size)
  1215. sources = append(sources, source)
  1216. }
  1217. }
  1218. }
  1219. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  1220. Id:tt.name, Sql: tt.sql,
  1221. Options: map[string]interface{}{
  1222. "isEventTime": true,
  1223. "lateTolerance": float64(1000),
  1224. },
  1225. }, sources)
  1226. if err != nil{
  1227. t.Error(err)
  1228. }
  1229. mockSink := test.NewMockSink()
  1230. sink := nodes.NewSinkNode("MockSink", mockSink)
  1231. tp.AddSink(inputs, sink)
  1232. count := len(sources)
  1233. errCh := tp.Open()
  1234. func(){
  1235. for{
  1236. select{
  1237. case err = <- errCh:
  1238. t.Log(err)
  1239. tp.Cancel()
  1240. return
  1241. case <- done:
  1242. count--
  1243. log.Infof("%d sources remaining", count)
  1244. if count <= 0{
  1245. log.Info("stream stopping")
  1246. time.Sleep(1 * time.Second)
  1247. tp.Cancel()
  1248. return
  1249. }
  1250. default:
  1251. }
  1252. }
  1253. }()
  1254. results := mockSink.GetResults()
  1255. var maps [][]map[string]interface{}
  1256. for _, v := range results{
  1257. var mapRes []map[string]interface{}
  1258. err := json.Unmarshal(v, &mapRes)
  1259. if err != nil {
  1260. t.Errorf("Failed to parse the input into map")
  1261. continue
  1262. }
  1263. maps = append(maps, mapRes)
  1264. }
  1265. if !reflect.DeepEqual(tt.r, maps) {
  1266. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1267. }
  1268. }
  1269. realTicker.Stop()
  1270. tickerDone <- true
  1271. close(tickerDone)
  1272. }
  1273. func errstring(err error) string {
  1274. if err != nil {
  1275. return err.Error()
  1276. }
  1277. return ""
  1278. }