xsql_processor_test.go 27 KB


  1. package processors
  2. import (
  3. "encoding/json"
  4. "engine/common"
  5. "engine/xsql"
  6. "engine/xstream/api"
  7. "engine/xstream/nodes"
  8. "engine/xstream/test"
  9. "fmt"
  10. "path"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. "time"
  15. )
  16. var BadgerDir string
  17. func init(){
  18. BadgerDir, err := common.GetAndCreateDataLoc("test")
  19. if err != nil {
  20. log.Panic(err)
  21. }
  22. log.Infof("badge location is %s", BadgerDir)
  23. }
  24. func TestStreamCreateProcessor(t *testing.T) {
  25. var tests = []struct {
  26. s string
  27. r []string
  28. err string
  29. }{
  30. {
  31. s: `SHOW STREAMS;`,
  32. r: []string{"no stream definition found"},
  33. },
  34. {
  35. s: `EXPLAIN STREAM topic1;`,
  36. err: "stream topic1 not found",
  37. },
  38. {
  39. s: `CREATE STREAM topic1 (
  40. USERID BIGINT,
  41. FIRST_NAME STRING,
  42. LAST_NAME STRING,
  43. NICKNAMES ARRAY(STRING),
  44. Gender BOOLEAN,
  45. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  46. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  47. r: []string{"stream topic1 created"},
  48. },
  49. {
  50. s: `CREATE STREAM topic1 (
  51. USERID BIGINT,
  52. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  53. err: "key topic1 already exist, delete it before creating a new one",
  54. },
  55. {
  56. s: `EXPLAIN STREAM topic1;`,
  57. r: []string{"TO BE SUPPORTED"},
  58. },
  59. {
  60. s: `DESCRIBE STREAM topic1;`,
  61. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  62. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  63. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  64. },
  65. {
  66. s: `SHOW STREAMS;`,
  67. r: []string{"topic1"},
  68. },
  69. {
  70. s: `DROP STREAM topic1;`,
  71. r: []string{"stream topic1 dropped"},
  72. },
  73. {
  74. s: `DESCRIBE STREAM topic1;`,
  75. err: "stream topic1 not found",
  76. },
  77. {
  78. s: `DROP STREAM topic1;`,
  79. err: "Key not found",
  80. },
  81. }
  82. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  83. streamDB := path.Join(BadgerDir, "streamTest")
  84. for i, tt := range tests {
  85. results, err := NewStreamProcessor(tt.s, streamDB).Exec()
  86. if !reflect.DeepEqual(tt.err, errstring(err)) {
  87. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  88. } else if tt.err == "" {
  89. if !reflect.DeepEqual(tt.r, results) {
  90. t.Errorf("%d. %q\n\nstmt mismatch:\n\ngot=%#v\n\n", i, tt.s, results)
  91. }
  92. }
  93. }
  94. }
  95. func createStreams(t *testing.T){
  96. demo := `CREATE STREAM demo (
  97. color STRING,
  98. size BIGINT,
  99. ts BIGINT
  100. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  101. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  102. if err != nil{
  103. t.Log(err)
  104. }
  105. demo1 := `CREATE STREAM demo1 (
  106. temp FLOAT,
  107. hum BIGINT,
  108. ts BIGINT
  109. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  110. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  111. if err != nil{
  112. t.Log(err)
  113. }
  114. sessionDemo := `CREATE STREAM sessionDemo (
  115. temp FLOAT,
  116. hum BIGINT,
  117. ts BIGINT
  118. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  119. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  120. if err != nil{
  121. t.Log(err)
  122. }
  123. }
  124. func dropStreams(t *testing.T){
  125. demo := `DROP STREAM demo`
  126. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  127. if err != nil{
  128. t.Log(err)
  129. }
  130. demo1 := `DROP STREAM demo1`
  131. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  132. if err != nil{
  133. t.Log(err)
  134. }
  135. sessionDemo := `DROP STREAM sessionDemo`
  136. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  137. if err != nil{
  138. t.Log(err)
  139. }
  140. }
  141. func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  142. var data []*xsql.Tuple
  143. switch name{
  144. case "demo":
  145. data = []*xsql.Tuple{
  146. {
  147. Emitter: name,
  148. Message: map[string]interface{}{
  149. "color": "red",
  150. "size": 3,
  151. "ts": 1541152486013,
  152. },
  153. Timestamp: 1541152486013,
  154. },
  155. {
  156. Emitter: name,
  157. Message: map[string]interface{}{
  158. "color": "blue",
  159. "size": 6,
  160. "ts": 1541152486822,
  161. },
  162. Timestamp: 1541152486822,
  163. },
  164. {
  165. Emitter: name,
  166. Message: map[string]interface{}{
  167. "color": "blue",
  168. "size": 2,
  169. "ts": 1541152487632,
  170. },
  171. Timestamp: 1541152487632,
  172. },
  173. {
  174. Emitter: name,
  175. Message: map[string]interface{}{
  176. "color": "yellow",
  177. "size": 4,
  178. "ts": 1541152488442,
  179. },
  180. Timestamp: 1541152488442,
  181. },
  182. {
  183. Emitter: name,
  184. Message: map[string]interface{}{
  185. "color": "red",
  186. "size": 1,
  187. "ts": 1541152489252,
  188. },
  189. Timestamp: 1541152489252,
  190. },
  191. }
  192. case "demo1":
  193. data = []*xsql.Tuple{
  194. {
  195. Emitter: name,
  196. Message: map[string]interface{}{
  197. "temp": 25.5,
  198. "hum": 65,
  199. "ts": 1541152486013,
  200. },
  201. Timestamp: 1541152486013,
  202. },
  203. {
  204. Emitter: name,
  205. Message: map[string]interface{}{
  206. "temp": 27.5,
  207. "hum": 59,
  208. "ts": 1541152486823,
  209. },
  210. Timestamp: 1541152486823,
  211. },
  212. {
  213. Emitter: name,
  214. Message: map[string]interface{}{
  215. "temp": 28.1,
  216. "hum": 75,
  217. "ts": 1541152487632,
  218. },
  219. Timestamp: 1541152487632,
  220. },
  221. {
  222. Emitter: name,
  223. Message: map[string]interface{}{
  224. "temp": 27.4,
  225. "hum": 80,
  226. "ts": 1541152488442,
  227. },
  228. Timestamp: 1541152488442,
  229. },
  230. {
  231. Emitter: name,
  232. Message: map[string]interface{}{
  233. "temp": 25.5,
  234. "hum": 62,
  235. "ts": 1541152489252,
  236. },
  237. Timestamp: 1541152489252,
  238. },
  239. }
  240. case "sessionDemo":
  241. data = []*xsql.Tuple{
  242. {
  243. Emitter: name,
  244. Message: map[string]interface{}{
  245. "temp": 25.5,
  246. "hum": 65,
  247. "ts": 1541152486013,
  248. },
  249. Timestamp: 1541152486013,
  250. },
  251. {
  252. Emitter: name,
  253. Message: map[string]interface{}{
  254. "temp": 27.5,
  255. "hum": 59,
  256. "ts": 1541152486823,
  257. },
  258. Timestamp: 1541152486823,
  259. },
  260. {
  261. Emitter: name,
  262. Message: map[string]interface{}{
  263. "temp": 28.1,
  264. "hum": 75,
  265. "ts": 1541152487932,
  266. },
  267. Timestamp: 1541152487932,
  268. },
  269. {
  270. Emitter: name,
  271. Message: map[string]interface{}{
  272. "temp": 27.4,
  273. "hum": 80,
  274. "ts": 1541152488442,
  275. },
  276. Timestamp: 1541152488442,
  277. },
  278. {
  279. Emitter: name,
  280. Message: map[string]interface{}{
  281. "temp": 25.5,
  282. "hum": 62,
  283. "ts": 1541152489252,
  284. },
  285. Timestamp: 1541152489252,
  286. },
  287. {
  288. Emitter: name,
  289. Message: map[string]interface{}{
  290. "temp": 26.2,
  291. "hum": 63,
  292. "ts": 1541152490062,
  293. },
  294. Timestamp: 1541152490062,
  295. },
  296. {
  297. Emitter: name,
  298. Message: map[string]interface{}{
  299. "temp": 26.8,
  300. "hum": 71,
  301. "ts": 1541152490872,
  302. },
  303. Timestamp: 1541152490872,
  304. },
  305. {
  306. Emitter: name,
  307. Message: map[string]interface{}{
  308. "temp": 28.9,
  309. "hum": 85,
  310. "ts": 1541152491682,
  311. },
  312. Timestamp: 1541152491682,
  313. },
  314. {
  315. Emitter: name,
  316. Message: map[string]interface{}{
  317. "temp": 29.1,
  318. "hum": 92,
  319. "ts": 1541152492492,
  320. },
  321. Timestamp: 1541152492492,
  322. },
  323. {
  324. Emitter: name,
  325. Message: map[string]interface{}{
  326. "temp": 32.2,
  327. "hum": 99,
  328. "ts": 1541152493202,
  329. },
  330. Timestamp: 1541152493202,
  331. },
  332. {
  333. Emitter: name,
  334. Message: map[string]interface{}{
  335. "temp": 30.9,
  336. "hum": 87,
  337. "ts": 1541152494112,
  338. },
  339. Timestamp: 1541152494112,
  340. },
  341. }
  342. }
  343. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, false))
  344. }
  345. func TestSingleSQL(t *testing.T) {
  346. var tests = []struct {
  347. name string
  348. sql string
  349. r [][]map[string]interface{}
  350. }{
  351. {
  352. name: `rule1`,
  353. sql: `SELECT * FROM demo`,
  354. r: [][]map[string]interface{}{
  355. {{
  356. "color": "red",
  357. "size": float64(3),
  358. "ts": float64(1541152486013),
  359. }},
  360. {{
  361. "color": "blue",
  362. "size": float64(6),
  363. "ts": float64(1541152486822),
  364. }},
  365. {{
  366. "color": "blue",
  367. "size": float64(2),
  368. "ts": float64(1541152487632),
  369. }},
  370. {{
  371. "color": "yellow",
  372. "size": float64(4),
  373. "ts": float64(1541152488442),
  374. }},
  375. {{
  376. "color": "red",
  377. "size": float64(1),
  378. "ts": float64(1541152489252),
  379. }},
  380. },
  381. }, {
  382. name: `rule2`,
  383. sql: `SELECT color, ts FROM demo where size > 3`,
  384. r: [][]map[string]interface{}{
  385. {{
  386. "color": "blue",
  387. "ts": float64(1541152486822),
  388. }},
  389. {{
  390. "color": "yellow",
  391. "ts": float64(1541152488442),
  392. }},
  393. },
  394. },
  395. }
  396. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  397. createStreams(t)
  398. defer dropStreams(t)
  399. done := make(chan struct{})
  400. defer close(done)
  401. for i, tt := range tests {
  402. p := NewRuleProcessor(BadgerDir)
  403. parser := xsql.NewParser(strings.NewReader(tt.sql))
  404. var sources []*nodes.SourceNode
  405. if stmt, err := xsql.Language.Parse(parser); err != nil{
  406. t.Errorf("parse sql %s error: %s", tt.sql , err)
  407. }else {
  408. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  409. t.Errorf("sql %s is not a select statement", tt.sql)
  410. } else {
  411. streams := xsql.GetStreams(selectStmt)
  412. for _, stream := range streams{
  413. source := getMockSource(stream, done, 5)
  414. sources = append(sources, source)
  415. }
  416. }
  417. }
  418. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  419. if err != nil{
  420. t.Error(err)
  421. }
  422. mockSink := test.NewMockSink()
  423. sink := nodes.NewSinkNode("MockSink", mockSink)
  424. tp.AddSink(inputs, sink)
  425. count := len(sources)
  426. errCh := tp.Open()
  427. func(){
  428. for{
  429. select{
  430. case err = <- errCh:
  431. t.Log(err)
  432. tp.Cancel()
  433. return
  434. case <- done:
  435. count--
  436. log.Infof("%d sources remaining", count)
  437. if count <= 0{
  438. log.Info("stream stopping")
  439. time.Sleep(1 * time.Second)
  440. tp.Cancel()
  441. return
  442. }
  443. default:
  444. }
  445. }
  446. }()
  447. results := mockSink.GetResults()
  448. var maps [][]map[string]interface{}
  449. for _, v := range results{
  450. var mapRes []map[string]interface{}
  451. err := json.Unmarshal(v, &mapRes)
  452. if err != nil {
  453. t.Errorf("Failed to parse the input into map")
  454. continue
  455. }
  456. maps = append(maps, mapRes)
  457. }
  458. if !reflect.DeepEqual(tt.r, maps) {
  459. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  460. }
  461. }
  462. }
  463. func TestWindow(t *testing.T) {
  464. common.IsTesting = true
  465. var tests = []struct {
  466. name string
  467. sql string
  468. size int
  469. r [][]map[string]interface{}
  470. }{
  471. {
  472. name: `rule1`,
  473. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  474. size: 5,
  475. r: [][]map[string]interface{}{
  476. {{
  477. "color": "red",
  478. "size": float64(3),
  479. "ts": float64(1541152486013),
  480. },{
  481. "color": "blue",
  482. "size": float64(6),
  483. "ts": float64(1541152486822),
  484. }},
  485. {{
  486. "color": "red",
  487. "size": float64(3),
  488. "ts": float64(1541152486013),
  489. },{
  490. "color": "blue",
  491. "size": float64(6),
  492. "ts": float64(1541152486822),
  493. },{
  494. "color": "blue",
  495. "size": float64(2),
  496. "ts": float64(1541152487632),
  497. }},
  498. {{
  499. "color": "blue",
  500. "size": float64(2),
  501. "ts": float64(1541152487632),
  502. },{
  503. "color": "yellow",
  504. "size": float64(4),
  505. "ts": float64(1541152488442),
  506. }},
  507. },
  508. }, {
  509. name: `rule2`,
  510. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  511. size: 5,
  512. r: [][]map[string]interface{}{
  513. {{
  514. "color": "red",
  515. "ts": float64(1541152486013),
  516. },{
  517. "color": "blue",
  518. "ts": float64(1541152486822),
  519. }},
  520. {{
  521. "color": "yellow",
  522. "ts": float64(1541152488442),
  523. }},
  524. },
  525. }, {
  526. name: `rule3`,
  527. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  528. size: 5,
  529. r: [][]map[string]interface{}{
  530. {{
  531. "color": "red",
  532. "temp": 25.5,
  533. "ts": float64(1541152486013),
  534. }},{{
  535. "color": "red",
  536. "temp": 25.5,
  537. "ts": float64(1541152486013),
  538. }},{{
  539. "color": "red",
  540. "temp": 25.5,
  541. "ts": float64(1541152486013),
  542. }},{{
  543. "color": "blue",
  544. "temp": 28.1,
  545. "ts": float64(1541152487632),
  546. }},{{
  547. "color": "blue",
  548. "temp": 28.1,
  549. "ts": float64(1541152487632),
  550. }},{{
  551. "color": "blue",
  552. "temp": 28.1,
  553. "ts": float64(1541152487632),
  554. },{
  555. "color": "yellow",
  556. "temp": 27.4,
  557. "ts": float64(1541152488442),
  558. }},{{
  559. "color": "yellow",
  560. "temp": 27.4,
  561. "ts": float64(1541152488442),
  562. }},{{
  563. "color": "yellow",
  564. "temp": 27.4,
  565. "ts": float64(1541152488442),
  566. },{
  567. "color": "red",
  568. "temp": 25.5,
  569. "ts": float64(1541152489252),
  570. }},
  571. },
  572. }, {
  573. name: `rule4`,
  574. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  575. size: 5,
  576. r: [][]map[string]interface{}{
  577. {{
  578. "color": "red",
  579. }},{{
  580. "color": "blue",
  581. },{
  582. "color": "red",
  583. }},{{
  584. "color": "blue",
  585. },{
  586. "color": "red",
  587. }},{{
  588. "color": "blue",
  589. },{
  590. "color": "yellow",
  591. }},{{
  592. "color": "blue",
  593. }, {
  594. "color": "red",
  595. },{
  596. "color": "yellow",
  597. }},
  598. },
  599. },{
  600. name: `rule5`,
  601. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  602. size: 11,
  603. r: [][]map[string]interface{}{
  604. {{
  605. "temp": 25.5,
  606. },{
  607. "temp": 27.5,
  608. }},{{
  609. "temp": 28.1,
  610. },{
  611. "temp": 27.4,
  612. },{
  613. "temp": 25.5,
  614. }},{{
  615. "temp": 26.2,
  616. },{
  617. "temp": 26.8,
  618. },{
  619. "temp": 28.9,
  620. },{
  621. "temp": 29.1,
  622. },{
  623. "temp": 32.2,
  624. }},
  625. },
  626. },{
  627. name: `rule6`,
  628. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  629. size: 5,
  630. r: [][]map[string]interface{}{
  631. {{
  632. "m": 25.5,
  633. "c": float64(1),
  634. }},{{
  635. "m": 25.5,
  636. "c": float64(1),
  637. }},{{
  638. "m": 25.5,
  639. "c": float64(1),
  640. }},{{
  641. "m": 28.1,
  642. "c": float64(1),
  643. }},{{
  644. "m": 28.1,
  645. "c": float64(1),
  646. }},{{
  647. "m": 28.1,
  648. "c": float64(2),
  649. }},{{
  650. "m": 27.4,
  651. "c": float64(1),
  652. }},{{
  653. "m": 27.4,
  654. "c": float64(2),
  655. }},
  656. },
  657. },
  658. }
  659. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  660. createStreams(t)
  661. defer dropStreams(t)
  662. done := make(chan struct{})
  663. defer close(done)
  664. common.ResetMockTicker()
  665. for i, tt := range tests {
  666. p := NewRuleProcessor(BadgerDir)
  667. parser := xsql.NewParser(strings.NewReader(tt.sql))
  668. var sources []*nodes.SourceNode
  669. if stmt, err := xsql.Language.Parse(parser); err != nil{
  670. t.Errorf("parse sql %s error: %s", tt.sql , err)
  671. }else {
  672. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  673. t.Errorf("sql %s is not a select statement", tt.sql)
  674. } else {
  675. streams := xsql.GetStreams(selectStmt)
  676. for _, stream := range streams{
  677. source := getMockSource(stream, done, tt.size)
  678. sources = append(sources, source)
  679. }
  680. }
  681. }
  682. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  683. if err != nil{
  684. t.Error(err)
  685. }
  686. mockSink := test.NewMockSink()
  687. sink := nodes.NewSinkNode("mockSink", mockSink)
  688. tp.AddSink(inputs, sink)
  689. count := len(sources)
  690. errCh := tp.Open()
  691. func(){
  692. for{
  693. select{
  694. case err = <- errCh:
  695. t.Log(err)
  696. tp.Cancel()
  697. return
  698. case <- done:
  699. count--
  700. log.Infof("%d sources remaining", count)
  701. if count <= 0{
  702. log.Info("stream stopping")
  703. time.Sleep(1 * time.Second)
  704. tp.Cancel()
  705. return
  706. }
  707. default:
  708. }
  709. }
  710. }()
  711. results := mockSink.GetResults()
  712. var maps [][]map[string]interface{}
  713. for _, v := range results{
  714. var mapRes []map[string]interface{}
  715. err := json.Unmarshal(v, &mapRes)
  716. if err != nil {
  717. t.Errorf("Failed to parse the input into map")
  718. continue
  719. }
  720. maps = append(maps, mapRes)
  721. }
  722. if !reflect.DeepEqual(tt.r, maps) {
  723. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  724. }
  725. }
  726. }
  727. func createEventStreams(t *testing.T){
  728. demo := `CREATE STREAM demoE (
  729. color STRING,
  730. size BIGINT,
  731. ts BIGINT
  732. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  733. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  734. if err != nil{
  735. t.Log(err)
  736. }
  737. demo1 := `CREATE STREAM demo1E (
  738. temp FLOAT,
  739. hum BIGINT,
  740. ts BIGINT
  741. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  742. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  743. if err != nil{
  744. t.Log(err)
  745. }
  746. sessionDemo := `CREATE STREAM sessionDemoE (
  747. temp FLOAT,
  748. hum BIGINT,
  749. ts BIGINT
  750. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  751. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  752. if err != nil{
  753. t.Log(err)
  754. }
  755. }
  756. func dropEventStreams(t *testing.T){
  757. demo := `DROP STREAM demoE`
  758. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  759. if err != nil{
  760. t.Log(err)
  761. }
  762. demo1 := `DROP STREAM demo1E`
  763. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  764. if err != nil{
  765. t.Log(err)
  766. }
  767. sessionDemo := `DROP STREAM sessionDemoE`
  768. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  769. if err != nil{
  770. t.Log(err)
  771. }
  772. }
  773. func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  774. var data []*xsql.Tuple
  775. switch name{
  776. case "demoE":
  777. data = []*xsql.Tuple{
  778. {
  779. Emitter: name,
  780. Message: map[string]interface{}{
  781. "color": "red",
  782. "size": 3,
  783. "ts": 1541152486013,
  784. },
  785. Timestamp: 1541152486013,
  786. },
  787. {
  788. Emitter: name,
  789. Message: map[string]interface{}{
  790. "color": "blue",
  791. "size": 2,
  792. "ts": 1541152487632,
  793. },
  794. Timestamp: 1541152487632,
  795. },
  796. {
  797. Emitter: name,
  798. Message: map[string]interface{}{
  799. "color": "red",
  800. "size": 1,
  801. "ts": 1541152489252,
  802. },
  803. Timestamp: 1541152489252,
  804. },
  805. { //dropped item
  806. Emitter: name,
  807. Message: map[string]interface{}{
  808. "color": "blue",
  809. "size": 6,
  810. "ts": 1541152486822,
  811. },
  812. Timestamp: 1541152486822,
  813. },
  814. {
  815. Emitter: name,
  816. Message: map[string]interface{}{
  817. "color": "yellow",
  818. "size": 4,
  819. "ts": 1541152488442,
  820. },
  821. Timestamp: 1541152488442,
  822. },
  823. { //To lift the watermark and issue all windows
  824. Emitter: name,
  825. Message: map[string]interface{}{
  826. "color": "yellow",
  827. "size": 4,
  828. "ts": 1541152492342,
  829. },
  830. Timestamp: 1541152488442,
  831. },
  832. }
  833. case "demo1E":
  834. data = []*xsql.Tuple{
  835. {
  836. Emitter: name,
  837. Message: map[string]interface{}{
  838. "temp": 27.5,
  839. "hum": 59,
  840. "ts": 1541152486823,
  841. },
  842. Timestamp: 1541152486823,
  843. },
  844. {
  845. Emitter: name,
  846. Message: map[string]interface{}{
  847. "temp": 25.5,
  848. "hum": 65,
  849. "ts": 1541152486013,
  850. },
  851. Timestamp: 1541152486013,
  852. },
  853. {
  854. Emitter: name,
  855. Message: map[string]interface{}{
  856. "temp": 27.4,
  857. "hum": 80,
  858. "ts": 1541152488442,
  859. },
  860. Timestamp: 1541152488442,
  861. },
  862. {
  863. Emitter: name,
  864. Message: map[string]interface{}{
  865. "temp": 28.1,
  866. "hum": 75,
  867. "ts": 1541152487632,
  868. },
  869. Timestamp: 1541152487632,
  870. },
  871. {
  872. Emitter: name,
  873. Message: map[string]interface{}{
  874. "temp": 25.5,
  875. "hum": 62,
  876. "ts": 1541152489252,
  877. },
  878. Timestamp: 1541152489252,
  879. },
  880. {
  881. Emitter: name,
  882. Message: map[string]interface{}{
  883. "temp": 25.5,
  884. "hum": 62,
  885. "ts": 1541152499252,
  886. },
  887. Timestamp: 1541152499252,
  888. },
  889. }
  890. case "sessionDemoE":
  891. data = []*xsql.Tuple{
  892. {
  893. Emitter: name,
  894. Message: map[string]interface{}{
  895. "temp": 25.5,
  896. "hum": 65,
  897. "ts": 1541152486013,
  898. },
  899. Timestamp: 1541152486013,
  900. },
  901. {
  902. Emitter: name,
  903. Message: map[string]interface{}{
  904. "temp": 28.1,
  905. "hum": 75,
  906. "ts": 1541152487932,
  907. },
  908. Timestamp: 1541152487932,
  909. },
  910. {
  911. Emitter: name,
  912. Message: map[string]interface{}{
  913. "temp": 27.5,
  914. "hum": 59,
  915. "ts": 1541152486823,
  916. },
  917. Timestamp: 1541152486823,
  918. },
  919. {
  920. Emitter: name,
  921. Message: map[string]interface{}{
  922. "temp": 25.5,
  923. "hum": 62,
  924. "ts": 1541152489252,
  925. },
  926. Timestamp: 1541152489252,
  927. },
  928. {
  929. Emitter: name,
  930. Message: map[string]interface{}{
  931. "temp": 27.4,
  932. "hum": 80,
  933. "ts": 1541152488442,
  934. },
  935. Timestamp: 1541152488442,
  936. },
  937. {
  938. Emitter: name,
  939. Message: map[string]interface{}{
  940. "temp": 26.2,
  941. "hum": 63,
  942. "ts": 1541152490062,
  943. },
  944. Timestamp: 1541152490062,
  945. },
  946. {
  947. Emitter: name,
  948. Message: map[string]interface{}{
  949. "temp": 28.9,
  950. "hum": 85,
  951. "ts": 1541152491682,
  952. },
  953. Timestamp: 1541152491682,
  954. },
  955. {
  956. Emitter: name,
  957. Message: map[string]interface{}{
  958. "temp": 26.8,
  959. "hum": 71,
  960. "ts": 1541152490872,
  961. },
  962. Timestamp: 1541152490872,
  963. },
  964. {
  965. Emitter: name,
  966. Message: map[string]interface{}{
  967. "temp": 29.1,
  968. "hum": 92,
  969. "ts": 1541152492492,
  970. },
  971. Timestamp: 1541152492492,
  972. },
  973. {
  974. Emitter: name,
  975. Message: map[string]interface{}{
  976. "temp": 30.9,
  977. "hum": 87,
  978. "ts": 1541152494112,
  979. },
  980. Timestamp: 1541152494112,
  981. },
  982. {
  983. Emitter: name,
  984. Message: map[string]interface{}{
  985. "temp": 32.2,
  986. "hum": 99,
  987. "ts": 1541152493202,
  988. },
  989. Timestamp: 1541152493202,
  990. },
  991. {
  992. Emitter: name,
  993. Message: map[string]interface{}{
  994. "temp": 32.2,
  995. "hum": 99,
  996. "ts": 1541152499202,
  997. },
  998. Timestamp: 1541152499202,
  999. },
  1000. }
  1001. }
  1002. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, true))
  1003. }
  1004. func TestEventWindow(t *testing.T) {
  1005. common.IsTesting = true
  1006. var tests = []struct {
  1007. name string
  1008. sql string
  1009. size int
  1010. r [][]map[string]interface{}
  1011. }{
  1012. {
  1013. name: `rule1`,
  1014. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1015. size: 6,
  1016. r: [][]map[string]interface{}{
  1017. {{
  1018. "color": "red",
  1019. "size": float64(3),
  1020. "ts": float64(1541152486013),
  1021. }},
  1022. {{
  1023. "color": "red",
  1024. "size": float64(3),
  1025. "ts": float64(1541152486013),
  1026. },{
  1027. "color": "blue",
  1028. "size": float64(2),
  1029. "ts": float64(1541152487632),
  1030. }},
  1031. {{
  1032. "color": "blue",
  1033. "size": float64(2),
  1034. "ts": float64(1541152487632),
  1035. },{
  1036. "color": "yellow",
  1037. "size": float64(4),
  1038. "ts": float64(1541152488442),
  1039. }},{{
  1040. "color": "yellow",
  1041. "size": float64(4),
  1042. "ts": float64(1541152488442),
  1043. },{
  1044. "color": "red",
  1045. "size": float64(1),
  1046. "ts": float64(1541152489252),
  1047. }},{{
  1048. "color": "red",
  1049. "size": float64(1),
  1050. "ts": float64(1541152489252),
  1051. }},
  1052. },
  1053. }, {
  1054. name: `rule2`,
  1055. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1056. size: 6,
  1057. r: [][]map[string]interface{}{
  1058. {{
  1059. "color": "red",
  1060. "ts": float64(1541152486013),
  1061. }},
  1062. {{
  1063. "color": "yellow",
  1064. "ts": float64(1541152488442),
  1065. }},
  1066. },
  1067. }, {
  1068. name: `rule3`,
  1069. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1070. size: 6,
  1071. r: [][]map[string]interface{}{
  1072. {{
  1073. "color": "red",
  1074. "temp": 25.5,
  1075. "ts": float64(1541152486013),
  1076. }},{{
  1077. "color": "red",
  1078. "temp": 25.5,
  1079. "ts": float64(1541152486013),
  1080. }},{{
  1081. "color": "blue",
  1082. "temp": 28.1,
  1083. "ts": float64(1541152487632),
  1084. }},{{
  1085. "color": "blue",
  1086. "temp": 28.1,
  1087. "ts": float64(1541152487632),
  1088. },{
  1089. "color": "yellow",
  1090. "temp": 27.4,
  1091. "ts": float64(1541152488442),
  1092. }},{{
  1093. "color": "yellow",
  1094. "temp": 27.4,
  1095. "ts": float64(1541152488442),
  1096. },{
  1097. "color": "red",
  1098. "temp": 25.5,
  1099. "ts": float64(1541152489252),
  1100. }},
  1101. },
  1102. }, {
  1103. name: `rule4`,
  1104. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1105. size: 6,
  1106. r: [][]map[string]interface{}{
  1107. {{
  1108. "color": "red",
  1109. }},{{
  1110. "color": "blue",
  1111. },{
  1112. "color": "red",
  1113. }},{{
  1114. "color": "blue",
  1115. },{
  1116. "color": "yellow",
  1117. }},{{
  1118. "color": "blue",
  1119. }, {
  1120. "color": "red",
  1121. },{
  1122. "color": "yellow",
  1123. }},
  1124. },
  1125. },{
  1126. name: `rule5`,
  1127. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1128. size: 12,
  1129. r: [][]map[string]interface{}{
  1130. {{
  1131. "temp": 25.5,
  1132. }},{{
  1133. "temp": 28.1,
  1134. },{
  1135. "temp": 27.4,
  1136. },{
  1137. "temp": 25.5,
  1138. }},{{
  1139. "temp": 26.2,
  1140. },{
  1141. "temp": 26.8,
  1142. },{
  1143. "temp": 28.9,
  1144. },{
  1145. "temp": 29.1,
  1146. },{
  1147. "temp": 32.2,
  1148. }},{{
  1149. "temp": 30.9,
  1150. }},
  1151. },
  1152. },{
  1153. name: `rule6`,
  1154. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1155. size: 6,
  1156. r: [][]map[string]interface{}{
  1157. {{
  1158. "m": 25.5,
  1159. "c": float64(1),
  1160. }},{{
  1161. "m": 25.5,
  1162. "c": float64(1),
  1163. }},{{
  1164. "m": 28.1,
  1165. "c": float64(1),
  1166. }},{{
  1167. "m": 28.1,
  1168. "c": float64(2),
  1169. }},{{
  1170. "m": 27.4,
  1171. "c": float64(2),
  1172. }},
  1173. },
  1174. },
  1175. }
  1176. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1177. createEventStreams(t)
  1178. defer dropEventStreams(t)
  1179. done := make(chan struct{})
  1180. defer close(done)
  1181. common.ResetMockTicker()
  1182. //mock ticker
  1183. realTicker := time.NewTicker(500 * time.Millisecond)
  1184. tickerDone := make(chan bool)
  1185. go func(){
  1186. ticker := common.GetTicker(1000).(*common.MockTicker)
  1187. timer := common.GetTimer(1000).(*common.MockTimer)
  1188. for {
  1189. select {
  1190. case <-tickerDone:
  1191. log.Infof("real ticker exiting...")
  1192. return
  1193. case t := <-realTicker.C:
  1194. ts := common.TimeToUnixMilli(t)
  1195. if ticker != nil {
  1196. go ticker.DoTick(ts)
  1197. }
  1198. if timer != nil {
  1199. go timer.DoTick(ts)
  1200. }
  1201. }
  1202. }
  1203. }()
  1204. for i, tt := range tests {
  1205. p := NewRuleProcessor(BadgerDir)
  1206. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1207. var sources []*nodes.SourceNode
  1208. if stmt, err := xsql.Language.Parse(parser); err != nil{
  1209. t.Errorf("parse sql %s error: %s", tt.sql , err)
  1210. }else {
  1211. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1212. t.Errorf("sql %s is not a select statement", tt.sql)
  1213. } else {
  1214. streams := xsql.GetStreams(selectStmt)
  1215. for _, stream := range streams{
  1216. source := getEventMockSource(stream, done, tt.size)
  1217. sources = append(sources, source)
  1218. }
  1219. }
  1220. }
  1221. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  1222. Id:tt.name, Sql: tt.sql,
  1223. Options: map[string]interface{}{
  1224. "isEventTime": true,
  1225. "lateTolerance": float64(1000),
  1226. },
  1227. }, sources)
  1228. if err != nil{
  1229. t.Error(err)
  1230. }
  1231. mockSink := test.NewMockSink()
  1232. sink := nodes.NewSinkNode("MockSink", mockSink)
  1233. tp.AddSink(inputs, sink)
  1234. count := len(sources)
  1235. errCh := tp.Open()
  1236. func(){
  1237. for{
  1238. select{
  1239. case err = <- errCh:
  1240. t.Log(err)
  1241. tp.Cancel()
  1242. return
  1243. case <- done:
  1244. count--
  1245. log.Infof("%d sources remaining", count)
  1246. if count <= 0{
  1247. log.Info("stream stopping")
  1248. time.Sleep(1 * time.Second)
  1249. tp.Cancel()
  1250. return
  1251. }
  1252. default:
  1253. }
  1254. }
  1255. }()
  1256. results := mockSink.GetResults()
  1257. var maps [][]map[string]interface{}
  1258. for _, v := range results{
  1259. var mapRes []map[string]interface{}
  1260. err := json.Unmarshal(v, &mapRes)
  1261. if err != nil {
  1262. t.Errorf("Failed to parse the input into map")
  1263. continue
  1264. }
  1265. maps = append(maps, mapRes)
  1266. }
  1267. if !reflect.DeepEqual(tt.r, maps) {
  1268. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1269. }
  1270. }
  1271. realTicker.Stop()
  1272. tickerDone <- true
  1273. close(tickerDone)
  1274. }
  1275. func errstring(err error) string {
  1276. if err != nil {
  1277. return err.Error()
  1278. }
  1279. return ""
  1280. }