xsql_processor_test.go 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781
  1. package processors
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/emqx/kuiper/xstream/nodes"
  9. "github.com/emqx/kuiper/xstream/test"
  10. "path"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. "time"
  15. )
  16. var DbDir = getDbDir()
  17. func getDbDir() string {
  18. dbDir, err := common.GetAndCreateDataLoc("test")
  19. if err != nil {
  20. log.Panic(err)
  21. }
  22. log.Infof("db location is %s", dbDir)
  23. return dbDir
  24. }
  25. func TestStreamCreateProcessor(t *testing.T) {
  26. var tests = []struct {
  27. s string
  28. r []string
  29. err string
  30. }{
  31. {
  32. s: `SHOW STREAMS;`,
  33. r: []string{"No stream definitions are found."},
  34. },
  35. {
  36. s: `EXPLAIN STREAM topic1;`,
  37. err: "Stream topic1 is not found.",
  38. },
  39. {
  40. s: `CREATE STREAM topic1 (
  41. USERID BIGINT,
  42. FIRST_NAME STRING,
  43. LAST_NAME STRING,
  44. NICKNAMES ARRAY(STRING),
  45. Gender BOOLEAN,
  46. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  47. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  48. r: []string{"Stream topic1 is created."},
  49. },
  50. {
  51. s: `CREATE STREAM topic1 (
  52. USERID BIGINT,
  53. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  54. err: "Create stream fails: Item topic1 already exists.",
  55. },
  56. {
  57. s: `EXPLAIN STREAM topic1;`,
  58. r: []string{"TO BE SUPPORTED"},
  59. },
  60. {
  61. s: `DESCRIBE STREAM topic1;`,
  62. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  63. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  64. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  65. },
  66. {
  67. s: `SHOW STREAMS;`,
  68. r: []string{"topic1"},
  69. },
  70. {
  71. s: `DROP STREAM topic1;`,
  72. r: []string{"Stream topic1 is dropped."},
  73. },
  74. {
  75. s: `DESCRIBE STREAM topic1;`,
  76. err: "Stream topic1 is not found.",
  77. },
  78. {
  79. s: `DROP STREAM topic1;`,
  80. err: "Drop stream fails: topic1 is not found.",
  81. },
  82. }
  83. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  84. streamDB := path.Join(getDbDir(), "streamTest")
  85. for i, tt := range tests {
  86. results, err := NewStreamProcessor(tt.s, streamDB).Exec()
  87. if !reflect.DeepEqual(tt.err, errstring(err)) {
  88. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  89. } else if tt.err == "" {
  90. if !reflect.DeepEqual(tt.r, results) {
  91. t.Errorf("%d. %q\n\nstmt mismatch:\n\ngot=%#v\n\n", i, tt.s, results)
  92. }
  93. }
  94. }
  95. }
  96. func createStreams(t *testing.T) {
  97. demo := `CREATE STREAM demo (
  98. color STRING,
  99. size BIGINT,
  100. ts BIGINT
  101. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  102. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  103. if err != nil {
  104. t.Log(err)
  105. }
  106. demo1 := `CREATE STREAM demo1 (
  107. temp FLOAT,
  108. hum BIGINT,
  109. ts BIGINT
  110. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  111. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  112. if err != nil {
  113. t.Log(err)
  114. }
  115. sessionDemo := `CREATE STREAM sessionDemo (
  116. temp FLOAT,
  117. hum BIGINT,
  118. ts BIGINT
  119. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  120. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  121. if err != nil {
  122. t.Log(err)
  123. }
  124. }
  125. func dropStreams(t *testing.T) {
  126. demo := `DROP STREAM demo`
  127. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  128. if err != nil {
  129. t.Log(err)
  130. }
  131. demo1 := `DROP STREAM demo1`
  132. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  133. if err != nil {
  134. t.Log(err)
  135. }
  136. sessionDemo := `DROP STREAM sessionDemo`
  137. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  138. if err != nil {
  139. t.Log(err)
  140. }
  141. }
  142. func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode {
  143. var data []*xsql.Tuple
  144. switch name {
  145. case "demo":
  146. data = []*xsql.Tuple{
  147. {
  148. Emitter: name,
  149. Message: map[string]interface{}{
  150. "color": "red",
  151. "size": 3,
  152. "ts": 1541152486013,
  153. },
  154. Timestamp: 1541152486013,
  155. },
  156. {
  157. Emitter: name,
  158. Message: map[string]interface{}{
  159. "color": "blue",
  160. "size": 6,
  161. "ts": 1541152486822,
  162. },
  163. Timestamp: 1541152486822,
  164. },
  165. {
  166. Emitter: name,
  167. Message: map[string]interface{}{
  168. "color": "blue",
  169. "size": 2,
  170. "ts": 1541152487632,
  171. },
  172. Timestamp: 1541152487632,
  173. },
  174. {
  175. Emitter: name,
  176. Message: map[string]interface{}{
  177. "color": "yellow",
  178. "size": 4,
  179. "ts": 1541152488442,
  180. },
  181. Timestamp: 1541152488442,
  182. },
  183. {
  184. Emitter: name,
  185. Message: map[string]interface{}{
  186. "color": "red",
  187. "size": 1,
  188. "ts": 1541152489252,
  189. },
  190. Timestamp: 1541152489252,
  191. },
  192. }
  193. case "demo1":
  194. data = []*xsql.Tuple{
  195. {
  196. Emitter: name,
  197. Message: map[string]interface{}{
  198. "temp": 25.5,
  199. "hum": 65,
  200. "ts": 1541152486013,
  201. },
  202. Timestamp: 1541152486013,
  203. },
  204. {
  205. Emitter: name,
  206. Message: map[string]interface{}{
  207. "temp": 27.5,
  208. "hum": 59,
  209. "ts": 1541152486823,
  210. },
  211. Timestamp: 1541152486823,
  212. },
  213. {
  214. Emitter: name,
  215. Message: map[string]interface{}{
  216. "temp": 28.1,
  217. "hum": 75,
  218. "ts": 1541152487632,
  219. },
  220. Timestamp: 1541152487632,
  221. },
  222. {
  223. Emitter: name,
  224. Message: map[string]interface{}{
  225. "temp": 27.4,
  226. "hum": 80,
  227. "ts": 1541152488442,
  228. },
  229. Timestamp: 1541152488442,
  230. },
  231. {
  232. Emitter: name,
  233. Message: map[string]interface{}{
  234. "temp": 25.5,
  235. "hum": 62,
  236. "ts": 1541152489252,
  237. },
  238. Timestamp: 1541152489252,
  239. },
  240. }
  241. case "sessionDemo":
  242. data = []*xsql.Tuple{
  243. {
  244. Emitter: name,
  245. Message: map[string]interface{}{
  246. "temp": 25.5,
  247. "hum": 65,
  248. "ts": 1541152486013,
  249. },
  250. Timestamp: 1541152486013,
  251. },
  252. {
  253. Emitter: name,
  254. Message: map[string]interface{}{
  255. "temp": 27.5,
  256. "hum": 59,
  257. "ts": 1541152486823,
  258. },
  259. Timestamp: 1541152486823,
  260. },
  261. {
  262. Emitter: name,
  263. Message: map[string]interface{}{
  264. "temp": 28.1,
  265. "hum": 75,
  266. "ts": 1541152487932,
  267. },
  268. Timestamp: 1541152487932,
  269. },
  270. {
  271. Emitter: name,
  272. Message: map[string]interface{}{
  273. "temp": 27.4,
  274. "hum": 80,
  275. "ts": 1541152488442,
  276. },
  277. Timestamp: 1541152488442,
  278. },
  279. {
  280. Emitter: name,
  281. Message: map[string]interface{}{
  282. "temp": 25.5,
  283. "hum": 62,
  284. "ts": 1541152489252,
  285. },
  286. Timestamp: 1541152489252,
  287. },
  288. {
  289. Emitter: name,
  290. Message: map[string]interface{}{
  291. "temp": 26.2,
  292. "hum": 63,
  293. "ts": 1541152490062,
  294. },
  295. Timestamp: 1541152490062,
  296. },
  297. {
  298. Emitter: name,
  299. Message: map[string]interface{}{
  300. "temp": 26.8,
  301. "hum": 71,
  302. "ts": 1541152490872,
  303. },
  304. Timestamp: 1541152490872,
  305. },
  306. {
  307. Emitter: name,
  308. Message: map[string]interface{}{
  309. "temp": 28.9,
  310. "hum": 85,
  311. "ts": 1541152491682,
  312. },
  313. Timestamp: 1541152491682,
  314. },
  315. {
  316. Emitter: name,
  317. Message: map[string]interface{}{
  318. "temp": 29.1,
  319. "hum": 92,
  320. "ts": 1541152492492,
  321. },
  322. Timestamp: 1541152492492,
  323. },
  324. {
  325. Emitter: name,
  326. Message: map[string]interface{}{
  327. "temp": 32.2,
  328. "hum": 99,
  329. "ts": 1541152493202,
  330. },
  331. Timestamp: 1541152493202,
  332. },
  333. {
  334. Emitter: name,
  335. Message: map[string]interface{}{
  336. "temp": 30.9,
  337. "hum": 87,
  338. "ts": 1541152494112,
  339. },
  340. Timestamp: 1541152494112,
  341. },
  342. }
  343. }
  344. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  345. "DATASOURCE": name,
  346. })
  347. }
  348. func TestSingleSQL(t *testing.T) {
  349. var tests = []struct {
  350. name string
  351. sql string
  352. r [][]map[string]interface{}
  353. m map[string]interface{}
  354. }{
  355. {
  356. name: `rule1`,
  357. sql: `SELECT * FROM demo`,
  358. r: [][]map[string]interface{}{
  359. {{
  360. "color": "red",
  361. "size": float64(3),
  362. "ts": float64(1541152486013),
  363. }},
  364. {{
  365. "color": "blue",
  366. "size": float64(6),
  367. "ts": float64(1541152486822),
  368. }},
  369. {{
  370. "color": "blue",
  371. "size": float64(2),
  372. "ts": float64(1541152487632),
  373. }},
  374. {{
  375. "color": "yellow",
  376. "size": float64(4),
  377. "ts": float64(1541152488442),
  378. }},
  379. {{
  380. "color": "red",
  381. "size": float64(1),
  382. "ts": float64(1541152489252),
  383. }},
  384. },
  385. m: map[string]interface{}{
  386. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  387. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  388. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  389. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  390. "kuiper_op_project_0_exceptions_total": int64(0),
  391. "kuiper_op_project_0_process_latency_ms": int64(0),
  392. "kuiper_op_project_0_records_in_total": int64(5),
  393. "kuiper_op_project_0_records_out_total": int64(5),
  394. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  395. "kuiper_sink_MockSink_0_records_in_total": int64(5),
  396. "kuiper_sink_MockSink_0_records_out_total": int64(5),
  397. "kuiper_source_demo_0_exceptions_total": int64(0),
  398. "kuiper_source_demo_0_records_in_total": int64(5),
  399. "kuiper_source_demo_0_records_out_total": int64(5),
  400. },
  401. }, {
  402. name: `rule2`,
  403. sql: `SELECT color, ts FROM demo where size > 3`,
  404. r: [][]map[string]interface{}{
  405. {{
  406. "color": "blue",
  407. "ts": float64(1541152486822),
  408. }},
  409. {{
  410. "color": "yellow",
  411. "ts": float64(1541152488442),
  412. }},
  413. },
  414. m: map[string]interface{}{
  415. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  416. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  417. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  418. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  419. "kuiper_op_project_0_exceptions_total": int64(0),
  420. "kuiper_op_project_0_process_latency_ms": int64(0),
  421. "kuiper_op_project_0_records_in_total": int64(2),
  422. "kuiper_op_project_0_records_out_total": int64(2),
  423. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  424. "kuiper_sink_MockSink_0_records_in_total": int64(2),
  425. "kuiper_sink_MockSink_0_records_out_total": int64(2),
  426. "kuiper_source_demo_0_exceptions_total": int64(0),
  427. "kuiper_source_demo_0_records_in_total": int64(5),
  428. "kuiper_source_demo_0_records_out_total": int64(5),
  429. "kuiper_op_filter_0_exceptions_total": int64(0),
  430. "kuiper_op_filter_0_process_latency_ms": int64(0),
  431. "kuiper_op_filter_0_records_in_total": int64(5),
  432. "kuiper_op_filter_0_records_out_total": int64(2),
  433. },
  434. }, {
  435. name: `rule3`,
  436. sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  437. r: [][]map[string]interface{}{
  438. {{
  439. "Int8": float64(6),
  440. "ts": float64(1541152486822),
  441. }},
  442. {{
  443. "Int8": float64(4),
  444. "ts": float64(1541152488442),
  445. }},
  446. },
  447. m: map[string]interface{}{
  448. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  449. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  450. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  451. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  452. "kuiper_op_project_0_exceptions_total": int64(0),
  453. "kuiper_op_project_0_process_latency_ms": int64(0),
  454. "kuiper_op_project_0_records_in_total": int64(2),
  455. "kuiper_op_project_0_records_out_total": int64(2),
  456. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  457. "kuiper_sink_MockSink_0_records_in_total": int64(2),
  458. "kuiper_sink_MockSink_0_records_out_total": int64(2),
  459. "kuiper_source_demo_0_exceptions_total": int64(0),
  460. "kuiper_source_demo_0_records_in_total": int64(5),
  461. "kuiper_source_demo_0_records_out_total": int64(5),
  462. "kuiper_op_filter_0_exceptions_total": int64(0),
  463. "kuiper_op_filter_0_process_latency_ms": int64(0),
  464. "kuiper_op_filter_0_records_in_total": int64(5),
  465. "kuiper_op_filter_0_records_out_total": int64(2),
  466. },
  467. },
  468. }
  469. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  470. createStreams(t)
  471. defer dropStreams(t)
  472. done := make(chan struct{})
  473. //defer close(done)
  474. for i, tt := range tests {
  475. p := NewRuleProcessor(DbDir)
  476. parser := xsql.NewParser(strings.NewReader(tt.sql))
  477. var sources []*nodes.SourceNode
  478. if stmt, err := xsql.Language.Parse(parser); err != nil {
  479. t.Errorf("parse sql %s error: %s", tt.sql, err)
  480. } else {
  481. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  482. t.Errorf("sql %s is not a select statement", tt.sql)
  483. } else {
  484. streams := xsql.GetStreams(selectStmt)
  485. for _, stream := range streams {
  486. source := getMockSource(stream, done, 5)
  487. sources = append(sources, source)
  488. }
  489. }
  490. }
  491. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  492. if err != nil {
  493. t.Error(err)
  494. }
  495. mockSink := test.NewMockSink()
  496. sink := nodes.NewSinkNodeWithSink("MockSink", mockSink)
  497. tp.AddSink(inputs, sink)
  498. count := len(sources)
  499. errCh := tp.Open()
  500. func() {
  501. for {
  502. select {
  503. case err = <-errCh:
  504. t.Log(err)
  505. tp.Cancel()
  506. return
  507. case <-done:
  508. count--
  509. log.Infof("%d sources remaining", count)
  510. if count <= 0 {
  511. log.Info("stream stopping")
  512. time.Sleep(1 * time.Second)
  513. tp.Cancel()
  514. return
  515. }
  516. default:
  517. }
  518. }
  519. }()
  520. results := mockSink.GetResults()
  521. var maps [][]map[string]interface{}
  522. for _, v := range results {
  523. var mapRes []map[string]interface{}
  524. err := json.Unmarshal(v, &mapRes)
  525. if err != nil {
  526. t.Errorf("Failed to parse the input into map")
  527. continue
  528. }
  529. maps = append(maps, mapRes)
  530. }
  531. //if !reflect.DeepEqual(tt.r, maps) {
  532. // t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  533. // continue
  534. //}
  535. metrics := tp.GetMetrics()
  536. log.Infof("metrics: %v", metrics)
  537. for k, v := range tt.m {
  538. if v != metrics[k] {
  539. t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, k, v, metrics[k])
  540. break
  541. }
  542. }
  543. }
  544. }
  545. func TestWindow(t *testing.T) {
  546. common.IsTesting = true
  547. var tests = []struct {
  548. name string
  549. sql string
  550. size int
  551. r [][]map[string]interface{}
  552. m map[string]interface{}
  553. }{
  554. {
  555. name: `rule1`,
  556. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  557. size: 5,
  558. r: [][]map[string]interface{}{
  559. {{
  560. "color": "red",
  561. "size": float64(3),
  562. "ts": float64(1541152486013),
  563. }, {
  564. "color": "blue",
  565. "size": float64(6),
  566. "ts": float64(1541152486822),
  567. }},
  568. {{
  569. "color": "red",
  570. "size": float64(3),
  571. "ts": float64(1541152486013),
  572. }, {
  573. "color": "blue",
  574. "size": float64(6),
  575. "ts": float64(1541152486822),
  576. }, {
  577. "color": "blue",
  578. "size": float64(2),
  579. "ts": float64(1541152487632),
  580. }},
  581. {{
  582. "color": "blue",
  583. "size": float64(2),
  584. "ts": float64(1541152487632),
  585. }, {
  586. "color": "yellow",
  587. "size": float64(4),
  588. "ts": float64(1541152488442),
  589. }},
  590. },
  591. m: map[string]interface{}{
  592. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  593. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  594. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  595. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  596. "kuiper_op_project_0_exceptions_total": int64(0),
  597. "kuiper_op_project_0_process_latency_ms": int64(0),
  598. "kuiper_op_project_0_records_in_total": int64(3),
  599. "kuiper_op_project_0_records_out_total": int64(3),
  600. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  601. "kuiper_sink_mockSink_0_records_in_total": int64(3),
  602. "kuiper_sink_mockSink_0_records_out_total": int64(3),
  603. "kuiper_source_demo_0_exceptions_total": int64(0),
  604. "kuiper_source_demo_0_records_in_total": int64(5),
  605. "kuiper_source_demo_0_records_out_total": int64(5),
  606. "kuiper_op_window_0_exceptions_total": int64(0),
  607. "kuiper_op_window_0_process_latency_ms": int64(0),
  608. "kuiper_op_window_0_records_in_total": int64(5),
  609. "kuiper_op_window_0_records_out_total": int64(3),
  610. },
  611. }, {
  612. name: `rule2`,
  613. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  614. size: 5,
  615. r: [][]map[string]interface{}{
  616. {{
  617. "color": "red",
  618. "ts": float64(1541152486013),
  619. }, {
  620. "color": "blue",
  621. "ts": float64(1541152486822),
  622. }},
  623. {{
  624. "color": "yellow",
  625. "ts": float64(1541152488442),
  626. }},
  627. },
  628. m: map[string]interface{}{
  629. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  630. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  631. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  632. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  633. "kuiper_op_project_0_exceptions_total": int64(0),
  634. "kuiper_op_project_0_process_latency_ms": int64(0),
  635. "kuiper_op_project_0_records_in_total": int64(2),
  636. "kuiper_op_project_0_records_out_total": int64(2),
  637. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  638. "kuiper_sink_mockSink_0_records_in_total": int64(2),
  639. "kuiper_sink_mockSink_0_records_out_total": int64(2),
  640. "kuiper_source_demo_0_exceptions_total": int64(0),
  641. "kuiper_source_demo_0_records_in_total": int64(5),
  642. "kuiper_source_demo_0_records_out_total": int64(5),
  643. "kuiper_op_window_0_exceptions_total": int64(0),
  644. "kuiper_op_window_0_process_latency_ms": int64(0),
  645. "kuiper_op_window_0_records_in_total": int64(5),
  646. "kuiper_op_window_0_records_out_total": int64(3),
  647. "kuiper_op_filter_0_exceptions_total": int64(0),
  648. "kuiper_op_filter_0_process_latency_ms": int64(0),
  649. "kuiper_op_filter_0_records_in_total": int64(3),
  650. "kuiper_op_filter_0_records_out_total": int64(2),
  651. },
  652. }, {
  653. name: `rule3`,
  654. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  655. size: 5,
  656. r: [][]map[string]interface{}{
  657. {{
  658. "color": "red",
  659. "temp": 25.5,
  660. "ts": float64(1541152486013),
  661. }}, {{
  662. "color": "red",
  663. "temp": 25.5,
  664. "ts": float64(1541152486013),
  665. }}, {{
  666. "color": "red",
  667. "temp": 25.5,
  668. "ts": float64(1541152486013),
  669. }}, {{
  670. "color": "blue",
  671. "temp": 28.1,
  672. "ts": float64(1541152487632),
  673. }}, {{
  674. "color": "blue",
  675. "temp": 28.1,
  676. "ts": float64(1541152487632),
  677. }}, {{
  678. "color": "blue",
  679. "temp": 28.1,
  680. "ts": float64(1541152487632),
  681. }, {
  682. "color": "yellow",
  683. "temp": 27.4,
  684. "ts": float64(1541152488442),
  685. }}, {{
  686. "color": "yellow",
  687. "temp": 27.4,
  688. "ts": float64(1541152488442),
  689. }}, {{
  690. "color": "yellow",
  691. "temp": 27.4,
  692. "ts": float64(1541152488442),
  693. }, {
  694. "color": "red",
  695. "temp": 25.5,
  696. "ts": float64(1541152489252),
  697. }},
  698. },
  699. m: map[string]interface{}{
  700. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  701. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  702. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  703. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  704. "kuiper_op_preprocessor_demo1_0_exceptions_total": int64(0),
  705. "kuiper_op_preprocessor_demo1_0_process_latency_ms": int64(0),
  706. "kuiper_op_preprocessor_demo1_0_records_in_total": int64(5),
  707. "kuiper_op_preprocessor_demo1_0_records_out_total": int64(5),
  708. "kuiper_op_project_0_exceptions_total": int64(0),
  709. "kuiper_op_project_0_process_latency_ms": int64(0),
  710. "kuiper_op_project_0_records_in_total": int64(8),
  711. "kuiper_op_project_0_records_out_total": int64(8),
  712. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  713. "kuiper_sink_mockSink_0_records_in_total": int64(8),
  714. "kuiper_sink_mockSink_0_records_out_total": int64(8),
  715. "kuiper_source_demo_0_exceptions_total": int64(0),
  716. "kuiper_source_demo_0_records_in_total": int64(5),
  717. "kuiper_source_demo_0_records_out_total": int64(5),
  718. "kuiper_source_demo1_0_exceptions_total": int64(0),
  719. "kuiper_source_demo1_0_records_in_total": int64(5),
  720. "kuiper_source_demo1_0_records_out_total": int64(5),
  721. "kuiper_op_window_0_exceptions_total": int64(0),
  722. "kuiper_op_window_0_process_latency_ms": int64(0),
  723. "kuiper_op_window_0_records_in_total": int64(10),
  724. "kuiper_op_window_0_records_out_total": int64(10),
  725. "kuiper_op_join_0_exceptions_total": int64(0),
  726. "kuiper_op_join_0_process_latency_ms": int64(0),
  727. "kuiper_op_join_0_records_in_total": int64(10),
  728. "kuiper_op_join_0_records_out_total": int64(8),
  729. },
  730. }, {
  731. name: `rule4`,
  732. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  733. size: 5,
  734. r: [][]map[string]interface{}{
  735. {{
  736. "color": "red",
  737. }}, {{
  738. "color": "blue",
  739. }, {
  740. "color": "red",
  741. }}, {{
  742. "color": "blue",
  743. }, {
  744. "color": "red",
  745. }}, {{
  746. "color": "blue",
  747. }, {
  748. "color": "yellow",
  749. }}, {{
  750. "color": "blue",
  751. }, {
  752. "color": "red",
  753. }, {
  754. "color": "yellow",
  755. }},
  756. },
  757. m: map[string]interface{}{
  758. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  759. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  760. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  761. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  762. "kuiper_op_project_0_exceptions_total": int64(0),
  763. "kuiper_op_project_0_process_latency_ms": int64(0),
  764. "kuiper_op_project_0_records_in_total": int64(5),
  765. "kuiper_op_project_0_records_out_total": int64(5),
  766. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  767. "kuiper_sink_mockSink_0_records_in_total": int64(5),
  768. "kuiper_sink_mockSink_0_records_out_total": int64(5),
  769. "kuiper_source_demo_0_exceptions_total": int64(0),
  770. "kuiper_source_demo_0_records_in_total": int64(5),
  771. "kuiper_source_demo_0_records_out_total": int64(5),
  772. "kuiper_op_window_0_exceptions_total": int64(0),
  773. "kuiper_op_window_0_process_latency_ms": int64(0),
  774. "kuiper_op_window_0_records_in_total": int64(5),
  775. "kuiper_op_window_0_records_out_total": int64(5),
  776. "kuiper_op_aggregate_0_exceptions_total": int64(0),
  777. "kuiper_op_aggregate_0_process_latency_ms": int64(0),
  778. "kuiper_op_aggregate_0_records_in_total": int64(5),
  779. "kuiper_op_aggregate_0_records_out_total": int64(5),
  780. "kuiper_op_order_0_exceptions_total": int64(0),
  781. "kuiper_op_order_0_process_latency_ms": int64(0),
  782. "kuiper_op_order_0_records_in_total": int64(5),
  783. "kuiper_op_order_0_records_out_total": int64(5),
  784. },
  785. }, {
  786. name: `rule5`,
  787. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  788. size: 11,
  789. r: [][]map[string]interface{}{
  790. {{
  791. "temp": 25.5,
  792. }, {
  793. "temp": 27.5,
  794. }}, {{
  795. "temp": 28.1,
  796. }, {
  797. "temp": 27.4,
  798. }, {
  799. "temp": 25.5,
  800. }}, {{
  801. "temp": 26.2,
  802. }, {
  803. "temp": 26.8,
  804. }, {
  805. "temp": 28.9,
  806. }, {
  807. "temp": 29.1,
  808. }, {
  809. "temp": 32.2,
  810. }},
  811. },
  812. m: map[string]interface{}{
  813. "kuiper_op_preprocessor_sessionDemo_0_exceptions_total": int64(0),
  814. "kuiper_op_preprocessor_sessionDemo_0_process_latency_ms": int64(0),
  815. "kuiper_op_preprocessor_sessionDemo_0_records_in_total": int64(11),
  816. "kuiper_op_preprocessor_sessionDemo_0_records_out_total": int64(11),
  817. "kuiper_op_project_0_exceptions_total": int64(0),
  818. "kuiper_op_project_0_process_latency_ms": int64(0),
  819. "kuiper_op_project_0_records_in_total": int64(3),
  820. "kuiper_op_project_0_records_out_total": int64(3),
  821. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  822. "kuiper_sink_mockSink_0_records_in_total": int64(3),
  823. "kuiper_sink_mockSink_0_records_out_total": int64(3),
  824. "kuiper_source_sessionDemo_0_exceptions_total": int64(0),
  825. "kuiper_source_sessionDemo_0_records_in_total": int64(11),
  826. "kuiper_source_sessionDemo_0_records_out_total": int64(11),
  827. "kuiper_op_window_0_exceptions_total": int64(0),
  828. "kuiper_op_window_0_process_latency_ms": int64(0),
  829. "kuiper_op_window_0_records_in_total": int64(11),
  830. "kuiper_op_window_0_records_out_total": int64(3),
  831. },
  832. }, {
  833. name: `rule6`,
  834. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  835. size: 5,
  836. r: [][]map[string]interface{}{
  837. {{
  838. "m": 25.5,
  839. "c": float64(1),
  840. }}, {{
  841. "m": 25.5,
  842. "c": float64(1),
  843. }}, {{
  844. "m": 25.5,
  845. "c": float64(1),
  846. }}, {{
  847. "m": 28.1,
  848. "c": float64(1),
  849. }}, {{
  850. "m": 28.1,
  851. "c": float64(1),
  852. }}, {{
  853. "m": 28.1,
  854. "c": float64(2),
  855. }}, {{
  856. "m": 27.4,
  857. "c": float64(1),
  858. }}, {{
  859. "m": 27.4,
  860. "c": float64(2),
  861. }},
  862. },
  863. m: map[string]interface{}{
  864. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  865. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  866. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  867. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  868. "kuiper_op_preprocessor_demo1_0_exceptions_total": int64(0),
  869. "kuiper_op_preprocessor_demo1_0_process_latency_ms": int64(0),
  870. "kuiper_op_preprocessor_demo1_0_records_in_total": int64(5),
  871. "kuiper_op_preprocessor_demo1_0_records_out_total": int64(5),
  872. "kuiper_op_project_0_exceptions_total": int64(0),
  873. "kuiper_op_project_0_process_latency_ms": int64(0),
  874. "kuiper_op_project_0_records_in_total": int64(8),
  875. "kuiper_op_project_0_records_out_total": int64(8),
  876. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  877. "kuiper_sink_mockSink_0_records_in_total": int64(8),
  878. "kuiper_sink_mockSink_0_records_out_total": int64(8),
  879. "kuiper_source_demo_0_exceptions_total": int64(0),
  880. "kuiper_source_demo_0_records_in_total": int64(5),
  881. "kuiper_source_demo_0_records_out_total": int64(5),
  882. "kuiper_source_demo1_0_exceptions_total": int64(0),
  883. "kuiper_source_demo1_0_records_in_total": int64(5),
  884. "kuiper_source_demo1_0_records_out_total": int64(5),
  885. "kuiper_op_window_0_exceptions_total": int64(0),
  886. "kuiper_op_window_0_process_latency_ms": int64(0),
  887. "kuiper_op_window_0_records_in_total": int64(10),
  888. "kuiper_op_window_0_records_out_total": int64(10),
  889. "kuiper_op_join_0_exceptions_total": int64(0),
  890. "kuiper_op_join_0_process_latency_ms": int64(0),
  891. "kuiper_op_join_0_records_in_total": int64(10),
  892. "kuiper_op_join_0_records_out_total": int64(8),
  893. },
  894. },
  895. }
  896. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  897. createStreams(t)
  898. defer dropStreams(t)
  899. done := make(chan struct{})
  900. defer close(done)
  901. common.ResetMockTicker()
  902. for i, tt := range tests {
  903. p := NewRuleProcessor(DbDir)
  904. parser := xsql.NewParser(strings.NewReader(tt.sql))
  905. var sources []*nodes.SourceNode
  906. if stmt, err := xsql.Language.Parse(parser); err != nil {
  907. t.Errorf("parse sql %s error: %s", tt.sql, err)
  908. } else {
  909. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  910. t.Errorf("sql %s is not a select statement", tt.sql)
  911. } else {
  912. streams := xsql.GetStreams(selectStmt)
  913. for _, stream := range streams {
  914. source := getMockSource(stream, done, tt.size)
  915. sources = append(sources, source)
  916. }
  917. }
  918. }
  919. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  920. if err != nil {
  921. t.Error(err)
  922. }
  923. mockSink := test.NewMockSink()
  924. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  925. tp.AddSink(inputs, sink)
  926. count := len(sources)
  927. errCh := tp.Open()
  928. func() {
  929. for {
  930. select {
  931. case err = <-errCh:
  932. t.Log(err)
  933. tp.Cancel()
  934. return
  935. case <-done:
  936. count--
  937. log.Infof("%d sources remaining", count)
  938. if count <= 0 {
  939. log.Info("stream stopping")
  940. time.Sleep(1 * time.Second)
  941. tp.Cancel()
  942. return
  943. }
  944. default:
  945. }
  946. }
  947. }()
  948. results := mockSink.GetResults()
  949. var maps [][]map[string]interface{}
  950. for _, v := range results {
  951. var mapRes []map[string]interface{}
  952. err := json.Unmarshal(v, &mapRes)
  953. if err != nil {
  954. t.Errorf("Failed to parse the input into map")
  955. continue
  956. }
  957. maps = append(maps, mapRes)
  958. }
  959. if !reflect.DeepEqual(tt.r, maps) {
  960. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  961. }
  962. metrics := tp.GetMetrics()
  963. for k, v := range tt.m {
  964. if v != metrics[k] {
  965. t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, k, v, metrics[k])
  966. break
  967. }
  968. }
  969. }
  970. }
  971. func createEventStreams(t *testing.T) {
  972. demo := `CREATE STREAM demoE (
  973. color STRING,
  974. size BIGINT,
  975. ts BIGINT
  976. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  977. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  978. if err != nil {
  979. t.Log(err)
  980. }
  981. demo1 := `CREATE STREAM demo1E (
  982. temp FLOAT,
  983. hum BIGINT,
  984. ts BIGINT
  985. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  986. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  987. if err != nil {
  988. t.Log(err)
  989. }
  990. sessionDemo := `CREATE STREAM sessionDemoE (
  991. temp FLOAT,
  992. hum BIGINT,
  993. ts BIGINT
  994. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  995. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  996. if err != nil {
  997. t.Log(err)
  998. }
  999. }
  1000. func dropEventStreams(t *testing.T) {
  1001. demo := `DROP STREAM demoE`
  1002. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  1003. if err != nil {
  1004. t.Log(err)
  1005. }
  1006. demo1 := `DROP STREAM demo1E`
  1007. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  1008. if err != nil {
  1009. t.Log(err)
  1010. }
  1011. sessionDemo := `DROP STREAM sessionDemoE`
  1012. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  1013. if err != nil {
  1014. t.Log(err)
  1015. }
  1016. }
  1017. func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode {
  1018. var data []*xsql.Tuple
  1019. switch name {
  1020. case "demoE":
  1021. data = []*xsql.Tuple{
  1022. {
  1023. Emitter: name,
  1024. Message: map[string]interface{}{
  1025. "color": "red",
  1026. "size": 3,
  1027. "ts": 1541152486013,
  1028. },
  1029. Timestamp: 1541152486013,
  1030. },
  1031. {
  1032. Emitter: name,
  1033. Message: map[string]interface{}{
  1034. "color": "blue",
  1035. "size": 2,
  1036. "ts": 1541152487632,
  1037. },
  1038. Timestamp: 1541152487632,
  1039. },
  1040. {
  1041. Emitter: name,
  1042. Message: map[string]interface{}{
  1043. "color": "red",
  1044. "size": 1,
  1045. "ts": 1541152489252,
  1046. },
  1047. Timestamp: 1541152489252,
  1048. },
  1049. { //dropped item
  1050. Emitter: name,
  1051. Message: map[string]interface{}{
  1052. "color": "blue",
  1053. "size": 6,
  1054. "ts": 1541152486822,
  1055. },
  1056. Timestamp: 1541152486822,
  1057. },
  1058. {
  1059. Emitter: name,
  1060. Message: map[string]interface{}{
  1061. "color": "yellow",
  1062. "size": 4,
  1063. "ts": 1541152488442,
  1064. },
  1065. Timestamp: 1541152488442,
  1066. },
  1067. { //To lift the watermark and issue all windows
  1068. Emitter: name,
  1069. Message: map[string]interface{}{
  1070. "color": "yellow",
  1071. "size": 4,
  1072. "ts": 1541152492342,
  1073. },
  1074. Timestamp: 1541152488442,
  1075. },
  1076. }
  1077. case "demo1E":
  1078. data = []*xsql.Tuple{
  1079. {
  1080. Emitter: name,
  1081. Message: map[string]interface{}{
  1082. "temp": 27.5,
  1083. "hum": 59,
  1084. "ts": 1541152486823,
  1085. },
  1086. Timestamp: 1541152486823,
  1087. },
  1088. {
  1089. Emitter: name,
  1090. Message: map[string]interface{}{
  1091. "temp": 25.5,
  1092. "hum": 65,
  1093. "ts": 1541152486013,
  1094. },
  1095. Timestamp: 1541152486013,
  1096. },
  1097. {
  1098. Emitter: name,
  1099. Message: map[string]interface{}{
  1100. "temp": 27.4,
  1101. "hum": 80,
  1102. "ts": 1541152488442,
  1103. },
  1104. Timestamp: 1541152488442,
  1105. },
  1106. {
  1107. Emitter: name,
  1108. Message: map[string]interface{}{
  1109. "temp": 28.1,
  1110. "hum": 75,
  1111. "ts": 1541152487632,
  1112. },
  1113. Timestamp: 1541152487632,
  1114. },
  1115. {
  1116. Emitter: name,
  1117. Message: map[string]interface{}{
  1118. "temp": 25.5,
  1119. "hum": 62,
  1120. "ts": 1541152489252,
  1121. },
  1122. Timestamp: 1541152489252,
  1123. },
  1124. {
  1125. Emitter: name,
  1126. Message: map[string]interface{}{
  1127. "temp": 25.5,
  1128. "hum": 62,
  1129. "ts": 1541152499252,
  1130. },
  1131. Timestamp: 1541152499252,
  1132. },
  1133. }
  1134. case "sessionDemoE":
  1135. data = []*xsql.Tuple{
  1136. {
  1137. Emitter: name,
  1138. Message: map[string]interface{}{
  1139. "temp": 25.5,
  1140. "hum": 65,
  1141. "ts": 1541152486013,
  1142. },
  1143. Timestamp: 1541152486013,
  1144. },
  1145. {
  1146. Emitter: name,
  1147. Message: map[string]interface{}{
  1148. "temp": 28.1,
  1149. "hum": 75,
  1150. "ts": 1541152487932,
  1151. },
  1152. Timestamp: 1541152487932,
  1153. },
  1154. {
  1155. Emitter: name,
  1156. Message: map[string]interface{}{
  1157. "temp": 27.5,
  1158. "hum": 59,
  1159. "ts": 1541152486823,
  1160. },
  1161. Timestamp: 1541152486823,
  1162. },
  1163. {
  1164. Emitter: name,
  1165. Message: map[string]interface{}{
  1166. "temp": 25.5,
  1167. "hum": 62,
  1168. "ts": 1541152489252,
  1169. },
  1170. Timestamp: 1541152489252,
  1171. },
  1172. {
  1173. Emitter: name,
  1174. Message: map[string]interface{}{
  1175. "temp": 27.4,
  1176. "hum": 80,
  1177. "ts": 1541152488442,
  1178. },
  1179. Timestamp: 1541152488442,
  1180. },
  1181. {
  1182. Emitter: name,
  1183. Message: map[string]interface{}{
  1184. "temp": 26.2,
  1185. "hum": 63,
  1186. "ts": 1541152490062,
  1187. },
  1188. Timestamp: 1541152490062,
  1189. },
  1190. {
  1191. Emitter: name,
  1192. Message: map[string]interface{}{
  1193. "temp": 28.9,
  1194. "hum": 85,
  1195. "ts": 1541152491682,
  1196. },
  1197. Timestamp: 1541152491682,
  1198. },
  1199. {
  1200. Emitter: name,
  1201. Message: map[string]interface{}{
  1202. "temp": 26.8,
  1203. "hum": 71,
  1204. "ts": 1541152490872,
  1205. },
  1206. Timestamp: 1541152490872,
  1207. },
  1208. {
  1209. Emitter: name,
  1210. Message: map[string]interface{}{
  1211. "temp": 29.1,
  1212. "hum": 92,
  1213. "ts": 1541152492492,
  1214. },
  1215. Timestamp: 1541152492492,
  1216. },
  1217. {
  1218. Emitter: name,
  1219. Message: map[string]interface{}{
  1220. "temp": 30.9,
  1221. "hum": 87,
  1222. "ts": 1541152494112,
  1223. },
  1224. Timestamp: 1541152494112,
  1225. },
  1226. {
  1227. Emitter: name,
  1228. Message: map[string]interface{}{
  1229. "temp": 32.2,
  1230. "hum": 99,
  1231. "ts": 1541152493202,
  1232. },
  1233. Timestamp: 1541152493202,
  1234. },
  1235. {
  1236. Emitter: name,
  1237. Message: map[string]interface{}{
  1238. "temp": 32.2,
  1239. "hum": 99,
  1240. "ts": 1541152499202,
  1241. },
  1242. Timestamp: 1541152499202,
  1243. },
  1244. }
  1245. }
  1246. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, true), map[string]string{
  1247. "DATASOURCE": name,
  1248. })
  1249. }
  1250. func TestEventWindow(t *testing.T) {
  1251. common.IsTesting = true
  1252. var tests = []struct {
  1253. name string
  1254. sql string
  1255. size int
  1256. r [][]map[string]interface{}
  1257. m map[string]interface{}
  1258. }{
  1259. {
  1260. name: `rule1`,
  1261. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1262. size: 6,
  1263. r: [][]map[string]interface{}{
  1264. {{
  1265. "color": "red",
  1266. "size": float64(3),
  1267. "ts": float64(1541152486013),
  1268. }},
  1269. {{
  1270. "color": "red",
  1271. "size": float64(3),
  1272. "ts": float64(1541152486013),
  1273. }, {
  1274. "color": "blue",
  1275. "size": float64(2),
  1276. "ts": float64(1541152487632),
  1277. }},
  1278. {{
  1279. "color": "blue",
  1280. "size": float64(2),
  1281. "ts": float64(1541152487632),
  1282. }, {
  1283. "color": "yellow",
  1284. "size": float64(4),
  1285. "ts": float64(1541152488442),
  1286. }}, {{
  1287. "color": "yellow",
  1288. "size": float64(4),
  1289. "ts": float64(1541152488442),
  1290. }, {
  1291. "color": "red",
  1292. "size": float64(1),
  1293. "ts": float64(1541152489252),
  1294. }}, {{
  1295. "color": "red",
  1296. "size": float64(1),
  1297. "ts": float64(1541152489252),
  1298. }},
  1299. },
  1300. m: map[string]interface{}{
  1301. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1302. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1303. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1304. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1305. "kuiper_op_project_0_exceptions_total": int64(0),
  1306. "kuiper_op_project_0_process_latency_ms": int64(0),
  1307. "kuiper_op_project_0_records_in_total": int64(5),
  1308. "kuiper_op_project_0_records_out_total": int64(5),
  1309. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1310. "kuiper_sink_MockSink_0_records_in_total": int64(5),
  1311. "kuiper_sink_MockSink_0_records_out_total": int64(5),
  1312. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1313. "kuiper_source_demoE_0_records_in_total": int64(6),
  1314. "kuiper_source_demoE_0_records_out_total": int64(6),
  1315. "kuiper_op_window_0_exceptions_total": int64(0),
  1316. "kuiper_op_window_0_process_latency_ms": int64(0),
  1317. "kuiper_op_window_0_records_in_total": int64(6),
  1318. "kuiper_op_window_0_records_out_total": int64(5),
  1319. },
  1320. }, {
  1321. name: `rule2`,
  1322. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1323. size: 6,
  1324. r: [][]map[string]interface{}{
  1325. {{
  1326. "color": "red",
  1327. "ts": float64(1541152486013),
  1328. }},
  1329. {{
  1330. "color": "yellow",
  1331. "ts": float64(1541152488442),
  1332. }},
  1333. },
  1334. m: map[string]interface{}{
  1335. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1336. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1337. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1338. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1339. "kuiper_op_project_0_exceptions_total": int64(0),
  1340. "kuiper_op_project_0_process_latency_ms": int64(0),
  1341. "kuiper_op_project_0_records_in_total": int64(2),
  1342. "kuiper_op_project_0_records_out_total": int64(2),
  1343. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1344. "kuiper_sink_MockSink_0_records_in_total": int64(2),
  1345. "kuiper_sink_MockSink_0_records_out_total": int64(2),
  1346. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1347. "kuiper_source_demoE_0_records_in_total": int64(6),
  1348. "kuiper_source_demoE_0_records_out_total": int64(6),
  1349. "kuiper_op_window_0_exceptions_total": int64(0),
  1350. "kuiper_op_window_0_process_latency_ms": int64(0),
  1351. "kuiper_op_window_0_records_in_total": int64(6),
  1352. "kuiper_op_window_0_records_out_total": int64(4),
  1353. "kuiper_op_filter_0_exceptions_total": int64(0),
  1354. "kuiper_op_filter_0_process_latency_ms": int64(0),
  1355. "kuiper_op_filter_0_records_in_total": int64(4),
  1356. "kuiper_op_filter_0_records_out_total": int64(2),
  1357. },
  1358. }, {
  1359. name: `rule3`,
  1360. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1361. size: 6,
  1362. r: [][]map[string]interface{}{
  1363. {{
  1364. "color": "red",
  1365. "temp": 25.5,
  1366. "ts": float64(1541152486013),
  1367. }}, {{
  1368. "color": "red",
  1369. "temp": 25.5,
  1370. "ts": float64(1541152486013),
  1371. }}, {{
  1372. "color": "blue",
  1373. "temp": 28.1,
  1374. "ts": float64(1541152487632),
  1375. }}, {{
  1376. "color": "blue",
  1377. "temp": 28.1,
  1378. "ts": float64(1541152487632),
  1379. }, {
  1380. "color": "yellow",
  1381. "temp": 27.4,
  1382. "ts": float64(1541152488442),
  1383. }}, {{
  1384. "color": "yellow",
  1385. "temp": 27.4,
  1386. "ts": float64(1541152488442),
  1387. }, {
  1388. "color": "red",
  1389. "temp": 25.5,
  1390. "ts": float64(1541152489252),
  1391. }},
  1392. },
  1393. m: map[string]interface{}{
  1394. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1395. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1396. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1397. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1398. "kuiper_op_preprocessor_demo1E_0_exceptions_total": int64(0),
  1399. "kuiper_op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  1400. "kuiper_op_preprocessor_demo1E_0_records_in_total": int64(6),
  1401. "kuiper_op_preprocessor_demo1E_0_records_out_total": int64(6),
  1402. "kuiper_op_project_0_exceptions_total": int64(0),
  1403. "kuiper_op_project_0_process_latency_ms": int64(0),
  1404. "kuiper_op_project_0_records_in_total": int64(5),
  1405. "kuiper_op_project_0_records_out_total": int64(5),
  1406. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1407. "kuiper_sink_MockSink_0_records_in_total": int64(5),
  1408. "kuiper_sink_MockSink_0_records_out_total": int64(5),
  1409. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1410. "kuiper_source_demoE_0_records_in_total": int64(6),
  1411. "kuiper_source_demoE_0_records_out_total": int64(6),
  1412. "kuiper_source_demo1E_0_exceptions_total": int64(0),
  1413. "kuiper_source_demo1E_0_records_in_total": int64(6),
  1414. "kuiper_source_demo1E_0_records_out_total": int64(6),
  1415. "kuiper_op_window_0_exceptions_total": int64(0),
  1416. "kuiper_op_window_0_process_latency_ms": int64(0),
  1417. "kuiper_op_window_0_records_in_total": int64(12),
  1418. "kuiper_op_window_0_records_out_total": int64(5),
  1419. "kuiper_op_join_0_exceptions_total": int64(0),
  1420. "kuiper_op_join_0_process_latency_ms": int64(0),
  1421. "kuiper_op_join_0_records_in_total": int64(5),
  1422. "kuiper_op_join_0_records_out_total": int64(5),
  1423. },
  1424. }, {
  1425. name: `rule4`,
  1426. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1427. size: 6,
  1428. r: [][]map[string]interface{}{
  1429. {{
  1430. "color": "red",
  1431. }}, {{
  1432. "color": "blue",
  1433. }, {
  1434. "color": "red",
  1435. }}, {{
  1436. "color": "blue",
  1437. }, {
  1438. "color": "yellow",
  1439. }}, {{
  1440. "color": "blue",
  1441. }, {
  1442. "color": "red",
  1443. }, {
  1444. "color": "yellow",
  1445. }},
  1446. },
  1447. m: map[string]interface{}{
  1448. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1449. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1450. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1451. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1452. "kuiper_op_project_0_exceptions_total": int64(0),
  1453. "kuiper_op_project_0_process_latency_ms": int64(0),
  1454. "kuiper_op_project_0_records_in_total": int64(4),
  1455. "kuiper_op_project_0_records_out_total": int64(4),
  1456. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1457. "kuiper_sink_MockSink_0_records_in_total": int64(4),
  1458. "kuiper_sink_MockSink_0_records_out_total": int64(4),
  1459. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1460. "kuiper_source_demoE_0_records_in_total": int64(6),
  1461. "kuiper_source_demoE_0_records_out_total": int64(6),
  1462. "kuiper_op_window_0_exceptions_total": int64(0),
  1463. "kuiper_op_window_0_process_latency_ms": int64(0),
  1464. "kuiper_op_window_0_records_in_total": int64(6),
  1465. "kuiper_op_window_0_records_out_total": int64(4),
  1466. "kuiper_op_aggregate_0_exceptions_total": int64(0),
  1467. "kuiper_op_aggregate_0_process_latency_ms": int64(0),
  1468. "kuiper_op_aggregate_0_records_in_total": int64(4),
  1469. "kuiper_op_aggregate_0_records_out_total": int64(4),
  1470. "kuiper_op_order_0_exceptions_total": int64(0),
  1471. "kuiper_op_order_0_process_latency_ms": int64(0),
  1472. "kuiper_op_order_0_records_in_total": int64(4),
  1473. "kuiper_op_order_0_records_out_total": int64(4),
  1474. },
  1475. }, {
  1476. name: `rule5`,
  1477. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1478. size: 12,
  1479. r: [][]map[string]interface{}{
  1480. {{
  1481. "temp": 25.5,
  1482. }}, {{
  1483. "temp": 28.1,
  1484. }, {
  1485. "temp": 27.4,
  1486. }, {
  1487. "temp": 25.5,
  1488. }}, {{
  1489. "temp": 26.2,
  1490. }, {
  1491. "temp": 26.8,
  1492. }, {
  1493. "temp": 28.9,
  1494. }, {
  1495. "temp": 29.1,
  1496. }, {
  1497. "temp": 32.2,
  1498. }}, {{
  1499. "temp": 30.9,
  1500. }},
  1501. },
  1502. m: map[string]interface{}{
  1503. "kuiper_op_preprocessor_sessionDemoE_0_exceptions_total": int64(0),
  1504. "kuiper_op_preprocessor_sessionDemoE_0_process_latency_ms": int64(0),
  1505. "kuiper_op_preprocessor_sessionDemoE_0_records_in_total": int64(12),
  1506. "kuiper_op_preprocessor_sessionDemoE_0_records_out_total": int64(12),
  1507. "kuiper_op_project_0_exceptions_total": int64(0),
  1508. "kuiper_op_project_0_process_latency_ms": int64(0),
  1509. "kuiper_op_project_0_records_in_total": int64(4),
  1510. "kuiper_op_project_0_records_out_total": int64(4),
  1511. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1512. "kuiper_sink_MockSink_0_records_in_total": int64(4),
  1513. "kuiper_sink_MockSink_0_records_out_total": int64(4),
  1514. "kuiper_source_sessionDemoE_0_exceptions_total": int64(0),
  1515. "kuiper_source_sessionDemoE_0_records_in_total": int64(12),
  1516. "kuiper_source_sessionDemoE_0_records_out_total": int64(12),
  1517. "kuiper_op_window_0_exceptions_total": int64(0),
  1518. "kuiper_op_window_0_process_latency_ms": int64(0),
  1519. "kuiper_op_window_0_records_in_total": int64(12),
  1520. "kuiper_op_window_0_records_out_total": int64(4),
  1521. },
  1522. }, {
  1523. name: `rule6`,
  1524. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1525. size: 6,
  1526. r: [][]map[string]interface{}{
  1527. {{
  1528. "m": 25.5,
  1529. "c": float64(1),
  1530. }}, {{
  1531. "m": 25.5,
  1532. "c": float64(1),
  1533. }}, {{
  1534. "m": 28.1,
  1535. "c": float64(1),
  1536. }}, {{
  1537. "m": 28.1,
  1538. "c": float64(2),
  1539. }}, {{
  1540. "m": 27.4,
  1541. "c": float64(2),
  1542. }},
  1543. },
  1544. m: map[string]interface{}{
  1545. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1546. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1547. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1548. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1549. "kuiper_op_preprocessor_demo1E_0_exceptions_total": int64(0),
  1550. "kuiper_op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  1551. "kuiper_op_preprocessor_demo1E_0_records_in_total": int64(6),
  1552. "kuiper_op_preprocessor_demo1E_0_records_out_total": int64(6),
  1553. "kuiper_op_project_0_exceptions_total": int64(0),
  1554. "kuiper_op_project_0_process_latency_ms": int64(0),
  1555. "kuiper_op_project_0_records_in_total": int64(5),
  1556. "kuiper_op_project_0_records_out_total": int64(5),
  1557. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1558. "kuiper_sink_MockSink_0_records_in_total": int64(5),
  1559. "kuiper_sink_MockSink_0_records_out_total": int64(5),
  1560. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1561. "kuiper_source_demoE_0_records_in_total": int64(6),
  1562. "kuiper_source_demoE_0_records_out_total": int64(6),
  1563. "kuiper_source_demo1E_0_exceptions_total": int64(0),
  1564. "kuiper_source_demo1E_0_records_in_total": int64(6),
  1565. "kuiper_source_demo1E_0_records_out_total": int64(6),
  1566. "kuiper_op_window_0_exceptions_total": int64(0),
  1567. "kuiper_op_window_0_records_in_total": int64(12),
  1568. "kuiper_op_window_0_records_out_total": int64(5),
  1569. "kuiper_op_join_0_exceptions_total": int64(0),
  1570. "kuiper_op_join_0_process_latency_ms": int64(0),
  1571. "kuiper_op_join_0_records_in_total": int64(5),
  1572. "kuiper_op_join_0_records_out_total": int64(5),
  1573. },
  1574. },
  1575. }
  1576. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1577. createEventStreams(t)
  1578. defer dropEventStreams(t)
  1579. done := make(chan struct{})
  1580. defer close(done)
  1581. common.ResetMockTicker()
  1582. //mock ticker
  1583. realTicker := time.NewTicker(500 * time.Millisecond)
  1584. tickerDone := make(chan bool)
  1585. go func() {
  1586. ticker := common.GetTicker(1000).(*common.MockTicker)
  1587. timer := common.GetTimer(1000).(*common.MockTimer)
  1588. for {
  1589. select {
  1590. case <-tickerDone:
  1591. log.Infof("real ticker exiting...")
  1592. return
  1593. case t := <-realTicker.C:
  1594. ts := common.TimeToUnixMilli(t)
  1595. if ticker != nil {
  1596. go ticker.DoTick(ts)
  1597. }
  1598. if timer != nil {
  1599. go timer.DoTick(ts)
  1600. }
  1601. }
  1602. }
  1603. }()
  1604. for i, tt := range tests {
  1605. p := NewRuleProcessor(DbDir)
  1606. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1607. var sources []*nodes.SourceNode
  1608. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1609. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1610. } else {
  1611. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1612. t.Errorf("sql %s is not a select statement", tt.sql)
  1613. } else {
  1614. streams := xsql.GetStreams(selectStmt)
  1615. for _, stream := range streams {
  1616. source := getEventMockSource(stream, done, tt.size)
  1617. sources = append(sources, source)
  1618. }
  1619. }
  1620. }
  1621. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  1622. Id: tt.name, Sql: tt.sql,
  1623. Options: map[string]interface{}{
  1624. "isEventTime": true,
  1625. "lateTolerance": float64(1000),
  1626. },
  1627. }, sources)
  1628. if err != nil {
  1629. t.Error(err)
  1630. }
  1631. mockSink := test.NewMockSink()
  1632. sink := nodes.NewSinkNodeWithSink("MockSink", mockSink)
  1633. tp.AddSink(inputs, sink)
  1634. count := len(sources)
  1635. errCh := tp.Open()
  1636. func() {
  1637. for {
  1638. select {
  1639. case err = <-errCh:
  1640. t.Log(err)
  1641. tp.Cancel()
  1642. return
  1643. case <-done:
  1644. count--
  1645. log.Infof("%d sources remaining", count)
  1646. if count <= 0 {
  1647. log.Info("stream stopping")
  1648. time.Sleep(1 * time.Second)
  1649. tp.Cancel()
  1650. return
  1651. }
  1652. default:
  1653. }
  1654. }
  1655. }()
  1656. results := mockSink.GetResults()
  1657. var maps [][]map[string]interface{}
  1658. for _, v := range results {
  1659. var mapRes []map[string]interface{}
  1660. err := json.Unmarshal(v, &mapRes)
  1661. if err != nil {
  1662. t.Errorf("Failed to parse the input into map")
  1663. continue
  1664. }
  1665. maps = append(maps, mapRes)
  1666. }
  1667. if !reflect.DeepEqual(tt.r, maps) {
  1668. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1669. }
  1670. metrics := tp.GetMetrics()
  1671. for k, v := range tt.m {
  1672. if v != metrics[k] {
  1673. t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, k, v, metrics[k])
  1674. break
  1675. }
  1676. }
  1677. }
  1678. realTicker.Stop()
  1679. tickerDone <- true
  1680. close(tickerDone)
  1681. }
  1682. func errstring(err error) string {
  1683. if err != nil {
  1684. return err.Error()
  1685. }
  1686. return ""
  1687. }