xsql_processor_test.go 27 KB


  1. package processors
  2. import (
  3. "encoding/json"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/nodes"
  8. "github.com/emqx/kuiper/xstream/test"
  9. "fmt"
  10. "path"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. "time"
  15. )
  16. var DbDir = getDbDir()
  17. func getDbDir() string{
  18. dbDir, err := common.GetAndCreateDataLoc("test")
  19. if err != nil {
  20. log.Panic(err)
  21. }
  22. log.Infof("db location is %s", dbDir)
  23. return dbDir
  24. }
  25. func TestStreamCreateProcessor(t *testing.T) {
  26. var tests = []struct {
  27. s string
  28. r []string
  29. err string
  30. }{
  31. {
  32. s: `SHOW STREAMS;`,
  33. r: []string{"No stream definitions are found."},
  34. },
  35. {
  36. s: `EXPLAIN STREAM topic1;`,
  37. err: "Stream topic1 is not found.",
  38. },
  39. {
  40. s: `CREATE STREAM topic1 (
  41. USERID BIGINT,
  42. FIRST_NAME STRING,
  43. LAST_NAME STRING,
  44. NICKNAMES ARRAY(STRING),
  45. Gender BOOLEAN,
  46. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  47. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  48. r: []string{"Stream topic1 is created."},
  49. },
  50. {
  51. s: `CREATE STREAM topic1 (
  52. USERID BIGINT,
  53. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  54. err: "Create stream fails: Item topic1 already exists.",
  55. },
  56. {
  57. s: `EXPLAIN STREAM topic1;`,
  58. r: []string{"TO BE SUPPORTED"},
  59. },
  60. {
  61. s: `DESCRIBE STREAM topic1;`,
  62. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  63. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  64. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  65. },
  66. {
  67. s: `SHOW STREAMS;`,
  68. r: []string{"topic1"},
  69. },
  70. {
  71. s: `DROP STREAM topic1;`,
  72. r: []string{"Stream topic1 is dropped."},
  73. },
  74. {
  75. s: `DESCRIBE STREAM topic1;`,
  76. err: "Stream topic1 is not found.",
  77. },
  78. {
  79. s: `DROP STREAM topic1;`,
  80. err: "Drop stream fails: topic1 is not found.",
  81. },
  82. }
  83. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  84. streamDB := path.Join(getDbDir(), "streamTest")
  85. for i, tt := range tests {
  86. results, err := NewStreamProcessor(tt.s, streamDB).Exec()
  87. if !reflect.DeepEqual(tt.err, errstring(err)) {
  88. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  89. } else if tt.err == "" {
  90. if !reflect.DeepEqual(tt.r, results) {
  91. t.Errorf("%d. %q\n\nstmt mismatch:\n\ngot=%#v\n\n", i, tt.s, results)
  92. }
  93. }
  94. }
  95. }
  96. func createStreams(t *testing.T){
  97. demo := `CREATE STREAM demo (
  98. color STRING,
  99. size BIGINT,
  100. ts BIGINT
  101. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  102. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  103. if err != nil{
  104. t.Log(err)
  105. }
  106. demo1 := `CREATE STREAM demo1 (
  107. temp FLOAT,
  108. hum BIGINT,
  109. ts BIGINT
  110. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  111. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  112. if err != nil{
  113. t.Log(err)
  114. }
  115. sessionDemo := `CREATE STREAM sessionDemo (
  116. temp FLOAT,
  117. hum BIGINT,
  118. ts BIGINT
  119. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  120. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  121. if err != nil{
  122. t.Log(err)
  123. }
  124. }
  125. func dropStreams(t *testing.T){
  126. demo := `DROP STREAM demo`
  127. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  128. if err != nil{
  129. t.Log(err)
  130. }
  131. demo1 := `DROP STREAM demo1`
  132. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  133. if err != nil{
  134. t.Log(err)
  135. }
  136. sessionDemo := `DROP STREAM sessionDemo`
  137. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  138. if err != nil{
  139. t.Log(err)
  140. }
  141. }
  142. func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  143. var data []*xsql.Tuple
  144. switch name{
  145. case "demo":
  146. data = []*xsql.Tuple{
  147. {
  148. Emitter: name,
  149. Message: map[string]interface{}{
  150. "color": "red",
  151. "size": 3,
  152. "ts": 1541152486013,
  153. },
  154. Timestamp: 1541152486013,
  155. },
  156. {
  157. Emitter: name,
  158. Message: map[string]interface{}{
  159. "color": "blue",
  160. "size": 6,
  161. "ts": 1541152486822,
  162. },
  163. Timestamp: 1541152486822,
  164. },
  165. {
  166. Emitter: name,
  167. Message: map[string]interface{}{
  168. "color": "blue",
  169. "size": 2,
  170. "ts": 1541152487632,
  171. },
  172. Timestamp: 1541152487632,
  173. },
  174. {
  175. Emitter: name,
  176. Message: map[string]interface{}{
  177. "color": "yellow",
  178. "size": 4,
  179. "ts": 1541152488442,
  180. },
  181. Timestamp: 1541152488442,
  182. },
  183. {
  184. Emitter: name,
  185. Message: map[string]interface{}{
  186. "color": "red",
  187. "size": 1,
  188. "ts": 1541152489252,
  189. },
  190. Timestamp: 1541152489252,
  191. },
  192. }
  193. case "demo1":
  194. data = []*xsql.Tuple{
  195. {
  196. Emitter: name,
  197. Message: map[string]interface{}{
  198. "temp": 25.5,
  199. "hum": 65,
  200. "ts": 1541152486013,
  201. },
  202. Timestamp: 1541152486013,
  203. },
  204. {
  205. Emitter: name,
  206. Message: map[string]interface{}{
  207. "temp": 27.5,
  208. "hum": 59,
  209. "ts": 1541152486823,
  210. },
  211. Timestamp: 1541152486823,
  212. },
  213. {
  214. Emitter: name,
  215. Message: map[string]interface{}{
  216. "temp": 28.1,
  217. "hum": 75,
  218. "ts": 1541152487632,
  219. },
  220. Timestamp: 1541152487632,
  221. },
  222. {
  223. Emitter: name,
  224. Message: map[string]interface{}{
  225. "temp": 27.4,
  226. "hum": 80,
  227. "ts": 1541152488442,
  228. },
  229. Timestamp: 1541152488442,
  230. },
  231. {
  232. Emitter: name,
  233. Message: map[string]interface{}{
  234. "temp": 25.5,
  235. "hum": 62,
  236. "ts": 1541152489252,
  237. },
  238. Timestamp: 1541152489252,
  239. },
  240. }
  241. case "sessionDemo":
  242. data = []*xsql.Tuple{
  243. {
  244. Emitter: name,
  245. Message: map[string]interface{}{
  246. "temp": 25.5,
  247. "hum": 65,
  248. "ts": 1541152486013,
  249. },
  250. Timestamp: 1541152486013,
  251. },
  252. {
  253. Emitter: name,
  254. Message: map[string]interface{}{
  255. "temp": 27.5,
  256. "hum": 59,
  257. "ts": 1541152486823,
  258. },
  259. Timestamp: 1541152486823,
  260. },
  261. {
  262. Emitter: name,
  263. Message: map[string]interface{}{
  264. "temp": 28.1,
  265. "hum": 75,
  266. "ts": 1541152487932,
  267. },
  268. Timestamp: 1541152487932,
  269. },
  270. {
  271. Emitter: name,
  272. Message: map[string]interface{}{
  273. "temp": 27.4,
  274. "hum": 80,
  275. "ts": 1541152488442,
  276. },
  277. Timestamp: 1541152488442,
  278. },
  279. {
  280. Emitter: name,
  281. Message: map[string]interface{}{
  282. "temp": 25.5,
  283. "hum": 62,
  284. "ts": 1541152489252,
  285. },
  286. Timestamp: 1541152489252,
  287. },
  288. {
  289. Emitter: name,
  290. Message: map[string]interface{}{
  291. "temp": 26.2,
  292. "hum": 63,
  293. "ts": 1541152490062,
  294. },
  295. Timestamp: 1541152490062,
  296. },
  297. {
  298. Emitter: name,
  299. Message: map[string]interface{}{
  300. "temp": 26.8,
  301. "hum": 71,
  302. "ts": 1541152490872,
  303. },
  304. Timestamp: 1541152490872,
  305. },
  306. {
  307. Emitter: name,
  308. Message: map[string]interface{}{
  309. "temp": 28.9,
  310. "hum": 85,
  311. "ts": 1541152491682,
  312. },
  313. Timestamp: 1541152491682,
  314. },
  315. {
  316. Emitter: name,
  317. Message: map[string]interface{}{
  318. "temp": 29.1,
  319. "hum": 92,
  320. "ts": 1541152492492,
  321. },
  322. Timestamp: 1541152492492,
  323. },
  324. {
  325. Emitter: name,
  326. Message: map[string]interface{}{
  327. "temp": 32.2,
  328. "hum": 99,
  329. "ts": 1541152493202,
  330. },
  331. Timestamp: 1541152493202,
  332. },
  333. {
  334. Emitter: name,
  335. Message: map[string]interface{}{
  336. "temp": 30.9,
  337. "hum": 87,
  338. "ts": 1541152494112,
  339. },
  340. Timestamp: 1541152494112,
  341. },
  342. }
  343. }
  344. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, false), map[string]string{
  345. "DATASOURCE": name,
  346. })
  347. }
  348. func TestSingleSQL(t *testing.T) {
  349. var tests = []struct {
  350. name string
  351. sql string
  352. r [][]map[string]interface{}
  353. }{
  354. {
  355. name: `rule1`,
  356. sql: `SELECT * FROM demo`,
  357. r: [][]map[string]interface{}{
  358. {{
  359. "color": "red",
  360. "size": float64(3),
  361. "ts": float64(1541152486013),
  362. }},
  363. {{
  364. "color": "blue",
  365. "size": float64(6),
  366. "ts": float64(1541152486822),
  367. }},
  368. {{
  369. "color": "blue",
  370. "size": float64(2),
  371. "ts": float64(1541152487632),
  372. }},
  373. {{
  374. "color": "yellow",
  375. "size": float64(4),
  376. "ts": float64(1541152488442),
  377. }},
  378. {{
  379. "color": "red",
  380. "size": float64(1),
  381. "ts": float64(1541152489252),
  382. }},
  383. },
  384. }, {
  385. name: `rule2`,
  386. sql: `SELECT color, ts FROM demo where size > 3`,
  387. r: [][]map[string]interface{}{
  388. {{
  389. "color": "blue",
  390. "ts": float64(1541152486822),
  391. }},
  392. {{
  393. "color": "yellow",
  394. "ts": float64(1541152488442),
  395. }},
  396. },
  397. },
  398. }
  399. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  400. createStreams(t)
  401. defer dropStreams(t)
  402. done := make(chan struct{})
  403. defer close(done)
  404. for i, tt := range tests {
  405. p := NewRuleProcessor(DbDir)
  406. parser := xsql.NewParser(strings.NewReader(tt.sql))
  407. var sources []*nodes.SourceNode
  408. if stmt, err := xsql.Language.Parse(parser); err != nil{
  409. t.Errorf("parse sql %s error: %s", tt.sql , err)
  410. }else {
  411. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  412. t.Errorf("sql %s is not a select statement", tt.sql)
  413. } else {
  414. streams := xsql.GetStreams(selectStmt)
  415. for _, stream := range streams{
  416. source := getMockSource(stream, done, 5)
  417. sources = append(sources, source)
  418. }
  419. }
  420. }
  421. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  422. if err != nil{
  423. t.Error(err)
  424. }
  425. mockSink := test.NewMockSink()
  426. sink := nodes.NewSinkNode("MockSink", mockSink)
  427. tp.AddSink(inputs, sink)
  428. count := len(sources)
  429. errCh := tp.Open()
  430. func(){
  431. for{
  432. select{
  433. case err = <- errCh:
  434. t.Log(err)
  435. tp.Cancel()
  436. return
  437. case <- done:
  438. count--
  439. log.Infof("%d sources remaining", count)
  440. if count <= 0{
  441. log.Info("stream stopping")
  442. time.Sleep(1 * time.Second)
  443. tp.Cancel()
  444. return
  445. }
  446. default:
  447. }
  448. }
  449. }()
  450. results := mockSink.GetResults()
  451. var maps [][]map[string]interface{}
  452. for _, v := range results{
  453. var mapRes []map[string]interface{}
  454. err := json.Unmarshal(v, &mapRes)
  455. if err != nil {
  456. t.Errorf("Failed to parse the input into map")
  457. continue
  458. }
  459. maps = append(maps, mapRes)
  460. }
  461. if !reflect.DeepEqual(tt.r, maps) {
  462. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  463. }
  464. }
  465. }
  466. func TestWindow(t *testing.T) {
  467. common.IsTesting = true
  468. var tests = []struct {
  469. name string
  470. sql string
  471. size int
  472. r [][]map[string]interface{}
  473. }{
  474. {
  475. name: `rule1`,
  476. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  477. size: 5,
  478. r: [][]map[string]interface{}{
  479. {{
  480. "color": "red",
  481. "size": float64(3),
  482. "ts": float64(1541152486013),
  483. },{
  484. "color": "blue",
  485. "size": float64(6),
  486. "ts": float64(1541152486822),
  487. }},
  488. {{
  489. "color": "red",
  490. "size": float64(3),
  491. "ts": float64(1541152486013),
  492. },{
  493. "color": "blue",
  494. "size": float64(6),
  495. "ts": float64(1541152486822),
  496. },{
  497. "color": "blue",
  498. "size": float64(2),
  499. "ts": float64(1541152487632),
  500. }},
  501. {{
  502. "color": "blue",
  503. "size": float64(2),
  504. "ts": float64(1541152487632),
  505. },{
  506. "color": "yellow",
  507. "size": float64(4),
  508. "ts": float64(1541152488442),
  509. }},
  510. },
  511. }, {
  512. name: `rule2`,
  513. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  514. size: 5,
  515. r: [][]map[string]interface{}{
  516. {{
  517. "color": "red",
  518. "ts": float64(1541152486013),
  519. },{
  520. "color": "blue",
  521. "ts": float64(1541152486822),
  522. }},
  523. {{
  524. "color": "yellow",
  525. "ts": float64(1541152488442),
  526. }},
  527. },
  528. }, {
  529. name: `rule3`,
  530. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  531. size: 5,
  532. r: [][]map[string]interface{}{
  533. {{
  534. "color": "red",
  535. "temp": 25.5,
  536. "ts": float64(1541152486013),
  537. }},{{
  538. "color": "red",
  539. "temp": 25.5,
  540. "ts": float64(1541152486013),
  541. }},{{
  542. "color": "red",
  543. "temp": 25.5,
  544. "ts": float64(1541152486013),
  545. }},{{
  546. "color": "blue",
  547. "temp": 28.1,
  548. "ts": float64(1541152487632),
  549. }},{{
  550. "color": "blue",
  551. "temp": 28.1,
  552. "ts": float64(1541152487632),
  553. }},{{
  554. "color": "blue",
  555. "temp": 28.1,
  556. "ts": float64(1541152487632),
  557. },{
  558. "color": "yellow",
  559. "temp": 27.4,
  560. "ts": float64(1541152488442),
  561. }},{{
  562. "color": "yellow",
  563. "temp": 27.4,
  564. "ts": float64(1541152488442),
  565. }},{{
  566. "color": "yellow",
  567. "temp": 27.4,
  568. "ts": float64(1541152488442),
  569. },{
  570. "color": "red",
  571. "temp": 25.5,
  572. "ts": float64(1541152489252),
  573. }},
  574. },
  575. }, {
  576. name: `rule4`,
  577. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  578. size: 5,
  579. r: [][]map[string]interface{}{
  580. {{
  581. "color": "red",
  582. }},{{
  583. "color": "blue",
  584. },{
  585. "color": "red",
  586. }},{{
  587. "color": "blue",
  588. },{
  589. "color": "red",
  590. }},{{
  591. "color": "blue",
  592. },{
  593. "color": "yellow",
  594. }},{{
  595. "color": "blue",
  596. }, {
  597. "color": "red",
  598. },{
  599. "color": "yellow",
  600. }},
  601. },
  602. },{
  603. name: `rule5`,
  604. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  605. size: 11,
  606. r: [][]map[string]interface{}{
  607. {{
  608. "temp": 25.5,
  609. },{
  610. "temp": 27.5,
  611. }},{{
  612. "temp": 28.1,
  613. },{
  614. "temp": 27.4,
  615. },{
  616. "temp": 25.5,
  617. }},{{
  618. "temp": 26.2,
  619. },{
  620. "temp": 26.8,
  621. },{
  622. "temp": 28.9,
  623. },{
  624. "temp": 29.1,
  625. },{
  626. "temp": 32.2,
  627. }},
  628. },
  629. },{
  630. name: `rule6`,
  631. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  632. size: 5,
  633. r: [][]map[string]interface{}{
  634. {{
  635. "m": 25.5,
  636. "c": float64(1),
  637. }},{{
  638. "m": 25.5,
  639. "c": float64(1),
  640. }},{{
  641. "m": 25.5,
  642. "c": float64(1),
  643. }},{{
  644. "m": 28.1,
  645. "c": float64(1),
  646. }},{{
  647. "m": 28.1,
  648. "c": float64(1),
  649. }},{{
  650. "m": 28.1,
  651. "c": float64(2),
  652. }},{{
  653. "m": 27.4,
  654. "c": float64(1),
  655. }},{{
  656. "m": 27.4,
  657. "c": float64(2),
  658. }},
  659. },
  660. },
  661. }
  662. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  663. createStreams(t)
  664. defer dropStreams(t)
  665. done := make(chan struct{})
  666. defer close(done)
  667. common.ResetMockTicker()
  668. for i, tt := range tests {
  669. p := NewRuleProcessor(DbDir)
  670. parser := xsql.NewParser(strings.NewReader(tt.sql))
  671. var sources []*nodes.SourceNode
  672. if stmt, err := xsql.Language.Parse(parser); err != nil{
  673. t.Errorf("parse sql %s error: %s", tt.sql , err)
  674. }else {
  675. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  676. t.Errorf("sql %s is not a select statement", tt.sql)
  677. } else {
  678. streams := xsql.GetStreams(selectStmt)
  679. for _, stream := range streams{
  680. source := getMockSource(stream, done, tt.size)
  681. sources = append(sources, source)
  682. }
  683. }
  684. }
  685. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  686. if err != nil{
  687. t.Error(err)
  688. }
  689. mockSink := test.NewMockSink()
  690. sink := nodes.NewSinkNode("mockSink", mockSink)
  691. tp.AddSink(inputs, sink)
  692. count := len(sources)
  693. errCh := tp.Open()
  694. func(){
  695. for{
  696. select{
  697. case err = <- errCh:
  698. t.Log(err)
  699. tp.Cancel()
  700. return
  701. case <- done:
  702. count--
  703. log.Infof("%d sources remaining", count)
  704. if count <= 0{
  705. log.Info("stream stopping")
  706. time.Sleep(1 * time.Second)
  707. tp.Cancel()
  708. return
  709. }
  710. default:
  711. }
  712. }
  713. }()
  714. results := mockSink.GetResults()
  715. var maps [][]map[string]interface{}
  716. for _, v := range results{
  717. var mapRes []map[string]interface{}
  718. err := json.Unmarshal(v, &mapRes)
  719. if err != nil {
  720. t.Errorf("Failed to parse the input into map")
  721. continue
  722. }
  723. maps = append(maps, mapRes)
  724. }
  725. if !reflect.DeepEqual(tt.r, maps) {
  726. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  727. }
  728. }
  729. }
  730. func createEventStreams(t *testing.T){
  731. demo := `CREATE STREAM demoE (
  732. color STRING,
  733. size BIGINT,
  734. ts BIGINT
  735. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  736. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  737. if err != nil{
  738. t.Log(err)
  739. }
  740. demo1 := `CREATE STREAM demo1E (
  741. temp FLOAT,
  742. hum BIGINT,
  743. ts BIGINT
  744. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  745. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  746. if err != nil{
  747. t.Log(err)
  748. }
  749. sessionDemo := `CREATE STREAM sessionDemoE (
  750. temp FLOAT,
  751. hum BIGINT,
  752. ts BIGINT
  753. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  754. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  755. if err != nil{
  756. t.Log(err)
  757. }
  758. }
  759. func dropEventStreams(t *testing.T){
  760. demo := `DROP STREAM demoE`
  761. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  762. if err != nil{
  763. t.Log(err)
  764. }
  765. demo1 := `DROP STREAM demo1E`
  766. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  767. if err != nil{
  768. t.Log(err)
  769. }
  770. sessionDemo := `DROP STREAM sessionDemoE`
  771. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  772. if err != nil{
  773. t.Log(err)
  774. }
  775. }
  776. func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  777. var data []*xsql.Tuple
  778. switch name{
  779. case "demoE":
  780. data = []*xsql.Tuple{
  781. {
  782. Emitter: name,
  783. Message: map[string]interface{}{
  784. "color": "red",
  785. "size": 3,
  786. "ts": 1541152486013,
  787. },
  788. Timestamp: 1541152486013,
  789. },
  790. {
  791. Emitter: name,
  792. Message: map[string]interface{}{
  793. "color": "blue",
  794. "size": 2,
  795. "ts": 1541152487632,
  796. },
  797. Timestamp: 1541152487632,
  798. },
  799. {
  800. Emitter: name,
  801. Message: map[string]interface{}{
  802. "color": "red",
  803. "size": 1,
  804. "ts": 1541152489252,
  805. },
  806. Timestamp: 1541152489252,
  807. },
  808. { //dropped item
  809. Emitter: name,
  810. Message: map[string]interface{}{
  811. "color": "blue",
  812. "size": 6,
  813. "ts": 1541152486822,
  814. },
  815. Timestamp: 1541152486822,
  816. },
  817. {
  818. Emitter: name,
  819. Message: map[string]interface{}{
  820. "color": "yellow",
  821. "size": 4,
  822. "ts": 1541152488442,
  823. },
  824. Timestamp: 1541152488442,
  825. },
  826. { //To lift the watermark and issue all windows
  827. Emitter: name,
  828. Message: map[string]interface{}{
  829. "color": "yellow",
  830. "size": 4,
  831. "ts": 1541152492342,
  832. },
  833. Timestamp: 1541152488442,
  834. },
  835. }
  836. case "demo1E":
  837. data = []*xsql.Tuple{
  838. {
  839. Emitter: name,
  840. Message: map[string]interface{}{
  841. "temp": 27.5,
  842. "hum": 59,
  843. "ts": 1541152486823,
  844. },
  845. Timestamp: 1541152486823,
  846. },
  847. {
  848. Emitter: name,
  849. Message: map[string]interface{}{
  850. "temp": 25.5,
  851. "hum": 65,
  852. "ts": 1541152486013,
  853. },
  854. Timestamp: 1541152486013,
  855. },
  856. {
  857. Emitter: name,
  858. Message: map[string]interface{}{
  859. "temp": 27.4,
  860. "hum": 80,
  861. "ts": 1541152488442,
  862. },
  863. Timestamp: 1541152488442,
  864. },
  865. {
  866. Emitter: name,
  867. Message: map[string]interface{}{
  868. "temp": 28.1,
  869. "hum": 75,
  870. "ts": 1541152487632,
  871. },
  872. Timestamp: 1541152487632,
  873. },
  874. {
  875. Emitter: name,
  876. Message: map[string]interface{}{
  877. "temp": 25.5,
  878. "hum": 62,
  879. "ts": 1541152489252,
  880. },
  881. Timestamp: 1541152489252,
  882. },
  883. {
  884. Emitter: name,
  885. Message: map[string]interface{}{
  886. "temp": 25.5,
  887. "hum": 62,
  888. "ts": 1541152499252,
  889. },
  890. Timestamp: 1541152499252,
  891. },
  892. }
  893. case "sessionDemoE":
  894. data = []*xsql.Tuple{
  895. {
  896. Emitter: name,
  897. Message: map[string]interface{}{
  898. "temp": 25.5,
  899. "hum": 65,
  900. "ts": 1541152486013,
  901. },
  902. Timestamp: 1541152486013,
  903. },
  904. {
  905. Emitter: name,
  906. Message: map[string]interface{}{
  907. "temp": 28.1,
  908. "hum": 75,
  909. "ts": 1541152487932,
  910. },
  911. Timestamp: 1541152487932,
  912. },
  913. {
  914. Emitter: name,
  915. Message: map[string]interface{}{
  916. "temp": 27.5,
  917. "hum": 59,
  918. "ts": 1541152486823,
  919. },
  920. Timestamp: 1541152486823,
  921. },
  922. {
  923. Emitter: name,
  924. Message: map[string]interface{}{
  925. "temp": 25.5,
  926. "hum": 62,
  927. "ts": 1541152489252,
  928. },
  929. Timestamp: 1541152489252,
  930. },
  931. {
  932. Emitter: name,
  933. Message: map[string]interface{}{
  934. "temp": 27.4,
  935. "hum": 80,
  936. "ts": 1541152488442,
  937. },
  938. Timestamp: 1541152488442,
  939. },
  940. {
  941. Emitter: name,
  942. Message: map[string]interface{}{
  943. "temp": 26.2,
  944. "hum": 63,
  945. "ts": 1541152490062,
  946. },
  947. Timestamp: 1541152490062,
  948. },
  949. {
  950. Emitter: name,
  951. Message: map[string]interface{}{
  952. "temp": 28.9,
  953. "hum": 85,
  954. "ts": 1541152491682,
  955. },
  956. Timestamp: 1541152491682,
  957. },
  958. {
  959. Emitter: name,
  960. Message: map[string]interface{}{
  961. "temp": 26.8,
  962. "hum": 71,
  963. "ts": 1541152490872,
  964. },
  965. Timestamp: 1541152490872,
  966. },
  967. {
  968. Emitter: name,
  969. Message: map[string]interface{}{
  970. "temp": 29.1,
  971. "hum": 92,
  972. "ts": 1541152492492,
  973. },
  974. Timestamp: 1541152492492,
  975. },
  976. {
  977. Emitter: name,
  978. Message: map[string]interface{}{
  979. "temp": 30.9,
  980. "hum": 87,
  981. "ts": 1541152494112,
  982. },
  983. Timestamp: 1541152494112,
  984. },
  985. {
  986. Emitter: name,
  987. Message: map[string]interface{}{
  988. "temp": 32.2,
  989. "hum": 99,
  990. "ts": 1541152493202,
  991. },
  992. Timestamp: 1541152493202,
  993. },
  994. {
  995. Emitter: name,
  996. Message: map[string]interface{}{
  997. "temp": 32.2,
  998. "hum": 99,
  999. "ts": 1541152499202,
  1000. },
  1001. Timestamp: 1541152499202,
  1002. },
  1003. }
  1004. }
  1005. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, true), map[string]string{
  1006. "DATASOURCE": name,
  1007. })
  1008. }
  1009. func TestEventWindow(t *testing.T) {
  1010. common.IsTesting = true
  1011. var tests = []struct {
  1012. name string
  1013. sql string
  1014. size int
  1015. r [][]map[string]interface{}
  1016. }{
  1017. {
  1018. name: `rule1`,
  1019. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1020. size: 6,
  1021. r: [][]map[string]interface{}{
  1022. {{
  1023. "color": "red",
  1024. "size": float64(3),
  1025. "ts": float64(1541152486013),
  1026. }},
  1027. {{
  1028. "color": "red",
  1029. "size": float64(3),
  1030. "ts": float64(1541152486013),
  1031. },{
  1032. "color": "blue",
  1033. "size": float64(2),
  1034. "ts": float64(1541152487632),
  1035. }},
  1036. {{
  1037. "color": "blue",
  1038. "size": float64(2),
  1039. "ts": float64(1541152487632),
  1040. },{
  1041. "color": "yellow",
  1042. "size": float64(4),
  1043. "ts": float64(1541152488442),
  1044. }},{{
  1045. "color": "yellow",
  1046. "size": float64(4),
  1047. "ts": float64(1541152488442),
  1048. },{
  1049. "color": "red",
  1050. "size": float64(1),
  1051. "ts": float64(1541152489252),
  1052. }},{{
  1053. "color": "red",
  1054. "size": float64(1),
  1055. "ts": float64(1541152489252),
  1056. }},
  1057. },
  1058. }, {
  1059. name: `rule2`,
  1060. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1061. size: 6,
  1062. r: [][]map[string]interface{}{
  1063. {{
  1064. "color": "red",
  1065. "ts": float64(1541152486013),
  1066. }},
  1067. {{
  1068. "color": "yellow",
  1069. "ts": float64(1541152488442),
  1070. }},
  1071. },
  1072. }, {
  1073. name: `rule3`,
  1074. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1075. size: 6,
  1076. r: [][]map[string]interface{}{
  1077. {{
  1078. "color": "red",
  1079. "temp": 25.5,
  1080. "ts": float64(1541152486013),
  1081. }},{{
  1082. "color": "red",
  1083. "temp": 25.5,
  1084. "ts": float64(1541152486013),
  1085. }},{{
  1086. "color": "blue",
  1087. "temp": 28.1,
  1088. "ts": float64(1541152487632),
  1089. }},{{
  1090. "color": "blue",
  1091. "temp": 28.1,
  1092. "ts": float64(1541152487632),
  1093. },{
  1094. "color": "yellow",
  1095. "temp": 27.4,
  1096. "ts": float64(1541152488442),
  1097. }},{{
  1098. "color": "yellow",
  1099. "temp": 27.4,
  1100. "ts": float64(1541152488442),
  1101. },{
  1102. "color": "red",
  1103. "temp": 25.5,
  1104. "ts": float64(1541152489252),
  1105. }},
  1106. },
  1107. }, {
  1108. name: `rule4`,
  1109. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1110. size: 6,
  1111. r: [][]map[string]interface{}{
  1112. {{
  1113. "color": "red",
  1114. }},{{
  1115. "color": "blue",
  1116. },{
  1117. "color": "red",
  1118. }},{{
  1119. "color": "blue",
  1120. },{
  1121. "color": "yellow",
  1122. }},{{
  1123. "color": "blue",
  1124. }, {
  1125. "color": "red",
  1126. },{
  1127. "color": "yellow",
  1128. }},
  1129. },
  1130. },{
  1131. name: `rule5`,
  1132. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1133. size: 12,
  1134. r: [][]map[string]interface{}{
  1135. {{
  1136. "temp": 25.5,
  1137. }},{{
  1138. "temp": 28.1,
  1139. },{
  1140. "temp": 27.4,
  1141. },{
  1142. "temp": 25.5,
  1143. }},{{
  1144. "temp": 26.2,
  1145. },{
  1146. "temp": 26.8,
  1147. },{
  1148. "temp": 28.9,
  1149. },{
  1150. "temp": 29.1,
  1151. },{
  1152. "temp": 32.2,
  1153. }},{{
  1154. "temp": 30.9,
  1155. }},
  1156. },
  1157. },{
  1158. name: `rule6`,
  1159. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1160. size: 6,
  1161. r: [][]map[string]interface{}{
  1162. {{
  1163. "m": 25.5,
  1164. "c": float64(1),
  1165. }},{{
  1166. "m": 25.5,
  1167. "c": float64(1),
  1168. }},{{
  1169. "m": 28.1,
  1170. "c": float64(1),
  1171. }},{{
  1172. "m": 28.1,
  1173. "c": float64(2),
  1174. }},{{
  1175. "m": 27.4,
  1176. "c": float64(2),
  1177. }},
  1178. },
  1179. },
  1180. }
  1181. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1182. createEventStreams(t)
  1183. defer dropEventStreams(t)
  1184. done := make(chan struct{})
  1185. defer close(done)
  1186. common.ResetMockTicker()
  1187. //mock ticker
  1188. realTicker := time.NewTicker(500 * time.Millisecond)
  1189. tickerDone := make(chan bool)
  1190. go func(){
  1191. ticker := common.GetTicker(1000).(*common.MockTicker)
  1192. timer := common.GetTimer(1000).(*common.MockTimer)
  1193. for {
  1194. select {
  1195. case <-tickerDone:
  1196. log.Infof("real ticker exiting...")
  1197. return
  1198. case t := <-realTicker.C:
  1199. ts := common.TimeToUnixMilli(t)
  1200. if ticker != nil {
  1201. go ticker.DoTick(ts)
  1202. }
  1203. if timer != nil {
  1204. go timer.DoTick(ts)
  1205. }
  1206. }
  1207. }
  1208. }()
  1209. for i, tt := range tests {
  1210. p := NewRuleProcessor(DbDir)
  1211. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1212. var sources []*nodes.SourceNode
  1213. if stmt, err := xsql.Language.Parse(parser); err != nil{
  1214. t.Errorf("parse sql %s error: %s", tt.sql , err)
  1215. }else {
  1216. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1217. t.Errorf("sql %s is not a select statement", tt.sql)
  1218. } else {
  1219. streams := xsql.GetStreams(selectStmt)
  1220. for _, stream := range streams{
  1221. source := getEventMockSource(stream, done, tt.size)
  1222. sources = append(sources, source)
  1223. }
  1224. }
  1225. }
  1226. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  1227. Id:tt.name, Sql: tt.sql,
  1228. Options: map[string]interface{}{
  1229. "isEventTime": true,
  1230. "lateTolerance": float64(1000),
  1231. },
  1232. }, sources)
  1233. if err != nil{
  1234. t.Error(err)
  1235. }
  1236. mockSink := test.NewMockSink()
  1237. sink := nodes.NewSinkNode("MockSink", mockSink)
  1238. tp.AddSink(inputs, sink)
  1239. count := len(sources)
  1240. errCh := tp.Open()
  1241. func(){
  1242. for{
  1243. select{
  1244. case err = <- errCh:
  1245. t.Log(err)
  1246. tp.Cancel()
  1247. return
  1248. case <- done:
  1249. count--
  1250. log.Infof("%d sources remaining", count)
  1251. if count <= 0{
  1252. log.Info("stream stopping")
  1253. time.Sleep(1 * time.Second)
  1254. tp.Cancel()
  1255. return
  1256. }
  1257. default:
  1258. }
  1259. }
  1260. }()
  1261. results := mockSink.GetResults()
  1262. var maps [][]map[string]interface{}
  1263. for _, v := range results{
  1264. var mapRes []map[string]interface{}
  1265. err := json.Unmarshal(v, &mapRes)
  1266. if err != nil {
  1267. t.Errorf("Failed to parse the input into map")
  1268. continue
  1269. }
  1270. maps = append(maps, mapRes)
  1271. }
  1272. if !reflect.DeepEqual(tt.r, maps) {
  1273. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1274. }
  1275. }
  1276. realTicker.Stop()
  1277. tickerDone <- true
  1278. close(tickerDone)
  1279. }
  1280. func errstring(err error) string {
  1281. if err != nil {
  1282. return err.Error()
  1283. }
  1284. return ""
  1285. }