xsql_processor_test.go 28 KB


  1. package processors
  2. import (
  3. "encoding/json"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/nodes"
  8. "github.com/emqx/kuiper/xstream/test"
  9. "fmt"
  10. "path"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. "time"
  15. )
  16. var DbDir = getDbDir()
  17. func getDbDir() string{
  18. dbDir, err := common.GetAndCreateDataLoc("test")
  19. if err != nil {
  20. log.Panic(err)
  21. }
  22. log.Infof("db location is %s", dbDir)
  23. return dbDir
  24. }
  25. func TestStreamCreateProcessor(t *testing.T) {
  26. var tests = []struct {
  27. s string
  28. r []string
  29. err string
  30. }{
  31. {
  32. s: `SHOW STREAMS;`,
  33. r: []string{"No stream definitions are found."},
  34. },
  35. {
  36. s: `EXPLAIN STREAM topic1;`,
  37. err: "Stream topic1 is not found.",
  38. },
  39. {
  40. s: `CREATE STREAM topic1 (
  41. USERID BIGINT,
  42. FIRST_NAME STRING,
  43. LAST_NAME STRING,
  44. NICKNAMES ARRAY(STRING),
  45. Gender BOOLEAN,
  46. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  47. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  48. r: []string{"Stream topic1 is created."},
  49. },
  50. {
  51. s: `CREATE STREAM topic1 (
  52. USERID BIGINT,
  53. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  54. err: "Create stream fails: Item topic1 already exists.",
  55. },
  56. {
  57. s: `EXPLAIN STREAM topic1;`,
  58. r: []string{"TO BE SUPPORTED"},
  59. },
  60. {
  61. s: `DESCRIBE STREAM topic1;`,
  62. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  63. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  64. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  65. },
  66. {
  67. s: `SHOW STREAMS;`,
  68. r: []string{"topic1"},
  69. },
  70. {
  71. s: `DROP STREAM topic1;`,
  72. r: []string{"Stream topic1 is dropped."},
  73. },
  74. {
  75. s: `DESCRIBE STREAM topic1;`,
  76. err: "Stream topic1 is not found.",
  77. },
  78. {
  79. s: `DROP STREAM topic1;`,
  80. err: "Drop stream fails: topic1 is not found.",
  81. },
  82. }
  83. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  84. streamDB := path.Join(getDbDir(), "streamTest")
  85. for i, tt := range tests {
  86. results, err := NewStreamProcessor(tt.s, streamDB).Exec()
  87. if !reflect.DeepEqual(tt.err, errstring(err)) {
  88. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  89. } else if tt.err == "" {
  90. if !reflect.DeepEqual(tt.r, results) {
  91. t.Errorf("%d. %q\n\nstmt mismatch:\n\ngot=%#v\n\n", i, tt.s, results)
  92. }
  93. }
  94. }
  95. }
  96. func createStreams(t *testing.T){
  97. demo := `CREATE STREAM demo (
  98. color STRING,
  99. size BIGINT,
  100. ts BIGINT
  101. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  102. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  103. if err != nil{
  104. t.Log(err)
  105. }
  106. demo1 := `CREATE STREAM demo1 (
  107. temp FLOAT,
  108. hum BIGINT,
  109. ts BIGINT
  110. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  111. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  112. if err != nil{
  113. t.Log(err)
  114. }
  115. sessionDemo := `CREATE STREAM sessionDemo (
  116. temp FLOAT,
  117. hum BIGINT,
  118. ts BIGINT
  119. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  120. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  121. if err != nil{
  122. t.Log(err)
  123. }
  124. }
  125. func dropStreams(t *testing.T){
  126. demo := `DROP STREAM demo`
  127. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  128. if err != nil{
  129. t.Log(err)
  130. }
  131. demo1 := `DROP STREAM demo1`
  132. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  133. if err != nil{
  134. t.Log(err)
  135. }
  136. sessionDemo := `DROP STREAM sessionDemo`
  137. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  138. if err != nil{
  139. t.Log(err)
  140. }
  141. }
  142. func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  143. var data []*xsql.Tuple
  144. switch name{
  145. case "demo":
  146. data = []*xsql.Tuple{
  147. {
  148. Emitter: name,
  149. Message: map[string]interface{}{
  150. "color": "red",
  151. "size": 3,
  152. "ts": 1541152486013,
  153. },
  154. Timestamp: 1541152486013,
  155. },
  156. {
  157. Emitter: name,
  158. Message: map[string]interface{}{
  159. "color": "blue",
  160. "size": 6,
  161. "ts": 1541152486822,
  162. },
  163. Timestamp: 1541152486822,
  164. },
  165. {
  166. Emitter: name,
  167. Message: map[string]interface{}{
  168. "color": "blue",
  169. "size": 2,
  170. "ts": 1541152487632,
  171. },
  172. Timestamp: 1541152487632,
  173. },
  174. {
  175. Emitter: name,
  176. Message: map[string]interface{}{
  177. "color": "yellow",
  178. "size": 4,
  179. "ts": 1541152488442,
  180. },
  181. Timestamp: 1541152488442,
  182. },
  183. {
  184. Emitter: name,
  185. Message: map[string]interface{}{
  186. "color": "red",
  187. "size": 1,
  188. "ts": 1541152489252,
  189. },
  190. Timestamp: 1541152489252,
  191. },
  192. }
  193. case "demo1":
  194. data = []*xsql.Tuple{
  195. {
  196. Emitter: name,
  197. Message: map[string]interface{}{
  198. "temp": 25.5,
  199. "hum": 65,
  200. "ts": 1541152486013,
  201. },
  202. Timestamp: 1541152486013,
  203. },
  204. {
  205. Emitter: name,
  206. Message: map[string]interface{}{
  207. "temp": 27.5,
  208. "hum": 59,
  209. "ts": 1541152486823,
  210. },
  211. Timestamp: 1541152486823,
  212. },
  213. {
  214. Emitter: name,
  215. Message: map[string]interface{}{
  216. "temp": 28.1,
  217. "hum": 75,
  218. "ts": 1541152487632,
  219. },
  220. Timestamp: 1541152487632,
  221. },
  222. {
  223. Emitter: name,
  224. Message: map[string]interface{}{
  225. "temp": 27.4,
  226. "hum": 80,
  227. "ts": 1541152488442,
  228. },
  229. Timestamp: 1541152488442,
  230. },
  231. {
  232. Emitter: name,
  233. Message: map[string]interface{}{
  234. "temp": 25.5,
  235. "hum": 62,
  236. "ts": 1541152489252,
  237. },
  238. Timestamp: 1541152489252,
  239. },
  240. }
  241. case "sessionDemo":
  242. data = []*xsql.Tuple{
  243. {
  244. Emitter: name,
  245. Message: map[string]interface{}{
  246. "temp": 25.5,
  247. "hum": 65,
  248. "ts": 1541152486013,
  249. },
  250. Timestamp: 1541152486013,
  251. },
  252. {
  253. Emitter: name,
  254. Message: map[string]interface{}{
  255. "temp": 27.5,
  256. "hum": 59,
  257. "ts": 1541152486823,
  258. },
  259. Timestamp: 1541152486823,
  260. },
  261. {
  262. Emitter: name,
  263. Message: map[string]interface{}{
  264. "temp": 28.1,
  265. "hum": 75,
  266. "ts": 1541152487932,
  267. },
  268. Timestamp: 1541152487932,
  269. },
  270. {
  271. Emitter: name,
  272. Message: map[string]interface{}{
  273. "temp": 27.4,
  274. "hum": 80,
  275. "ts": 1541152488442,
  276. },
  277. Timestamp: 1541152488442,
  278. },
  279. {
  280. Emitter: name,
  281. Message: map[string]interface{}{
  282. "temp": 25.5,
  283. "hum": 62,
  284. "ts": 1541152489252,
  285. },
  286. Timestamp: 1541152489252,
  287. },
  288. {
  289. Emitter: name,
  290. Message: map[string]interface{}{
  291. "temp": 26.2,
  292. "hum": 63,
  293. "ts": 1541152490062,
  294. },
  295. Timestamp: 1541152490062,
  296. },
  297. {
  298. Emitter: name,
  299. Message: map[string]interface{}{
  300. "temp": 26.8,
  301. "hum": 71,
  302. "ts": 1541152490872,
  303. },
  304. Timestamp: 1541152490872,
  305. },
  306. {
  307. Emitter: name,
  308. Message: map[string]interface{}{
  309. "temp": 28.9,
  310. "hum": 85,
  311. "ts": 1541152491682,
  312. },
  313. Timestamp: 1541152491682,
  314. },
  315. {
  316. Emitter: name,
  317. Message: map[string]interface{}{
  318. "temp": 29.1,
  319. "hum": 92,
  320. "ts": 1541152492492,
  321. },
  322. Timestamp: 1541152492492,
  323. },
  324. {
  325. Emitter: name,
  326. Message: map[string]interface{}{
  327. "temp": 32.2,
  328. "hum": 99,
  329. "ts": 1541152493202,
  330. },
  331. Timestamp: 1541152493202,
  332. },
  333. {
  334. Emitter: name,
  335. Message: map[string]interface{}{
  336. "temp": 30.9,
  337. "hum": 87,
  338. "ts": 1541152494112,
  339. },
  340. Timestamp: 1541152494112,
  341. },
  342. }
  343. }
  344. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, false), map[string]string{
  345. "DATASOURCE": name,
  346. })
  347. }
  348. func TestSingleSQL(t *testing.T) {
  349. var tests = []struct {
  350. name string
  351. sql string
  352. r [][]map[string]interface{}
  353. }{
  354. {
  355. name: `rule1`,
  356. sql: `SELECT * FROM demo`,
  357. r: [][]map[string]interface{}{
  358. {{
  359. "color": "red",
  360. "size": float64(3),
  361. "ts": float64(1541152486013),
  362. }},
  363. {{
  364. "color": "blue",
  365. "size": float64(6),
  366. "ts": float64(1541152486822),
  367. }},
  368. {{
  369. "color": "blue",
  370. "size": float64(2),
  371. "ts": float64(1541152487632),
  372. }},
  373. {{
  374. "color": "yellow",
  375. "size": float64(4),
  376. "ts": float64(1541152488442),
  377. }},
  378. {{
  379. "color": "red",
  380. "size": float64(1),
  381. "ts": float64(1541152489252),
  382. }},
  383. },
  384. }, {
  385. name: `rule2`,
  386. sql: `SELECT color, ts FROM demo where size > 3`,
  387. r: [][]map[string]interface{}{
  388. {{
  389. "color": "blue",
  390. "ts": float64(1541152486822),
  391. }},
  392. {{
  393. "color": "yellow",
  394. "ts": float64(1541152488442),
  395. }},
  396. },
  397. }, {
  398. name: `rule3`,
  399. sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  400. r: [][]map[string]interface{}{
  401. {{
  402. "Int8": float64(6),
  403. "ts": float64(1541152486822),
  404. }},
  405. {{
  406. "Int8": float64(4),
  407. "ts": float64(1541152488442),
  408. }},
  409. },
  410. },
  411. }
  412. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  413. createStreams(t)
  414. defer dropStreams(t)
  415. done := make(chan struct{})
  416. defer close(done)
  417. for i, tt := range tests {
  418. p := NewRuleProcessor(DbDir)
  419. parser := xsql.NewParser(strings.NewReader(tt.sql))
  420. var sources []*nodes.SourceNode
  421. if stmt, err := xsql.Language.Parse(parser); err != nil{
  422. t.Errorf("parse sql %s error: %s", tt.sql , err)
  423. }else {
  424. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  425. t.Errorf("sql %s is not a select statement", tt.sql)
  426. } else {
  427. streams := xsql.GetStreams(selectStmt)
  428. for _, stream := range streams{
  429. source := getMockSource(stream, done, 5)
  430. sources = append(sources, source)
  431. }
  432. }
  433. }
  434. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  435. if err != nil{
  436. t.Error(err)
  437. }
  438. mockSink := test.NewMockSink()
  439. sink := nodes.NewSinkNode("MockSink", mockSink)
  440. tp.AddSink(inputs, sink)
  441. count := len(sources)
  442. errCh := tp.Open()
  443. func(){
  444. for{
  445. select{
  446. case err = <- errCh:
  447. t.Log(err)
  448. tp.Cancel()
  449. return
  450. case <- done:
  451. count--
  452. log.Infof("%d sources remaining", count)
  453. if count <= 0{
  454. log.Info("stream stopping")
  455. time.Sleep(1 * time.Second)
  456. tp.Cancel()
  457. return
  458. }
  459. default:
  460. }
  461. }
  462. }()
  463. results := mockSink.GetResults()
  464. var maps [][]map[string]interface{}
  465. for _, v := range results{
  466. var mapRes []map[string]interface{}
  467. err := json.Unmarshal(v, &mapRes)
  468. if err != nil {
  469. t.Errorf("Failed to parse the input into map")
  470. continue
  471. }
  472. maps = append(maps, mapRes)
  473. }
  474. if !reflect.DeepEqual(tt.r, maps) {
  475. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  476. }
  477. }
  478. }
  479. func TestWindow(t *testing.T) {
  480. common.IsTesting = true
  481. var tests = []struct {
  482. name string
  483. sql string
  484. size int
  485. r [][]map[string]interface{}
  486. }{
  487. {
  488. name: `rule1`,
  489. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  490. size: 5,
  491. r: [][]map[string]interface{}{
  492. {{
  493. "color": "red",
  494. "size": float64(3),
  495. "ts": float64(1541152486013),
  496. },{
  497. "color": "blue",
  498. "size": float64(6),
  499. "ts": float64(1541152486822),
  500. }},
  501. {{
  502. "color": "red",
  503. "size": float64(3),
  504. "ts": float64(1541152486013),
  505. },{
  506. "color": "blue",
  507. "size": float64(6),
  508. "ts": float64(1541152486822),
  509. },{
  510. "color": "blue",
  511. "size": float64(2),
  512. "ts": float64(1541152487632),
  513. }},
  514. {{
  515. "color": "blue",
  516. "size": float64(2),
  517. "ts": float64(1541152487632),
  518. },{
  519. "color": "yellow",
  520. "size": float64(4),
  521. "ts": float64(1541152488442),
  522. }},
  523. },
  524. }, {
  525. name: `rule2`,
  526. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  527. size: 5,
  528. r: [][]map[string]interface{}{
  529. {{
  530. "color": "red",
  531. "ts": float64(1541152486013),
  532. },{
  533. "color": "blue",
  534. "ts": float64(1541152486822),
  535. }},
  536. {{
  537. "color": "yellow",
  538. "ts": float64(1541152488442),
  539. }},
  540. },
  541. }, {
  542. name: `rule3`,
  543. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  544. size: 5,
  545. r: [][]map[string]interface{}{
  546. {{
  547. "color": "red",
  548. "temp": 25.5,
  549. "ts": float64(1541152486013),
  550. }},{{
  551. "color": "red",
  552. "temp": 25.5,
  553. "ts": float64(1541152486013),
  554. }},{{
  555. "color": "red",
  556. "temp": 25.5,
  557. "ts": float64(1541152486013),
  558. }},{{
  559. "color": "blue",
  560. "temp": 28.1,
  561. "ts": float64(1541152487632),
  562. }},{{
  563. "color": "blue",
  564. "temp": 28.1,
  565. "ts": float64(1541152487632),
  566. }},{{
  567. "color": "blue",
  568. "temp": 28.1,
  569. "ts": float64(1541152487632),
  570. },{
  571. "color": "yellow",
  572. "temp": 27.4,
  573. "ts": float64(1541152488442),
  574. }},{{
  575. "color": "yellow",
  576. "temp": 27.4,
  577. "ts": float64(1541152488442),
  578. }},{{
  579. "color": "yellow",
  580. "temp": 27.4,
  581. "ts": float64(1541152488442),
  582. },{
  583. "color": "red",
  584. "temp": 25.5,
  585. "ts": float64(1541152489252),
  586. }},
  587. },
  588. }, {
  589. name: `rule4`,
  590. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  591. size: 5,
  592. r: [][]map[string]interface{}{
  593. {{
  594. "color": "red",
  595. }},{{
  596. "color": "blue",
  597. },{
  598. "color": "red",
  599. }},{{
  600. "color": "blue",
  601. },{
  602. "color": "red",
  603. }},{{
  604. "color": "blue",
  605. },{
  606. "color": "yellow",
  607. }},{{
  608. "color": "blue",
  609. }, {
  610. "color": "red",
  611. },{
  612. "color": "yellow",
  613. }},
  614. },
  615. },{
  616. name: `rule5`,
  617. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  618. size: 11,
  619. r: [][]map[string]interface{}{
  620. {{
  621. "temp": 25.5,
  622. },{
  623. "temp": 27.5,
  624. }},{{
  625. "temp": 28.1,
  626. },{
  627. "temp": 27.4,
  628. },{
  629. "temp": 25.5,
  630. }},{{
  631. "temp": 26.2,
  632. },{
  633. "temp": 26.8,
  634. },{
  635. "temp": 28.9,
  636. },{
  637. "temp": 29.1,
  638. },{
  639. "temp": 32.2,
  640. }},
  641. },
  642. },{
  643. name: `rule6`,
  644. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  645. size: 5,
  646. r: [][]map[string]interface{}{
  647. {{
  648. "m": 25.5,
  649. "c": float64(1),
  650. }},{{
  651. "m": 25.5,
  652. "c": float64(1),
  653. }},{{
  654. "m": 25.5,
  655. "c": float64(1),
  656. }},{{
  657. "m": 28.1,
  658. "c": float64(1),
  659. }},{{
  660. "m": 28.1,
  661. "c": float64(1),
  662. }},{{
  663. "m": 28.1,
  664. "c": float64(2),
  665. }},{{
  666. "m": 27.4,
  667. "c": float64(1),
  668. }},{{
  669. "m": 27.4,
  670. "c": float64(2),
  671. }},
  672. },
  673. },
  674. }
  675. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  676. createStreams(t)
  677. defer dropStreams(t)
  678. done := make(chan struct{})
  679. defer close(done)
  680. common.ResetMockTicker()
  681. for i, tt := range tests {
  682. p := NewRuleProcessor(DbDir)
  683. parser := xsql.NewParser(strings.NewReader(tt.sql))
  684. var sources []*nodes.SourceNode
  685. if stmt, err := xsql.Language.Parse(parser); err != nil{
  686. t.Errorf("parse sql %s error: %s", tt.sql , err)
  687. }else {
  688. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  689. t.Errorf("sql %s is not a select statement", tt.sql)
  690. } else {
  691. streams := xsql.GetStreams(selectStmt)
  692. for _, stream := range streams{
  693. source := getMockSource(stream, done, tt.size)
  694. sources = append(sources, source)
  695. }
  696. }
  697. }
  698. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  699. if err != nil{
  700. t.Error(err)
  701. }
  702. mockSink := test.NewMockSink()
  703. sink := nodes.NewSinkNode("mockSink", mockSink)
  704. tp.AddSink(inputs, sink)
  705. count := len(sources)
  706. errCh := tp.Open()
  707. func(){
  708. for{
  709. select{
  710. case err = <- errCh:
  711. t.Log(err)
  712. tp.Cancel()
  713. return
  714. case <- done:
  715. count--
  716. log.Infof("%d sources remaining", count)
  717. if count <= 0{
  718. log.Info("stream stopping")
  719. time.Sleep(1 * time.Second)
  720. tp.Cancel()
  721. return
  722. }
  723. default:
  724. }
  725. }
  726. }()
  727. results := mockSink.GetResults()
  728. var maps [][]map[string]interface{}
  729. for _, v := range results{
  730. var mapRes []map[string]interface{}
  731. err := json.Unmarshal(v, &mapRes)
  732. if err != nil {
  733. t.Errorf("Failed to parse the input into map")
  734. continue
  735. }
  736. maps = append(maps, mapRes)
  737. }
  738. if !reflect.DeepEqual(tt.r, maps) {
  739. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  740. }
  741. }
  742. }
  743. func createEventStreams(t *testing.T){
  744. demo := `CREATE STREAM demoE (
  745. color STRING,
  746. size BIGINT,
  747. ts BIGINT
  748. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  749. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  750. if err != nil{
  751. t.Log(err)
  752. }
  753. demo1 := `CREATE STREAM demo1E (
  754. temp FLOAT,
  755. hum BIGINT,
  756. ts BIGINT
  757. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  758. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  759. if err != nil{
  760. t.Log(err)
  761. }
  762. sessionDemo := `CREATE STREAM sessionDemoE (
  763. temp FLOAT,
  764. hum BIGINT,
  765. ts BIGINT
  766. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  767. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  768. if err != nil{
  769. t.Log(err)
  770. }
  771. }
  772. func dropEventStreams(t *testing.T){
  773. demo := `DROP STREAM demoE`
  774. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  775. if err != nil{
  776. t.Log(err)
  777. }
  778. demo1 := `DROP STREAM demo1E`
  779. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  780. if err != nil{
  781. t.Log(err)
  782. }
  783. sessionDemo := `DROP STREAM sessionDemoE`
  784. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  785. if err != nil{
  786. t.Log(err)
  787. }
  788. }
  789. func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  790. var data []*xsql.Tuple
  791. switch name{
  792. case "demoE":
  793. data = []*xsql.Tuple{
  794. {
  795. Emitter: name,
  796. Message: map[string]interface{}{
  797. "color": "red",
  798. "size": 3,
  799. "ts": 1541152486013,
  800. },
  801. Timestamp: 1541152486013,
  802. },
  803. {
  804. Emitter: name,
  805. Message: map[string]interface{}{
  806. "color": "blue",
  807. "size": 2,
  808. "ts": 1541152487632,
  809. },
  810. Timestamp: 1541152487632,
  811. },
  812. {
  813. Emitter: name,
  814. Message: map[string]interface{}{
  815. "color": "red",
  816. "size": 1,
  817. "ts": 1541152489252,
  818. },
  819. Timestamp: 1541152489252,
  820. },
  821. { //dropped item
  822. Emitter: name,
  823. Message: map[string]interface{}{
  824. "color": "blue",
  825. "size": 6,
  826. "ts": 1541152486822,
  827. },
  828. Timestamp: 1541152486822,
  829. },
  830. {
  831. Emitter: name,
  832. Message: map[string]interface{}{
  833. "color": "yellow",
  834. "size": 4,
  835. "ts": 1541152488442,
  836. },
  837. Timestamp: 1541152488442,
  838. },
  839. { //To lift the watermark and issue all windows
  840. Emitter: name,
  841. Message: map[string]interface{}{
  842. "color": "yellow",
  843. "size": 4,
  844. "ts": 1541152492342,
  845. },
  846. Timestamp: 1541152488442,
  847. },
  848. }
  849. case "demo1E":
  850. data = []*xsql.Tuple{
  851. {
  852. Emitter: name,
  853. Message: map[string]interface{}{
  854. "temp": 27.5,
  855. "hum": 59,
  856. "ts": 1541152486823,
  857. },
  858. Timestamp: 1541152486823,
  859. },
  860. {
  861. Emitter: name,
  862. Message: map[string]interface{}{
  863. "temp": 25.5,
  864. "hum": 65,
  865. "ts": 1541152486013,
  866. },
  867. Timestamp: 1541152486013,
  868. },
  869. {
  870. Emitter: name,
  871. Message: map[string]interface{}{
  872. "temp": 27.4,
  873. "hum": 80,
  874. "ts": 1541152488442,
  875. },
  876. Timestamp: 1541152488442,
  877. },
  878. {
  879. Emitter: name,
  880. Message: map[string]interface{}{
  881. "temp": 28.1,
  882. "hum": 75,
  883. "ts": 1541152487632,
  884. },
  885. Timestamp: 1541152487632,
  886. },
  887. {
  888. Emitter: name,
  889. Message: map[string]interface{}{
  890. "temp": 25.5,
  891. "hum": 62,
  892. "ts": 1541152489252,
  893. },
  894. Timestamp: 1541152489252,
  895. },
  896. {
  897. Emitter: name,
  898. Message: map[string]interface{}{
  899. "temp": 25.5,
  900. "hum": 62,
  901. "ts": 1541152499252,
  902. },
  903. Timestamp: 1541152499252,
  904. },
  905. }
  906. case "sessionDemoE":
  907. data = []*xsql.Tuple{
  908. {
  909. Emitter: name,
  910. Message: map[string]interface{}{
  911. "temp": 25.5,
  912. "hum": 65,
  913. "ts": 1541152486013,
  914. },
  915. Timestamp: 1541152486013,
  916. },
  917. {
  918. Emitter: name,
  919. Message: map[string]interface{}{
  920. "temp": 28.1,
  921. "hum": 75,
  922. "ts": 1541152487932,
  923. },
  924. Timestamp: 1541152487932,
  925. },
  926. {
  927. Emitter: name,
  928. Message: map[string]interface{}{
  929. "temp": 27.5,
  930. "hum": 59,
  931. "ts": 1541152486823,
  932. },
  933. Timestamp: 1541152486823,
  934. },
  935. {
  936. Emitter: name,
  937. Message: map[string]interface{}{
  938. "temp": 25.5,
  939. "hum": 62,
  940. "ts": 1541152489252,
  941. },
  942. Timestamp: 1541152489252,
  943. },
  944. {
  945. Emitter: name,
  946. Message: map[string]interface{}{
  947. "temp": 27.4,
  948. "hum": 80,
  949. "ts": 1541152488442,
  950. },
  951. Timestamp: 1541152488442,
  952. },
  953. {
  954. Emitter: name,
  955. Message: map[string]interface{}{
  956. "temp": 26.2,
  957. "hum": 63,
  958. "ts": 1541152490062,
  959. },
  960. Timestamp: 1541152490062,
  961. },
  962. {
  963. Emitter: name,
  964. Message: map[string]interface{}{
  965. "temp": 28.9,
  966. "hum": 85,
  967. "ts": 1541152491682,
  968. },
  969. Timestamp: 1541152491682,
  970. },
  971. {
  972. Emitter: name,
  973. Message: map[string]interface{}{
  974. "temp": 26.8,
  975. "hum": 71,
  976. "ts": 1541152490872,
  977. },
  978. Timestamp: 1541152490872,
  979. },
  980. {
  981. Emitter: name,
  982. Message: map[string]interface{}{
  983. "temp": 29.1,
  984. "hum": 92,
  985. "ts": 1541152492492,
  986. },
  987. Timestamp: 1541152492492,
  988. },
  989. {
  990. Emitter: name,
  991. Message: map[string]interface{}{
  992. "temp": 30.9,
  993. "hum": 87,
  994. "ts": 1541152494112,
  995. },
  996. Timestamp: 1541152494112,
  997. },
  998. {
  999. Emitter: name,
  1000. Message: map[string]interface{}{
  1001. "temp": 32.2,
  1002. "hum": 99,
  1003. "ts": 1541152493202,
  1004. },
  1005. Timestamp: 1541152493202,
  1006. },
  1007. {
  1008. Emitter: name,
  1009. Message: map[string]interface{}{
  1010. "temp": 32.2,
  1011. "hum": 99,
  1012. "ts": 1541152499202,
  1013. },
  1014. Timestamp: 1541152499202,
  1015. },
  1016. }
  1017. }
  1018. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, true), map[string]string{
  1019. "DATASOURCE": name,
  1020. })
  1021. }
  1022. func TestEventWindow(t *testing.T) {
  1023. common.IsTesting = true
  1024. var tests = []struct {
  1025. name string
  1026. sql string
  1027. size int
  1028. r [][]map[string]interface{}
  1029. }{
  1030. {
  1031. name: `rule1`,
  1032. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1033. size: 6,
  1034. r: [][]map[string]interface{}{
  1035. {{
  1036. "color": "red",
  1037. "size": float64(3),
  1038. "ts": float64(1541152486013),
  1039. }},
  1040. {{
  1041. "color": "red",
  1042. "size": float64(3),
  1043. "ts": float64(1541152486013),
  1044. },{
  1045. "color": "blue",
  1046. "size": float64(2),
  1047. "ts": float64(1541152487632),
  1048. }},
  1049. {{
  1050. "color": "blue",
  1051. "size": float64(2),
  1052. "ts": float64(1541152487632),
  1053. },{
  1054. "color": "yellow",
  1055. "size": float64(4),
  1056. "ts": float64(1541152488442),
  1057. }},{{
  1058. "color": "yellow",
  1059. "size": float64(4),
  1060. "ts": float64(1541152488442),
  1061. },{
  1062. "color": "red",
  1063. "size": float64(1),
  1064. "ts": float64(1541152489252),
  1065. }},{{
  1066. "color": "red",
  1067. "size": float64(1),
  1068. "ts": float64(1541152489252),
  1069. }},
  1070. },
  1071. }, {
  1072. name: `rule2`,
  1073. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1074. size: 6,
  1075. r: [][]map[string]interface{}{
  1076. {{
  1077. "color": "red",
  1078. "ts": float64(1541152486013),
  1079. }},
  1080. {{
  1081. "color": "yellow",
  1082. "ts": float64(1541152488442),
  1083. }},
  1084. },
  1085. }, {
  1086. name: `rule3`,
  1087. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1088. size: 6,
  1089. r: [][]map[string]interface{}{
  1090. {{
  1091. "color": "red",
  1092. "temp": 25.5,
  1093. "ts": float64(1541152486013),
  1094. }},{{
  1095. "color": "red",
  1096. "temp": 25.5,
  1097. "ts": float64(1541152486013),
  1098. }},{{
  1099. "color": "blue",
  1100. "temp": 28.1,
  1101. "ts": float64(1541152487632),
  1102. }},{{
  1103. "color": "blue",
  1104. "temp": 28.1,
  1105. "ts": float64(1541152487632),
  1106. },{
  1107. "color": "yellow",
  1108. "temp": 27.4,
  1109. "ts": float64(1541152488442),
  1110. }},{{
  1111. "color": "yellow",
  1112. "temp": 27.4,
  1113. "ts": float64(1541152488442),
  1114. },{
  1115. "color": "red",
  1116. "temp": 25.5,
  1117. "ts": float64(1541152489252),
  1118. }},
  1119. },
  1120. }, {
  1121. name: `rule4`,
  1122. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1123. size: 6,
  1124. r: [][]map[string]interface{}{
  1125. {{
  1126. "color": "red",
  1127. }},{{
  1128. "color": "blue",
  1129. },{
  1130. "color": "red",
  1131. }},{{
  1132. "color": "blue",
  1133. },{
  1134. "color": "yellow",
  1135. }},{{
  1136. "color": "blue",
  1137. }, {
  1138. "color": "red",
  1139. },{
  1140. "color": "yellow",
  1141. }},
  1142. },
  1143. },{
  1144. name: `rule5`,
  1145. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1146. size: 12,
  1147. r: [][]map[string]interface{}{
  1148. {{
  1149. "temp": 25.5,
  1150. }},{{
  1151. "temp": 28.1,
  1152. },{
  1153. "temp": 27.4,
  1154. },{
  1155. "temp": 25.5,
  1156. }},{{
  1157. "temp": 26.2,
  1158. },{
  1159. "temp": 26.8,
  1160. },{
  1161. "temp": 28.9,
  1162. },{
  1163. "temp": 29.1,
  1164. },{
  1165. "temp": 32.2,
  1166. }},{{
  1167. "temp": 30.9,
  1168. }},
  1169. },
  1170. },{
  1171. name: `rule6`,
  1172. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1173. size: 6,
  1174. r: [][]map[string]interface{}{
  1175. {{
  1176. "m": 25.5,
  1177. "c": float64(1),
  1178. }},{{
  1179. "m": 25.5,
  1180. "c": float64(1),
  1181. }},{{
  1182. "m": 28.1,
  1183. "c": float64(1),
  1184. }},{{
  1185. "m": 28.1,
  1186. "c": float64(2),
  1187. }},{{
  1188. "m": 27.4,
  1189. "c": float64(2),
  1190. }},
  1191. },
  1192. },
  1193. }
  1194. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1195. createEventStreams(t)
  1196. defer dropEventStreams(t)
  1197. done := make(chan struct{})
  1198. defer close(done)
  1199. common.ResetMockTicker()
  1200. //mock ticker
  1201. realTicker := time.NewTicker(500 * time.Millisecond)
  1202. tickerDone := make(chan bool)
  1203. go func(){
  1204. ticker := common.GetTicker(1000).(*common.MockTicker)
  1205. timer := common.GetTimer(1000).(*common.MockTimer)
  1206. for {
  1207. select {
  1208. case <-tickerDone:
  1209. log.Infof("real ticker exiting...")
  1210. return
  1211. case t := <-realTicker.C:
  1212. ts := common.TimeToUnixMilli(t)
  1213. if ticker != nil {
  1214. go ticker.DoTick(ts)
  1215. }
  1216. if timer != nil {
  1217. go timer.DoTick(ts)
  1218. }
  1219. }
  1220. }
  1221. }()
  1222. for i, tt := range tests {
  1223. p := NewRuleProcessor(DbDir)
  1224. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1225. var sources []*nodes.SourceNode
  1226. if stmt, err := xsql.Language.Parse(parser); err != nil{
  1227. t.Errorf("parse sql %s error: %s", tt.sql , err)
  1228. }else {
  1229. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1230. t.Errorf("sql %s is not a select statement", tt.sql)
  1231. } else {
  1232. streams := xsql.GetStreams(selectStmt)
  1233. for _, stream := range streams{
  1234. source := getEventMockSource(stream, done, tt.size)
  1235. sources = append(sources, source)
  1236. }
  1237. }
  1238. }
  1239. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  1240. Id:tt.name, Sql: tt.sql,
  1241. Options: map[string]interface{}{
  1242. "isEventTime": true,
  1243. "lateTolerance": float64(1000),
  1244. },
  1245. }, sources)
  1246. if err != nil{
  1247. t.Error(err)
  1248. }
  1249. mockSink := test.NewMockSink()
  1250. sink := nodes.NewSinkNode("MockSink", mockSink)
  1251. tp.AddSink(inputs, sink)
  1252. count := len(sources)
  1253. errCh := tp.Open()
  1254. func(){
  1255. for{
  1256. select{
  1257. case err = <- errCh:
  1258. t.Log(err)
  1259. tp.Cancel()
  1260. return
  1261. case <- done:
  1262. count--
  1263. log.Infof("%d sources remaining", count)
  1264. if count <= 0{
  1265. log.Info("stream stopping")
  1266. time.Sleep(1 * time.Second)
  1267. tp.Cancel()
  1268. return
  1269. }
  1270. default:
  1271. }
  1272. }
  1273. }()
  1274. results := mockSink.GetResults()
  1275. var maps [][]map[string]interface{}
  1276. for _, v := range results{
  1277. var mapRes []map[string]interface{}
  1278. err := json.Unmarshal(v, &mapRes)
  1279. if err != nil {
  1280. t.Errorf("Failed to parse the input into map")
  1281. continue
  1282. }
  1283. maps = append(maps, mapRes)
  1284. }
  1285. if !reflect.DeepEqual(tt.r, maps) {
  1286. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1287. }
  1288. }
  1289. realTicker.Stop()
  1290. tickerDone <- true
  1291. close(tickerDone)
  1292. }
  1293. func errstring(err error) string {
  1294. if err != nil {
  1295. return err.Error()
  1296. }
  1297. return ""
  1298. }