xsql_processor_test.go 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783
  1. package processors
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/emqx/kuiper/xstream/nodes"
  9. "github.com/emqx/kuiper/xstream/test"
  10. "path"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. "time"
  15. )
  16. var DbDir = getDbDir()
  17. func getDbDir() string {
  18. dbDir, err := common.GetAndCreateDataLoc("test")
  19. if err != nil {
  20. log.Panic(err)
  21. }
  22. log.Infof("db location is %s", dbDir)
  23. return dbDir
  24. }
  25. func TestStreamCreateProcessor(t *testing.T) {
  26. var tests = []struct {
  27. s string
  28. r []string
  29. err string
  30. }{
  31. {
  32. s: `SHOW STREAMS;`,
  33. r: []string{"No stream definitions are found."},
  34. },
  35. {
  36. s: `EXPLAIN STREAM topic1;`,
  37. err: "Stream topic1 is not found.",
  38. },
  39. {
  40. s: `CREATE STREAM topic1 (
  41. USERID BIGINT,
  42. FIRST_NAME STRING,
  43. LAST_NAME STRING,
  44. NICKNAMES ARRAY(STRING),
  45. Gender BOOLEAN,
  46. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  47. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  48. r: []string{"Stream topic1 is created."},
  49. },
  50. {
  51. s: `CREATE STREAM topic1 (
  52. USERID BIGINT,
  53. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  54. err: "Create stream fails: Item topic1 already exists.",
  55. },
  56. {
  57. s: `EXPLAIN STREAM topic1;`,
  58. r: []string{"TO BE SUPPORTED"},
  59. },
  60. {
  61. s: `DESCRIBE STREAM topic1;`,
  62. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  63. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  64. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  65. },
  66. {
  67. s: `SHOW STREAMS;`,
  68. r: []string{"topic1"},
  69. },
  70. {
  71. s: `DROP STREAM topic1;`,
  72. r: []string{"Stream topic1 is dropped."},
  73. },
  74. {
  75. s: `DESCRIBE STREAM topic1;`,
  76. err: "Stream topic1 is not found.",
  77. },
  78. {
  79. s: `DROP STREAM topic1;`,
  80. err: "Drop stream fails: topic1 is not found.",
  81. },
  82. }
  83. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  84. streamDB := path.Join(getDbDir(), "streamTest")
  85. for i, tt := range tests {
  86. results, err := NewStreamProcessor(tt.s, streamDB).Exec()
  87. if !reflect.DeepEqual(tt.err, errstring(err)) {
  88. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  89. } else if tt.err == "" {
  90. if !reflect.DeepEqual(tt.r, results) {
  91. t.Errorf("%d. %q\n\nstmt mismatch:\n\ngot=%#v\n\n", i, tt.s, results)
  92. }
  93. }
  94. }
  95. }
  96. func createStreams(t *testing.T) {
  97. demo := `CREATE STREAM demo (
  98. color STRING,
  99. size BIGINT,
  100. ts BIGINT
  101. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  102. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  103. if err != nil {
  104. t.Log(err)
  105. }
  106. demo1 := `CREATE STREAM demo1 (
  107. temp FLOAT,
  108. hum BIGINT,
  109. ts BIGINT
  110. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  111. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  112. if err != nil {
  113. t.Log(err)
  114. }
  115. sessionDemo := `CREATE STREAM sessionDemo (
  116. temp FLOAT,
  117. hum BIGINT,
  118. ts BIGINT
  119. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  120. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  121. if err != nil {
  122. t.Log(err)
  123. }
  124. }
  125. func dropStreams(t *testing.T) {
  126. demo := `DROP STREAM demo`
  127. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  128. if err != nil {
  129. t.Log(err)
  130. }
  131. demo1 := `DROP STREAM demo1`
  132. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  133. if err != nil {
  134. t.Log(err)
  135. }
  136. sessionDemo := `DROP STREAM sessionDemo`
  137. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  138. if err != nil {
  139. t.Log(err)
  140. }
  141. }
  142. func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode {
  143. var data []*xsql.Tuple
  144. switch name {
  145. case "demo":
  146. data = []*xsql.Tuple{
  147. {
  148. Emitter: name,
  149. Message: map[string]interface{}{
  150. "color": "red",
  151. "size": 3,
  152. "ts": 1541152486013,
  153. },
  154. Timestamp: 1541152486013,
  155. },
  156. {
  157. Emitter: name,
  158. Message: map[string]interface{}{
  159. "color": "blue",
  160. "size": 6,
  161. "ts": 1541152486822,
  162. },
  163. Timestamp: 1541152486822,
  164. },
  165. {
  166. Emitter: name,
  167. Message: map[string]interface{}{
  168. "color": "blue",
  169. "size": 2,
  170. "ts": 1541152487632,
  171. },
  172. Timestamp: 1541152487632,
  173. },
  174. {
  175. Emitter: name,
  176. Message: map[string]interface{}{
  177. "color": "yellow",
  178. "size": 4,
  179. "ts": 1541152488442,
  180. },
  181. Timestamp: 1541152488442,
  182. },
  183. {
  184. Emitter: name,
  185. Message: map[string]interface{}{
  186. "color": "red",
  187. "size": 1,
  188. "ts": 1541152489252,
  189. },
  190. Timestamp: 1541152489252,
  191. },
  192. }
  193. case "demo1":
  194. data = []*xsql.Tuple{
  195. {
  196. Emitter: name,
  197. Message: map[string]interface{}{
  198. "temp": 25.5,
  199. "hum": 65,
  200. "ts": 1541152486013,
  201. },
  202. Timestamp: 1541152486013,
  203. },
  204. {
  205. Emitter: name,
  206. Message: map[string]interface{}{
  207. "temp": 27.5,
  208. "hum": 59,
  209. "ts": 1541152486823,
  210. },
  211. Timestamp: 1541152486823,
  212. },
  213. {
  214. Emitter: name,
  215. Message: map[string]interface{}{
  216. "temp": 28.1,
  217. "hum": 75,
  218. "ts": 1541152487632,
  219. },
  220. Timestamp: 1541152487632,
  221. },
  222. {
  223. Emitter: name,
  224. Message: map[string]interface{}{
  225. "temp": 27.4,
  226. "hum": 80,
  227. "ts": 1541152488442,
  228. },
  229. Timestamp: 1541152488442,
  230. },
  231. {
  232. Emitter: name,
  233. Message: map[string]interface{}{
  234. "temp": 25.5,
  235. "hum": 62,
  236. "ts": 1541152489252,
  237. },
  238. Timestamp: 1541152489252,
  239. },
  240. }
  241. case "sessionDemo":
  242. data = []*xsql.Tuple{
  243. {
  244. Emitter: name,
  245. Message: map[string]interface{}{
  246. "temp": 25.5,
  247. "hum": 65,
  248. "ts": 1541152486013,
  249. },
  250. Timestamp: 1541152486013,
  251. },
  252. {
  253. Emitter: name,
  254. Message: map[string]interface{}{
  255. "temp": 27.5,
  256. "hum": 59,
  257. "ts": 1541152486823,
  258. },
  259. Timestamp: 1541152486823,
  260. },
  261. {
  262. Emitter: name,
  263. Message: map[string]interface{}{
  264. "temp": 28.1,
  265. "hum": 75,
  266. "ts": 1541152487932,
  267. },
  268. Timestamp: 1541152487932,
  269. },
  270. {
  271. Emitter: name,
  272. Message: map[string]interface{}{
  273. "temp": 27.4,
  274. "hum": 80,
  275. "ts": 1541152488442,
  276. },
  277. Timestamp: 1541152488442,
  278. },
  279. {
  280. Emitter: name,
  281. Message: map[string]interface{}{
  282. "temp": 25.5,
  283. "hum": 62,
  284. "ts": 1541152489252,
  285. },
  286. Timestamp: 1541152489252,
  287. },
  288. {
  289. Emitter: name,
  290. Message: map[string]interface{}{
  291. "temp": 26.2,
  292. "hum": 63,
  293. "ts": 1541152490062,
  294. },
  295. Timestamp: 1541152490062,
  296. },
  297. {
  298. Emitter: name,
  299. Message: map[string]interface{}{
  300. "temp": 26.8,
  301. "hum": 71,
  302. "ts": 1541152490872,
  303. },
  304. Timestamp: 1541152490872,
  305. },
  306. {
  307. Emitter: name,
  308. Message: map[string]interface{}{
  309. "temp": 28.9,
  310. "hum": 85,
  311. "ts": 1541152491682,
  312. },
  313. Timestamp: 1541152491682,
  314. },
  315. {
  316. Emitter: name,
  317. Message: map[string]interface{}{
  318. "temp": 29.1,
  319. "hum": 92,
  320. "ts": 1541152492492,
  321. },
  322. Timestamp: 1541152492492,
  323. },
  324. {
  325. Emitter: name,
  326. Message: map[string]interface{}{
  327. "temp": 32.2,
  328. "hum": 99,
  329. "ts": 1541152493202,
  330. },
  331. Timestamp: 1541152493202,
  332. },
  333. {
  334. Emitter: name,
  335. Message: map[string]interface{}{
  336. "temp": 30.9,
  337. "hum": 87,
  338. "ts": 1541152494112,
  339. },
  340. Timestamp: 1541152494112,
  341. },
  342. }
  343. }
  344. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  345. "DATASOURCE": name,
  346. })
  347. }
  348. func TestSingleSQL(t *testing.T) {
  349. var tests = []struct {
  350. name string
  351. sql string
  352. r [][]map[string]interface{}
  353. m map[string]interface{}
  354. }{
  355. {
  356. name: `rule1`,
  357. sql: `SELECT * FROM demo`,
  358. r: [][]map[string]interface{}{
  359. {{
  360. "color": "red",
  361. "size": float64(3),
  362. "ts": float64(1541152486013),
  363. }},
  364. {{
  365. "color": "blue",
  366. "size": float64(6),
  367. "ts": float64(1541152486822),
  368. }},
  369. {{
  370. "color": "blue",
  371. "size": float64(2),
  372. "ts": float64(1541152487632),
  373. }},
  374. {{
  375. "color": "yellow",
  376. "size": float64(4),
  377. "ts": float64(1541152488442),
  378. }},
  379. {{
  380. "color": "red",
  381. "size": float64(1),
  382. "ts": float64(1541152489252),
  383. }},
  384. },
  385. m: map[string]interface{}{
  386. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  387. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  388. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  389. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  390. "kuiper_op_project_0_exceptions_total": int64(0),
  391. "kuiper_op_project_0_process_latency_ms": int64(0),
  392. "kuiper_op_project_0_records_in_total": int64(5),
  393. "kuiper_op_project_0_records_out_total": int64(5),
  394. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  395. "kuiper_sink_MockSink_0_records_in_total": int64(5),
  396. "kuiper_sink_MockSink_0_records_out_total": int64(5),
  397. "kuiper_source_demo_0_exceptions_total": int64(0),
  398. "kuiper_source_demo_0_records_in_total": int64(5),
  399. "kuiper_source_demo_0_records_out_total": int64(5),
  400. },
  401. }, {
  402. name: `rule2`,
  403. sql: `SELECT color, ts FROM demo where size > 3`,
  404. r: [][]map[string]interface{}{
  405. {{
  406. "color": "blue",
  407. "ts": float64(1541152486822),
  408. }},
  409. {{
  410. "color": "yellow",
  411. "ts": float64(1541152488442),
  412. }},
  413. },
  414. m: map[string]interface{}{
  415. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  416. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  417. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  418. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  419. "kuiper_op_project_0_exceptions_total": int64(0),
  420. "kuiper_op_project_0_process_latency_ms": int64(0),
  421. "kuiper_op_project_0_records_in_total": int64(2),
  422. "kuiper_op_project_0_records_out_total": int64(2),
  423. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  424. "kuiper_sink_MockSink_0_records_in_total": int64(2),
  425. "kuiper_sink_MockSink_0_records_out_total": int64(2),
  426. "kuiper_source_demo_0_exceptions_total": int64(0),
  427. "kuiper_source_demo_0_records_in_total": int64(5),
  428. "kuiper_source_demo_0_records_out_total": int64(5),
  429. "kuiper_op_filter_0_exceptions_total": int64(0),
  430. "kuiper_op_filter_0_process_latency_ms": int64(0),
  431. "kuiper_op_filter_0_records_in_total": int64(5),
  432. "kuiper_op_filter_0_records_out_total": int64(2),
  433. },
  434. }, {
  435. name: `rule3`,
  436. sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  437. r: [][]map[string]interface{}{
  438. {{
  439. "Int8": float64(6),
  440. "ts": float64(1541152486822),
  441. }},
  442. {{
  443. "Int8": float64(4),
  444. "ts": float64(1541152488442),
  445. }},
  446. },
  447. m: map[string]interface{}{
  448. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  449. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  450. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  451. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  452. "kuiper_op_project_0_exceptions_total": int64(0),
  453. "kuiper_op_project_0_process_latency_ms": int64(0),
  454. "kuiper_op_project_0_records_in_total": int64(2),
  455. "kuiper_op_project_0_records_out_total": int64(2),
  456. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  457. "kuiper_sink_MockSink_0_records_in_total": int64(2),
  458. "kuiper_sink_MockSink_0_records_out_total": int64(2),
  459. "kuiper_source_demo_0_exceptions_total": int64(0),
  460. "kuiper_source_demo_0_records_in_total": int64(5),
  461. "kuiper_source_demo_0_records_out_total": int64(5),
  462. "kuiper_op_filter_0_exceptions_total": int64(0),
  463. "kuiper_op_filter_0_process_latency_ms": int64(0),
  464. "kuiper_op_filter_0_records_in_total": int64(5),
  465. "kuiper_op_filter_0_records_out_total": int64(2),
  466. },
  467. },
  468. }
  469. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  470. createStreams(t)
  471. defer dropStreams(t)
  472. done := make(chan struct{})
  473. //defer close(done)
  474. for i, tt := range tests {
  475. p := NewRuleProcessor(DbDir)
  476. parser := xsql.NewParser(strings.NewReader(tt.sql))
  477. var sources []*nodes.SourceNode
  478. if stmt, err := xsql.Language.Parse(parser); err != nil {
  479. t.Errorf("parse sql %s error: %s", tt.sql, err)
  480. } else {
  481. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  482. t.Errorf("sql %s is not a select statement", tt.sql)
  483. } else {
  484. streams := xsql.GetStreams(selectStmt)
  485. for _, stream := range streams {
  486. source := getMockSource(stream, done, 5)
  487. sources = append(sources, source)
  488. }
  489. }
  490. }
  491. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options:map[string]interface{}{
  492. "bufferLength": float64(100),
  493. }}, sources)
  494. if err != nil {
  495. t.Error(err)
  496. }
  497. mockSink := test.NewMockSink()
  498. sink := nodes.NewSinkNodeWithSink("MockSink", mockSink)
  499. tp.AddSink(inputs, sink)
  500. count := len(sources)
  501. errCh := tp.Open()
  502. func() {
  503. for {
  504. select {
  505. case err = <-errCh:
  506. t.Log(err)
  507. tp.Cancel()
  508. return
  509. case <-done:
  510. count--
  511. log.Infof("%d sources remaining", count)
  512. if count <= 0 {
  513. log.Info("stream stopping")
  514. time.Sleep(1 * time.Second)
  515. tp.Cancel()
  516. return
  517. }
  518. default:
  519. }
  520. }
  521. }()
  522. results := mockSink.GetResults()
  523. var maps [][]map[string]interface{}
  524. for _, v := range results {
  525. var mapRes []map[string]interface{}
  526. err := json.Unmarshal(v, &mapRes)
  527. if err != nil {
  528. t.Errorf("Failed to parse the input into map")
  529. continue
  530. }
  531. maps = append(maps, mapRes)
  532. }
  533. //if !reflect.DeepEqual(tt.r, maps) {
  534. // t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  535. // continue
  536. //}
  537. metrics := tp.GetMetrics()
  538. log.Infof("metrics: %v", metrics)
  539. for k, v := range tt.m {
  540. if v != metrics[k] {
  541. t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, k, v, metrics[k])
  542. break
  543. }
  544. }
  545. }
  546. }
  547. func TestWindow(t *testing.T) {
  548. common.IsTesting = true
  549. var tests = []struct {
  550. name string
  551. sql string
  552. size int
  553. r [][]map[string]interface{}
  554. m map[string]interface{}
  555. }{
  556. {
  557. name: `rule1`,
  558. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  559. size: 5,
  560. r: [][]map[string]interface{}{
  561. {{
  562. "color": "red",
  563. "size": float64(3),
  564. "ts": float64(1541152486013),
  565. }, {
  566. "color": "blue",
  567. "size": float64(6),
  568. "ts": float64(1541152486822),
  569. }},
  570. {{
  571. "color": "red",
  572. "size": float64(3),
  573. "ts": float64(1541152486013),
  574. }, {
  575. "color": "blue",
  576. "size": float64(6),
  577. "ts": float64(1541152486822),
  578. }, {
  579. "color": "blue",
  580. "size": float64(2),
  581. "ts": float64(1541152487632),
  582. }},
  583. {{
  584. "color": "blue",
  585. "size": float64(2),
  586. "ts": float64(1541152487632),
  587. }, {
  588. "color": "yellow",
  589. "size": float64(4),
  590. "ts": float64(1541152488442),
  591. }},
  592. },
  593. m: map[string]interface{}{
  594. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  595. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  596. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  597. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  598. "kuiper_op_project_0_exceptions_total": int64(0),
  599. "kuiper_op_project_0_process_latency_ms": int64(0),
  600. "kuiper_op_project_0_records_in_total": int64(3),
  601. "kuiper_op_project_0_records_out_total": int64(3),
  602. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  603. "kuiper_sink_mockSink_0_records_in_total": int64(3),
  604. "kuiper_sink_mockSink_0_records_out_total": int64(3),
  605. "kuiper_source_demo_0_exceptions_total": int64(0),
  606. "kuiper_source_demo_0_records_in_total": int64(5),
  607. "kuiper_source_demo_0_records_out_total": int64(5),
  608. "kuiper_op_window_0_exceptions_total": int64(0),
  609. "kuiper_op_window_0_process_latency_ms": int64(0),
  610. "kuiper_op_window_0_records_in_total": int64(5),
  611. "kuiper_op_window_0_records_out_total": int64(3),
  612. },
  613. }, {
  614. name: `rule2`,
  615. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  616. size: 5,
  617. r: [][]map[string]interface{}{
  618. {{
  619. "color": "red",
  620. "ts": float64(1541152486013),
  621. }, {
  622. "color": "blue",
  623. "ts": float64(1541152486822),
  624. }},
  625. {{
  626. "color": "yellow",
  627. "ts": float64(1541152488442),
  628. }},
  629. },
  630. m: map[string]interface{}{
  631. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  632. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  633. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  634. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  635. "kuiper_op_project_0_exceptions_total": int64(0),
  636. "kuiper_op_project_0_process_latency_ms": int64(0),
  637. "kuiper_op_project_0_records_in_total": int64(2),
  638. "kuiper_op_project_0_records_out_total": int64(2),
  639. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  640. "kuiper_sink_mockSink_0_records_in_total": int64(2),
  641. "kuiper_sink_mockSink_0_records_out_total": int64(2),
  642. "kuiper_source_demo_0_exceptions_total": int64(0),
  643. "kuiper_source_demo_0_records_in_total": int64(5),
  644. "kuiper_source_demo_0_records_out_total": int64(5),
  645. "kuiper_op_window_0_exceptions_total": int64(0),
  646. "kuiper_op_window_0_process_latency_ms": int64(0),
  647. "kuiper_op_window_0_records_in_total": int64(5),
  648. "kuiper_op_window_0_records_out_total": int64(3),
  649. "kuiper_op_filter_0_exceptions_total": int64(0),
  650. "kuiper_op_filter_0_process_latency_ms": int64(0),
  651. "kuiper_op_filter_0_records_in_total": int64(3),
  652. "kuiper_op_filter_0_records_out_total": int64(2),
  653. },
  654. }, {
  655. name: `rule3`,
  656. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  657. size: 5,
  658. r: [][]map[string]interface{}{
  659. {{
  660. "color": "red",
  661. "temp": 25.5,
  662. "ts": float64(1541152486013),
  663. }}, {{
  664. "color": "red",
  665. "temp": 25.5,
  666. "ts": float64(1541152486013),
  667. }}, {{
  668. "color": "red",
  669. "temp": 25.5,
  670. "ts": float64(1541152486013),
  671. }}, {{
  672. "color": "blue",
  673. "temp": 28.1,
  674. "ts": float64(1541152487632),
  675. }}, {{
  676. "color": "blue",
  677. "temp": 28.1,
  678. "ts": float64(1541152487632),
  679. }}, {{
  680. "color": "blue",
  681. "temp": 28.1,
  682. "ts": float64(1541152487632),
  683. }, {
  684. "color": "yellow",
  685. "temp": 27.4,
  686. "ts": float64(1541152488442),
  687. }}, {{
  688. "color": "yellow",
  689. "temp": 27.4,
  690. "ts": float64(1541152488442),
  691. }}, {{
  692. "color": "yellow",
  693. "temp": 27.4,
  694. "ts": float64(1541152488442),
  695. }, {
  696. "color": "red",
  697. "temp": 25.5,
  698. "ts": float64(1541152489252),
  699. }},
  700. },
  701. m: map[string]interface{}{
  702. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  703. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  704. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  705. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  706. "kuiper_op_preprocessor_demo1_0_exceptions_total": int64(0),
  707. "kuiper_op_preprocessor_demo1_0_process_latency_ms": int64(0),
  708. "kuiper_op_preprocessor_demo1_0_records_in_total": int64(5),
  709. "kuiper_op_preprocessor_demo1_0_records_out_total": int64(5),
  710. "kuiper_op_project_0_exceptions_total": int64(0),
  711. "kuiper_op_project_0_process_latency_ms": int64(0),
  712. "kuiper_op_project_0_records_in_total": int64(8),
  713. "kuiper_op_project_0_records_out_total": int64(8),
  714. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  715. "kuiper_sink_mockSink_0_records_in_total": int64(8),
  716. "kuiper_sink_mockSink_0_records_out_total": int64(8),
  717. "kuiper_source_demo_0_exceptions_total": int64(0),
  718. "kuiper_source_demo_0_records_in_total": int64(5),
  719. "kuiper_source_demo_0_records_out_total": int64(5),
  720. "kuiper_source_demo1_0_exceptions_total": int64(0),
  721. "kuiper_source_demo1_0_records_in_total": int64(5),
  722. "kuiper_source_demo1_0_records_out_total": int64(5),
  723. "kuiper_op_window_0_exceptions_total": int64(0),
  724. "kuiper_op_window_0_process_latency_ms": int64(0),
  725. "kuiper_op_window_0_records_in_total": int64(10),
  726. "kuiper_op_window_0_records_out_total": int64(10),
  727. "kuiper_op_join_0_exceptions_total": int64(0),
  728. "kuiper_op_join_0_process_latency_ms": int64(0),
  729. "kuiper_op_join_0_records_in_total": int64(10),
  730. "kuiper_op_join_0_records_out_total": int64(8),
  731. },
  732. }, {
  733. name: `rule4`,
  734. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  735. size: 5,
  736. r: [][]map[string]interface{}{
  737. {{
  738. "color": "red",
  739. }}, {{
  740. "color": "blue",
  741. }, {
  742. "color": "red",
  743. }}, {{
  744. "color": "blue",
  745. }, {
  746. "color": "red",
  747. }}, {{
  748. "color": "blue",
  749. }, {
  750. "color": "yellow",
  751. }}, {{
  752. "color": "blue",
  753. }, {
  754. "color": "red",
  755. }, {
  756. "color": "yellow",
  757. }},
  758. },
  759. m: map[string]interface{}{
  760. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  761. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  762. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  763. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  764. "kuiper_op_project_0_exceptions_total": int64(0),
  765. "kuiper_op_project_0_process_latency_ms": int64(0),
  766. "kuiper_op_project_0_records_in_total": int64(5),
  767. "kuiper_op_project_0_records_out_total": int64(5),
  768. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  769. "kuiper_sink_mockSink_0_records_in_total": int64(5),
  770. "kuiper_sink_mockSink_0_records_out_total": int64(5),
  771. "kuiper_source_demo_0_exceptions_total": int64(0),
  772. "kuiper_source_demo_0_records_in_total": int64(5),
  773. "kuiper_source_demo_0_records_out_total": int64(5),
  774. "kuiper_op_window_0_exceptions_total": int64(0),
  775. "kuiper_op_window_0_process_latency_ms": int64(0),
  776. "kuiper_op_window_0_records_in_total": int64(5),
  777. "kuiper_op_window_0_records_out_total": int64(5),
  778. "kuiper_op_aggregate_0_exceptions_total": int64(0),
  779. "kuiper_op_aggregate_0_process_latency_ms": int64(0),
  780. "kuiper_op_aggregate_0_records_in_total": int64(5),
  781. "kuiper_op_aggregate_0_records_out_total": int64(5),
  782. "kuiper_op_order_0_exceptions_total": int64(0),
  783. "kuiper_op_order_0_process_latency_ms": int64(0),
  784. "kuiper_op_order_0_records_in_total": int64(5),
  785. "kuiper_op_order_0_records_out_total": int64(5),
  786. },
  787. }, {
  788. name: `rule5`,
  789. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  790. size: 11,
  791. r: [][]map[string]interface{}{
  792. {{
  793. "temp": 25.5,
  794. }, {
  795. "temp": 27.5,
  796. }}, {{
  797. "temp": 28.1,
  798. }, {
  799. "temp": 27.4,
  800. }, {
  801. "temp": 25.5,
  802. }}, {{
  803. "temp": 26.2,
  804. }, {
  805. "temp": 26.8,
  806. }, {
  807. "temp": 28.9,
  808. }, {
  809. "temp": 29.1,
  810. }, {
  811. "temp": 32.2,
  812. }},
  813. },
  814. m: map[string]interface{}{
  815. "kuiper_op_preprocessor_sessionDemo_0_exceptions_total": int64(0),
  816. "kuiper_op_preprocessor_sessionDemo_0_process_latency_ms": int64(0),
  817. "kuiper_op_preprocessor_sessionDemo_0_records_in_total": int64(11),
  818. "kuiper_op_preprocessor_sessionDemo_0_records_out_total": int64(11),
  819. "kuiper_op_project_0_exceptions_total": int64(0),
  820. "kuiper_op_project_0_process_latency_ms": int64(0),
  821. "kuiper_op_project_0_records_in_total": int64(3),
  822. "kuiper_op_project_0_records_out_total": int64(3),
  823. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  824. "kuiper_sink_mockSink_0_records_in_total": int64(3),
  825. "kuiper_sink_mockSink_0_records_out_total": int64(3),
  826. "kuiper_source_sessionDemo_0_exceptions_total": int64(0),
  827. "kuiper_source_sessionDemo_0_records_in_total": int64(11),
  828. "kuiper_source_sessionDemo_0_records_out_total": int64(11),
  829. "kuiper_op_window_0_exceptions_total": int64(0),
  830. "kuiper_op_window_0_process_latency_ms": int64(0),
  831. "kuiper_op_window_0_records_in_total": int64(11),
  832. "kuiper_op_window_0_records_out_total": int64(3),
  833. },
  834. }, {
  835. name: `rule6`,
  836. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  837. size: 5,
  838. r: [][]map[string]interface{}{
  839. {{
  840. "m": 25.5,
  841. "c": float64(1),
  842. }}, {{
  843. "m": 25.5,
  844. "c": float64(1),
  845. }}, {{
  846. "m": 25.5,
  847. "c": float64(1),
  848. }}, {{
  849. "m": 28.1,
  850. "c": float64(1),
  851. }}, {{
  852. "m": 28.1,
  853. "c": float64(1),
  854. }}, {{
  855. "m": 28.1,
  856. "c": float64(2),
  857. }}, {{
  858. "m": 27.4,
  859. "c": float64(1),
  860. }}, {{
  861. "m": 27.4,
  862. "c": float64(2),
  863. }},
  864. },
  865. m: map[string]interface{}{
  866. "kuiper_op_preprocessor_demo_0_exceptions_total": int64(0),
  867. "kuiper_op_preprocessor_demo_0_process_latency_ms": int64(0),
  868. "kuiper_op_preprocessor_demo_0_records_in_total": int64(5),
  869. "kuiper_op_preprocessor_demo_0_records_out_total": int64(5),
  870. "kuiper_op_preprocessor_demo1_0_exceptions_total": int64(0),
  871. "kuiper_op_preprocessor_demo1_0_process_latency_ms": int64(0),
  872. "kuiper_op_preprocessor_demo1_0_records_in_total": int64(5),
  873. "kuiper_op_preprocessor_demo1_0_records_out_total": int64(5),
  874. "kuiper_op_project_0_exceptions_total": int64(0),
  875. "kuiper_op_project_0_process_latency_ms": int64(0),
  876. "kuiper_op_project_0_records_in_total": int64(8),
  877. "kuiper_op_project_0_records_out_total": int64(8),
  878. "kuiper_sink_mockSink_0_exceptions_total": int64(0),
  879. "kuiper_sink_mockSink_0_records_in_total": int64(8),
  880. "kuiper_sink_mockSink_0_records_out_total": int64(8),
  881. "kuiper_source_demo_0_exceptions_total": int64(0),
  882. "kuiper_source_demo_0_records_in_total": int64(5),
  883. "kuiper_source_demo_0_records_out_total": int64(5),
  884. "kuiper_source_demo1_0_exceptions_total": int64(0),
  885. "kuiper_source_demo1_0_records_in_total": int64(5),
  886. "kuiper_source_demo1_0_records_out_total": int64(5),
  887. "kuiper_op_window_0_exceptions_total": int64(0),
  888. "kuiper_op_window_0_process_latency_ms": int64(0),
  889. "kuiper_op_window_0_records_in_total": int64(10),
  890. "kuiper_op_window_0_records_out_total": int64(10),
  891. "kuiper_op_join_0_exceptions_total": int64(0),
  892. "kuiper_op_join_0_process_latency_ms": int64(0),
  893. "kuiper_op_join_0_records_in_total": int64(10),
  894. "kuiper_op_join_0_records_out_total": int64(8),
  895. },
  896. },
  897. }
  898. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  899. createStreams(t)
  900. defer dropStreams(t)
  901. done := make(chan struct{})
  902. defer close(done)
  903. common.ResetMockTicker()
  904. for i, tt := range tests {
  905. p := NewRuleProcessor(DbDir)
  906. parser := xsql.NewParser(strings.NewReader(tt.sql))
  907. var sources []*nodes.SourceNode
  908. if stmt, err := xsql.Language.Parse(parser); err != nil {
  909. t.Errorf("parse sql %s error: %s", tt.sql, err)
  910. } else {
  911. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  912. t.Errorf("sql %s is not a select statement", tt.sql)
  913. } else {
  914. streams := xsql.GetStreams(selectStmt)
  915. for _, stream := range streams {
  916. source := getMockSource(stream, done, tt.size)
  917. sources = append(sources, source)
  918. }
  919. }
  920. }
  921. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  922. if err != nil {
  923. t.Error(err)
  924. }
  925. mockSink := test.NewMockSink()
  926. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  927. tp.AddSink(inputs, sink)
  928. count := len(sources)
  929. errCh := tp.Open()
  930. func() {
  931. for {
  932. select {
  933. case err = <-errCh:
  934. t.Log(err)
  935. tp.Cancel()
  936. return
  937. case <-done:
  938. count--
  939. log.Infof("%d sources remaining", count)
  940. if count <= 0 {
  941. log.Info("stream stopping")
  942. time.Sleep(1 * time.Second)
  943. tp.Cancel()
  944. return
  945. }
  946. default:
  947. }
  948. }
  949. }()
  950. results := mockSink.GetResults()
  951. var maps [][]map[string]interface{}
  952. for _, v := range results {
  953. var mapRes []map[string]interface{}
  954. err := json.Unmarshal(v, &mapRes)
  955. if err != nil {
  956. t.Errorf("Failed to parse the input into map")
  957. continue
  958. }
  959. maps = append(maps, mapRes)
  960. }
  961. if !reflect.DeepEqual(tt.r, maps) {
  962. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  963. }
  964. metrics := tp.GetMetrics()
  965. for k, v := range tt.m {
  966. if v != metrics[k] {
  967. t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, k, v, metrics[k])
  968. break
  969. }
  970. }
  971. }
  972. }
  973. func createEventStreams(t *testing.T) {
  974. demo := `CREATE STREAM demoE (
  975. color STRING,
  976. size BIGINT,
  977. ts BIGINT
  978. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  979. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  980. if err != nil {
  981. t.Log(err)
  982. }
  983. demo1 := `CREATE STREAM demo1E (
  984. temp FLOAT,
  985. hum BIGINT,
  986. ts BIGINT
  987. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  988. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  989. if err != nil {
  990. t.Log(err)
  991. }
  992. sessionDemo := `CREATE STREAM sessionDemoE (
  993. temp FLOAT,
  994. hum BIGINT,
  995. ts BIGINT
  996. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  997. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  998. if err != nil {
  999. t.Log(err)
  1000. }
  1001. }
  1002. func dropEventStreams(t *testing.T) {
  1003. demo := `DROP STREAM demoE`
  1004. _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
  1005. if err != nil {
  1006. t.Log(err)
  1007. }
  1008. demo1 := `DROP STREAM demo1E`
  1009. _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
  1010. if err != nil {
  1011. t.Log(err)
  1012. }
  1013. sessionDemo := `DROP STREAM sessionDemoE`
  1014. _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
  1015. if err != nil {
  1016. t.Log(err)
  1017. }
  1018. }
  1019. func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode {
  1020. var data []*xsql.Tuple
  1021. switch name {
  1022. case "demoE":
  1023. data = []*xsql.Tuple{
  1024. {
  1025. Emitter: name,
  1026. Message: map[string]interface{}{
  1027. "color": "red",
  1028. "size": 3,
  1029. "ts": 1541152486013,
  1030. },
  1031. Timestamp: 1541152486013,
  1032. },
  1033. {
  1034. Emitter: name,
  1035. Message: map[string]interface{}{
  1036. "color": "blue",
  1037. "size": 2,
  1038. "ts": 1541152487632,
  1039. },
  1040. Timestamp: 1541152487632,
  1041. },
  1042. {
  1043. Emitter: name,
  1044. Message: map[string]interface{}{
  1045. "color": "red",
  1046. "size": 1,
  1047. "ts": 1541152489252,
  1048. },
  1049. Timestamp: 1541152489252,
  1050. },
  1051. { //dropped item
  1052. Emitter: name,
  1053. Message: map[string]interface{}{
  1054. "color": "blue",
  1055. "size": 6,
  1056. "ts": 1541152486822,
  1057. },
  1058. Timestamp: 1541152486822,
  1059. },
  1060. {
  1061. Emitter: name,
  1062. Message: map[string]interface{}{
  1063. "color": "yellow",
  1064. "size": 4,
  1065. "ts": 1541152488442,
  1066. },
  1067. Timestamp: 1541152488442,
  1068. },
  1069. { //To lift the watermark and issue all windows
  1070. Emitter: name,
  1071. Message: map[string]interface{}{
  1072. "color": "yellow",
  1073. "size": 4,
  1074. "ts": 1541152492342,
  1075. },
  1076. Timestamp: 1541152488442,
  1077. },
  1078. }
  1079. case "demo1E":
  1080. data = []*xsql.Tuple{
  1081. {
  1082. Emitter: name,
  1083. Message: map[string]interface{}{
  1084. "temp": 27.5,
  1085. "hum": 59,
  1086. "ts": 1541152486823,
  1087. },
  1088. Timestamp: 1541152486823,
  1089. },
  1090. {
  1091. Emitter: name,
  1092. Message: map[string]interface{}{
  1093. "temp": 25.5,
  1094. "hum": 65,
  1095. "ts": 1541152486013,
  1096. },
  1097. Timestamp: 1541152486013,
  1098. },
  1099. {
  1100. Emitter: name,
  1101. Message: map[string]interface{}{
  1102. "temp": 27.4,
  1103. "hum": 80,
  1104. "ts": 1541152488442,
  1105. },
  1106. Timestamp: 1541152488442,
  1107. },
  1108. {
  1109. Emitter: name,
  1110. Message: map[string]interface{}{
  1111. "temp": 28.1,
  1112. "hum": 75,
  1113. "ts": 1541152487632,
  1114. },
  1115. Timestamp: 1541152487632,
  1116. },
  1117. {
  1118. Emitter: name,
  1119. Message: map[string]interface{}{
  1120. "temp": 25.5,
  1121. "hum": 62,
  1122. "ts": 1541152489252,
  1123. },
  1124. Timestamp: 1541152489252,
  1125. },
  1126. {
  1127. Emitter: name,
  1128. Message: map[string]interface{}{
  1129. "temp": 25.5,
  1130. "hum": 62,
  1131. "ts": 1541152499252,
  1132. },
  1133. Timestamp: 1541152499252,
  1134. },
  1135. }
  1136. case "sessionDemoE":
  1137. data = []*xsql.Tuple{
  1138. {
  1139. Emitter: name,
  1140. Message: map[string]interface{}{
  1141. "temp": 25.5,
  1142. "hum": 65,
  1143. "ts": 1541152486013,
  1144. },
  1145. Timestamp: 1541152486013,
  1146. },
  1147. {
  1148. Emitter: name,
  1149. Message: map[string]interface{}{
  1150. "temp": 28.1,
  1151. "hum": 75,
  1152. "ts": 1541152487932,
  1153. },
  1154. Timestamp: 1541152487932,
  1155. },
  1156. {
  1157. Emitter: name,
  1158. Message: map[string]interface{}{
  1159. "temp": 27.5,
  1160. "hum": 59,
  1161. "ts": 1541152486823,
  1162. },
  1163. Timestamp: 1541152486823,
  1164. },
  1165. {
  1166. Emitter: name,
  1167. Message: map[string]interface{}{
  1168. "temp": 25.5,
  1169. "hum": 62,
  1170. "ts": 1541152489252,
  1171. },
  1172. Timestamp: 1541152489252,
  1173. },
  1174. {
  1175. Emitter: name,
  1176. Message: map[string]interface{}{
  1177. "temp": 27.4,
  1178. "hum": 80,
  1179. "ts": 1541152488442,
  1180. },
  1181. Timestamp: 1541152488442,
  1182. },
  1183. {
  1184. Emitter: name,
  1185. Message: map[string]interface{}{
  1186. "temp": 26.2,
  1187. "hum": 63,
  1188. "ts": 1541152490062,
  1189. },
  1190. Timestamp: 1541152490062,
  1191. },
  1192. {
  1193. Emitter: name,
  1194. Message: map[string]interface{}{
  1195. "temp": 28.9,
  1196. "hum": 85,
  1197. "ts": 1541152491682,
  1198. },
  1199. Timestamp: 1541152491682,
  1200. },
  1201. {
  1202. Emitter: name,
  1203. Message: map[string]interface{}{
  1204. "temp": 26.8,
  1205. "hum": 71,
  1206. "ts": 1541152490872,
  1207. },
  1208. Timestamp: 1541152490872,
  1209. },
  1210. {
  1211. Emitter: name,
  1212. Message: map[string]interface{}{
  1213. "temp": 29.1,
  1214. "hum": 92,
  1215. "ts": 1541152492492,
  1216. },
  1217. Timestamp: 1541152492492,
  1218. },
  1219. {
  1220. Emitter: name,
  1221. Message: map[string]interface{}{
  1222. "temp": 30.9,
  1223. "hum": 87,
  1224. "ts": 1541152494112,
  1225. },
  1226. Timestamp: 1541152494112,
  1227. },
  1228. {
  1229. Emitter: name,
  1230. Message: map[string]interface{}{
  1231. "temp": 32.2,
  1232. "hum": 99,
  1233. "ts": 1541152493202,
  1234. },
  1235. Timestamp: 1541152493202,
  1236. },
  1237. {
  1238. Emitter: name,
  1239. Message: map[string]interface{}{
  1240. "temp": 32.2,
  1241. "hum": 99,
  1242. "ts": 1541152499202,
  1243. },
  1244. Timestamp: 1541152499202,
  1245. },
  1246. }
  1247. }
  1248. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, true), map[string]string{
  1249. "DATASOURCE": name,
  1250. })
  1251. }
  1252. func TestEventWindow(t *testing.T) {
  1253. common.IsTesting = true
  1254. var tests = []struct {
  1255. name string
  1256. sql string
  1257. size int
  1258. r [][]map[string]interface{}
  1259. m map[string]interface{}
  1260. }{
  1261. {
  1262. name: `rule1`,
  1263. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1264. size: 6,
  1265. r: [][]map[string]interface{}{
  1266. {{
  1267. "color": "red",
  1268. "size": float64(3),
  1269. "ts": float64(1541152486013),
  1270. }},
  1271. {{
  1272. "color": "red",
  1273. "size": float64(3),
  1274. "ts": float64(1541152486013),
  1275. }, {
  1276. "color": "blue",
  1277. "size": float64(2),
  1278. "ts": float64(1541152487632),
  1279. }},
  1280. {{
  1281. "color": "blue",
  1282. "size": float64(2),
  1283. "ts": float64(1541152487632),
  1284. }, {
  1285. "color": "yellow",
  1286. "size": float64(4),
  1287. "ts": float64(1541152488442),
  1288. }}, {{
  1289. "color": "yellow",
  1290. "size": float64(4),
  1291. "ts": float64(1541152488442),
  1292. }, {
  1293. "color": "red",
  1294. "size": float64(1),
  1295. "ts": float64(1541152489252),
  1296. }}, {{
  1297. "color": "red",
  1298. "size": float64(1),
  1299. "ts": float64(1541152489252),
  1300. }},
  1301. },
  1302. m: map[string]interface{}{
  1303. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1304. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1305. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1306. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1307. "kuiper_op_project_0_exceptions_total": int64(0),
  1308. "kuiper_op_project_0_process_latency_ms": int64(0),
  1309. "kuiper_op_project_0_records_in_total": int64(5),
  1310. "kuiper_op_project_0_records_out_total": int64(5),
  1311. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1312. "kuiper_sink_MockSink_0_records_in_total": int64(5),
  1313. "kuiper_sink_MockSink_0_records_out_total": int64(5),
  1314. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1315. "kuiper_source_demoE_0_records_in_total": int64(6),
  1316. "kuiper_source_demoE_0_records_out_total": int64(6),
  1317. "kuiper_op_window_0_exceptions_total": int64(0),
  1318. "kuiper_op_window_0_process_latency_ms": int64(0),
  1319. "kuiper_op_window_0_records_in_total": int64(6),
  1320. "kuiper_op_window_0_records_out_total": int64(5),
  1321. },
  1322. }, {
  1323. name: `rule2`,
  1324. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1325. size: 6,
  1326. r: [][]map[string]interface{}{
  1327. {{
  1328. "color": "red",
  1329. "ts": float64(1541152486013),
  1330. }},
  1331. {{
  1332. "color": "yellow",
  1333. "ts": float64(1541152488442),
  1334. }},
  1335. },
  1336. m: map[string]interface{}{
  1337. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1338. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1339. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1340. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1341. "kuiper_op_project_0_exceptions_total": int64(0),
  1342. "kuiper_op_project_0_process_latency_ms": int64(0),
  1343. "kuiper_op_project_0_records_in_total": int64(2),
  1344. "kuiper_op_project_0_records_out_total": int64(2),
  1345. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1346. "kuiper_sink_MockSink_0_records_in_total": int64(2),
  1347. "kuiper_sink_MockSink_0_records_out_total": int64(2),
  1348. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1349. "kuiper_source_demoE_0_records_in_total": int64(6),
  1350. "kuiper_source_demoE_0_records_out_total": int64(6),
  1351. "kuiper_op_window_0_exceptions_total": int64(0),
  1352. "kuiper_op_window_0_process_latency_ms": int64(0),
  1353. "kuiper_op_window_0_records_in_total": int64(6),
  1354. "kuiper_op_window_0_records_out_total": int64(4),
  1355. "kuiper_op_filter_0_exceptions_total": int64(0),
  1356. "kuiper_op_filter_0_process_latency_ms": int64(0),
  1357. "kuiper_op_filter_0_records_in_total": int64(4),
  1358. "kuiper_op_filter_0_records_out_total": int64(2),
  1359. },
  1360. }, {
  1361. name: `rule3`,
  1362. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1363. size: 6,
  1364. r: [][]map[string]interface{}{
  1365. {{
  1366. "color": "red",
  1367. "temp": 25.5,
  1368. "ts": float64(1541152486013),
  1369. }}, {{
  1370. "color": "red",
  1371. "temp": 25.5,
  1372. "ts": float64(1541152486013),
  1373. }}, {{
  1374. "color": "blue",
  1375. "temp": 28.1,
  1376. "ts": float64(1541152487632),
  1377. }}, {{
  1378. "color": "blue",
  1379. "temp": 28.1,
  1380. "ts": float64(1541152487632),
  1381. }, {
  1382. "color": "yellow",
  1383. "temp": 27.4,
  1384. "ts": float64(1541152488442),
  1385. }}, {{
  1386. "color": "yellow",
  1387. "temp": 27.4,
  1388. "ts": float64(1541152488442),
  1389. }, {
  1390. "color": "red",
  1391. "temp": 25.5,
  1392. "ts": float64(1541152489252),
  1393. }},
  1394. },
  1395. m: map[string]interface{}{
  1396. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1397. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1398. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1399. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1400. "kuiper_op_preprocessor_demo1E_0_exceptions_total": int64(0),
  1401. "kuiper_op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  1402. "kuiper_op_preprocessor_demo1E_0_records_in_total": int64(6),
  1403. "kuiper_op_preprocessor_demo1E_0_records_out_total": int64(6),
  1404. "kuiper_op_project_0_exceptions_total": int64(0),
  1405. "kuiper_op_project_0_process_latency_ms": int64(0),
  1406. "kuiper_op_project_0_records_in_total": int64(5),
  1407. "kuiper_op_project_0_records_out_total": int64(5),
  1408. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1409. "kuiper_sink_MockSink_0_records_in_total": int64(5),
  1410. "kuiper_sink_MockSink_0_records_out_total": int64(5),
  1411. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1412. "kuiper_source_demoE_0_records_in_total": int64(6),
  1413. "kuiper_source_demoE_0_records_out_total": int64(6),
  1414. "kuiper_source_demo1E_0_exceptions_total": int64(0),
  1415. "kuiper_source_demo1E_0_records_in_total": int64(6),
  1416. "kuiper_source_demo1E_0_records_out_total": int64(6),
  1417. "kuiper_op_window_0_exceptions_total": int64(0),
  1418. "kuiper_op_window_0_process_latency_ms": int64(0),
  1419. "kuiper_op_window_0_records_in_total": int64(12),
  1420. "kuiper_op_window_0_records_out_total": int64(5),
  1421. "kuiper_op_join_0_exceptions_total": int64(0),
  1422. "kuiper_op_join_0_process_latency_ms": int64(0),
  1423. "kuiper_op_join_0_records_in_total": int64(5),
  1424. "kuiper_op_join_0_records_out_total": int64(5),
  1425. },
  1426. }, {
  1427. name: `rule4`,
  1428. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1429. size: 6,
  1430. r: [][]map[string]interface{}{
  1431. {{
  1432. "color": "red",
  1433. }}, {{
  1434. "color": "blue",
  1435. }, {
  1436. "color": "red",
  1437. }}, {{
  1438. "color": "blue",
  1439. }, {
  1440. "color": "yellow",
  1441. }}, {{
  1442. "color": "blue",
  1443. }, {
  1444. "color": "red",
  1445. }, {
  1446. "color": "yellow",
  1447. }},
  1448. },
  1449. m: map[string]interface{}{
  1450. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1451. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1452. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1453. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1454. "kuiper_op_project_0_exceptions_total": int64(0),
  1455. "kuiper_op_project_0_process_latency_ms": int64(0),
  1456. "kuiper_op_project_0_records_in_total": int64(4),
  1457. "kuiper_op_project_0_records_out_total": int64(4),
  1458. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1459. "kuiper_sink_MockSink_0_records_in_total": int64(4),
  1460. "kuiper_sink_MockSink_0_records_out_total": int64(4),
  1461. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1462. "kuiper_source_demoE_0_records_in_total": int64(6),
  1463. "kuiper_source_demoE_0_records_out_total": int64(6),
  1464. "kuiper_op_window_0_exceptions_total": int64(0),
  1465. "kuiper_op_window_0_process_latency_ms": int64(0),
  1466. "kuiper_op_window_0_records_in_total": int64(6),
  1467. "kuiper_op_window_0_records_out_total": int64(4),
  1468. "kuiper_op_aggregate_0_exceptions_total": int64(0),
  1469. "kuiper_op_aggregate_0_process_latency_ms": int64(0),
  1470. "kuiper_op_aggregate_0_records_in_total": int64(4),
  1471. "kuiper_op_aggregate_0_records_out_total": int64(4),
  1472. "kuiper_op_order_0_exceptions_total": int64(0),
  1473. "kuiper_op_order_0_process_latency_ms": int64(0),
  1474. "kuiper_op_order_0_records_in_total": int64(4),
  1475. "kuiper_op_order_0_records_out_total": int64(4),
  1476. },
  1477. }, {
  1478. name: `rule5`,
  1479. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1480. size: 12,
  1481. r: [][]map[string]interface{}{
  1482. {{
  1483. "temp": 25.5,
  1484. }}, {{
  1485. "temp": 28.1,
  1486. }, {
  1487. "temp": 27.4,
  1488. }, {
  1489. "temp": 25.5,
  1490. }}, {{
  1491. "temp": 26.2,
  1492. }, {
  1493. "temp": 26.8,
  1494. }, {
  1495. "temp": 28.9,
  1496. }, {
  1497. "temp": 29.1,
  1498. }, {
  1499. "temp": 32.2,
  1500. }}, {{
  1501. "temp": 30.9,
  1502. }},
  1503. },
  1504. m: map[string]interface{}{
  1505. "kuiper_op_preprocessor_sessionDemoE_0_exceptions_total": int64(0),
  1506. "kuiper_op_preprocessor_sessionDemoE_0_process_latency_ms": int64(0),
  1507. "kuiper_op_preprocessor_sessionDemoE_0_records_in_total": int64(12),
  1508. "kuiper_op_preprocessor_sessionDemoE_0_records_out_total": int64(12),
  1509. "kuiper_op_project_0_exceptions_total": int64(0),
  1510. "kuiper_op_project_0_process_latency_ms": int64(0),
  1511. "kuiper_op_project_0_records_in_total": int64(4),
  1512. "kuiper_op_project_0_records_out_total": int64(4),
  1513. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1514. "kuiper_sink_MockSink_0_records_in_total": int64(4),
  1515. "kuiper_sink_MockSink_0_records_out_total": int64(4),
  1516. "kuiper_source_sessionDemoE_0_exceptions_total": int64(0),
  1517. "kuiper_source_sessionDemoE_0_records_in_total": int64(12),
  1518. "kuiper_source_sessionDemoE_0_records_out_total": int64(12),
  1519. "kuiper_op_window_0_exceptions_total": int64(0),
  1520. "kuiper_op_window_0_process_latency_ms": int64(0),
  1521. "kuiper_op_window_0_records_in_total": int64(12),
  1522. "kuiper_op_window_0_records_out_total": int64(4),
  1523. },
  1524. }, {
  1525. name: `rule6`,
  1526. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1527. size: 6,
  1528. r: [][]map[string]interface{}{
  1529. {{
  1530. "m": 25.5,
  1531. "c": float64(1),
  1532. }}, {{
  1533. "m": 25.5,
  1534. "c": float64(1),
  1535. }}, {{
  1536. "m": 28.1,
  1537. "c": float64(1),
  1538. }}, {{
  1539. "m": 28.1,
  1540. "c": float64(2),
  1541. }}, {{
  1542. "m": 27.4,
  1543. "c": float64(2),
  1544. }},
  1545. },
  1546. m: map[string]interface{}{
  1547. "kuiper_op_preprocessor_demoE_0_exceptions_total": int64(0),
  1548. "kuiper_op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1549. "kuiper_op_preprocessor_demoE_0_records_in_total": int64(6),
  1550. "kuiper_op_preprocessor_demoE_0_records_out_total": int64(6),
  1551. "kuiper_op_preprocessor_demo1E_0_exceptions_total": int64(0),
  1552. "kuiper_op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  1553. "kuiper_op_preprocessor_demo1E_0_records_in_total": int64(6),
  1554. "kuiper_op_preprocessor_demo1E_0_records_out_total": int64(6),
  1555. "kuiper_op_project_0_exceptions_total": int64(0),
  1556. "kuiper_op_project_0_process_latency_ms": int64(0),
  1557. "kuiper_op_project_0_records_in_total": int64(5),
  1558. "kuiper_op_project_0_records_out_total": int64(5),
  1559. "kuiper_sink_MockSink_0_exceptions_total": int64(0),
  1560. "kuiper_sink_MockSink_0_records_in_total": int64(5),
  1561. "kuiper_sink_MockSink_0_records_out_total": int64(5),
  1562. "kuiper_source_demoE_0_exceptions_total": int64(0),
  1563. "kuiper_source_demoE_0_records_in_total": int64(6),
  1564. "kuiper_source_demoE_0_records_out_total": int64(6),
  1565. "kuiper_source_demo1E_0_exceptions_total": int64(0),
  1566. "kuiper_source_demo1E_0_records_in_total": int64(6),
  1567. "kuiper_source_demo1E_0_records_out_total": int64(6),
  1568. "kuiper_op_window_0_exceptions_total": int64(0),
  1569. "kuiper_op_window_0_records_in_total": int64(12),
  1570. "kuiper_op_window_0_records_out_total": int64(5),
  1571. "kuiper_op_join_0_exceptions_total": int64(0),
  1572. "kuiper_op_join_0_process_latency_ms": int64(0),
  1573. "kuiper_op_join_0_records_in_total": int64(5),
  1574. "kuiper_op_join_0_records_out_total": int64(5),
  1575. },
  1576. },
  1577. }
  1578. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1579. createEventStreams(t)
  1580. defer dropEventStreams(t)
  1581. done := make(chan struct{})
  1582. defer close(done)
  1583. common.ResetMockTicker()
  1584. //mock ticker
  1585. realTicker := time.NewTicker(500 * time.Millisecond)
  1586. tickerDone := make(chan bool)
  1587. go func() {
  1588. ticker := common.GetTicker(1000).(*common.MockTicker)
  1589. timer := common.GetTimer(1000).(*common.MockTimer)
  1590. for {
  1591. select {
  1592. case <-tickerDone:
  1593. log.Infof("real ticker exiting...")
  1594. return
  1595. case t := <-realTicker.C:
  1596. ts := common.TimeToUnixMilli(t)
  1597. if ticker != nil {
  1598. go ticker.DoTick(ts)
  1599. }
  1600. if timer != nil {
  1601. go timer.DoTick(ts)
  1602. }
  1603. }
  1604. }
  1605. }()
  1606. for i, tt := range tests {
  1607. p := NewRuleProcessor(DbDir)
  1608. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1609. var sources []*nodes.SourceNode
  1610. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1611. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1612. } else {
  1613. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1614. t.Errorf("sql %s is not a select statement", tt.sql)
  1615. } else {
  1616. streams := xsql.GetStreams(selectStmt)
  1617. for _, stream := range streams {
  1618. source := getEventMockSource(stream, done, tt.size)
  1619. sources = append(sources, source)
  1620. }
  1621. }
  1622. }
  1623. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  1624. Id: tt.name, Sql: tt.sql,
  1625. Options: map[string]interface{}{
  1626. "isEventTime": true,
  1627. "lateTolerance": float64(1000),
  1628. },
  1629. }, sources)
  1630. if err != nil {
  1631. t.Error(err)
  1632. }
  1633. mockSink := test.NewMockSink()
  1634. sink := nodes.NewSinkNodeWithSink("MockSink", mockSink)
  1635. tp.AddSink(inputs, sink)
  1636. count := len(sources)
  1637. errCh := tp.Open()
  1638. func() {
  1639. for {
  1640. select {
  1641. case err = <-errCh:
  1642. t.Log(err)
  1643. tp.Cancel()
  1644. return
  1645. case <-done:
  1646. count--
  1647. log.Infof("%d sources remaining", count)
  1648. if count <= 0 {
  1649. log.Info("stream stopping")
  1650. time.Sleep(1 * time.Second)
  1651. tp.Cancel()
  1652. return
  1653. }
  1654. default:
  1655. }
  1656. }
  1657. }()
  1658. results := mockSink.GetResults()
  1659. var maps [][]map[string]interface{}
  1660. for _, v := range results {
  1661. var mapRes []map[string]interface{}
  1662. err := json.Unmarshal(v, &mapRes)
  1663. if err != nil {
  1664. t.Errorf("Failed to parse the input into map")
  1665. continue
  1666. }
  1667. maps = append(maps, mapRes)
  1668. }
  1669. if !reflect.DeepEqual(tt.r, maps) {
  1670. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1671. }
  1672. metrics := tp.GetMetrics()
  1673. for k, v := range tt.m {
  1674. if v != metrics[k] {
  1675. t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, k, v, metrics[k])
  1676. break
  1677. }
  1678. }
  1679. }
  1680. realTicker.Stop()
  1681. tickerDone <- true
  1682. close(tickerDone)
  1683. }
  1684. func errstring(err error) string {
  1685. if err != nil {
  1686. return err.Error()
  1687. }
  1688. return ""
  1689. }