xsql_processor_test.go 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833
  1. package processors
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/nodes"
  10. "github.com/emqx/kuiper/xstream/test"
  11. "path"
  12. "reflect"
  13. "strings"
  14. "testing"
  15. "time"
  16. )
  17. var DbDir = getDbDir()
  18. func getDbDir() string {
  19. dbDir, err := common.GetAndCreateDataLoc("test")
  20. if err != nil {
  21. log.Panic(err)
  22. }
  23. log.Infof("db location is %s", dbDir)
  24. return dbDir
  25. }
  26. func TestStreamCreateProcessor(t *testing.T) {
  27. var tests = []struct {
  28. s string
  29. r []string
  30. err string
  31. }{
  32. {
  33. s: `SHOW STREAMS;`,
  34. r: []string{"No stream definitions are found."},
  35. },
  36. {
  37. s: `EXPLAIN STREAM topic1;`,
  38. err: "Stream topic1 is not found.",
  39. },
  40. {
  41. s: `CREATE STREAM topic1 (
  42. USERID BIGINT,
  43. FIRST_NAME STRING,
  44. LAST_NAME STRING,
  45. NICKNAMES ARRAY(STRING),
  46. Gender BOOLEAN,
  47. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  48. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  49. r: []string{"Stream topic1 is created."},
  50. },
  51. {
  52. s: `CREATE STREAM topic1 (
  53. USERID BIGINT,
  54. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  55. err: "Create stream fails: Item topic1 already exists.",
  56. },
  57. {
  58. s: `EXPLAIN STREAM topic1;`,
  59. r: []string{"TO BE SUPPORTED"},
  60. },
  61. {
  62. s: `DESCRIBE STREAM topic1;`,
  63. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  64. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  65. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  66. },
  67. {
  68. s: `SHOW STREAMS;`,
  69. r: []string{"topic1"},
  70. },
  71. {
  72. s: `DROP STREAM topic1;`,
  73. r: []string{"Stream topic1 is dropped."},
  74. },
  75. {
  76. s: `DESCRIBE STREAM topic1;`,
  77. err: "Stream topic1 is not found.",
  78. },
  79. {
  80. s: `DROP STREAM topic1;`,
  81. err: "Drop stream fails: topic1 is not found.",
  82. },
  83. }
  84. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  85. streamDB := path.Join(getDbDir(), "streamTest")
  86. for i, tt := range tests {
  87. results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
  88. if !reflect.DeepEqual(tt.err, errstring(err)) {
  89. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  90. } else if tt.err == "" {
  91. if !reflect.DeepEqual(tt.r, results) {
  92. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  93. }
  94. }
  95. }
  96. }
  97. func createStreams(t *testing.T) {
  98. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  99. demo := `CREATE STREAM demo (
  100. color STRING,
  101. size BIGINT,
  102. ts BIGINT
  103. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  104. _, err := p.ExecStmt(demo)
  105. if err != nil {
  106. t.Log(err)
  107. }
  108. demo1 := `CREATE STREAM demo1 (
  109. temp FLOAT,
  110. hum BIGINT,
  111. ts BIGINT
  112. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  113. _, err = p.ExecStmt(demo1)
  114. if err != nil {
  115. t.Log(err)
  116. }
  117. sessionDemo := `CREATE STREAM sessionDemo (
  118. temp FLOAT,
  119. hum BIGINT,
  120. ts BIGINT
  121. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  122. _, err = p.ExecStmt(sessionDemo)
  123. if err != nil {
  124. t.Log(err)
  125. }
  126. }
  127. func dropStreams(t *testing.T) {
  128. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  129. demo := `DROP STREAM demo`
  130. _, err := p.ExecStmt(demo)
  131. if err != nil {
  132. t.Log(err)
  133. }
  134. demo1 := `DROP STREAM demo1`
  135. _, err = p.ExecStmt(demo1)
  136. if err != nil {
  137. t.Log(err)
  138. }
  139. sessionDemo := `DROP STREAM sessionDemo`
  140. _, err = p.ExecStmt(sessionDemo)
  141. if err != nil {
  142. t.Log(err)
  143. }
  144. }
  145. func getMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  146. var data []*xsql.Tuple
  147. switch name {
  148. case "demo":
  149. data = []*xsql.Tuple{
  150. {
  151. Emitter: name,
  152. Message: map[string]interface{}{
  153. "color": "red",
  154. "size": 3,
  155. "ts": 1541152486013,
  156. },
  157. Timestamp: 1541152486013,
  158. },
  159. {
  160. Emitter: name,
  161. Message: map[string]interface{}{
  162. "color": "blue",
  163. "size": 6,
  164. "ts": 1541152486822,
  165. },
  166. Timestamp: 1541152486822,
  167. },
  168. {
  169. Emitter: name,
  170. Message: map[string]interface{}{
  171. "color": "blue",
  172. "size": 2,
  173. "ts": 1541152487632,
  174. },
  175. Timestamp: 1541152487632,
  176. },
  177. {
  178. Emitter: name,
  179. Message: map[string]interface{}{
  180. "color": "yellow",
  181. "size": 4,
  182. "ts": 1541152488442,
  183. },
  184. Timestamp: 1541152488442,
  185. },
  186. {
  187. Emitter: name,
  188. Message: map[string]interface{}{
  189. "color": "red",
  190. "size": 1,
  191. "ts": 1541152489252,
  192. },
  193. Timestamp: 1541152489252,
  194. },
  195. }
  196. case "demo1":
  197. data = []*xsql.Tuple{
  198. {
  199. Emitter: name,
  200. Message: map[string]interface{}{
  201. "temp": 25.5,
  202. "hum": 65,
  203. "ts": 1541152486013,
  204. },
  205. Timestamp: 1541152486013,
  206. },
  207. {
  208. Emitter: name,
  209. Message: map[string]interface{}{
  210. "temp": 27.5,
  211. "hum": 59,
  212. "ts": 1541152486823,
  213. },
  214. Timestamp: 1541152486823,
  215. },
  216. {
  217. Emitter: name,
  218. Message: map[string]interface{}{
  219. "temp": 28.1,
  220. "hum": 75,
  221. "ts": 1541152487632,
  222. },
  223. Timestamp: 1541152487632,
  224. },
  225. {
  226. Emitter: name,
  227. Message: map[string]interface{}{
  228. "temp": 27.4,
  229. "hum": 80,
  230. "ts": 1541152488442,
  231. },
  232. Timestamp: 1541152488442,
  233. },
  234. {
  235. Emitter: name,
  236. Message: map[string]interface{}{
  237. "temp": 25.5,
  238. "hum": 62,
  239. "ts": 1541152489252,
  240. },
  241. Timestamp: 1541152489252,
  242. },
  243. }
  244. case "sessionDemo":
  245. data = []*xsql.Tuple{
  246. {
  247. Emitter: name,
  248. Message: map[string]interface{}{
  249. "temp": 25.5,
  250. "hum": 65,
  251. "ts": 1541152486013,
  252. },
  253. Timestamp: 1541152486013,
  254. },
  255. {
  256. Emitter: name,
  257. Message: map[string]interface{}{
  258. "temp": 27.5,
  259. "hum": 59,
  260. "ts": 1541152486823,
  261. },
  262. Timestamp: 1541152486823,
  263. },
  264. {
  265. Emitter: name,
  266. Message: map[string]interface{}{
  267. "temp": 28.1,
  268. "hum": 75,
  269. "ts": 1541152487932,
  270. },
  271. Timestamp: 1541152487932,
  272. },
  273. {
  274. Emitter: name,
  275. Message: map[string]interface{}{
  276. "temp": 27.4,
  277. "hum": 80,
  278. "ts": 1541152488442,
  279. },
  280. Timestamp: 1541152488442,
  281. },
  282. {
  283. Emitter: name,
  284. Message: map[string]interface{}{
  285. "temp": 25.5,
  286. "hum": 62,
  287. "ts": 1541152489252,
  288. },
  289. Timestamp: 1541152489252,
  290. },
  291. {
  292. Emitter: name,
  293. Message: map[string]interface{}{
  294. "temp": 26.2,
  295. "hum": 63,
  296. "ts": 1541152490062,
  297. },
  298. Timestamp: 1541152490062,
  299. },
  300. {
  301. Emitter: name,
  302. Message: map[string]interface{}{
  303. "temp": 26.8,
  304. "hum": 71,
  305. "ts": 1541152490872,
  306. },
  307. Timestamp: 1541152490872,
  308. },
  309. {
  310. Emitter: name,
  311. Message: map[string]interface{}{
  312. "temp": 28.9,
  313. "hum": 85,
  314. "ts": 1541152491682,
  315. },
  316. Timestamp: 1541152491682,
  317. },
  318. {
  319. Emitter: name,
  320. Message: map[string]interface{}{
  321. "temp": 29.1,
  322. "hum": 92,
  323. "ts": 1541152492492,
  324. },
  325. Timestamp: 1541152492492,
  326. },
  327. {
  328. Emitter: name,
  329. Message: map[string]interface{}{
  330. "temp": 32.2,
  331. "hum": 99,
  332. "ts": 1541152493202,
  333. },
  334. Timestamp: 1541152493202,
  335. },
  336. {
  337. Emitter: name,
  338. Message: map[string]interface{}{
  339. "temp": 30.9,
  340. "hum": 87,
  341. "ts": 1541152494112,
  342. },
  343. Timestamp: 1541152494112,
  344. },
  345. }
  346. }
  347. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  348. "DATASOURCE": name,
  349. })
  350. }
  351. func TestSingleSQL(t *testing.T) {
  352. var tests = []struct {
  353. name string
  354. sql string
  355. r [][]map[string]interface{}
  356. s string
  357. m map[string]interface{}
  358. }{
  359. {
  360. name: `rule1`,
  361. sql: `SELECT * FROM demo`,
  362. r: [][]map[string]interface{}{
  363. {{
  364. "color": "red",
  365. "size": float64(3),
  366. "ts": float64(1541152486013),
  367. }},
  368. {{
  369. "color": "blue",
  370. "size": float64(6),
  371. "ts": float64(1541152486822),
  372. }},
  373. {{
  374. "color": "blue",
  375. "size": float64(2),
  376. "ts": float64(1541152487632),
  377. }},
  378. {{
  379. "color": "yellow",
  380. "size": float64(4),
  381. "ts": float64(1541152488442),
  382. }},
  383. {{
  384. "color": "red",
  385. "size": float64(1),
  386. "ts": float64(1541152489252),
  387. }},
  388. },
  389. m: map[string]interface{}{
  390. "op_preprocessor_demo_0_exceptions_total": int64(0),
  391. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  392. "op_preprocessor_demo_0_records_in_total": int64(5),
  393. "op_preprocessor_demo_0_records_out_total": int64(5),
  394. "op_project_0_exceptions_total": int64(0),
  395. "op_project_0_process_latency_ms": int64(0),
  396. "op_project_0_records_in_total": int64(5),
  397. "op_project_0_records_out_total": int64(5),
  398. "sink_mockSink_0_exceptions_total": int64(0),
  399. "sink_mockSink_0_records_in_total": int64(5),
  400. "sink_mockSink_0_records_out_total": int64(5),
  401. "source_demo_0_exceptions_total": int64(0),
  402. "source_demo_0_records_in_total": int64(5),
  403. "source_demo_0_records_out_total": int64(5),
  404. },
  405. s: "sink_mockSink_0_records_out_total",
  406. }, {
  407. name: `rule2`,
  408. sql: `SELECT color, ts FROM demo where size > 3`,
  409. r: [][]map[string]interface{}{
  410. {{
  411. "color": "blue",
  412. "ts": float64(1541152486822),
  413. }},
  414. {{
  415. "color": "yellow",
  416. "ts": float64(1541152488442),
  417. }},
  418. },
  419. s: "op_filter_0_records_in_total",
  420. m: map[string]interface{}{
  421. "op_preprocessor_demo_0_exceptions_total": int64(0),
  422. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  423. "op_preprocessor_demo_0_records_in_total": int64(5),
  424. "op_preprocessor_demo_0_records_out_total": int64(5),
  425. "op_project_0_exceptions_total": int64(0),
  426. "op_project_0_process_latency_ms": int64(0),
  427. "op_project_0_records_in_total": int64(2),
  428. "op_project_0_records_out_total": int64(2),
  429. "sink_mockSink_0_exceptions_total": int64(0),
  430. "sink_mockSink_0_records_in_total": int64(2),
  431. "sink_mockSink_0_records_out_total": int64(2),
  432. "source_demo_0_exceptions_total": int64(0),
  433. "source_demo_0_records_in_total": int64(5),
  434. "source_demo_0_records_out_total": int64(5),
  435. "op_filter_0_exceptions_total": int64(0),
  436. "op_filter_0_process_latency_ms": int64(0),
  437. "op_filter_0_records_in_total": int64(5),
  438. "op_filter_0_records_out_total": int64(2),
  439. },
  440. }, {
  441. name: `rule3`,
  442. sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  443. r: [][]map[string]interface{}{
  444. {{
  445. "Int8": float64(6),
  446. "ts": float64(1541152486822),
  447. }},
  448. {{
  449. "Int8": float64(4),
  450. "ts": float64(1541152488442),
  451. }},
  452. },
  453. s: "op_filter_0_records_in_total",
  454. m: map[string]interface{}{
  455. "op_preprocessor_demo_0_exceptions_total": int64(0),
  456. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  457. "op_preprocessor_demo_0_records_in_total": int64(5),
  458. "op_preprocessor_demo_0_records_out_total": int64(5),
  459. "op_project_0_exceptions_total": int64(0),
  460. "op_project_0_process_latency_ms": int64(0),
  461. "op_project_0_records_in_total": int64(2),
  462. "op_project_0_records_out_total": int64(2),
  463. "sink_mockSink_0_exceptions_total": int64(0),
  464. "sink_mockSink_0_records_in_total": int64(2),
  465. "sink_mockSink_0_records_out_total": int64(2),
  466. "source_demo_0_exceptions_total": int64(0),
  467. "source_demo_0_records_in_total": int64(5),
  468. "source_demo_0_records_out_total": int64(5),
  469. "op_filter_0_exceptions_total": int64(0),
  470. "op_filter_0_process_latency_ms": int64(0),
  471. "op_filter_0_records_in_total": int64(5),
  472. "op_filter_0_records_out_total": int64(2),
  473. },
  474. },
  475. }
  476. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  477. createStreams(t)
  478. defer dropStreams(t)
  479. //defer close(done)
  480. for i, tt := range tests {
  481. test.ResetClock(1541152486000)
  482. p := NewRuleProcessor(DbDir)
  483. parser := xsql.NewParser(strings.NewReader(tt.sql))
  484. var (
  485. sources []*nodes.SourceNode
  486. syncs []chan int
  487. )
  488. if stmt, err := xsql.Language.Parse(parser); err != nil {
  489. t.Errorf("parse sql %s error: %s", tt.sql, err)
  490. } else {
  491. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  492. t.Errorf("sql %s is not a select statement", tt.sql)
  493. } else {
  494. streams := xsql.GetStreams(selectStmt)
  495. for _, stream := range streams {
  496. next := make(chan int)
  497. syncs = append(syncs, next)
  498. source := getMockSource(stream, next, 5)
  499. sources = append(sources, source)
  500. }
  501. }
  502. }
  503. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
  504. "bufferLength": float64(100),
  505. }}, sources)
  506. if err != nil {
  507. t.Error(err)
  508. }
  509. mockSink := test.NewMockSink()
  510. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  511. tp.AddSink(inputs, sink)
  512. errCh := tp.Open()
  513. func() {
  514. for i := 0; i < 5; i++ {
  515. syncs[i%len(syncs)] <- i
  516. select {
  517. case err = <-errCh:
  518. t.Log(err)
  519. tp.Cancel()
  520. return
  521. default:
  522. }
  523. }
  524. for retry := 100; retry > 0; retry-- {
  525. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  526. break
  527. }
  528. time.Sleep(time.Duration(retry) * time.Millisecond)
  529. }
  530. }()
  531. results := mockSink.GetResults()
  532. var maps [][]map[string]interface{}
  533. for _, v := range results {
  534. var mapRes []map[string]interface{}
  535. err := json.Unmarshal(v, &mapRes)
  536. if err != nil {
  537. t.Errorf("Failed to parse the input into map")
  538. continue
  539. }
  540. maps = append(maps, mapRes)
  541. }
  542. if !reflect.DeepEqual(tt.r, maps) {
  543. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  544. continue
  545. }
  546. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  547. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  548. }
  549. tp.Cancel()
  550. }
  551. }
  552. func TestWindow(t *testing.T) {
  553. var tests = []struct {
  554. name string
  555. sql string
  556. size int
  557. r [][]map[string]interface{}
  558. m map[string]interface{}
  559. }{
  560. {
  561. name: `rule1`,
  562. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  563. size: 5,
  564. r: [][]map[string]interface{}{
  565. {{
  566. "color": "red",
  567. "size": float64(3),
  568. "ts": float64(1541152486013),
  569. }, {
  570. "color": "blue",
  571. "size": float64(6),
  572. "ts": float64(1541152486822),
  573. }},
  574. {{
  575. "color": "red",
  576. "size": float64(3),
  577. "ts": float64(1541152486013),
  578. }, {
  579. "color": "blue",
  580. "size": float64(6),
  581. "ts": float64(1541152486822),
  582. }, {
  583. "color": "blue",
  584. "size": float64(2),
  585. "ts": float64(1541152487632),
  586. }},
  587. {{
  588. "color": "blue",
  589. "size": float64(2),
  590. "ts": float64(1541152487632),
  591. }, {
  592. "color": "yellow",
  593. "size": float64(4),
  594. "ts": float64(1541152488442),
  595. }},
  596. },
  597. m: map[string]interface{}{
  598. "op_preprocessor_demo_0_exceptions_total": int64(0),
  599. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  600. "op_preprocessor_demo_0_records_in_total": int64(5),
  601. "op_preprocessor_demo_0_records_out_total": int64(5),
  602. "op_project_0_exceptions_total": int64(0),
  603. "op_project_0_process_latency_ms": int64(0),
  604. "op_project_0_records_in_total": int64(3),
  605. "op_project_0_records_out_total": int64(3),
  606. "sink_mockSink_0_exceptions_total": int64(0),
  607. "sink_mockSink_0_records_in_total": int64(3),
  608. "sink_mockSink_0_records_out_total": int64(3),
  609. "source_demo_0_exceptions_total": int64(0),
  610. "source_demo_0_records_in_total": int64(5),
  611. "source_demo_0_records_out_total": int64(5),
  612. "op_window_0_exceptions_total": int64(0),
  613. "op_window_0_process_latency_ms": int64(0),
  614. "op_window_0_records_in_total": int64(5),
  615. "op_window_0_records_out_total": int64(3),
  616. },
  617. }, {
  618. name: `rule2`,
  619. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  620. size: 5,
  621. r: [][]map[string]interface{}{
  622. {{
  623. "color": "red",
  624. "ts": float64(1541152486013),
  625. }, {
  626. "color": "blue",
  627. "ts": float64(1541152486822),
  628. }},
  629. {{
  630. "color": "yellow",
  631. "ts": float64(1541152488442),
  632. }},
  633. },
  634. m: map[string]interface{}{
  635. "op_preprocessor_demo_0_exceptions_total": int64(0),
  636. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  637. "op_preprocessor_demo_0_records_in_total": int64(5),
  638. "op_preprocessor_demo_0_records_out_total": int64(5),
  639. "op_project_0_exceptions_total": int64(0),
  640. "op_project_0_process_latency_ms": int64(0),
  641. "op_project_0_records_in_total": int64(2),
  642. "op_project_0_records_out_total": int64(2),
  643. "sink_mockSink_0_exceptions_total": int64(0),
  644. "sink_mockSink_0_records_in_total": int64(2),
  645. "sink_mockSink_0_records_out_total": int64(2),
  646. "source_demo_0_exceptions_total": int64(0),
  647. "source_demo_0_records_in_total": int64(5),
  648. "source_demo_0_records_out_total": int64(5),
  649. "op_window_0_exceptions_total": int64(0),
  650. "op_window_0_process_latency_ms": int64(0),
  651. "op_window_0_records_in_total": int64(5),
  652. "op_window_0_records_out_total": int64(3),
  653. "op_filter_0_exceptions_total": int64(0),
  654. "op_filter_0_process_latency_ms": int64(0),
  655. "op_filter_0_records_in_total": int64(3),
  656. "op_filter_0_records_out_total": int64(2),
  657. },
  658. }, {
  659. name: `rule3`,
  660. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  661. size: 5,
  662. r: [][]map[string]interface{}{
  663. {{
  664. "color": "red",
  665. "temp": 25.5,
  666. "ts": float64(1541152486013),
  667. }}, {{
  668. "color": "red",
  669. "temp": 25.5,
  670. "ts": float64(1541152486013),
  671. }}, {{
  672. "color": "red",
  673. "temp": 25.5,
  674. "ts": float64(1541152486013),
  675. }}, {{
  676. "color": "blue",
  677. "temp": 28.1,
  678. "ts": float64(1541152487632),
  679. }}, {{
  680. "color": "blue",
  681. "temp": 28.1,
  682. "ts": float64(1541152487632),
  683. }}, {{
  684. "color": "blue",
  685. "temp": 28.1,
  686. "ts": float64(1541152487632),
  687. }, {
  688. "color": "yellow",
  689. "temp": 27.4,
  690. "ts": float64(1541152488442),
  691. }}, {{
  692. "color": "yellow",
  693. "temp": 27.4,
  694. "ts": float64(1541152488442),
  695. }}, {{
  696. "color": "yellow",
  697. "temp": 27.4,
  698. "ts": float64(1541152488442),
  699. }, {
  700. "color": "red",
  701. "temp": 25.5,
  702. "ts": float64(1541152489252),
  703. }},
  704. },
  705. m: map[string]interface{}{
  706. "op_preprocessor_demo_0_exceptions_total": int64(0),
  707. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  708. "op_preprocessor_demo_0_records_in_total": int64(5),
  709. "op_preprocessor_demo_0_records_out_total": int64(5),
  710. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  711. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  712. "op_preprocessor_demo1_0_records_in_total": int64(5),
  713. "op_preprocessor_demo1_0_records_out_total": int64(5),
  714. "op_project_0_exceptions_total": int64(0),
  715. "op_project_0_process_latency_ms": int64(0),
  716. "op_project_0_records_in_total": int64(8),
  717. "op_project_0_records_out_total": int64(8),
  718. "sink_mockSink_0_exceptions_total": int64(0),
  719. "sink_mockSink_0_records_in_total": int64(8),
  720. "sink_mockSink_0_records_out_total": int64(8),
  721. "source_demo_0_exceptions_total": int64(0),
  722. "source_demo_0_records_in_total": int64(5),
  723. "source_demo_0_records_out_total": int64(5),
  724. "source_demo1_0_exceptions_total": int64(0),
  725. "source_demo1_0_records_in_total": int64(5),
  726. "source_demo1_0_records_out_total": int64(5),
  727. "op_window_0_exceptions_total": int64(0),
  728. "op_window_0_process_latency_ms": int64(0),
  729. "op_window_0_records_in_total": int64(10),
  730. "op_window_0_records_out_total": int64(10),
  731. "op_join_0_exceptions_total": int64(0),
  732. "op_join_0_process_latency_ms": int64(0),
  733. "op_join_0_records_in_total": int64(10),
  734. "op_join_0_records_out_total": int64(8),
  735. },
  736. }, {
  737. name: `rule4`,
  738. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  739. size: 5,
  740. r: [][]map[string]interface{}{
  741. {{
  742. "color": "red",
  743. }}, {{
  744. "color": "blue",
  745. }, {
  746. "color": "red",
  747. }}, {{
  748. "color": "blue",
  749. }, {
  750. "color": "red",
  751. }}, {{
  752. "color": "blue",
  753. }, {
  754. "color": "yellow",
  755. }}, {{
  756. "color": "blue",
  757. }, {
  758. "color": "red",
  759. }, {
  760. "color": "yellow",
  761. }},
  762. },
  763. m: map[string]interface{}{
  764. "op_preprocessor_demo_0_exceptions_total": int64(0),
  765. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  766. "op_preprocessor_demo_0_records_in_total": int64(5),
  767. "op_preprocessor_demo_0_records_out_total": int64(5),
  768. "op_project_0_exceptions_total": int64(0),
  769. "op_project_0_process_latency_ms": int64(0),
  770. "op_project_0_records_in_total": int64(5),
  771. "op_project_0_records_out_total": int64(5),
  772. "sink_mockSink_0_exceptions_total": int64(0),
  773. "sink_mockSink_0_records_in_total": int64(5),
  774. "sink_mockSink_0_records_out_total": int64(5),
  775. "source_demo_0_exceptions_total": int64(0),
  776. "source_demo_0_records_in_total": int64(5),
  777. "source_demo_0_records_out_total": int64(5),
  778. "op_window_0_exceptions_total": int64(0),
  779. "op_window_0_process_latency_ms": int64(0),
  780. "op_window_0_records_in_total": int64(5),
  781. "op_window_0_records_out_total": int64(5),
  782. "op_aggregate_0_exceptions_total": int64(0),
  783. "op_aggregate_0_process_latency_ms": int64(0),
  784. "op_aggregate_0_records_in_total": int64(5),
  785. "op_aggregate_0_records_out_total": int64(5),
  786. "op_order_0_exceptions_total": int64(0),
  787. "op_order_0_process_latency_ms": int64(0),
  788. "op_order_0_records_in_total": int64(5),
  789. "op_order_0_records_out_total": int64(5),
  790. },
  791. }, {
  792. name: `rule5`,
  793. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  794. size: 11,
  795. r: [][]map[string]interface{}{
  796. {{
  797. "temp": 25.5,
  798. }, {
  799. "temp": 27.5,
  800. }}, {{
  801. "temp": 28.1,
  802. }, {
  803. "temp": 27.4,
  804. }, {
  805. "temp": 25.5,
  806. }}, {{
  807. "temp": 26.2,
  808. }, {
  809. "temp": 26.8,
  810. }, {
  811. "temp": 28.9,
  812. }, {
  813. "temp": 29.1,
  814. }, {
  815. "temp": 32.2,
  816. }},
  817. },
  818. m: map[string]interface{}{
  819. "op_preprocessor_sessionDemo_0_exceptions_total": int64(0),
  820. "op_preprocessor_sessionDemo_0_process_latency_ms": int64(0),
  821. "op_preprocessor_sessionDemo_0_records_in_total": int64(11),
  822. "op_preprocessor_sessionDemo_0_records_out_total": int64(11),
  823. "op_project_0_exceptions_total": int64(0),
  824. "op_project_0_process_latency_ms": int64(0),
  825. "op_project_0_records_in_total": int64(3),
  826. "op_project_0_records_out_total": int64(3),
  827. "sink_mockSink_0_exceptions_total": int64(0),
  828. "sink_mockSink_0_records_in_total": int64(3),
  829. "sink_mockSink_0_records_out_total": int64(3),
  830. "source_sessionDemo_0_exceptions_total": int64(0),
  831. "source_sessionDemo_0_records_in_total": int64(11),
  832. "source_sessionDemo_0_records_out_total": int64(11),
  833. "op_window_0_exceptions_total": int64(0),
  834. "op_window_0_process_latency_ms": int64(0),
  835. "op_window_0_records_in_total": int64(11),
  836. "op_window_0_records_out_total": int64(3),
  837. },
  838. }, {
  839. name: `rule6`,
  840. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  841. size: 5,
  842. r: [][]map[string]interface{}{
  843. {{
  844. "m": 25.5,
  845. "c": float64(1),
  846. }}, {{
  847. "m": 25.5,
  848. "c": float64(1),
  849. }}, {{
  850. "m": 25.5,
  851. "c": float64(1),
  852. }}, {{
  853. "m": 28.1,
  854. "c": float64(1),
  855. }}, {{
  856. "m": 28.1,
  857. "c": float64(1),
  858. }}, {{
  859. "m": 28.1,
  860. "c": float64(2),
  861. }}, {{
  862. "m": 27.4,
  863. "c": float64(1),
  864. }}, {{
  865. "m": 27.4,
  866. "c": float64(2),
  867. }},
  868. },
  869. m: map[string]interface{}{
  870. "op_preprocessor_demo_0_exceptions_total": int64(0),
  871. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  872. "op_preprocessor_demo_0_records_in_total": int64(5),
  873. "op_preprocessor_demo_0_records_out_total": int64(5),
  874. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  875. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  876. "op_preprocessor_demo1_0_records_in_total": int64(5),
  877. "op_preprocessor_demo1_0_records_out_total": int64(5),
  878. "op_project_0_exceptions_total": int64(0),
  879. "op_project_0_process_latency_ms": int64(0),
  880. "op_project_0_records_in_total": int64(8),
  881. "op_project_0_records_out_total": int64(8),
  882. "sink_mockSink_0_exceptions_total": int64(0),
  883. "sink_mockSink_0_records_in_total": int64(8),
  884. "sink_mockSink_0_records_out_total": int64(8),
  885. "source_demo_0_exceptions_total": int64(0),
  886. "source_demo_0_records_in_total": int64(5),
  887. "source_demo_0_records_out_total": int64(5),
  888. "source_demo1_0_exceptions_total": int64(0),
  889. "source_demo1_0_records_in_total": int64(5),
  890. "source_demo1_0_records_out_total": int64(5),
  891. "op_window_0_exceptions_total": int64(0),
  892. "op_window_0_process_latency_ms": int64(0),
  893. "op_window_0_records_in_total": int64(10),
  894. "op_window_0_records_out_total": int64(10),
  895. "op_join_0_exceptions_total": int64(0),
  896. "op_join_0_process_latency_ms": int64(0),
  897. "op_join_0_records_in_total": int64(10),
  898. "op_join_0_records_out_total": int64(8),
  899. },
  900. },
  901. }
  902. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  903. createStreams(t)
  904. defer dropStreams(t)
  905. for i, tt := range tests {
  906. test.ResetClock(1541152486000)
  907. p := NewRuleProcessor(DbDir)
  908. parser := xsql.NewParser(strings.NewReader(tt.sql))
  909. var (
  910. sources []*nodes.SourceNode
  911. syncs []chan int
  912. )
  913. if stmt, err := xsql.Language.Parse(parser); err != nil {
  914. t.Errorf("parse sql %s error: %s", tt.sql, err)
  915. } else {
  916. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  917. t.Errorf("sql %s is not a select statement", tt.sql)
  918. } else {
  919. streams := xsql.GetStreams(selectStmt)
  920. for _, stream := range streams {
  921. next := make(chan int)
  922. syncs = append(syncs, next)
  923. source := getMockSource(stream, next, tt.size)
  924. sources = append(sources, source)
  925. }
  926. }
  927. }
  928. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  929. if err != nil {
  930. t.Error(err)
  931. }
  932. mockSink := test.NewMockSink()
  933. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  934. tp.AddSink(inputs, sink)
  935. errCh := tp.Open()
  936. func() {
  937. for i := 0; i < tt.size*len(syncs); i++ {
  938. syncs[i%len(syncs)] <- i
  939. for {
  940. time.Sleep(1)
  941. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  942. break
  943. }
  944. }
  945. select {
  946. case err = <-errCh:
  947. t.Log(err)
  948. tp.Cancel()
  949. return
  950. default:
  951. }
  952. }
  953. retry := 100
  954. for ; retry > 0; retry-- {
  955. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  956. break
  957. }
  958. t.Logf("wait to try another %d times", retry)
  959. time.Sleep(time.Duration(retry) * time.Millisecond)
  960. }
  961. if retry == 0 {
  962. err := compareMetrics(tp, tt.m, tt.sql)
  963. t.Errorf("could not get correct metrics: %v", err)
  964. }
  965. }()
  966. results := mockSink.GetResults()
  967. var maps [][]map[string]interface{}
  968. for _, v := range results {
  969. var mapRes []map[string]interface{}
  970. err := json.Unmarshal(v, &mapRes)
  971. if err != nil {
  972. t.Errorf("Failed to parse the input into map")
  973. continue
  974. }
  975. maps = append(maps, mapRes)
  976. }
  977. if !reflect.DeepEqual(tt.r, maps) {
  978. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  979. }
  980. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  981. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  982. }
  983. tp.Cancel()
  984. }
  985. }
  986. func createEventStreams(t *testing.T) {
  987. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  988. demo := `CREATE STREAM demoE (
  989. color STRING,
  990. size BIGINT,
  991. ts BIGINT
  992. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  993. _, err := p.ExecStmt(demo)
  994. if err != nil {
  995. t.Log(err)
  996. }
  997. demo1 := `CREATE STREAM demo1E (
  998. temp FLOAT,
  999. hum BIGINT,
  1000. ts BIGINT
  1001. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1002. _, err = p.ExecStmt(demo1)
  1003. if err != nil {
  1004. t.Log(err)
  1005. }
  1006. sessionDemo := `CREATE STREAM sessionDemoE (
  1007. temp FLOAT,
  1008. hum BIGINT,
  1009. ts BIGINT
  1010. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1011. _, err = p.ExecStmt(sessionDemo)
  1012. if err != nil {
  1013. t.Log(err)
  1014. }
  1015. }
  1016. func dropEventStreams(t *testing.T) {
  1017. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  1018. demo := `DROP STREAM demoE`
  1019. _, err := p.ExecStmt(demo)
  1020. if err != nil {
  1021. t.Log(err)
  1022. }
  1023. demo1 := `DROP STREAM demo1E`
  1024. _, err = p.ExecStmt(demo1)
  1025. if err != nil {
  1026. t.Log(err)
  1027. }
  1028. sessionDemo := `DROP STREAM sessionDemoE`
  1029. _, err = p.ExecStmt(sessionDemo)
  1030. if err != nil {
  1031. t.Log(err)
  1032. }
  1033. }
  1034. func getEventMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  1035. var data []*xsql.Tuple
  1036. switch name {
  1037. case "demoE":
  1038. data = []*xsql.Tuple{
  1039. {
  1040. Emitter: name,
  1041. Message: map[string]interface{}{
  1042. "color": "red",
  1043. "size": 3,
  1044. "ts": 1541152486013,
  1045. },
  1046. Timestamp: 1541152486013,
  1047. },
  1048. {
  1049. Emitter: name,
  1050. Message: map[string]interface{}{
  1051. "color": "blue",
  1052. "size": 2,
  1053. "ts": 1541152487632,
  1054. },
  1055. Timestamp: 1541152487632,
  1056. },
  1057. {
  1058. Emitter: name,
  1059. Message: map[string]interface{}{
  1060. "color": "red",
  1061. "size": 1,
  1062. "ts": 1541152489252,
  1063. },
  1064. Timestamp: 1541152489252,
  1065. },
  1066. { //dropped item
  1067. Emitter: name,
  1068. Message: map[string]interface{}{
  1069. "color": "blue",
  1070. "size": 6,
  1071. "ts": 1541152486822,
  1072. },
  1073. Timestamp: 1541152486822,
  1074. },
  1075. {
  1076. Emitter: name,
  1077. Message: map[string]interface{}{
  1078. "color": "yellow",
  1079. "size": 4,
  1080. "ts": 1541152488442,
  1081. },
  1082. Timestamp: 1541152488442,
  1083. },
  1084. { //To lift the watermark and issue all windows
  1085. Emitter: name,
  1086. Message: map[string]interface{}{
  1087. "color": "yellow",
  1088. "size": 4,
  1089. "ts": 1541152492342,
  1090. },
  1091. Timestamp: 1541152488442,
  1092. },
  1093. }
  1094. case "demo1E":
  1095. data = []*xsql.Tuple{
  1096. {
  1097. Emitter: name,
  1098. Message: map[string]interface{}{
  1099. "temp": 27.5,
  1100. "hum": 59,
  1101. "ts": 1541152486823,
  1102. },
  1103. Timestamp: 1541152486823,
  1104. },
  1105. {
  1106. Emitter: name,
  1107. Message: map[string]interface{}{
  1108. "temp": 25.5,
  1109. "hum": 65,
  1110. "ts": 1541152486013,
  1111. },
  1112. Timestamp: 1541152486013,
  1113. },
  1114. {
  1115. Emitter: name,
  1116. Message: map[string]interface{}{
  1117. "temp": 27.4,
  1118. "hum": 80,
  1119. "ts": 1541152488442,
  1120. },
  1121. Timestamp: 1541152488442,
  1122. },
  1123. {
  1124. Emitter: name,
  1125. Message: map[string]interface{}{
  1126. "temp": 28.1,
  1127. "hum": 75,
  1128. "ts": 1541152487632,
  1129. },
  1130. Timestamp: 1541152487632,
  1131. },
  1132. {
  1133. Emitter: name,
  1134. Message: map[string]interface{}{
  1135. "temp": 25.5,
  1136. "hum": 62,
  1137. "ts": 1541152489252,
  1138. },
  1139. Timestamp: 1541152489252,
  1140. },
  1141. {
  1142. Emitter: name,
  1143. Message: map[string]interface{}{
  1144. "temp": 25.5,
  1145. "hum": 62,
  1146. "ts": 1541152499252,
  1147. },
  1148. Timestamp: 1541152499252,
  1149. },
  1150. }
  1151. case "sessionDemoE":
  1152. data = []*xsql.Tuple{
  1153. {
  1154. Emitter: name,
  1155. Message: map[string]interface{}{
  1156. "temp": 25.5,
  1157. "hum": 65,
  1158. "ts": 1541152486013,
  1159. },
  1160. Timestamp: 1541152486013,
  1161. },
  1162. {
  1163. Emitter: name,
  1164. Message: map[string]interface{}{
  1165. "temp": 28.1,
  1166. "hum": 75,
  1167. "ts": 1541152487932,
  1168. },
  1169. Timestamp: 1541152487932,
  1170. },
  1171. {
  1172. Emitter: name,
  1173. Message: map[string]interface{}{
  1174. "temp": 27.5,
  1175. "hum": 59,
  1176. "ts": 1541152486823,
  1177. },
  1178. Timestamp: 1541152486823,
  1179. },
  1180. {
  1181. Emitter: name,
  1182. Message: map[string]interface{}{
  1183. "temp": 25.5,
  1184. "hum": 62,
  1185. "ts": 1541152489252,
  1186. },
  1187. Timestamp: 1541152489252,
  1188. },
  1189. {
  1190. Emitter: name,
  1191. Message: map[string]interface{}{
  1192. "temp": 27.4,
  1193. "hum": 80,
  1194. "ts": 1541152488442,
  1195. },
  1196. Timestamp: 1541152488442,
  1197. },
  1198. {
  1199. Emitter: name,
  1200. Message: map[string]interface{}{
  1201. "temp": 26.2,
  1202. "hum": 63,
  1203. "ts": 1541152490062,
  1204. },
  1205. Timestamp: 1541152490062,
  1206. },
  1207. {
  1208. Emitter: name,
  1209. Message: map[string]interface{}{
  1210. "temp": 28.9,
  1211. "hum": 85,
  1212. "ts": 1541152491682,
  1213. },
  1214. Timestamp: 1541152491682,
  1215. },
  1216. {
  1217. Emitter: name,
  1218. Message: map[string]interface{}{
  1219. "temp": 26.8,
  1220. "hum": 71,
  1221. "ts": 1541152490872,
  1222. },
  1223. Timestamp: 1541152490872,
  1224. },
  1225. {
  1226. Emitter: name,
  1227. Message: map[string]interface{}{
  1228. "temp": 29.1,
  1229. "hum": 92,
  1230. "ts": 1541152492492,
  1231. },
  1232. Timestamp: 1541152492492,
  1233. },
  1234. {
  1235. Emitter: name,
  1236. Message: map[string]interface{}{
  1237. "temp": 30.9,
  1238. "hum": 87,
  1239. "ts": 1541152494112,
  1240. },
  1241. Timestamp: 1541152494112,
  1242. },
  1243. {
  1244. Emitter: name,
  1245. Message: map[string]interface{}{
  1246. "temp": 32.2,
  1247. "hum": 99,
  1248. "ts": 1541152493202,
  1249. },
  1250. Timestamp: 1541152493202,
  1251. },
  1252. {
  1253. Emitter: name,
  1254. Message: map[string]interface{}{
  1255. "temp": 32.2,
  1256. "hum": 99,
  1257. "ts": 1541152499202,
  1258. },
  1259. Timestamp: 1541152499202,
  1260. },
  1261. }
  1262. }
  1263. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, true), map[string]string{
  1264. "DATASOURCE": name,
  1265. })
  1266. }
  1267. func TestEventWindow(t *testing.T) {
  1268. var tests = []struct {
  1269. name string
  1270. sql string
  1271. size int
  1272. r [][]map[string]interface{}
  1273. m map[string]interface{}
  1274. }{
  1275. {
  1276. name: `rule1`,
  1277. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1278. size: 6,
  1279. r: [][]map[string]interface{}{
  1280. {{
  1281. "color": "red",
  1282. "size": float64(3),
  1283. "ts": float64(1541152486013),
  1284. }},
  1285. {{
  1286. "color": "red",
  1287. "size": float64(3),
  1288. "ts": float64(1541152486013),
  1289. }, {
  1290. "color": "blue",
  1291. "size": float64(2),
  1292. "ts": float64(1541152487632),
  1293. }},
  1294. {{
  1295. "color": "blue",
  1296. "size": float64(2),
  1297. "ts": float64(1541152487632),
  1298. }, {
  1299. "color": "yellow",
  1300. "size": float64(4),
  1301. "ts": float64(1541152488442),
  1302. }}, {{
  1303. "color": "yellow",
  1304. "size": float64(4),
  1305. "ts": float64(1541152488442),
  1306. }, {
  1307. "color": "red",
  1308. "size": float64(1),
  1309. "ts": float64(1541152489252),
  1310. }}, {{
  1311. "color": "red",
  1312. "size": float64(1),
  1313. "ts": float64(1541152489252),
  1314. }},
  1315. },
  1316. m: map[string]interface{}{
  1317. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  1318. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1319. "op_preprocessor_demoE_0_records_in_total": int64(6),
  1320. "op_preprocessor_demoE_0_records_out_total": int64(6),
  1321. "op_project_0_exceptions_total": int64(0),
  1322. "op_project_0_process_latency_ms": int64(0),
  1323. "op_project_0_records_in_total": int64(5),
  1324. "op_project_0_records_out_total": int64(5),
  1325. "sink_mockSink_0_exceptions_total": int64(0),
  1326. "sink_mockSink_0_records_in_total": int64(5),
  1327. "sink_mockSink_0_records_out_total": int64(5),
  1328. "source_demoE_0_exceptions_total": int64(0),
  1329. "source_demoE_0_records_in_total": int64(6),
  1330. "source_demoE_0_records_out_total": int64(6),
  1331. "op_window_0_exceptions_total": int64(0),
  1332. "op_window_0_process_latency_ms": int64(0),
  1333. "op_window_0_records_in_total": int64(6),
  1334. "op_window_0_records_out_total": int64(5),
  1335. },
  1336. }, {
  1337. name: `rule2`,
  1338. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1339. size: 6,
  1340. r: [][]map[string]interface{}{
  1341. {{
  1342. "color": "red",
  1343. "ts": float64(1541152486013),
  1344. }},
  1345. {{
  1346. "color": "yellow",
  1347. "ts": float64(1541152488442),
  1348. }},
  1349. },
  1350. m: map[string]interface{}{
  1351. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  1352. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1353. "op_preprocessor_demoE_0_records_in_total": int64(6),
  1354. "op_preprocessor_demoE_0_records_out_total": int64(6),
  1355. "op_project_0_exceptions_total": int64(0),
  1356. "op_project_0_process_latency_ms": int64(0),
  1357. "op_project_0_records_in_total": int64(2),
  1358. "op_project_0_records_out_total": int64(2),
  1359. "sink_mockSink_0_exceptions_total": int64(0),
  1360. "sink_mockSink_0_records_in_total": int64(2),
  1361. "sink_mockSink_0_records_out_total": int64(2),
  1362. "source_demoE_0_exceptions_total": int64(0),
  1363. "source_demoE_0_records_in_total": int64(6),
  1364. "source_demoE_0_records_out_total": int64(6),
  1365. "op_window_0_exceptions_total": int64(0),
  1366. "op_window_0_process_latency_ms": int64(0),
  1367. "op_window_0_records_in_total": int64(6),
  1368. "op_window_0_records_out_total": int64(4),
  1369. "op_filter_0_exceptions_total": int64(0),
  1370. "op_filter_0_process_latency_ms": int64(0),
  1371. "op_filter_0_records_in_total": int64(4),
  1372. "op_filter_0_records_out_total": int64(2),
  1373. },
  1374. }, {
  1375. name: `rule3`,
  1376. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1377. size: 6,
  1378. r: [][]map[string]interface{}{
  1379. {{
  1380. "color": "red",
  1381. "temp": 25.5,
  1382. "ts": float64(1541152486013),
  1383. }}, {{
  1384. "color": "red",
  1385. "temp": 25.5,
  1386. "ts": float64(1541152486013),
  1387. }}, {{
  1388. "color": "blue",
  1389. "temp": 28.1,
  1390. "ts": float64(1541152487632),
  1391. }}, {{
  1392. "color": "blue",
  1393. "temp": 28.1,
  1394. "ts": float64(1541152487632),
  1395. }, {
  1396. "color": "yellow",
  1397. "temp": 27.4,
  1398. "ts": float64(1541152488442),
  1399. }}, {{
  1400. "color": "yellow",
  1401. "temp": 27.4,
  1402. "ts": float64(1541152488442),
  1403. }, {
  1404. "color": "red",
  1405. "temp": 25.5,
  1406. "ts": float64(1541152489252),
  1407. }},
  1408. },
  1409. m: map[string]interface{}{
  1410. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  1411. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1412. "op_preprocessor_demoE_0_records_in_total": int64(6),
  1413. "op_preprocessor_demoE_0_records_out_total": int64(6),
  1414. "op_preprocessor_demo1E_0_exceptions_total": int64(0),
  1415. "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  1416. "op_preprocessor_demo1E_0_records_in_total": int64(6),
  1417. "op_preprocessor_demo1E_0_records_out_total": int64(6),
  1418. "op_project_0_exceptions_total": int64(0),
  1419. "op_project_0_process_latency_ms": int64(0),
  1420. "op_project_0_records_in_total": int64(5),
  1421. "op_project_0_records_out_total": int64(5),
  1422. "sink_mockSink_0_exceptions_total": int64(0),
  1423. "sink_mockSink_0_records_in_total": int64(5),
  1424. "sink_mockSink_0_records_out_total": int64(5),
  1425. "source_demoE_0_exceptions_total": int64(0),
  1426. "source_demoE_0_records_in_total": int64(6),
  1427. "source_demoE_0_records_out_total": int64(6),
  1428. "source_demo1E_0_exceptions_total": int64(0),
  1429. "source_demo1E_0_records_in_total": int64(6),
  1430. "source_demo1E_0_records_out_total": int64(6),
  1431. "op_window_0_exceptions_total": int64(0),
  1432. "op_window_0_process_latency_ms": int64(0),
  1433. "op_window_0_records_in_total": int64(12),
  1434. "op_window_0_records_out_total": int64(5),
  1435. "op_join_0_exceptions_total": int64(0),
  1436. "op_join_0_process_latency_ms": int64(0),
  1437. "op_join_0_records_in_total": int64(5),
  1438. "op_join_0_records_out_total": int64(5),
  1439. },
  1440. }, {
  1441. name: `rule4`,
  1442. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1443. size: 6,
  1444. r: [][]map[string]interface{}{
  1445. {{
  1446. "color": "red",
  1447. }}, {{
  1448. "color": "blue",
  1449. }, {
  1450. "color": "red",
  1451. }}, {{
  1452. "color": "blue",
  1453. }, {
  1454. "color": "yellow",
  1455. }}, {{
  1456. "color": "blue",
  1457. }, {
  1458. "color": "red",
  1459. }, {
  1460. "color": "yellow",
  1461. }},
  1462. },
  1463. m: map[string]interface{}{
  1464. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  1465. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1466. "op_preprocessor_demoE_0_records_in_total": int64(6),
  1467. "op_preprocessor_demoE_0_records_out_total": int64(6),
  1468. "op_project_0_exceptions_total": int64(0),
  1469. "op_project_0_process_latency_ms": int64(0),
  1470. "op_project_0_records_in_total": int64(4),
  1471. "op_project_0_records_out_total": int64(4),
  1472. "sink_mockSink_0_exceptions_total": int64(0),
  1473. "sink_mockSink_0_records_in_total": int64(4),
  1474. "sink_mockSink_0_records_out_total": int64(4),
  1475. "source_demoE_0_exceptions_total": int64(0),
  1476. "source_demoE_0_records_in_total": int64(6),
  1477. "source_demoE_0_records_out_total": int64(6),
  1478. "op_window_0_exceptions_total": int64(0),
  1479. "op_window_0_process_latency_ms": int64(0),
  1480. "op_window_0_records_in_total": int64(6),
  1481. "op_window_0_records_out_total": int64(4),
  1482. "op_aggregate_0_exceptions_total": int64(0),
  1483. "op_aggregate_0_process_latency_ms": int64(0),
  1484. "op_aggregate_0_records_in_total": int64(4),
  1485. "op_aggregate_0_records_out_total": int64(4),
  1486. "op_order_0_exceptions_total": int64(0),
  1487. "op_order_0_process_latency_ms": int64(0),
  1488. "op_order_0_records_in_total": int64(4),
  1489. "op_order_0_records_out_total": int64(4),
  1490. },
  1491. }, {
  1492. name: `rule5`,
  1493. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1494. size: 12,
  1495. r: [][]map[string]interface{}{
  1496. {{
  1497. "temp": 25.5,
  1498. }}, {{
  1499. "temp": 28.1,
  1500. }, {
  1501. "temp": 27.4,
  1502. }, {
  1503. "temp": 25.5,
  1504. }}, {{
  1505. "temp": 26.2,
  1506. }, {
  1507. "temp": 26.8,
  1508. }, {
  1509. "temp": 28.9,
  1510. }, {
  1511. "temp": 29.1,
  1512. }, {
  1513. "temp": 32.2,
  1514. }}, {{
  1515. "temp": 30.9,
  1516. }},
  1517. },
  1518. m: map[string]interface{}{
  1519. "op_preprocessor_sessionDemoE_0_exceptions_total": int64(0),
  1520. "op_preprocessor_sessionDemoE_0_process_latency_ms": int64(0),
  1521. "op_preprocessor_sessionDemoE_0_records_in_total": int64(12),
  1522. "op_preprocessor_sessionDemoE_0_records_out_total": int64(12),
  1523. "op_project_0_exceptions_total": int64(0),
  1524. "op_project_0_process_latency_ms": int64(0),
  1525. "op_project_0_records_in_total": int64(4),
  1526. "op_project_0_records_out_total": int64(4),
  1527. "sink_mockSink_0_exceptions_total": int64(0),
  1528. "sink_mockSink_0_records_in_total": int64(4),
  1529. "sink_mockSink_0_records_out_total": int64(4),
  1530. "source_sessionDemoE_0_exceptions_total": int64(0),
  1531. "source_sessionDemoE_0_records_in_total": int64(12),
  1532. "source_sessionDemoE_0_records_out_total": int64(12),
  1533. "op_window_0_exceptions_total": int64(0),
  1534. "op_window_0_process_latency_ms": int64(0),
  1535. "op_window_0_records_in_total": int64(12),
  1536. "op_window_0_records_out_total": int64(4),
  1537. },
  1538. }, {
  1539. name: `rule6`,
  1540. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1541. size: 6,
  1542. r: [][]map[string]interface{}{
  1543. {{
  1544. "m": 25.5,
  1545. "c": float64(1),
  1546. }}, {{
  1547. "m": 25.5,
  1548. "c": float64(1),
  1549. }}, {{
  1550. "m": 28.1,
  1551. "c": float64(1),
  1552. }}, {{
  1553. "m": 28.1,
  1554. "c": float64(2),
  1555. }}, {{
  1556. "m": 27.4,
  1557. "c": float64(2),
  1558. }},
  1559. },
  1560. m: map[string]interface{}{
  1561. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  1562. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1563. "op_preprocessor_demoE_0_records_in_total": int64(6),
  1564. "op_preprocessor_demoE_0_records_out_total": int64(6),
  1565. "op_preprocessor_demo1E_0_exceptions_total": int64(0),
  1566. "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  1567. "op_preprocessor_demo1E_0_records_in_total": int64(6),
  1568. "op_preprocessor_demo1E_0_records_out_total": int64(6),
  1569. "op_project_0_exceptions_total": int64(0),
  1570. "op_project_0_process_latency_ms": int64(0),
  1571. "op_project_0_records_in_total": int64(5),
  1572. "op_project_0_records_out_total": int64(5),
  1573. "sink_mockSink_0_exceptions_total": int64(0),
  1574. "sink_mockSink_0_records_in_total": int64(5),
  1575. "sink_mockSink_0_records_out_total": int64(5),
  1576. "source_demoE_0_exceptions_total": int64(0),
  1577. "source_demoE_0_records_in_total": int64(6),
  1578. "source_demoE_0_records_out_total": int64(6),
  1579. "source_demo1E_0_exceptions_total": int64(0),
  1580. "source_demo1E_0_records_in_total": int64(6),
  1581. "source_demo1E_0_records_out_total": int64(6),
  1582. "op_window_0_exceptions_total": int64(0),
  1583. "op_window_0_records_in_total": int64(12),
  1584. "op_window_0_records_out_total": int64(5),
  1585. "op_join_0_exceptions_total": int64(0),
  1586. "op_join_0_process_latency_ms": int64(0),
  1587. "op_join_0_records_in_total": int64(5),
  1588. "op_join_0_records_out_total": int64(5),
  1589. },
  1590. },
  1591. }
  1592. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1593. createEventStreams(t)
  1594. defer dropEventStreams(t)
  1595. for i, tt := range tests {
  1596. test.ResetClock(1541152486000)
  1597. p := NewRuleProcessor(DbDir)
  1598. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1599. var (
  1600. sources []*nodes.SourceNode
  1601. syncs []chan int
  1602. )
  1603. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1604. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1605. } else {
  1606. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1607. t.Errorf("sql %s is not a select statement", tt.sql)
  1608. } else {
  1609. streams := xsql.GetStreams(selectStmt)
  1610. for _, stream := range streams {
  1611. next := make(chan int)
  1612. syncs = append(syncs, next)
  1613. source := getEventMockSource(stream, next, tt.size)
  1614. sources = append(sources, source)
  1615. }
  1616. }
  1617. }
  1618. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  1619. Id: tt.name, Sql: tt.sql,
  1620. Options: map[string]interface{}{
  1621. "isEventTime": true,
  1622. "lateTolerance": float64(1000),
  1623. },
  1624. }, sources)
  1625. if err != nil {
  1626. t.Error(err)
  1627. }
  1628. mockSink := test.NewMockSink()
  1629. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  1630. tp.AddSink(inputs, sink)
  1631. errCh := tp.Open()
  1632. func() {
  1633. for i := 0; i < tt.size*len(syncs); i++ {
  1634. syncs[i%len(syncs)] <- i
  1635. for {
  1636. time.Sleep(1)
  1637. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  1638. break
  1639. }
  1640. }
  1641. select {
  1642. case err = <-errCh:
  1643. t.Log(err)
  1644. tp.Cancel()
  1645. return
  1646. default:
  1647. }
  1648. }
  1649. mockClock := test.GetMockClock()
  1650. mockClock.Add(1000 * time.Millisecond)
  1651. retry := 100
  1652. for ; retry > 0; retry-- {
  1653. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1654. break
  1655. }
  1656. t.Logf("wait to try another %d times", retry)
  1657. time.Sleep(time.Duration(retry) * time.Millisecond)
  1658. }
  1659. if retry == 0 {
  1660. err := compareMetrics(tp, tt.m, tt.sql)
  1661. t.Errorf("could not get correct metrics: %v", err)
  1662. }
  1663. }()
  1664. results := mockSink.GetResults()
  1665. var maps [][]map[string]interface{}
  1666. for _, v := range results {
  1667. var mapRes []map[string]interface{}
  1668. err := json.Unmarshal(v, &mapRes)
  1669. if err != nil {
  1670. t.Errorf("Failed to parse the input into map")
  1671. continue
  1672. }
  1673. maps = append(maps, mapRes)
  1674. }
  1675. if !reflect.DeepEqual(tt.r, maps) {
  1676. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1677. }
  1678. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1679. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1680. }
  1681. tp.Cancel()
  1682. }
  1683. }
  1684. func getMetric(tp *xstream.TopologyNew, name string) int {
  1685. keys, values := tp.GetMetrics()
  1686. for index, key := range keys {
  1687. if key == name {
  1688. return int(values[index].(int64))
  1689. }
  1690. }
  1691. fmt.Println("can't find " + name)
  1692. return 0
  1693. }
  1694. func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
  1695. keys, values := tp.GetMetrics()
  1696. //for i, k := range keys{
  1697. // log.Printf("%s:%v", k, values[i])
  1698. //}
  1699. for k, v := range m {
  1700. var (
  1701. index int
  1702. key string
  1703. matched bool
  1704. )
  1705. for index, key = range keys {
  1706. if k == key {
  1707. if strings.HasSuffix(k, "process_latency_ms") {
  1708. if values[index].(int64) >= v.(int64) {
  1709. matched = true
  1710. continue
  1711. } else {
  1712. break
  1713. }
  1714. }
  1715. if values[index] == v {
  1716. matched = true
  1717. }
  1718. break
  1719. }
  1720. }
  1721. if matched {
  1722. continue
  1723. }
  1724. //do not find
  1725. if index < len(values) {
  1726. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  1727. } else {
  1728. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  1729. }
  1730. }
  1731. return nil
  1732. }
  1733. func errstring(err error) string {
  1734. if err != nil {
  1735. return err.Error()
  1736. }
  1737. return ""
  1738. }