xsql_processor_test.go 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292
  1. package processors
  2. import (
  3. "encoding/json"
  4. "engine/common"
  5. "engine/xsql"
  6. "engine/xstream"
  7. "engine/xstream/test"
  8. "fmt"
  9. "path"
  10. "reflect"
  11. "strings"
  12. "testing"
  13. "time"
  14. )
  15. var BadgerDir string
  16. func init(){
  17. BadgerDir, err := common.GetAndCreateDataLoc("test")
  18. if err != nil {
  19. log.Panic(err)
  20. }
  21. log.Infof("badge location is %s", BadgerDir)
  22. }
  23. func TestStreamCreateProcessor(t *testing.T) {
  24. var tests = []struct {
  25. s string
  26. r []string
  27. err string
  28. }{
  29. {
  30. s: `SHOW STREAMS;`,
  31. r: []string{"no stream definition found"},
  32. },
  33. {
  34. s: `EXPLAIN STREAM topic1;`,
  35. err: "stream topic1 not found",
  36. },
  37. {
  38. s: `CREATE STREAM topic1 (
  39. USERID BIGINT,
  40. FIRST_NAME STRING,
  41. LAST_NAME STRING,
  42. NICKNAMES ARRAY(STRING),
  43. Gender BOOLEAN,
  44. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  45. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  46. r: []string{"stream topic1 created"},
  47. },
  48. {
  49. s: `CREATE STREAM topic1 (
  50. USERID BIGINT,
  51. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  52. err: "key topic1 already exist, delete it before creating a new one",
  53. },
  54. {
  55. s: `EXPLAIN STREAM topic1;`,
  56. r: []string{"TO BE SUPPORTED"},
  57. },
  58. {
  59. s: `DESCRIBE STREAM topic1;`,
  60. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  61. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  62. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  63. },
  64. {
  65. s: `SHOW STREAMS;`,
  66. r: []string{"topic1"},
  67. },
  68. {
  69. s: `DROP STREAM topic1;`,
  70. r: []string{"stream topic1 dropped"},
  71. },
  72. {
  73. s: `DESCRIBE STREAM topic1;`,
  74. err: "stream topic1 not found",
  75. },
  76. {
  77. s: `DROP STREAM topic1;`,
  78. err: "Key not found",
  79. },
  80. }
  81. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  82. streamDB := path.Join(BadgerDir, "streamTest")
  83. for i, tt := range tests {
  84. results, err := NewStreamProcessor(tt.s, streamDB).Exec()
  85. if !reflect.DeepEqual(tt.err, errstring(err)) {
  86. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  87. } else if tt.err == "" {
  88. if !reflect.DeepEqual(tt.r, results) {
  89. t.Errorf("%d. %q\n\nstmt mismatch:\n\ngot=%#v\n\n", i, tt.s, results)
  90. }
  91. }
  92. }
  93. }
  94. func createStreams(t *testing.T){
  95. demo := `CREATE STREAM demo (
  96. color STRING,
  97. size BIGINT,
  98. ts BIGINT
  99. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  100. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  101. if err != nil{
  102. t.Log(err)
  103. }
  104. demo1 := `CREATE STREAM demo1 (
  105. temp FLOAT,
  106. hum BIGINT,
  107. ts BIGINT
  108. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  109. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  110. if err != nil{
  111. t.Log(err)
  112. }
  113. sessionDemo := `CREATE STREAM sessionDemo (
  114. temp FLOAT,
  115. hum BIGINT,
  116. ts BIGINT
  117. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  118. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  119. if err != nil{
  120. t.Log(err)
  121. }
  122. }
  123. func dropStreams(t *testing.T){
  124. demo := `DROP STREAM demo`
  125. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  126. if err != nil{
  127. t.Log(err)
  128. }
  129. demo1 := `DROP STREAM demo1`
  130. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  131. if err != nil{
  132. t.Log(err)
  133. }
  134. sessionDemo := `DROP STREAM sessionDemo`
  135. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  136. if err != nil{
  137. t.Log(err)
  138. }
  139. }
  140. func getMockSource(name string, done chan<- struct{}, size int) *test.MockSource{
  141. var data []*xsql.Tuple
  142. switch name{
  143. case "demo":
  144. data = []*xsql.Tuple{
  145. {
  146. Emitter: name,
  147. Message: map[string]interface{}{
  148. "color": "red",
  149. "size": 3,
  150. "ts": 1541152486013,
  151. },
  152. Timestamp: 1541152486013,
  153. },
  154. {
  155. Emitter: name,
  156. Message: map[string]interface{}{
  157. "color": "blue",
  158. "size": 6,
  159. "ts": 1541152486822,
  160. },
  161. Timestamp: 1541152486822,
  162. },
  163. {
  164. Emitter: name,
  165. Message: map[string]interface{}{
  166. "color": "blue",
  167. "size": 2,
  168. "ts": 1541152487632,
  169. },
  170. Timestamp: 1541152487632,
  171. },
  172. {
  173. Emitter: name,
  174. Message: map[string]interface{}{
  175. "color": "yellow",
  176. "size": 4,
  177. "ts": 1541152488442,
  178. },
  179. Timestamp: 1541152488442,
  180. },
  181. {
  182. Emitter: name,
  183. Message: map[string]interface{}{
  184. "color": "red",
  185. "size": 1,
  186. "ts": 1541152489252,
  187. },
  188. Timestamp: 1541152489252,
  189. },
  190. }
  191. case "demo1":
  192. data = []*xsql.Tuple{
  193. {
  194. Emitter: name,
  195. Message: map[string]interface{}{
  196. "temp": 25.5,
  197. "hum": 65,
  198. "ts": 1541152486013,
  199. },
  200. Timestamp: 1541152486013,
  201. },
  202. {
  203. Emitter: name,
  204. Message: map[string]interface{}{
  205. "temp": 27.5,
  206. "hum": 59,
  207. "ts": 1541152486823,
  208. },
  209. Timestamp: 1541152486823,
  210. },
  211. {
  212. Emitter: name,
  213. Message: map[string]interface{}{
  214. "temp": 28.1,
  215. "hum": 75,
  216. "ts": 1541152487632,
  217. },
  218. Timestamp: 1541152487632,
  219. },
  220. {
  221. Emitter: name,
  222. Message: map[string]interface{}{
  223. "temp": 27.4,
  224. "hum": 80,
  225. "ts": 1541152488442,
  226. },
  227. Timestamp: 1541152488442,
  228. },
  229. {
  230. Emitter: name,
  231. Message: map[string]interface{}{
  232. "temp": 25.5,
  233. "hum": 62,
  234. "ts": 1541152489252,
  235. },
  236. Timestamp: 1541152489252,
  237. },
  238. }
  239. case "sessionDemo":
  240. data = []*xsql.Tuple{
  241. {
  242. Emitter: name,
  243. Message: map[string]interface{}{
  244. "temp": 25.5,
  245. "hum": 65,
  246. "ts": 1541152486013,
  247. },
  248. Timestamp: 1541152486013,
  249. },
  250. {
  251. Emitter: name,
  252. Message: map[string]interface{}{
  253. "temp": 27.5,
  254. "hum": 59,
  255. "ts": 1541152486823,
  256. },
  257. Timestamp: 1541152486823,
  258. },
  259. {
  260. Emitter: name,
  261. Message: map[string]interface{}{
  262. "temp": 28.1,
  263. "hum": 75,
  264. "ts": 1541152487932,
  265. },
  266. Timestamp: 1541152487932,
  267. },
  268. {
  269. Emitter: name,
  270. Message: map[string]interface{}{
  271. "temp": 27.4,
  272. "hum": 80,
  273. "ts": 1541152488442,
  274. },
  275. Timestamp: 1541152488442,
  276. },
  277. {
  278. Emitter: name,
  279. Message: map[string]interface{}{
  280. "temp": 25.5,
  281. "hum": 62,
  282. "ts": 1541152489252,
  283. },
  284. Timestamp: 1541152489252,
  285. },
  286. {
  287. Emitter: name,
  288. Message: map[string]interface{}{
  289. "temp": 26.2,
  290. "hum": 63,
  291. "ts": 1541152490062,
  292. },
  293. Timestamp: 1541152490062,
  294. },
  295. {
  296. Emitter: name,
  297. Message: map[string]interface{}{
  298. "temp": 26.8,
  299. "hum": 71,
  300. "ts": 1541152490872,
  301. },
  302. Timestamp: 1541152490872,
  303. },
  304. {
  305. Emitter: name,
  306. Message: map[string]interface{}{
  307. "temp": 28.9,
  308. "hum": 85,
  309. "ts": 1541152491682,
  310. },
  311. Timestamp: 1541152491682,
  312. },
  313. {
  314. Emitter: name,
  315. Message: map[string]interface{}{
  316. "temp": 29.1,
  317. "hum": 92,
  318. "ts": 1541152492492,
  319. },
  320. Timestamp: 1541152492492,
  321. },
  322. {
  323. Emitter: name,
  324. Message: map[string]interface{}{
  325. "temp": 32.2,
  326. "hum": 99,
  327. "ts": 1541152493202,
  328. },
  329. Timestamp: 1541152493202,
  330. },
  331. {
  332. Emitter: name,
  333. Message: map[string]interface{}{
  334. "temp": 30.9,
  335. "hum": 87,
  336. "ts": 1541152494112,
  337. },
  338. Timestamp: 1541152494112,
  339. },
  340. }
  341. }
  342. return test.NewMockSource(data[:size], name, done, false)
  343. }
  344. func TestSingleSQL(t *testing.T) {
  345. var tests = []struct {
  346. name string
  347. sql string
  348. r [][]map[string]interface{}
  349. }{
  350. {
  351. name: `rule1`,
  352. sql: `SELECT * FROM demo`,
  353. r: [][]map[string]interface{}{
  354. {{
  355. "color": "red",
  356. "size": float64(3),
  357. "ts": float64(1541152486013),
  358. }},
  359. {{
  360. "color": "blue",
  361. "size": float64(6),
  362. "ts": float64(1541152486822),
  363. }},
  364. {{
  365. "color": "blue",
  366. "size": float64(2),
  367. "ts": float64(1541152487632),
  368. }},
  369. {{
  370. "color": "yellow",
  371. "size": float64(4),
  372. "ts": float64(1541152488442),
  373. }},
  374. {{
  375. "color": "red",
  376. "size": float64(1),
  377. "ts": float64(1541152489252),
  378. }},
  379. },
  380. }, {
  381. name: `rule2`,
  382. sql: `SELECT color, ts FROM demo where size > 3`,
  383. r: [][]map[string]interface{}{
  384. {{
  385. "color": "blue",
  386. "ts": float64(1541152486822),
  387. }},
  388. {{
  389. "color": "yellow",
  390. "ts": float64(1541152488442),
  391. }},
  392. },
  393. },
  394. }
  395. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  396. createStreams(t)
  397. defer dropStreams(t)
  398. done := make(chan struct{})
  399. defer close(done)
  400. for i, tt := range tests {
  401. p := NewRuleProcessor(BadgerDir)
  402. parser := xsql.NewParser(strings.NewReader(tt.sql))
  403. var sources []xstream.Source
  404. if stmt, err := xsql.Language.Parse(parser); err != nil{
  405. t.Errorf("parse sql %s error: %s", tt.sql , err)
  406. }else {
  407. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  408. t.Errorf("sql %s is not a select statement", tt.sql)
  409. } else {
  410. streams := xsql.GetStreams(selectStmt)
  411. for _, stream := range streams{
  412. source := getMockSource(stream, done, 5)
  413. sources = append(sources, source)
  414. }
  415. }
  416. }
  417. tp, inputs, err := p.createTopoWithSources(&xstream.Rule{Id:tt.name, Sql: tt.sql}, sources)
  418. if err != nil{
  419. t.Error(err)
  420. }
  421. sink := test.NewMockSink("mockSink", tt.name)
  422. tp.AddSink(inputs, sink)
  423. count := len(sources)
  424. errCh := tp.Open()
  425. func(){
  426. for{
  427. select{
  428. case err = <- errCh:
  429. t.Log(err)
  430. tp.Cancel()
  431. return
  432. case <- done:
  433. count--
  434. log.Infof("%d sources remaining", count)
  435. if count <= 0{
  436. log.Info("stream stopping")
  437. time.Sleep(1 * time.Second)
  438. tp.Cancel()
  439. return
  440. }
  441. default:
  442. }
  443. }
  444. }()
  445. results := sink.GetResults()
  446. var maps [][]map[string]interface{}
  447. for _, v := range results{
  448. var mapRes []map[string]interface{}
  449. err := json.Unmarshal(v, &mapRes)
  450. if err != nil {
  451. t.Errorf("Failed to parse the input into map")
  452. continue
  453. }
  454. maps = append(maps, mapRes)
  455. }
  456. if !reflect.DeepEqual(tt.r, maps) {
  457. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  458. }
  459. }
  460. }
  461. func TestWindow(t *testing.T) {
  462. common.IsTesting = true
  463. var tests = []struct {
  464. name string
  465. sql string
  466. size int
  467. r [][]map[string]interface{}
  468. }{
  469. {
  470. name: `rule1`,
  471. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  472. size: 5,
  473. r: [][]map[string]interface{}{
  474. {{
  475. "color": "red",
  476. "size": float64(3),
  477. "ts": float64(1541152486013),
  478. },{
  479. "color": "blue",
  480. "size": float64(6),
  481. "ts": float64(1541152486822),
  482. }},
  483. {{
  484. "color": "red",
  485. "size": float64(3),
  486. "ts": float64(1541152486013),
  487. },{
  488. "color": "blue",
  489. "size": float64(6),
  490. "ts": float64(1541152486822),
  491. },{
  492. "color": "blue",
  493. "size": float64(2),
  494. "ts": float64(1541152487632),
  495. }},
  496. {{
  497. "color": "blue",
  498. "size": float64(2),
  499. "ts": float64(1541152487632),
  500. },{
  501. "color": "yellow",
  502. "size": float64(4),
  503. "ts": float64(1541152488442),
  504. }},
  505. },
  506. }, {
  507. name: `rule2`,
  508. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  509. size: 5,
  510. r: [][]map[string]interface{}{
  511. {{
  512. "color": "red",
  513. "ts": float64(1541152486013),
  514. },{
  515. "color": "blue",
  516. "ts": float64(1541152486822),
  517. }},
  518. {{
  519. "color": "yellow",
  520. "ts": float64(1541152488442),
  521. }},
  522. },
  523. }, {
  524. name: `rule3`,
  525. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  526. size: 5,
  527. r: [][]map[string]interface{}{
  528. {{
  529. "color": "red",
  530. "temp": 25.5,
  531. "ts": float64(1541152486013),
  532. }},{{
  533. "color": "red",
  534. "temp": 25.5,
  535. "ts": float64(1541152486013),
  536. }},{{
  537. "color": "red",
  538. "temp": 25.5,
  539. "ts": float64(1541152486013),
  540. }},{{
  541. "color": "blue",
  542. "temp": 28.1,
  543. "ts": float64(1541152487632),
  544. }},{{
  545. "color": "blue",
  546. "temp": 28.1,
  547. "ts": float64(1541152487632),
  548. }},{{
  549. "color": "blue",
  550. "temp": 28.1,
  551. "ts": float64(1541152487632),
  552. },{
  553. "color": "yellow",
  554. "temp": 27.4,
  555. "ts": float64(1541152488442),
  556. }},{{
  557. "color": "yellow",
  558. "temp": 27.4,
  559. "ts": float64(1541152488442),
  560. }},{{
  561. "color": "yellow",
  562. "temp": 27.4,
  563. "ts": float64(1541152488442),
  564. },{
  565. "color": "red",
  566. "temp": 25.5,
  567. "ts": float64(1541152489252),
  568. }},
  569. },
  570. }, {
  571. name: `rule4`,
  572. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  573. size: 5,
  574. r: [][]map[string]interface{}{
  575. {{
  576. "color": "red",
  577. }},{{
  578. "color": "blue",
  579. },{
  580. "color": "red",
  581. }},{{
  582. "color": "blue",
  583. },{
  584. "color": "red",
  585. }},{{
  586. "color": "blue",
  587. },{
  588. "color": "yellow",
  589. }},{{
  590. "color": "blue",
  591. }, {
  592. "color": "red",
  593. },{
  594. "color": "yellow",
  595. }},
  596. },
  597. },{
  598. name: `rule5`,
  599. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  600. size: 11,
  601. r: [][]map[string]interface{}{
  602. {{
  603. "temp": 25.5,
  604. },{
  605. "temp": 27.5,
  606. }},{{
  607. "temp": 28.1,
  608. },{
  609. "temp": 27.4,
  610. },{
  611. "temp": 25.5,
  612. }},{{
  613. "temp": 26.2,
  614. },{
  615. "temp": 26.8,
  616. },{
  617. "temp": 28.9,
  618. },{
  619. "temp": 29.1,
  620. },{
  621. "temp": 32.2,
  622. }},
  623. },
  624. },{
  625. name: `rule6`,
  626. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  627. size: 5,
  628. r: [][]map[string]interface{}{
  629. {{
  630. "m": 25.5,
  631. "c": float64(1),
  632. }},{{
  633. "m": 25.5,
  634. "c": float64(1),
  635. }},{{
  636. "m": 25.5,
  637. "c": float64(1),
  638. }},{{
  639. "m": 28.1,
  640. "c": float64(1),
  641. }},{{
  642. "m": 28.1,
  643. "c": float64(1),
  644. }},{{
  645. "m": 28.1,
  646. "c": float64(2),
  647. }},{{
  648. "m": 27.4,
  649. "c": float64(1),
  650. }},{{
  651. "m": 27.4,
  652. "c": float64(2),
  653. }},
  654. },
  655. },
  656. }
  657. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  658. createStreams(t)
  659. defer dropStreams(t)
  660. done := make(chan struct{})
  661. defer close(done)
  662. common.ResetMockTicker()
  663. for i, tt := range tests {
  664. p := NewRuleProcessor(BadgerDir)
  665. parser := xsql.NewParser(strings.NewReader(tt.sql))
  666. var sources []xstream.Source
  667. if stmt, err := xsql.Language.Parse(parser); err != nil{
  668. t.Errorf("parse sql %s error: %s", tt.sql , err)
  669. }else {
  670. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  671. t.Errorf("sql %s is not a select statement", tt.sql)
  672. } else {
  673. streams := xsql.GetStreams(selectStmt)
  674. for _, stream := range streams{
  675. source := getMockSource(stream, done, tt.size)
  676. sources = append(sources, source)
  677. }
  678. }
  679. }
  680. tp, inputs, err := p.createTopoWithSources(&xstream.Rule{Id:tt.name, Sql: tt.sql}, sources)
  681. if err != nil{
  682. t.Error(err)
  683. }
  684. sink := test.NewMockSink("mockSink", tt.name)
  685. tp.AddSink(inputs, sink)
  686. count := len(sources)
  687. errCh := tp.Open()
  688. func(){
  689. for{
  690. select{
  691. case err = <- errCh:
  692. t.Log(err)
  693. tp.Cancel()
  694. return
  695. case <- done:
  696. count--
  697. log.Infof("%d sources remaining", count)
  698. if count <= 0{
  699. log.Info("stream stopping")
  700. time.Sleep(1 * time.Second)
  701. tp.Cancel()
  702. return
  703. }
  704. default:
  705. }
  706. }
  707. }()
  708. results := sink.GetResults()
  709. var maps [][]map[string]interface{}
  710. for _, v := range results{
  711. var mapRes []map[string]interface{}
  712. err := json.Unmarshal(v, &mapRes)
  713. if err != nil {
  714. t.Errorf("Failed to parse the input into map")
  715. continue
  716. }
  717. maps = append(maps, mapRes)
  718. }
  719. if !reflect.DeepEqual(tt.r, maps) {
  720. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  721. }
  722. }
  723. }
  724. func createEventStreams(t *testing.T){
  725. demo := `CREATE STREAM demoE (
  726. color STRING,
  727. size BIGINT,
  728. ts BIGINT
  729. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  730. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  731. if err != nil{
  732. t.Log(err)
  733. }
  734. demo1 := `CREATE STREAM demo1E (
  735. temp FLOAT,
  736. hum BIGINT,
  737. ts BIGINT
  738. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  739. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  740. if err != nil{
  741. t.Log(err)
  742. }
  743. sessionDemo := `CREATE STREAM sessionDemoE (
  744. temp FLOAT,
  745. hum BIGINT,
  746. ts BIGINT
  747. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  748. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  749. if err != nil{
  750. t.Log(err)
  751. }
  752. }
  753. func dropEventStreams(t *testing.T){
  754. demo := `DROP STREAM demoE`
  755. _, err := NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  756. if err != nil{
  757. t.Log(err)
  758. }
  759. demo1 := `DROP STREAM demo1E`
  760. _, err = NewStreamProcessor(demo1, path.Join(BadgerDir, "stream")).Exec()
  761. if err != nil{
  762. t.Log(err)
  763. }
  764. sessionDemo := `DROP STREAM sessionDemoE`
  765. _, err = NewStreamProcessor(sessionDemo, path.Join(BadgerDir, "stream")).Exec()
  766. if err != nil{
  767. t.Log(err)
  768. }
  769. }
  770. func getEventMockSource(name string, done chan<- struct{}, size int) *test.MockSource{
  771. var data []*xsql.Tuple
  772. switch name{
  773. case "demoE":
  774. data = []*xsql.Tuple{
  775. {
  776. Emitter: name,
  777. Message: map[string]interface{}{
  778. "color": "red",
  779. "size": 3,
  780. "ts": 1541152486013,
  781. },
  782. Timestamp: 1541152486013,
  783. },
  784. {
  785. Emitter: name,
  786. Message: map[string]interface{}{
  787. "color": "blue",
  788. "size": 2,
  789. "ts": 1541152487632,
  790. },
  791. Timestamp: 1541152487632,
  792. },
  793. {
  794. Emitter: name,
  795. Message: map[string]interface{}{
  796. "color": "red",
  797. "size": 1,
  798. "ts": 1541152489252,
  799. },
  800. Timestamp: 1541152489252,
  801. },
  802. { //dropped item
  803. Emitter: name,
  804. Message: map[string]interface{}{
  805. "color": "blue",
  806. "size": 6,
  807. "ts": 1541152486822,
  808. },
  809. Timestamp: 1541152486822,
  810. },
  811. {
  812. Emitter: name,
  813. Message: map[string]interface{}{
  814. "color": "yellow",
  815. "size": 4,
  816. "ts": 1541152488442,
  817. },
  818. Timestamp: 1541152488442,
  819. },
  820. { //To lift the watermark and issue all windows
  821. Emitter: name,
  822. Message: map[string]interface{}{
  823. "color": "yellow",
  824. "size": 4,
  825. "ts": 1541152492342,
  826. },
  827. Timestamp: 1541152488442,
  828. },
  829. }
  830. case "demo1E":
  831. data = []*xsql.Tuple{
  832. {
  833. Emitter: name,
  834. Message: map[string]interface{}{
  835. "temp": 27.5,
  836. "hum": 59,
  837. "ts": 1541152486823,
  838. },
  839. Timestamp: 1541152486823,
  840. },
  841. {
  842. Emitter: name,
  843. Message: map[string]interface{}{
  844. "temp": 25.5,
  845. "hum": 65,
  846. "ts": 1541152486013,
  847. },
  848. Timestamp: 1541152486013,
  849. },
  850. {
  851. Emitter: name,
  852. Message: map[string]interface{}{
  853. "temp": 27.4,
  854. "hum": 80,
  855. "ts": 1541152488442,
  856. },
  857. Timestamp: 1541152488442,
  858. },
  859. {
  860. Emitter: name,
  861. Message: map[string]interface{}{
  862. "temp": 28.1,
  863. "hum": 75,
  864. "ts": 1541152487632,
  865. },
  866. Timestamp: 1541152487632,
  867. },
  868. {
  869. Emitter: name,
  870. Message: map[string]interface{}{
  871. "temp": 25.5,
  872. "hum": 62,
  873. "ts": 1541152489252,
  874. },
  875. Timestamp: 1541152489252,
  876. },
  877. {
  878. Emitter: name,
  879. Message: map[string]interface{}{
  880. "temp": 25.5,
  881. "hum": 62,
  882. "ts": 1541152499252,
  883. },
  884. Timestamp: 1541152499252,
  885. },
  886. }
  887. case "sessionDemoE":
  888. data = []*xsql.Tuple{
  889. {
  890. Emitter: name,
  891. Message: map[string]interface{}{
  892. "temp": 25.5,
  893. "hum": 65,
  894. "ts": 1541152486013,
  895. },
  896. Timestamp: 1541152486013,
  897. },
  898. {
  899. Emitter: name,
  900. Message: map[string]interface{}{
  901. "temp": 28.1,
  902. "hum": 75,
  903. "ts": 1541152487932,
  904. },
  905. Timestamp: 1541152487932,
  906. },
  907. {
  908. Emitter: name,
  909. Message: map[string]interface{}{
  910. "temp": 27.5,
  911. "hum": 59,
  912. "ts": 1541152486823,
  913. },
  914. Timestamp: 1541152486823,
  915. },
  916. {
  917. Emitter: name,
  918. Message: map[string]interface{}{
  919. "temp": 25.5,
  920. "hum": 62,
  921. "ts": 1541152489252,
  922. },
  923. Timestamp: 1541152489252,
  924. },
  925. {
  926. Emitter: name,
  927. Message: map[string]interface{}{
  928. "temp": 27.4,
  929. "hum": 80,
  930. "ts": 1541152488442,
  931. },
  932. Timestamp: 1541152488442,
  933. },
  934. {
  935. Emitter: name,
  936. Message: map[string]interface{}{
  937. "temp": 26.2,
  938. "hum": 63,
  939. "ts": 1541152490062,
  940. },
  941. Timestamp: 1541152490062,
  942. },
  943. {
  944. Emitter: name,
  945. Message: map[string]interface{}{
  946. "temp": 28.9,
  947. "hum": 85,
  948. "ts": 1541152491682,
  949. },
  950. Timestamp: 1541152491682,
  951. },
  952. {
  953. Emitter: name,
  954. Message: map[string]interface{}{
  955. "temp": 26.8,
  956. "hum": 71,
  957. "ts": 1541152490872,
  958. },
  959. Timestamp: 1541152490872,
  960. },
  961. {
  962. Emitter: name,
  963. Message: map[string]interface{}{
  964. "temp": 29.1,
  965. "hum": 92,
  966. "ts": 1541152492492,
  967. },
  968. Timestamp: 1541152492492,
  969. },
  970. {
  971. Emitter: name,
  972. Message: map[string]interface{}{
  973. "temp": 30.9,
  974. "hum": 87,
  975. "ts": 1541152494112,
  976. },
  977. Timestamp: 1541152494112,
  978. },
  979. {
  980. Emitter: name,
  981. Message: map[string]interface{}{
  982. "temp": 32.2,
  983. "hum": 99,
  984. "ts": 1541152493202,
  985. },
  986. Timestamp: 1541152493202,
  987. },
  988. {
  989. Emitter: name,
  990. Message: map[string]interface{}{
  991. "temp": 32.2,
  992. "hum": 99,
  993. "ts": 1541152499202,
  994. },
  995. Timestamp: 1541152499202,
  996. },
  997. }
  998. }
  999. return test.NewMockSource(data[:size], name, done, true)
  1000. }
  1001. func TestEventWindow(t *testing.T) {
  1002. common.IsTesting = true
  1003. var tests = []struct {
  1004. name string
  1005. sql string
  1006. size int
  1007. r [][]map[string]interface{}
  1008. }{
  1009. {
  1010. name: `rule1`,
  1011. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1012. size: 6,
  1013. r: [][]map[string]interface{}{
  1014. {{
  1015. "color": "red",
  1016. "size": float64(3),
  1017. "ts": float64(1541152486013),
  1018. }},
  1019. {{
  1020. "color": "red",
  1021. "size": float64(3),
  1022. "ts": float64(1541152486013),
  1023. },{
  1024. "color": "blue",
  1025. "size": float64(2),
  1026. "ts": float64(1541152487632),
  1027. }},
  1028. {{
  1029. "color": "blue",
  1030. "size": float64(2),
  1031. "ts": float64(1541152487632),
  1032. },{
  1033. "color": "yellow",
  1034. "size": float64(4),
  1035. "ts": float64(1541152488442),
  1036. }},{{
  1037. "color": "yellow",
  1038. "size": float64(4),
  1039. "ts": float64(1541152488442),
  1040. },{
  1041. "color": "red",
  1042. "size": float64(1),
  1043. "ts": float64(1541152489252),
  1044. }},{{
  1045. "color": "red",
  1046. "size": float64(1),
  1047. "ts": float64(1541152489252),
  1048. }},
  1049. },
  1050. }, {
  1051. name: `rule2`,
  1052. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1053. size: 6,
  1054. r: [][]map[string]interface{}{
  1055. {{
  1056. "color": "red",
  1057. "ts": float64(1541152486013),
  1058. }},
  1059. {{
  1060. "color": "yellow",
  1061. "ts": float64(1541152488442),
  1062. }},
  1063. },
  1064. }, {
  1065. name: `rule3`,
  1066. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1067. size: 6,
  1068. r: [][]map[string]interface{}{
  1069. {{
  1070. "color": "red",
  1071. "temp": 25.5,
  1072. "ts": float64(1541152486013),
  1073. }},{{
  1074. "color": "red",
  1075. "temp": 25.5,
  1076. "ts": float64(1541152486013),
  1077. }},{{
  1078. "color": "blue",
  1079. "temp": 28.1,
  1080. "ts": float64(1541152487632),
  1081. }},{{
  1082. "color": "blue",
  1083. "temp": 28.1,
  1084. "ts": float64(1541152487632),
  1085. },{
  1086. "color": "yellow",
  1087. "temp": 27.4,
  1088. "ts": float64(1541152488442),
  1089. }},{{
  1090. "color": "yellow",
  1091. "temp": 27.4,
  1092. "ts": float64(1541152488442),
  1093. },{
  1094. "color": "red",
  1095. "temp": 25.5,
  1096. "ts": float64(1541152489252),
  1097. }},
  1098. },
  1099. }, {
  1100. name: `rule4`,
  1101. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1102. size: 6,
  1103. r: [][]map[string]interface{}{
  1104. {{
  1105. "color": "red",
  1106. }},{{
  1107. "color": "blue",
  1108. },{
  1109. "color": "red",
  1110. }},{{
  1111. "color": "blue",
  1112. },{
  1113. "color": "yellow",
  1114. }},{{
  1115. "color": "blue",
  1116. }, {
  1117. "color": "red",
  1118. },{
  1119. "color": "yellow",
  1120. }},
  1121. },
  1122. },{
  1123. name: `rule5`,
  1124. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1125. size: 12,
  1126. r: [][]map[string]interface{}{
  1127. {{
  1128. "temp": 25.5,
  1129. }},{{
  1130. "temp": 28.1,
  1131. },{
  1132. "temp": 27.4,
  1133. },{
  1134. "temp": 25.5,
  1135. }},{{
  1136. "temp": 26.2,
  1137. },{
  1138. "temp": 26.8,
  1139. },{
  1140. "temp": 28.9,
  1141. },{
  1142. "temp": 29.1,
  1143. },{
  1144. "temp": 32.2,
  1145. }},{{
  1146. "temp": 30.9,
  1147. }},
  1148. },
  1149. },{
  1150. name: `rule6`,
  1151. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1152. size: 6,
  1153. r: [][]map[string]interface{}{
  1154. {{
  1155. "m": 25.5,
  1156. "c": float64(1),
  1157. }},{{
  1158. "m": 25.5,
  1159. "c": float64(1),
  1160. }},{{
  1161. "m": 28.1,
  1162. "c": float64(1),
  1163. }},{{
  1164. "m": 28.1,
  1165. "c": float64(2),
  1166. }},{{
  1167. "m": 27.4,
  1168. "c": float64(2),
  1169. }},
  1170. },
  1171. },
  1172. }
  1173. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1174. createEventStreams(t)
  1175. defer dropEventStreams(t)
  1176. done := make(chan struct{})
  1177. defer close(done)
  1178. common.ResetMockTicker()
  1179. //mock ticker
  1180. realTicker := time.NewTicker(500 * time.Millisecond)
  1181. tickerDone := make(chan bool)
  1182. go func(){
  1183. ticker := common.GetTicker(1000).(*common.MockTicker)
  1184. timer := common.GetTimer(1000).(*common.MockTimer)
  1185. for {
  1186. select {
  1187. case <-tickerDone:
  1188. log.Infof("real ticker exiting...")
  1189. return
  1190. case t := <-realTicker.C:
  1191. ts := common.TimeToUnixMilli(t)
  1192. if ticker != nil {
  1193. go ticker.DoTick(ts)
  1194. }
  1195. if timer != nil {
  1196. go timer.DoTick(ts)
  1197. }
  1198. }
  1199. }
  1200. }()
  1201. for i, tt := range tests {
  1202. p := NewRuleProcessor(BadgerDir)
  1203. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1204. var sources []xstream.Source
  1205. if stmt, err := xsql.Language.Parse(parser); err != nil{
  1206. t.Errorf("parse sql %s error: %s", tt.sql , err)
  1207. }else {
  1208. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1209. t.Errorf("sql %s is not a select statement", tt.sql)
  1210. } else {
  1211. streams := xsql.GetStreams(selectStmt)
  1212. for _, stream := range streams{
  1213. source := getEventMockSource(stream, done, tt.size)
  1214. sources = append(sources, source)
  1215. }
  1216. }
  1217. }
  1218. tp, inputs, err := p.createTopoWithSources(&xstream.Rule{
  1219. Id:tt.name, Sql: tt.sql,
  1220. Options: map[string]interface{}{
  1221. "isEventTime": true,
  1222. "lateTolerance": float64(1000),
  1223. },
  1224. }, sources)
  1225. if err != nil{
  1226. t.Error(err)
  1227. }
  1228. sink := test.NewMockSink("mockSink", tt.name)
  1229. tp.AddSink(inputs, sink)
  1230. count := len(sources)
  1231. errCh := tp.Open()
  1232. func(){
  1233. for{
  1234. select{
  1235. case err = <- errCh:
  1236. t.Log(err)
  1237. tp.Cancel()
  1238. return
  1239. case <- done:
  1240. count--
  1241. log.Infof("%d sources remaining", count)
  1242. if count <= 0{
  1243. log.Info("stream stopping")
  1244. time.Sleep(1 * time.Second)
  1245. tp.Cancel()
  1246. return
  1247. }
  1248. default:
  1249. }
  1250. }
  1251. }()
  1252. results := sink.GetResults()
  1253. var maps [][]map[string]interface{}
  1254. for _, v := range results{
  1255. var mapRes []map[string]interface{}
  1256. err := json.Unmarshal(v, &mapRes)
  1257. if err != nil {
  1258. t.Errorf("Failed to parse the input into map")
  1259. continue
  1260. }
  1261. maps = append(maps, mapRes)
  1262. }
  1263. if !reflect.DeepEqual(tt.r, maps) {
  1264. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1265. }
  1266. }
  1267. realTicker.Stop()
  1268. tickerDone <- true
  1269. close(tickerDone)
  1270. }
  1271. func errstring(err error) string {
  1272. if err != nil {
  1273. return err.Error()
  1274. }
  1275. return ""
  1276. }