xsql_processor_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293
  1. package processors
  2. import (
  3. "encoding/json"
  4. "engine/common"
  5. "engine/xsql"
  6. "engine/xstream/api"
  7. "engine/xstream/nodes"
  8. "engine/xstream/test"
  9. "fmt"
  10. "path"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. "time"
  15. )
  16. var BadgerDir string
  17. func init(){
  18. BadgerDir, err := common.GetAndCreateDataLoc("test")
  19. if err != nil {
  20. log.Panic(err)
  21. }
  22. log.Infof("badge location is %s", BadgerDir)
  23. }
  24. func TestStreamCreateProcessor(t *testing.T) {
  25. var tests = []struct {
  26. s string
  27. r []string
  28. err string
  29. }{
  30. {
  31. s: `SHOW STREAMS;`,
  32. r: []string{"no stream definition found"},
  33. },
  34. {
  35. s: `EXPLAIN STREAM topic1;`,
  36. err: "stream topic1 not found",
  37. },
  38. {
  39. s: `CREATE STREAM topic1 (
  40. USERID BIGINT,
  41. FIRST_NAME STRING,
  42. LAST_NAME STRING,
  43. NICKNAMES ARRAY(STRING),
  44. Gender BOOLEAN,
  45. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  46. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  47. r: []string{"stream topic1 created"},
  48. },
  49. {
  50. s: `CREATE STREAM topic1 (
  51. USERID BIGINT,
  52. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  53. err: "key topic1 already exist, delete it before creating a new one",
  54. },
  55. {
  56. s: `EXPLAIN STREAM topic1;`,
  57. r: []string{"TO BE SUPPORTED"},
  58. },
  59. {
  60. s: `DESCRIBE STREAM topic1;`,
  61. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  62. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  63. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  64. },
  65. {
  66. s: `SHOW STREAMS;`,
  67. r: []string{"topic1"},
  68. },
  69. {
  70. s: `DROP STREAM topic1;`,
  71. r: []string{"stream topic1 dropped"},
  72. },
  73. {
  74. s: `DESCRIBE STREAM topic1;`,
  75. err: "stream topic1 not found",
  76. },
  77. {
  78. s: `DROP STREAM topic1;`,
  79. err: "Key not found",
  80. },
  81. }
  82. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  83. streamDB := path.Join(BadgerDir, "streamTest")
  84. for i, tt := range tests {
  85. results, err := NewStreamProcessor(tt.s, streamDB).Exec()
  86. if !reflect.DeepEqual(tt.err, errstring(err)) {
  87. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  88. } else if tt.err == "" {
  89. if !reflect.DeepEqual(tt.r, results) {
  90. t.Errorf("%d. %q\n\nstmt mismatch:\n\ngot=%#v\n\n", i, tt.s, results)
  91. }
  92. }
  93. }
  94. }
  95. func createStreams(t *testing.T){
  96. demo := `CREATE STREAM demo (
  97. color STRING,
  98. size BIGINT,
  99. ts BIGINT
  100. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  101. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  102. if err != nil{
  103. t.Log(err)
  104. }
  105. demo1 := `CREATE STREAM demo1 (
  106. temp FLOAT,
  107. hum BIGINT,
  108. ts BIGINT
  109. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  110. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  111. if err != nil{
  112. t.Log(err)
  113. }
  114. sessionDemo := `CREATE STREAM sessionDemo (
  115. temp FLOAT,
  116. hum BIGINT,
  117. ts BIGINT
  118. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  119. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  120. if err != nil{
  121. t.Log(err)
  122. }
  123. }
  124. func dropStreams(t *testing.T){
  125. demo := `DROP STREAM demo`
  126. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  127. if err != nil{
  128. t.Log(err)
  129. }
  130. demo1 := `DROP STREAM demo1`
  131. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  132. if err != nil{
  133. t.Log(err)
  134. }
  135. sessionDemo := `DROP STREAM sessionDemo`
  136. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  137. if err != nil{
  138. t.Log(err)
  139. }
  140. }
  141. func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  142. var data []*xsql.Tuple
  143. switch name{
  144. case "demo":
  145. data = []*xsql.Tuple{
  146. {
  147. Emitter: name,
  148. Message: map[string]interface{}{
  149. "color": "red",
  150. "size": 3,
  151. "ts": 1541152486013,
  152. },
  153. Timestamp: 1541152486013,
  154. },
  155. {
  156. Emitter: name,
  157. Message: map[string]interface{}{
  158. "color": "blue",
  159. "size": 6,
  160. "ts": 1541152486822,
  161. },
  162. Timestamp: 1541152486822,
  163. },
  164. {
  165. Emitter: name,
  166. Message: map[string]interface{}{
  167. "color": "blue",
  168. "size": 2,
  169. "ts": 1541152487632,
  170. },
  171. Timestamp: 1541152487632,
  172. },
  173. {
  174. Emitter: name,
  175. Message: map[string]interface{}{
  176. "color": "yellow",
  177. "size": 4,
  178. "ts": 1541152488442,
  179. },
  180. Timestamp: 1541152488442,
  181. },
  182. {
  183. Emitter: name,
  184. Message: map[string]interface{}{
  185. "color": "red",
  186. "size": 1,
  187. "ts": 1541152489252,
  188. },
  189. Timestamp: 1541152489252,
  190. },
  191. }
  192. case "demo1":
  193. data = []*xsql.Tuple{
  194. {
  195. Emitter: name,
  196. Message: map[string]interface{}{
  197. "temp": 25.5,
  198. "hum": 65,
  199. "ts": 1541152486013,
  200. },
  201. Timestamp: 1541152486013,
  202. },
  203. {
  204. Emitter: name,
  205. Message: map[string]interface{}{
  206. "temp": 27.5,
  207. "hum": 59,
  208. "ts": 1541152486823,
  209. },
  210. Timestamp: 1541152486823,
  211. },
  212. {
  213. Emitter: name,
  214. Message: map[string]interface{}{
  215. "temp": 28.1,
  216. "hum": 75,
  217. "ts": 1541152487632,
  218. },
  219. Timestamp: 1541152487632,
  220. },
  221. {
  222. Emitter: name,
  223. Message: map[string]interface{}{
  224. "temp": 27.4,
  225. "hum": 80,
  226. "ts": 1541152488442,
  227. },
  228. Timestamp: 1541152488442,
  229. },
  230. {
  231. Emitter: name,
  232. Message: map[string]interface{}{
  233. "temp": 25.5,
  234. "hum": 62,
  235. "ts": 1541152489252,
  236. },
  237. Timestamp: 1541152489252,
  238. },
  239. }
  240. case "sessionDemo":
  241. data = []*xsql.Tuple{
  242. {
  243. Emitter: name,
  244. Message: map[string]interface{}{
  245. "temp": 25.5,
  246. "hum": 65,
  247. "ts": 1541152486013,
  248. },
  249. Timestamp: 1541152486013,
  250. },
  251. {
  252. Emitter: name,
  253. Message: map[string]interface{}{
  254. "temp": 27.5,
  255. "hum": 59,
  256. "ts": 1541152486823,
  257. },
  258. Timestamp: 1541152486823,
  259. },
  260. {
  261. Emitter: name,
  262. Message: map[string]interface{}{
  263. "temp": 28.1,
  264. "hum": 75,
  265. "ts": 1541152487932,
  266. },
  267. Timestamp: 1541152487932,
  268. },
  269. {
  270. Emitter: name,
  271. Message: map[string]interface{}{
  272. "temp": 27.4,
  273. "hum": 80,
  274. "ts": 1541152488442,
  275. },
  276. Timestamp: 1541152488442,
  277. },
  278. {
  279. Emitter: name,
  280. Message: map[string]interface{}{
  281. "temp": 25.5,
  282. "hum": 62,
  283. "ts": 1541152489252,
  284. },
  285. Timestamp: 1541152489252,
  286. },
  287. {
  288. Emitter: name,
  289. Message: map[string]interface{}{
  290. "temp": 26.2,
  291. "hum": 63,
  292. "ts": 1541152490062,
  293. },
  294. Timestamp: 1541152490062,
  295. },
  296. {
  297. Emitter: name,
  298. Message: map[string]interface{}{
  299. "temp": 26.8,
  300. "hum": 71,
  301. "ts": 1541152490872,
  302. },
  303. Timestamp: 1541152490872,
  304. },
  305. {
  306. Emitter: name,
  307. Message: map[string]interface{}{
  308. "temp": 28.9,
  309. "hum": 85,
  310. "ts": 1541152491682,
  311. },
  312. Timestamp: 1541152491682,
  313. },
  314. {
  315. Emitter: name,
  316. Message: map[string]interface{}{
  317. "temp": 29.1,
  318. "hum": 92,
  319. "ts": 1541152492492,
  320. },
  321. Timestamp: 1541152492492,
  322. },
  323. {
  324. Emitter: name,
  325. Message: map[string]interface{}{
  326. "temp": 32.2,
  327. "hum": 99,
  328. "ts": 1541152493202,
  329. },
  330. Timestamp: 1541152493202,
  331. },
  332. {
  333. Emitter: name,
  334. Message: map[string]interface{}{
  335. "temp": 30.9,
  336. "hum": 87,
  337. "ts": 1541152494112,
  338. },
  339. Timestamp: 1541152494112,
  340. },
  341. }
  342. }
  343. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, false))
  344. }
  345. func TestSingleSQL(t *testing.T) {
  346. var tests = []struct {
  347. name string
  348. sql string
  349. r [][]map[string]interface{}
  350. }{
  351. {
  352. name: `rule1`,
  353. sql: `SELECT * FROM demo`,
  354. r: [][]map[string]interface{}{
  355. {{
  356. "color": "red",
  357. "size": float64(3),
  358. "ts": float64(1541152486013),
  359. }},
  360. {{
  361. "color": "blue",
  362. "size": float64(6),
  363. "ts": float64(1541152486822),
  364. }},
  365. {{
  366. "color": "blue",
  367. "size": float64(2),
  368. "ts": float64(1541152487632),
  369. }},
  370. {{
  371. "color": "yellow",
  372. "size": float64(4),
  373. "ts": float64(1541152488442),
  374. }},
  375. {{
  376. "color": "red",
  377. "size": float64(1),
  378. "ts": float64(1541152489252),
  379. }},
  380. },
  381. }, {
  382. name: `rule2`,
  383. sql: `SELECT color, ts FROM demo where size > 3`,
  384. r: [][]map[string]interface{}{
  385. {{
  386. "color": "blue",
  387. "ts": float64(1541152486822),
  388. }},
  389. {{
  390. "color": "yellow",
  391. "ts": float64(1541152488442),
  392. }},
  393. },
  394. },
  395. }
  396. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  397. createStreams(t)
  398. defer dropStreams(t)
  399. done := make(chan struct{})
  400. defer close(done)
  401. for i, tt := range tests {
  402. p := NewRuleProcessor(BadgerDir)
  403. parser := xsql.NewParser(strings.NewReader(tt.sql))
  404. var sources []*nodes.SourceNode
  405. if stmt, err := xsql.Language.Parse(parser); err != nil{
  406. t.Errorf("parse sql %s error: %s", tt.sql , err)
  407. }else {
  408. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  409. t.Errorf("sql %s is not a select statement", tt.sql)
  410. } else {
  411. streams := xsql.GetStreams(selectStmt)
  412. for _, stream := range streams{
  413. source := getMockSource(stream, done, 5)
  414. sources = append(sources, source)
  415. }
  416. }
  417. }
  418. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  419. if err != nil{
  420. t.Error(err)
  421. }
  422. sink := test.NewMockSink("mockSink", tt.name)
  423. tp.AddSink(inputs, sink)
  424. count := len(sources)
  425. errCh := tp.Open()
  426. func(){
  427. for{
  428. select{
  429. case err = <- errCh:
  430. t.Log(err)
  431. tp.Cancel()
  432. return
  433. case <- done:
  434. count--
  435. log.Infof("%d sources remaining", count)
  436. if count <= 0{
  437. log.Info("stream stopping")
  438. time.Sleep(1 * time.Second)
  439. tp.Cancel()
  440. return
  441. }
  442. default:
  443. }
  444. }
  445. }()
  446. results := sink.GetResults()
  447. var maps [][]map[string]interface{}
  448. for _, v := range results{
  449. var mapRes []map[string]interface{}
  450. err := json.Unmarshal(v, &mapRes)
  451. if err != nil {
  452. t.Errorf("Failed to parse the input into map")
  453. continue
  454. }
  455. maps = append(maps, mapRes)
  456. }
  457. if !reflect.DeepEqual(tt.r, maps) {
  458. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  459. }
  460. }
  461. }
  462. func TestWindow(t *testing.T) {
  463. common.IsTesting = true
  464. var tests = []struct {
  465. name string
  466. sql string
  467. size int
  468. r [][]map[string]interface{}
  469. }{
  470. {
  471. name: `rule1`,
  472. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  473. size: 5,
  474. r: [][]map[string]interface{}{
  475. {{
  476. "color": "red",
  477. "size": float64(3),
  478. "ts": float64(1541152486013),
  479. },{
  480. "color": "blue",
  481. "size": float64(6),
  482. "ts": float64(1541152486822),
  483. }},
  484. {{
  485. "color": "red",
  486. "size": float64(3),
  487. "ts": float64(1541152486013),
  488. },{
  489. "color": "blue",
  490. "size": float64(6),
  491. "ts": float64(1541152486822),
  492. },{
  493. "color": "blue",
  494. "size": float64(2),
  495. "ts": float64(1541152487632),
  496. }},
  497. {{
  498. "color": "blue",
  499. "size": float64(2),
  500. "ts": float64(1541152487632),
  501. },{
  502. "color": "yellow",
  503. "size": float64(4),
  504. "ts": float64(1541152488442),
  505. }},
  506. },
  507. }, {
  508. name: `rule2`,
  509. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  510. size: 5,
  511. r: [][]map[string]interface{}{
  512. {{
  513. "color": "red",
  514. "ts": float64(1541152486013),
  515. },{
  516. "color": "blue",
  517. "ts": float64(1541152486822),
  518. }},
  519. {{
  520. "color": "yellow",
  521. "ts": float64(1541152488442),
  522. }},
  523. },
  524. }, {
  525. name: `rule3`,
  526. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  527. size: 5,
  528. r: [][]map[string]interface{}{
  529. {{
  530. "color": "red",
  531. "temp": 25.5,
  532. "ts": float64(1541152486013),
  533. }},{{
  534. "color": "red",
  535. "temp": 25.5,
  536. "ts": float64(1541152486013),
  537. }},{{
  538. "color": "red",
  539. "temp": 25.5,
  540. "ts": float64(1541152486013),
  541. }},{{
  542. "color": "blue",
  543. "temp": 28.1,
  544. "ts": float64(1541152487632),
  545. }},{{
  546. "color": "blue",
  547. "temp": 28.1,
  548. "ts": float64(1541152487632),
  549. }},{{
  550. "color": "blue",
  551. "temp": 28.1,
  552. "ts": float64(1541152487632),
  553. },{
  554. "color": "yellow",
  555. "temp": 27.4,
  556. "ts": float64(1541152488442),
  557. }},{{
  558. "color": "yellow",
  559. "temp": 27.4,
  560. "ts": float64(1541152488442),
  561. }},{{
  562. "color": "yellow",
  563. "temp": 27.4,
  564. "ts": float64(1541152488442),
  565. },{
  566. "color": "red",
  567. "temp": 25.5,
  568. "ts": float64(1541152489252),
  569. }},
  570. },
  571. }, {
  572. name: `rule4`,
  573. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  574. size: 5,
  575. r: [][]map[string]interface{}{
  576. {{
  577. "color": "red",
  578. }},{{
  579. "color": "blue",
  580. },{
  581. "color": "red",
  582. }},{{
  583. "color": "blue",
  584. },{
  585. "color": "red",
  586. }},{{
  587. "color": "blue",
  588. },{
  589. "color": "yellow",
  590. }},{{
  591. "color": "blue",
  592. }, {
  593. "color": "red",
  594. },{
  595. "color": "yellow",
  596. }},
  597. },
  598. },{
  599. name: `rule5`,
  600. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  601. size: 11,
  602. r: [][]map[string]interface{}{
  603. {{
  604. "temp": 25.5,
  605. },{
  606. "temp": 27.5,
  607. }},{{
  608. "temp": 28.1,
  609. },{
  610. "temp": 27.4,
  611. },{
  612. "temp": 25.5,
  613. }},{{
  614. "temp": 26.2,
  615. },{
  616. "temp": 26.8,
  617. },{
  618. "temp": 28.9,
  619. },{
  620. "temp": 29.1,
  621. },{
  622. "temp": 32.2,
  623. }},
  624. },
  625. },{
  626. name: `rule6`,
  627. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  628. size: 5,
  629. r: [][]map[string]interface{}{
  630. {{
  631. "m": 25.5,
  632. "c": float64(1),
  633. }},{{
  634. "m": 25.5,
  635. "c": float64(1),
  636. }},{{
  637. "m": 25.5,
  638. "c": float64(1),
  639. }},{{
  640. "m": 28.1,
  641. "c": float64(1),
  642. }},{{
  643. "m": 28.1,
  644. "c": float64(1),
  645. }},{{
  646. "m": 28.1,
  647. "c": float64(2),
  648. }},{{
  649. "m": 27.4,
  650. "c": float64(1),
  651. }},{{
  652. "m": 27.4,
  653. "c": float64(2),
  654. }},
  655. },
  656. },
  657. }
  658. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  659. createStreams(t)
  660. defer dropStreams(t)
  661. done := make(chan struct{})
  662. defer close(done)
  663. common.ResetMockTicker()
  664. for i, tt := range tests {
  665. p := NewRuleProcessor(BadgerDir)
  666. parser := xsql.NewParser(strings.NewReader(tt.sql))
  667. var sources []*nodes.SourceNode
  668. if stmt, err := xsql.Language.Parse(parser); err != nil{
  669. t.Errorf("parse sql %s error: %s", tt.sql , err)
  670. }else {
  671. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  672. t.Errorf("sql %s is not a select statement", tt.sql)
  673. } else {
  674. streams := xsql.GetStreams(selectStmt)
  675. for _, stream := range streams{
  676. source := getMockSource(stream, done, tt.size)
  677. sources = append(sources, source)
  678. }
  679. }
  680. }
  681. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  682. if err != nil{
  683. t.Error(err)
  684. }
  685. sink := test.NewMockSink("mockSink", tt.name)
  686. tp.AddSink(inputs, sink)
  687. count := len(sources)
  688. errCh := tp.Open()
  689. func(){
  690. for{
  691. select{
  692. case err = <- errCh:
  693. t.Log(err)
  694. tp.Cancel()
  695. return
  696. case <- done:
  697. count--
  698. log.Infof("%d sources remaining", count)
  699. if count <= 0{
  700. log.Info("stream stopping")
  701. time.Sleep(1 * time.Second)
  702. tp.Cancel()
  703. return
  704. }
  705. default:
  706. }
  707. }
  708. }()
  709. results := sink.GetResults()
  710. var maps [][]map[string]interface{}
  711. for _, v := range results{
  712. var mapRes []map[string]interface{}
  713. err := json.Unmarshal(v, &mapRes)
  714. if err != nil {
  715. t.Errorf("Failed to parse the input into map")
  716. continue
  717. }
  718. maps = append(maps, mapRes)
  719. }
  720. if !reflect.DeepEqual(tt.r, maps) {
  721. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  722. }
  723. }
  724. }
  725. func createEventStreams(t *testing.T){
  726. demo := `CREATE STREAM demoE (
  727. color STRING,
  728. size BIGINT,
  729. ts BIGINT
  730. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  731. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  732. if err != nil{
  733. t.Log(err)
  734. }
  735. demo1 := `CREATE STREAM demo1E (
  736. temp FLOAT,
  737. hum BIGINT,
  738. ts BIGINT
  739. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  740. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  741. if err != nil{
  742. t.Log(err)
  743. }
  744. sessionDemo := `CREATE STREAM sessionDemoE (
  745. temp FLOAT,
  746. hum BIGINT,
  747. ts BIGINT
  748. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  749. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  750. if err != nil{
  751. t.Log(err)
  752. }
  753. }
  754. func dropEventStreams(t *testing.T){
  755. demo := `DROP STREAM demoE`
  756. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  757. if err != nil{
  758. t.Log(err)
  759. }
  760. demo1 := `DROP STREAM demo1E`
  761. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  762. if err != nil{
  763. t.Log(err)
  764. }
  765. sessionDemo := `DROP STREAM sessionDemoE`
  766. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  767. if err != nil{
  768. t.Log(err)
  769. }
  770. }
  771. func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
  772. var data []*xsql.Tuple
  773. switch name{
  774. case "demoE":
  775. data = []*xsql.Tuple{
  776. {
  777. Emitter: name,
  778. Message: map[string]interface{}{
  779. "color": "red",
  780. "size": 3,
  781. "ts": 1541152486013,
  782. },
  783. Timestamp: 1541152486013,
  784. },
  785. {
  786. Emitter: name,
  787. Message: map[string]interface{}{
  788. "color": "blue",
  789. "size": 2,
  790. "ts": 1541152487632,
  791. },
  792. Timestamp: 1541152487632,
  793. },
  794. {
  795. Emitter: name,
  796. Message: map[string]interface{}{
  797. "color": "red",
  798. "size": 1,
  799. "ts": 1541152489252,
  800. },
  801. Timestamp: 1541152489252,
  802. },
  803. { //dropped item
  804. Emitter: name,
  805. Message: map[string]interface{}{
  806. "color": "blue",
  807. "size": 6,
  808. "ts": 1541152486822,
  809. },
  810. Timestamp: 1541152486822,
  811. },
  812. {
  813. Emitter: name,
  814. Message: map[string]interface{}{
  815. "color": "yellow",
  816. "size": 4,
  817. "ts": 1541152488442,
  818. },
  819. Timestamp: 1541152488442,
  820. },
  821. { //To lift the watermark and issue all windows
  822. Emitter: name,
  823. Message: map[string]interface{}{
  824. "color": "yellow",
  825. "size": 4,
  826. "ts": 1541152492342,
  827. },
  828. Timestamp: 1541152488442,
  829. },
  830. }
  831. case "demo1E":
  832. data = []*xsql.Tuple{
  833. {
  834. Emitter: name,
  835. Message: map[string]interface{}{
  836. "temp": 27.5,
  837. "hum": 59,
  838. "ts": 1541152486823,
  839. },
  840. Timestamp: 1541152486823,
  841. },
  842. {
  843. Emitter: name,
  844. Message: map[string]interface{}{
  845. "temp": 25.5,
  846. "hum": 65,
  847. "ts": 1541152486013,
  848. },
  849. Timestamp: 1541152486013,
  850. },
  851. {
  852. Emitter: name,
  853. Message: map[string]interface{}{
  854. "temp": 27.4,
  855. "hum": 80,
  856. "ts": 1541152488442,
  857. },
  858. Timestamp: 1541152488442,
  859. },
  860. {
  861. Emitter: name,
  862. Message: map[string]interface{}{
  863. "temp": 28.1,
  864. "hum": 75,
  865. "ts": 1541152487632,
  866. },
  867. Timestamp: 1541152487632,
  868. },
  869. {
  870. Emitter: name,
  871. Message: map[string]interface{}{
  872. "temp": 25.5,
  873. "hum": 62,
  874. "ts": 1541152489252,
  875. },
  876. Timestamp: 1541152489252,
  877. },
  878. {
  879. Emitter: name,
  880. Message: map[string]interface{}{
  881. "temp": 25.5,
  882. "hum": 62,
  883. "ts": 1541152499252,
  884. },
  885. Timestamp: 1541152499252,
  886. },
  887. }
  888. case "sessionDemoE":
  889. data = []*xsql.Tuple{
  890. {
  891. Emitter: name,
  892. Message: map[string]interface{}{
  893. "temp": 25.5,
  894. "hum": 65,
  895. "ts": 1541152486013,
  896. },
  897. Timestamp: 1541152486013,
  898. },
  899. {
  900. Emitter: name,
  901. Message: map[string]interface{}{
  902. "temp": 28.1,
  903. "hum": 75,
  904. "ts": 1541152487932,
  905. },
  906. Timestamp: 1541152487932,
  907. },
  908. {
  909. Emitter: name,
  910. Message: map[string]interface{}{
  911. "temp": 27.5,
  912. "hum": 59,
  913. "ts": 1541152486823,
  914. },
  915. Timestamp: 1541152486823,
  916. },
  917. {
  918. Emitter: name,
  919. Message: map[string]interface{}{
  920. "temp": 25.5,
  921. "hum": 62,
  922. "ts": 1541152489252,
  923. },
  924. Timestamp: 1541152489252,
  925. },
  926. {
  927. Emitter: name,
  928. Message: map[string]interface{}{
  929. "temp": 27.4,
  930. "hum": 80,
  931. "ts": 1541152488442,
  932. },
  933. Timestamp: 1541152488442,
  934. },
  935. {
  936. Emitter: name,
  937. Message: map[string]interface{}{
  938. "temp": 26.2,
  939. "hum": 63,
  940. "ts": 1541152490062,
  941. },
  942. Timestamp: 1541152490062,
  943. },
  944. {
  945. Emitter: name,
  946. Message: map[string]interface{}{
  947. "temp": 28.9,
  948. "hum": 85,
  949. "ts": 1541152491682,
  950. },
  951. Timestamp: 1541152491682,
  952. },
  953. {
  954. Emitter: name,
  955. Message: map[string]interface{}{
  956. "temp": 26.8,
  957. "hum": 71,
  958. "ts": 1541152490872,
  959. },
  960. Timestamp: 1541152490872,
  961. },
  962. {
  963. Emitter: name,
  964. Message: map[string]interface{}{
  965. "temp": 29.1,
  966. "hum": 92,
  967. "ts": 1541152492492,
  968. },
  969. Timestamp: 1541152492492,
  970. },
  971. {
  972. Emitter: name,
  973. Message: map[string]interface{}{
  974. "temp": 30.9,
  975. "hum": 87,
  976. "ts": 1541152494112,
  977. },
  978. Timestamp: 1541152494112,
  979. },
  980. {
  981. Emitter: name,
  982. Message: map[string]interface{}{
  983. "temp": 32.2,
  984. "hum": 99,
  985. "ts": 1541152493202,
  986. },
  987. Timestamp: 1541152493202,
  988. },
  989. {
  990. Emitter: name,
  991. Message: map[string]interface{}{
  992. "temp": 32.2,
  993. "hum": 99,
  994. "ts": 1541152499202,
  995. },
  996. Timestamp: 1541152499202,
  997. },
  998. }
  999. }
  1000. return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, true))
  1001. }
  1002. func TestEventWindow(t *testing.T) {
  1003. common.IsTesting = true
  1004. var tests = []struct {
  1005. name string
  1006. sql string
  1007. size int
  1008. r [][]map[string]interface{}
  1009. }{
  1010. {
  1011. name: `rule1`,
  1012. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1013. size: 6,
  1014. r: [][]map[string]interface{}{
  1015. {{
  1016. "color": "red",
  1017. "size": float64(3),
  1018. "ts": float64(1541152486013),
  1019. }},
  1020. {{
  1021. "color": "red",
  1022. "size": float64(3),
  1023. "ts": float64(1541152486013),
  1024. },{
  1025. "color": "blue",
  1026. "size": float64(2),
  1027. "ts": float64(1541152487632),
  1028. }},
  1029. {{
  1030. "color": "blue",
  1031. "size": float64(2),
  1032. "ts": float64(1541152487632),
  1033. },{
  1034. "color": "yellow",
  1035. "size": float64(4),
  1036. "ts": float64(1541152488442),
  1037. }},{{
  1038. "color": "yellow",
  1039. "size": float64(4),
  1040. "ts": float64(1541152488442),
  1041. },{
  1042. "color": "red",
  1043. "size": float64(1),
  1044. "ts": float64(1541152489252),
  1045. }},{{
  1046. "color": "red",
  1047. "size": float64(1),
  1048. "ts": float64(1541152489252),
  1049. }},
  1050. },
  1051. }, {
  1052. name: `rule2`,
  1053. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1054. size: 6,
  1055. r: [][]map[string]interface{}{
  1056. {{
  1057. "color": "red",
  1058. "ts": float64(1541152486013),
  1059. }},
  1060. {{
  1061. "color": "yellow",
  1062. "ts": float64(1541152488442),
  1063. }},
  1064. },
  1065. }, {
  1066. name: `rule3`,
  1067. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1068. size: 6,
  1069. r: [][]map[string]interface{}{
  1070. {{
  1071. "color": "red",
  1072. "temp": 25.5,
  1073. "ts": float64(1541152486013),
  1074. }},{{
  1075. "color": "red",
  1076. "temp": 25.5,
  1077. "ts": float64(1541152486013),
  1078. }},{{
  1079. "color": "blue",
  1080. "temp": 28.1,
  1081. "ts": float64(1541152487632),
  1082. }},{{
  1083. "color": "blue",
  1084. "temp": 28.1,
  1085. "ts": float64(1541152487632),
  1086. },{
  1087. "color": "yellow",
  1088. "temp": 27.4,
  1089. "ts": float64(1541152488442),
  1090. }},{{
  1091. "color": "yellow",
  1092. "temp": 27.4,
  1093. "ts": float64(1541152488442),
  1094. },{
  1095. "color": "red",
  1096. "temp": 25.5,
  1097. "ts": float64(1541152489252),
  1098. }},
  1099. },
  1100. }, {
  1101. name: `rule4`,
  1102. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1103. size: 6,
  1104. r: [][]map[string]interface{}{
  1105. {{
  1106. "color": "red",
  1107. }},{{
  1108. "color": "blue",
  1109. },{
  1110. "color": "red",
  1111. }},{{
  1112. "color": "blue",
  1113. },{
  1114. "color": "yellow",
  1115. }},{{
  1116. "color": "blue",
  1117. }, {
  1118. "color": "red",
  1119. },{
  1120. "color": "yellow",
  1121. }},
  1122. },
  1123. },{
  1124. name: `rule5`,
  1125. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1126. size: 12,
  1127. r: [][]map[string]interface{}{
  1128. {{
  1129. "temp": 25.5,
  1130. }},{{
  1131. "temp": 28.1,
  1132. },{
  1133. "temp": 27.4,
  1134. },{
  1135. "temp": 25.5,
  1136. }},{{
  1137. "temp": 26.2,
  1138. },{
  1139. "temp": 26.8,
  1140. },{
  1141. "temp": 28.9,
  1142. },{
  1143. "temp": 29.1,
  1144. },{
  1145. "temp": 32.2,
  1146. }},{{
  1147. "temp": 30.9,
  1148. }},
  1149. },
  1150. },{
  1151. name: `rule6`,
  1152. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1153. size: 6,
  1154. r: [][]map[string]interface{}{
  1155. {{
  1156. "m": 25.5,
  1157. "c": float64(1),
  1158. }},{{
  1159. "m": 25.5,
  1160. "c": float64(1),
  1161. }},{{
  1162. "m": 28.1,
  1163. "c": float64(1),
  1164. }},{{
  1165. "m": 28.1,
  1166. "c": float64(2),
  1167. }},{{
  1168. "m": 27.4,
  1169. "c": float64(2),
  1170. }},
  1171. },
  1172. },
  1173. }
  1174. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1175. createEventStreams(t)
  1176. defer dropEventStreams(t)
  1177. done := make(chan struct{})
  1178. defer close(done)
  1179. common.ResetMockTicker()
  1180. //mock ticker
  1181. realTicker := time.NewTicker(500 * time.Millisecond)
  1182. tickerDone := make(chan bool)
  1183. go func(){
  1184. ticker := common.GetTicker(1000).(*common.MockTicker)
  1185. timer := common.GetTimer(1000).(*common.MockTimer)
  1186. for {
  1187. select {
  1188. case <-tickerDone:
  1189. log.Infof("real ticker exiting...")
  1190. return
  1191. case t := <-realTicker.C:
  1192. ts := common.TimeToUnixMilli(t)
  1193. if ticker != nil {
  1194. go ticker.DoTick(ts)
  1195. }
  1196. if timer != nil {
  1197. go timer.DoTick(ts)
  1198. }
  1199. }
  1200. }
  1201. }()
  1202. for i, tt := range tests {
  1203. p := NewRuleProcessor(BadgerDir)
  1204. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1205. var sources []*nodes.SourceNode
  1206. if stmt, err := xsql.Language.Parse(parser); err != nil{
  1207. t.Errorf("parse sql %s error: %s", tt.sql , err)
  1208. }else {
  1209. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1210. t.Errorf("sql %s is not a select statement", tt.sql)
  1211. } else {
  1212. streams := xsql.GetStreams(selectStmt)
  1213. for _, stream := range streams{
  1214. source := getEventMockSource(stream, done, tt.size)
  1215. sources = append(sources, source)
  1216. }
  1217. }
  1218. }
  1219. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  1220. Id:tt.name, Sql: tt.sql,
  1221. Options: map[string]interface{}{
  1222. "isEventTime": true,
  1223. "lateTolerance": float64(1000),
  1224. },
  1225. }, sources)
  1226. if err != nil{
  1227. t.Error(err)
  1228. }
  1229. sink := test.NewMockSink("mockSink", tt.name)
  1230. tp.AddSink(inputs, sink)
  1231. count := len(sources)
  1232. errCh := tp.Open()
  1233. func(){
  1234. for{
  1235. select{
  1236. case err = <- errCh:
  1237. t.Log(err)
  1238. tp.Cancel()
  1239. return
  1240. case <- done:
  1241. count--
  1242. log.Infof("%d sources remaining", count)
  1243. if count <= 0{
  1244. log.Info("stream stopping")
  1245. time.Sleep(1 * time.Second)
  1246. tp.Cancel()
  1247. return
  1248. }
  1249. default:
  1250. }
  1251. }
  1252. }()
  1253. results := sink.GetResults()
  1254. var maps [][]map[string]interface{}
  1255. for _, v := range results{
  1256. var mapRes []map[string]interface{}
  1257. err := json.Unmarshal(v, &mapRes)
  1258. if err != nil {
  1259. t.Errorf("Failed to parse the input into map")
  1260. continue
  1261. }
  1262. maps = append(maps, mapRes)
  1263. }
  1264. if !reflect.DeepEqual(tt.r, maps) {
  1265. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1266. }
  1267. }
  1268. realTicker.Stop()
  1269. tickerDone <- true
  1270. close(tickerDone)
  1271. }
  1272. func errstring(err error) string {
  1273. if err != nil {
  1274. return err.Error()
  1275. }
  1276. return ""
  1277. }