window_rule_test.go 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554
  1. package topotest
  2. import (
  3. "github.com/emqx/kuiper/internal/topo"
  4. "github.com/emqx/kuiper/pkg/api"
  5. "testing"
  6. )
  7. func TestWindow(t *testing.T) {
  8. //Reset
  9. streamList := []string{"demo", "demoError", "demo1", "sessionDemo", "table1"}
  10. HandleStream(false, streamList, t)
  11. var tests = []RuleTest{
  12. {
  13. Name: `TestWindowRule1`,
  14. Sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  15. R: [][]map[string]interface{}{
  16. {{
  17. "color": "red",
  18. "size": float64(3),
  19. "ts": float64(1541152486013),
  20. }, {
  21. "color": "blue",
  22. "size": float64(6),
  23. "ts": float64(1541152486822),
  24. }},
  25. {{
  26. "color": "red",
  27. "size": float64(3),
  28. "ts": float64(1541152486013),
  29. }, {
  30. "color": "blue",
  31. "size": float64(6),
  32. "ts": float64(1541152486822),
  33. }, {
  34. "color": "blue",
  35. "size": float64(2),
  36. "ts": float64(1541152487632),
  37. }},
  38. {{
  39. "color": "blue",
  40. "size": float64(2),
  41. "ts": float64(1541152487632),
  42. }, {
  43. "color": "yellow",
  44. "size": float64(4),
  45. "ts": float64(1541152488442),
  46. }},
  47. {{
  48. "color": "yellow",
  49. "size": float64(4),
  50. "ts": float64(1541152488442),
  51. }, {
  52. "color": "red",
  53. "size": float64(1),
  54. "ts": float64(1541152489252),
  55. }},
  56. },
  57. M: map[string]interface{}{
  58. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  59. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  60. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  61. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  62. "op_3_project_0_exceptions_total": int64(0),
  63. "op_3_project_0_process_latency_us": int64(0),
  64. "op_3_project_0_records_in_total": int64(4),
  65. "op_3_project_0_records_out_total": int64(4),
  66. "sink_mockSink_0_exceptions_total": int64(0),
  67. "sink_mockSink_0_records_in_total": int64(4),
  68. "sink_mockSink_0_records_out_total": int64(4),
  69. "source_demo_0_exceptions_total": int64(0),
  70. "source_demo_0_records_in_total": int64(5),
  71. "source_demo_0_records_out_total": int64(5),
  72. "op_2_window_0_exceptions_total": int64(0),
  73. "op_2_window_0_process_latency_us": int64(0),
  74. "op_2_window_0_records_in_total": int64(5),
  75. "op_2_window_0_records_out_total": int64(4),
  76. },
  77. }, {
  78. Name: `TestWindowRule2`,
  79. Sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  80. R: [][]map[string]interface{}{
  81. {{
  82. "color": "red",
  83. "ts": float64(1541152486013),
  84. }, {
  85. "color": "blue",
  86. "ts": float64(1541152486822),
  87. }},
  88. {{
  89. "color": "yellow",
  90. "ts": float64(1541152488442),
  91. }},
  92. },
  93. M: map[string]interface{}{
  94. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  95. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  96. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  97. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  98. "op_4_project_0_exceptions_total": int64(0),
  99. "op_4_project_0_process_latency_us": int64(0),
  100. "op_4_project_0_records_in_total": int64(2),
  101. "op_4_project_0_records_out_total": int64(2),
  102. "sink_mockSink_0_exceptions_total": int64(0),
  103. "sink_mockSink_0_records_in_total": int64(2),
  104. "sink_mockSink_0_records_out_total": int64(2),
  105. "source_demo_0_exceptions_total": int64(0),
  106. "source_demo_0_records_in_total": int64(5),
  107. "source_demo_0_records_out_total": int64(5),
  108. "op_3_window_0_exceptions_total": int64(0),
  109. "op_3_window_0_process_latency_us": int64(0),
  110. "op_3_window_0_records_in_total": int64(3),
  111. "op_3_window_0_records_out_total": int64(2),
  112. "op_2_filter_0_exceptions_total": int64(0),
  113. "op_2_filter_0_process_latency_us": int64(0),
  114. "op_2_filter_0_records_in_total": int64(5),
  115. "op_2_filter_0_records_out_total": int64(3),
  116. },
  117. }, {
  118. Name: `TestWindowRule3`,
  119. Sql: `SELECT color, temp, demo.ts as ts1, demo1.ts as ts2, demo.ts - demo1.ts as diff FROM demo INNER JOIN demo1 ON ts1 = ts2 GROUP BY SlidingWindow(ss, 1)`,
  120. R: [][]map[string]interface{}{
  121. {{
  122. "color": "red",
  123. "temp": 25.5,
  124. "ts1": float64(1541152486013),
  125. "ts2": float64(1541152486013),
  126. "diff": float64(0),
  127. }}, {{
  128. "color": "red",
  129. "temp": 25.5,
  130. "ts1": float64(1541152486013),
  131. "ts2": float64(1541152486013),
  132. "diff": float64(0),
  133. }}, {{
  134. "color": "red",
  135. "temp": 25.5,
  136. "ts1": float64(1541152486013),
  137. "ts2": float64(1541152486013),
  138. "diff": float64(0),
  139. }}, {{
  140. "color": "blue",
  141. "temp": 28.1,
  142. "ts1": float64(1541152487632),
  143. "ts2": float64(1541152487632),
  144. "diff": float64(0),
  145. }}, {{
  146. "color": "blue",
  147. "temp": 28.1,
  148. "ts1": float64(1541152487632),
  149. "ts2": float64(1541152487632),
  150. "diff": float64(0),
  151. }}, {{
  152. "color": "blue",
  153. "temp": 28.1,
  154. "ts1": float64(1541152487632),
  155. "ts2": float64(1541152487632),
  156. "diff": float64(0),
  157. }, {
  158. "color": "yellow",
  159. "temp": 27.4,
  160. "ts1": float64(1541152488442),
  161. "ts2": float64(1541152488442),
  162. "diff": float64(0),
  163. }}, {{
  164. "color": "yellow",
  165. "temp": 27.4,
  166. "ts1": float64(1541152488442),
  167. "ts2": float64(1541152488442),
  168. "diff": float64(0),
  169. }}, {{
  170. "color": "yellow",
  171. "temp": 27.4,
  172. "ts1": float64(1541152488442),
  173. "ts2": float64(1541152488442),
  174. "diff": float64(0),
  175. }, {
  176. "color": "red",
  177. "temp": 25.5,
  178. "ts1": float64(1541152489252),
  179. "ts2": float64(1541152489252),
  180. "diff": float64(0),
  181. }},
  182. },
  183. M: map[string]interface{}{
  184. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  185. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  186. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  187. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  188. "op_2_preprocessor_demo1_0_exceptions_total": int64(0),
  189. "op_2_preprocessor_demo1_0_process_latency_us": int64(0),
  190. "op_2_preprocessor_demo1_0_records_in_total": int64(5),
  191. "op_2_preprocessor_demo1_0_records_out_total": int64(5),
  192. "op_5_project_0_exceptions_total": int64(0),
  193. "op_5_project_0_process_latency_us": int64(0),
  194. "op_5_project_0_records_in_total": int64(8),
  195. "op_5_project_0_records_out_total": int64(8),
  196. "sink_mockSink_0_exceptions_total": int64(0),
  197. "sink_mockSink_0_records_in_total": int64(8),
  198. "sink_mockSink_0_records_out_total": int64(8),
  199. "source_demo_0_exceptions_total": int64(0),
  200. "source_demo_0_records_in_total": int64(5),
  201. "source_demo_0_records_out_total": int64(5),
  202. "source_demo1_0_exceptions_total": int64(0),
  203. "source_demo1_0_records_in_total": int64(5),
  204. "source_demo1_0_records_out_total": int64(5),
  205. "op_3_window_0_exceptions_total": int64(0),
  206. "op_3_window_0_process_latency_us": int64(0),
  207. "op_3_window_0_records_in_total": int64(10),
  208. "op_3_window_0_records_out_total": int64(10),
  209. "op_4_join_0_exceptions_total": int64(0),
  210. "op_4_join_0_process_latency_us": int64(0),
  211. "op_4_join_0_records_in_total": int64(10),
  212. "op_4_join_0_records_out_total": int64(8),
  213. },
  214. T: &topo.PrintableTopo{
  215. Sources: []string{"source_demo", "source_demo1"},
  216. Edges: map[string][]string{
  217. "source_demo": {"op_1_preprocessor_demo"},
  218. "source_demo1": {"op_2_preprocessor_demo1"},
  219. "op_1_preprocessor_demo": {"op_3_window"},
  220. "op_2_preprocessor_demo1": {"op_3_window"},
  221. "op_3_window": {"op_4_join"},
  222. "op_4_join": {"op_5_project"},
  223. "op_5_project": {"sink_mockSink"},
  224. },
  225. },
  226. }, {
  227. Name: `TestWindowRule4`,
  228. Sql: `SELECT color, count(*) as c FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  229. R: [][]map[string]interface{}{
  230. {{
  231. "color": "red",
  232. "c": float64(1),
  233. }}, {{
  234. "color": "blue",
  235. "c": float64(1),
  236. }, {
  237. "color": "red",
  238. "c": float64(1),
  239. }}, {{
  240. "color": "blue",
  241. "c": float64(2),
  242. }, {
  243. "color": "red",
  244. "c": float64(1),
  245. }}, {{
  246. "color": "blue",
  247. "c": float64(2),
  248. }, {
  249. "color": "yellow",
  250. "c": float64(1),
  251. }}, {{
  252. "color": "blue",
  253. "c": float64(1),
  254. }, {
  255. "color": "red",
  256. "c": float64(1),
  257. }, {
  258. "color": "yellow",
  259. "c": float64(1),
  260. }},
  261. },
  262. M: map[string]interface{}{
  263. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  264. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  265. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  266. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  267. "op_5_project_0_exceptions_total": int64(0),
  268. "op_5_project_0_process_latency_us": int64(0),
  269. "op_5_project_0_records_in_total": int64(5),
  270. "op_5_project_0_records_out_total": int64(5),
  271. "sink_mockSink_0_exceptions_total": int64(0),
  272. "sink_mockSink_0_records_in_total": int64(5),
  273. "sink_mockSink_0_records_out_total": int64(5),
  274. "source_demo_0_exceptions_total": int64(0),
  275. "source_demo_0_records_in_total": int64(5),
  276. "source_demo_0_records_out_total": int64(5),
  277. "op_2_window_0_exceptions_total": int64(0),
  278. "op_2_window_0_process_latency_us": int64(0),
  279. "op_2_window_0_records_in_total": int64(5),
  280. "op_2_window_0_records_out_total": int64(5),
  281. "op_3_aggregate_0_exceptions_total": int64(0),
  282. "op_3_aggregate_0_process_latency_us": int64(0),
  283. "op_3_aggregate_0_records_in_total": int64(5),
  284. "op_3_aggregate_0_records_out_total": int64(5),
  285. "op_4_order_0_exceptions_total": int64(0),
  286. "op_4_order_0_process_latency_us": int64(0),
  287. "op_4_order_0_records_in_total": int64(5),
  288. "op_4_order_0_records_out_total": int64(5),
  289. },
  290. }, {
  291. Name: `TestWindowRule5`,
  292. Sql: `SELECT count(temp), window_start() as ws, window_end() FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  293. R: [][]map[string]interface{}{
  294. {{
  295. "count": float64(2),
  296. "ws": float64(1541152486013),
  297. "window_end": float64(1541152487823), // timeout
  298. }}, {{
  299. "count": float64(3),
  300. "ws": float64(1541152487932),
  301. "window_end": float64(1541152490000), // tick
  302. }}, {{
  303. "count": float64(5),
  304. "ws": float64(1541152490000),
  305. "window_end": float64(1541152494000), // tick
  306. }}, {{
  307. "count": float64(1),
  308. "ws": float64(1541152494000),
  309. "window_end": float64(1541152495112), // timeout
  310. }},
  311. },
  312. M: map[string]interface{}{
  313. "op_1_preprocessor_sessionDemo_0_exceptions_total": int64(0),
  314. "op_1_preprocessor_sessionDemo_0_process_latency_us": int64(0),
  315. "op_1_preprocessor_sessionDemo_0_records_in_total": int64(11),
  316. "op_1_preprocessor_sessionDemo_0_records_out_total": int64(11),
  317. "op_3_project_0_exceptions_total": int64(0),
  318. "op_3_project_0_process_latency_us": int64(0),
  319. "op_3_project_0_records_in_total": int64(4),
  320. "op_3_project_0_records_out_total": int64(4),
  321. "sink_mockSink_0_exceptions_total": int64(0),
  322. "sink_mockSink_0_records_in_total": int64(4),
  323. "sink_mockSink_0_records_out_total": int64(4),
  324. "source_sessionDemo_0_exceptions_total": int64(0),
  325. "source_sessionDemo_0_records_in_total": int64(11),
  326. "source_sessionDemo_0_records_out_total": int64(11),
  327. "op_2_window_0_exceptions_total": int64(0),
  328. "op_2_window_0_process_latency_us": int64(0),
  329. "op_2_window_0_records_in_total": int64(11),
  330. "op_2_window_0_records_out_total": int64(4),
  331. },
  332. }, {
  333. Name: `TestWindowRule6`,
  334. Sql: `SELECT window_end(), sum(temp) as temp, count(color) as c, window_start() FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  335. R: [][]map[string]interface{}{
  336. {{
  337. "temp": 25.5,
  338. "c": float64(1),
  339. "window_start": float64(1541152485115),
  340. "window_end": float64(1541152486115),
  341. }}, {{
  342. "temp": 25.5,
  343. "c": float64(1),
  344. "window_start": float64(1541152485822),
  345. "window_end": float64(1541152486822),
  346. }}, {{
  347. "temp": 25.5,
  348. "c": float64(1),
  349. "window_start": float64(1541152485903),
  350. "window_end": float64(1541152486903),
  351. }}, {{
  352. "temp": 28.1,
  353. "c": float64(1),
  354. "window_start": float64(1541152486702),
  355. "window_end": float64(1541152487702),
  356. }}, {{
  357. "temp": 28.1,
  358. "c": float64(1),
  359. "window_start": float64(1541152487442),
  360. "window_end": float64(1541152488442),
  361. }}, {{
  362. "temp": 55.5,
  363. "c": float64(2),
  364. "window_start": float64(1541152487605),
  365. "window_end": float64(1541152488605),
  366. }}, {{
  367. "temp": 27.4,
  368. "c": float64(1),
  369. "window_start": float64(1541152488252),
  370. "window_end": float64(1541152489252),
  371. }}, {{
  372. "temp": 52.9,
  373. "c": float64(2),
  374. "window_start": float64(1541152488305),
  375. "window_end": float64(1541152489305),
  376. }},
  377. },
  378. M: map[string]interface{}{
  379. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  380. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  381. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  382. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  383. "op_2_preprocessor_demo1_0_exceptions_total": int64(0),
  384. "op_2_preprocessor_demo1_0_process_latency_us": int64(0),
  385. "op_2_preprocessor_demo1_0_records_in_total": int64(5),
  386. "op_2_preprocessor_demo1_0_records_out_total": int64(5),
  387. "op_5_project_0_exceptions_total": int64(0),
  388. "op_5_project_0_process_latency_us": int64(0),
  389. "op_5_project_0_records_in_total": int64(8),
  390. "op_5_project_0_records_out_total": int64(8),
  391. "sink_mockSink_0_exceptions_total": int64(0),
  392. "sink_mockSink_0_records_in_total": int64(8),
  393. "sink_mockSink_0_records_out_total": int64(8),
  394. "source_demo_0_exceptions_total": int64(0),
  395. "source_demo_0_records_in_total": int64(5),
  396. "source_demo_0_records_out_total": int64(5),
  397. "source_demo1_0_exceptions_total": int64(0),
  398. "source_demo1_0_records_in_total": int64(5),
  399. "source_demo1_0_records_out_total": int64(5),
  400. "op_3_window_0_exceptions_total": int64(0),
  401. "op_3_window_0_process_latency_us": int64(0),
  402. "op_3_window_0_records_in_total": int64(10),
  403. "op_3_window_0_records_out_total": int64(10),
  404. "op_4_join_0_exceptions_total": int64(0),
  405. "op_4_join_0_process_latency_us": int64(0),
  406. "op_4_join_0_records_in_total": int64(10),
  407. "op_4_join_0_records_out_total": int64(8),
  408. },
  409. }, {
  410. Name: `TestWindowRule7`,
  411. Sql: `SELECT * FROM demoError GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  412. R: [][]map[string]interface{}{
  413. {{
  414. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  415. }},
  416. {{
  417. "color": "blue",
  418. "size": float64(6),
  419. "ts": float64(1541152486822),
  420. }},
  421. {{
  422. "color": "blue",
  423. "size": float64(6),
  424. "ts": float64(1541152486822),
  425. }, {
  426. "color": "blue",
  427. "size": float64(2),
  428. "ts": float64(1541152487632),
  429. }},
  430. {{
  431. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  432. }},
  433. {{
  434. "color": "blue",
  435. "size": float64(2),
  436. "ts": float64(1541152487632),
  437. }},
  438. {{
  439. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  440. }},
  441. },
  442. M: map[string]interface{}{
  443. "op_1_preprocessor_demoError_0_exceptions_total": int64(3),
  444. "op_1_preprocessor_demoError_0_process_latency_us": int64(0),
  445. "op_1_preprocessor_demoError_0_records_in_total": int64(5),
  446. "op_1_preprocessor_demoError_0_records_out_total": int64(2),
  447. "op_3_project_0_exceptions_total": int64(3),
  448. "op_3_project_0_process_latency_us": int64(0),
  449. "op_3_project_0_records_in_total": int64(6),
  450. "op_3_project_0_records_out_total": int64(3),
  451. "sink_mockSink_0_exceptions_total": int64(0),
  452. "sink_mockSink_0_records_in_total": int64(6),
  453. "sink_mockSink_0_records_out_total": int64(6),
  454. "source_demoError_0_exceptions_total": int64(0),
  455. "source_demoError_0_records_in_total": int64(5),
  456. "source_demoError_0_records_out_total": int64(5),
  457. "op_2_window_0_exceptions_total": int64(3),
  458. "op_2_window_0_process_latency_us": int64(0),
  459. "op_2_window_0_records_in_total": int64(5),
  460. "op_2_window_0_records_out_total": int64(3),
  461. },
  462. }, {
  463. Name: `TestWindowRule8`,
  464. Sql: `SELECT color, window_end(), ts, count(*) as c, window_start() FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
  465. R: [][]map[string]interface{}{
  466. {{
  467. "color": "red",
  468. "ts": float64(1541152486013),
  469. "c": float64(2),
  470. "window_start": float64(1541152486000),
  471. "window_end": float64(1541152487000),
  472. }},
  473. },
  474. M: map[string]interface{}{
  475. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  476. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  477. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  478. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  479. "op_5_project_0_exceptions_total": int64(0),
  480. "op_5_project_0_process_latency_us": int64(0),
  481. "op_5_project_0_records_in_total": int64(1),
  482. "op_5_project_0_records_out_total": int64(1),
  483. "sink_mockSink_0_exceptions_total": int64(0),
  484. "sink_mockSink_0_records_in_total": int64(1),
  485. "sink_mockSink_0_records_out_total": int64(1),
  486. "source_demo_0_exceptions_total": int64(0),
  487. "source_demo_0_records_in_total": int64(5),
  488. "source_demo_0_records_out_total": int64(5),
  489. "op_3_window_0_exceptions_total": int64(0),
  490. "op_3_window_0_process_latency_us": int64(0),
  491. "op_3_window_0_records_in_total": int64(3),
  492. "op_3_window_0_records_out_total": int64(2),
  493. "op_2_filter_0_exceptions_total": int64(0),
  494. "op_2_filter_0_process_latency_us": int64(0),
  495. "op_2_filter_0_records_in_total": int64(5),
  496. "op_2_filter_0_records_out_total": int64(3),
  497. "op_4_having_0_exceptions_total": int64(0),
  498. "op_4_having_0_process_latency_us": int64(0),
  499. "op_4_having_0_records_in_total": int64(2),
  500. "op_4_having_0_records_out_total": int64(1),
  501. },
  502. }, {
  503. Name: `TestWindowRule9`,
  504. Sql: `SELECT color, window_start(), window_end() FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1) FILTER( WHERE size > 2)`,
  505. R: [][]map[string]interface{}{
  506. {{
  507. "color": "red",
  508. "window_start": float64(1541152485000),
  509. "window_end": float64(1541152487000),
  510. }},
  511. {{
  512. "color": "red",
  513. "window_start": float64(1541152486000),
  514. "window_end": float64(1541152488000),
  515. }},
  516. {{
  517. "color": "yellow",
  518. "window_start": float64(1541152487000),
  519. "window_end": float64(1541152489000),
  520. }},
  521. {{
  522. "color": "yellow",
  523. "window_start": float64(1541152488000),
  524. "window_end": float64(1541152490000),
  525. }},
  526. },
  527. M: map[string]interface{}{
  528. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  529. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  530. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  531. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  532. "op_4_project_0_exceptions_total": int64(0),
  533. "op_4_project_0_process_latency_us": int64(0),
  534. "op_4_project_0_records_in_total": int64(4),
  535. "op_4_project_0_records_out_total": int64(4),
  536. "sink_mockSink_0_exceptions_total": int64(0),
  537. "sink_mockSink_0_records_in_total": int64(4),
  538. "sink_mockSink_0_records_out_total": int64(4),
  539. "source_demo_0_exceptions_total": int64(0),
  540. "source_demo_0_records_in_total": int64(5),
  541. "source_demo_0_records_out_total": int64(5),
  542. "op_3_window_0_exceptions_total": int64(0),
  543. "op_3_window_0_process_latency_us": int64(0),
  544. "op_3_window_0_records_in_total": int64(3),
  545. "op_3_window_0_records_out_total": int64(4),
  546. },
  547. }, {
  548. Name: `TestCountWindowRule1`,
  549. Sql: `SELECT collect(*)[0]->color as c FROM demo GROUP BY COUNTWINDOW(3)`,
  550. R: [][]map[string]interface{}{
  551. {{
  552. "c": "red",
  553. }},
  554. },
  555. M: map[string]interface{}{
  556. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  557. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  558. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  559. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  560. "op_3_project_0_exceptions_total": int64(0),
  561. "op_3_project_0_process_latency_us": int64(0),
  562. "op_3_project_0_records_in_total": int64(1),
  563. "op_3_project_0_records_out_total": int64(1),
  564. "sink_mockSink_0_exceptions_total": int64(0),
  565. "sink_mockSink_0_records_in_total": int64(1),
  566. "sink_mockSink_0_records_out_total": int64(1),
  567. "source_demo_0_exceptions_total": int64(0),
  568. "source_demo_0_records_in_total": int64(5),
  569. "source_demo_0_records_out_total": int64(5),
  570. "op_2_window_0_exceptions_total": int64(0),
  571. "op_2_window_0_process_latency_us": int64(0),
  572. "op_2_window_0_records_in_total": int64(5),
  573. "op_2_window_0_records_out_total": int64(1),
  574. },
  575. }, {
  576. Name: `TestWindowRule10`,
  577. Sql: `SELECT deduplicate(color, false)->color as c FROM demo GROUP BY SlidingWindow(hh, 1)`,
  578. R: [][]map[string]interface{}{
  579. {{
  580. "c": "red",
  581. }}, {{
  582. "c": "blue",
  583. }}, {{}}, {{
  584. "c": "yellow",
  585. }}, {{}},
  586. },
  587. M: map[string]interface{}{
  588. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  589. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  590. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  591. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  592. "op_3_project_0_exceptions_total": int64(0),
  593. "op_3_project_0_process_latency_us": int64(0),
  594. "op_3_project_0_records_in_total": int64(5),
  595. "op_3_project_0_records_out_total": int64(5),
  596. "sink_mockSink_0_exceptions_total": int64(0),
  597. "sink_mockSink_0_records_in_total": int64(5),
  598. "sink_mockSink_0_records_out_total": int64(5),
  599. "source_demo_0_exceptions_total": int64(0),
  600. "source_demo_0_records_in_total": int64(5),
  601. "source_demo_0_records_out_total": int64(5),
  602. "op_2_window_0_exceptions_total": int64(0),
  603. "op_2_window_0_process_latency_us": int64(0),
  604. "op_2_window_0_records_in_total": int64(5),
  605. "op_2_window_0_records_out_total": int64(5),
  606. },
  607. }, {
  608. Name: `TestWindowRule11`,
  609. Sql: `SELECT color, name, window_start(), window_end() FROM demo INNER JOIN table1 on demo.ts = table1.id where demo.size > 2 and table1.size > 1 GROUP BY tumblingwindow(ss, 1)`,
  610. R: [][]map[string]interface{}{
  611. {{
  612. "color": "red",
  613. "name": "name1",
  614. "window_start": float64(1541152486000),
  615. "window_end": float64(1541152487000),
  616. }},
  617. },
  618. M: map[string]interface{}{
  619. //"op_4_project_0_exceptions_total": int64(0),
  620. //"op_4_project_0_process_latency_us": int64(0),
  621. //"op_4_project_0_records_in_total": int64(2),
  622. //"op_4_project_0_records_out_total": int64(2),
  623. "op_3_window_0_exceptions_total": int64(0),
  624. "op_3_window_0_process_latency_us": int64(0),
  625. "op_3_window_0_records_in_total": int64(3),
  626. "op_3_window_0_records_out_total": int64(2),
  627. "op_2_filter_0_exceptions_total": int64(0),
  628. "op_2_filter_0_process_latency_us": int64(0),
  629. "op_2_filter_0_records_in_total": int64(5),
  630. "op_2_filter_0_records_out_total": int64(3),
  631. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  632. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  633. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  634. "op_4_tableprocessor_table1_0_exceptions_total": int64(0),
  635. "op_4_tableprocessor_table1_0_records_in_total": int64(4),
  636. "op_4_tableprocessor_table1_0_records_out_total": int64(1),
  637. "op_5_filter_0_exceptions_total": int64(0),
  638. "op_5_filter_0_records_in_total": int64(1),
  639. "op_5_filter_0_records_out_total": int64(1),
  640. "op_6_join_aligner_0_records_in_total": int64(3),
  641. "op_6_join_aligner_0_records_out_total": int64(2),
  642. "op_7_join_0_exceptions_total": int64(0),
  643. "op_7_join_0_records_in_total": int64(2),
  644. "op_7_join_0_records_out_total": int64(1),
  645. "op_8_project_0_exceptions_total": int64(0),
  646. "op_8_project_0_records_in_total": int64(1),
  647. "op_8_project_0_records_out_total": int64(1),
  648. "sink_mockSink_0_exceptions_total": int64(0),
  649. "sink_mockSink_0_records_in_total": int64(1),
  650. "sink_mockSink_0_records_out_total": int64(1),
  651. "source_demo_0_exceptions_total": int64(0),
  652. "source_demo_0_records_in_total": int64(5),
  653. "source_demo_0_records_out_total": int64(5),
  654. "source_table1_0_exceptions_total": int64(0),
  655. "source_table1_0_records_in_total": int64(4),
  656. "source_table1_0_records_out_total": int64(4),
  657. },
  658. },
  659. }
  660. HandleStream(true, streamList, t)
  661. options := []*api.RuleOption{
  662. {
  663. BufferLength: 100,
  664. SendError: true,
  665. }, {
  666. BufferLength: 100,
  667. SendError: true,
  668. Qos: api.AtLeastOnce,
  669. CheckpointInterval: 5000,
  670. }, {
  671. BufferLength: 100,
  672. SendError: true,
  673. Qos: api.ExactlyOnce,
  674. CheckpointInterval: 5000,
  675. },
  676. }
  677. for j, opt := range options {
  678. DoRuleTest(t, tests, j, opt, 15)
  679. }
  680. }
  681. func TestEventWindow(t *testing.T) {
  682. //Reset
  683. streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE"}
  684. HandleStream(false, streamList, t)
  685. var tests = []RuleTest{
  686. {
  687. Name: `TestEventWindowRule1`,
  688. Sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  689. R: [][]map[string]interface{}{
  690. {{
  691. "color": "red",
  692. "size": float64(3),
  693. "ts": float64(1541152486013),
  694. }},
  695. {{
  696. "color": "red",
  697. "size": float64(3),
  698. "ts": float64(1541152486013),
  699. }, {
  700. "color": "blue",
  701. "size": float64(2),
  702. "ts": float64(1541152487632),
  703. }},
  704. {{
  705. "color": "blue",
  706. "size": float64(2),
  707. "ts": float64(1541152487632),
  708. }, {
  709. "color": "yellow",
  710. "size": float64(4),
  711. "ts": float64(1541152488442),
  712. }}, {{
  713. "color": "yellow",
  714. "size": float64(4),
  715. "ts": float64(1541152488442),
  716. }, {
  717. "color": "red",
  718. "size": float64(1),
  719. "ts": float64(1541152489252),
  720. }}, {{
  721. "color": "red",
  722. "size": float64(1),
  723. "ts": float64(1541152489252),
  724. }},
  725. },
  726. M: map[string]interface{}{
  727. "op_1_preprocessor_demoE_0_exceptions_total": int64(0),
  728. "op_1_preprocessor_demoE_0_process_latency_us": int64(0),
  729. "op_1_preprocessor_demoE_0_records_in_total": int64(6),
  730. "op_1_preprocessor_demoE_0_records_out_total": int64(6),
  731. "op_3_project_0_exceptions_total": int64(0),
  732. "op_3_project_0_process_latency_us": int64(0),
  733. "op_3_project_0_records_in_total": int64(5),
  734. "op_3_project_0_records_out_total": int64(5),
  735. "sink_mockSink_0_exceptions_total": int64(0),
  736. "sink_mockSink_0_records_in_total": int64(5),
  737. "sink_mockSink_0_records_out_total": int64(5),
  738. "source_demoE_0_exceptions_total": int64(0),
  739. "source_demoE_0_records_in_total": int64(6),
  740. "source_demoE_0_records_out_total": int64(6),
  741. "op_2_window_0_exceptions_total": int64(0),
  742. "op_2_window_0_process_latency_us": int64(0),
  743. "op_2_window_0_records_in_total": int64(6),
  744. "op_2_window_0_records_out_total": int64(5),
  745. },
  746. }, {
  747. Name: `TestEventWindowRule2`,
  748. Sql: `SELECT window_start(), window_end(), color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  749. R: [][]map[string]interface{}{
  750. {{
  751. "window_start": float64(1541152486000),
  752. "window_end": float64(1541152487000),
  753. "color": "red",
  754. "ts": float64(1541152486013),
  755. }},
  756. {{
  757. "window_start": float64(1541152488000),
  758. "window_end": float64(1541152489000),
  759. "color": "yellow",
  760. "ts": float64(1541152488442),
  761. }},
  762. },
  763. M: map[string]interface{}{
  764. "op_1_preprocessor_demoE_0_exceptions_total": int64(0),
  765. "op_1_preprocessor_demoE_0_process_latency_us": int64(0),
  766. "op_1_preprocessor_demoE_0_records_in_total": int64(6),
  767. "op_1_preprocessor_demoE_0_records_out_total": int64(6),
  768. "op_4_project_0_exceptions_total": int64(0),
  769. "op_4_project_0_process_latency_us": int64(0),
  770. "op_4_project_0_records_in_total": int64(2),
  771. "op_4_project_0_records_out_total": int64(2),
  772. "sink_mockSink_0_exceptions_total": int64(0),
  773. "sink_mockSink_0_records_in_total": int64(2),
  774. "sink_mockSink_0_records_out_total": int64(2),
  775. "source_demoE_0_exceptions_total": int64(0),
  776. "source_demoE_0_records_in_total": int64(6),
  777. "source_demoE_0_records_out_total": int64(6),
  778. "op_2_window_0_exceptions_total": int64(0),
  779. "op_2_window_0_process_latency_us": int64(0),
  780. "op_2_window_0_records_in_total": int64(6),
  781. "op_2_window_0_records_out_total": int64(4),
  782. "op_3_filter_0_exceptions_total": int64(0),
  783. "op_3_filter_0_process_latency_us": int64(0),
  784. "op_3_filter_0_records_in_total": int64(4),
  785. "op_3_filter_0_records_out_total": int64(2),
  786. },
  787. }, {
  788. Name: `TestEventWindowRule3`,
  789. Sql: `SELECT color, temp, demoE.ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  790. R: [][]map[string]interface{}{
  791. {{
  792. "color": "red",
  793. "temp": 25.5,
  794. "ts": float64(1541152486013),
  795. }}, {{
  796. "color": "red",
  797. "temp": 25.5,
  798. "ts": float64(1541152486013),
  799. }}, {{
  800. "color": "blue",
  801. "temp": 28.1,
  802. "ts": float64(1541152487632),
  803. }}, {{
  804. "color": "blue",
  805. "temp": 28.1,
  806. "ts": float64(1541152487632),
  807. }, {
  808. "color": "yellow",
  809. "temp": 27.4,
  810. "ts": float64(1541152488442),
  811. }}, {{
  812. "color": "yellow",
  813. "temp": 27.4,
  814. "ts": float64(1541152488442),
  815. }, {
  816. "color": "red",
  817. "temp": 25.5,
  818. "ts": float64(1541152489252),
  819. }},
  820. },
  821. M: map[string]interface{}{
  822. "op_1_preprocessor_demoE_0_exceptions_total": int64(0),
  823. "op_1_preprocessor_demoE_0_process_latency_us": int64(0),
  824. "op_1_preprocessor_demoE_0_records_in_total": int64(6),
  825. "op_1_preprocessor_demoE_0_records_out_total": int64(6),
  826. "op_2_preprocessor_demo1E_0_exceptions_total": int64(0),
  827. "op_2_preprocessor_demo1E_0_process_latency_us": int64(0),
  828. "op_2_preprocessor_demo1E_0_records_in_total": int64(6),
  829. "op_2_preprocessor_demo1E_0_records_out_total": int64(6),
  830. "op_5_project_0_exceptions_total": int64(0),
  831. "op_5_project_0_process_latency_us": int64(0),
  832. "op_5_project_0_records_in_total": int64(5),
  833. "op_5_project_0_records_out_total": int64(5),
  834. "sink_mockSink_0_exceptions_total": int64(0),
  835. "sink_mockSink_0_records_in_total": int64(5),
  836. "sink_mockSink_0_records_out_total": int64(5),
  837. "source_demoE_0_exceptions_total": int64(0),
  838. "source_demoE_0_records_in_total": int64(6),
  839. "source_demoE_0_records_out_total": int64(6),
  840. "source_demo1E_0_exceptions_total": int64(0),
  841. "source_demo1E_0_records_in_total": int64(6),
  842. "source_demo1E_0_records_out_total": int64(6),
  843. "op_3_window_0_exceptions_total": int64(0),
  844. "op_3_window_0_process_latency_us": int64(0),
  845. "op_3_window_0_records_in_total": int64(12),
  846. "op_3_window_0_records_out_total": int64(5),
  847. "op_4_join_0_exceptions_total": int64(0),
  848. "op_4_join_0_process_latency_us": int64(0),
  849. "op_4_join_0_records_in_total": int64(5),
  850. "op_4_join_0_records_out_total": int64(5),
  851. },
  852. }, {
  853. Name: `TestEventWindowRule4`,
  854. Sql: `SELECT window_start() as ws, color, window_end() as we FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  855. R: [][]map[string]interface{}{
  856. {{
  857. "color": "red",
  858. "ws": float64(1541152484013),
  859. "we": float64(1541152486013),
  860. }}, {{
  861. "color": "blue",
  862. "ws": float64(1541152485632),
  863. "we": float64(1541152487632),
  864. }, {
  865. "color": "red",
  866. "ws": float64(1541152485632),
  867. "we": float64(1541152487632),
  868. }}, {{
  869. "color": "blue",
  870. "ws": float64(1541152486442),
  871. "we": float64(1541152488442),
  872. }, {
  873. "color": "yellow",
  874. "ws": float64(1541152486442),
  875. "we": float64(1541152488442),
  876. }}, {{
  877. "color": "blue",
  878. "ws": float64(1541152487252),
  879. "we": float64(1541152489252),
  880. }, {
  881. "color": "red",
  882. "ws": float64(1541152487252),
  883. "we": float64(1541152489252),
  884. }, {
  885. "color": "yellow",
  886. "ws": float64(1541152487252),
  887. "we": float64(1541152489252),
  888. }},
  889. },
  890. M: map[string]interface{}{
  891. "op_1_preprocessor_demoE_0_exceptions_total": int64(0),
  892. "op_1_preprocessor_demoE_0_process_latency_us": int64(0),
  893. "op_1_preprocessor_demoE_0_records_in_total": int64(6),
  894. "op_1_preprocessor_demoE_0_records_out_total": int64(6),
  895. "op_5_project_0_exceptions_total": int64(0),
  896. "op_5_project_0_process_latency_us": int64(0),
  897. "op_5_project_0_records_in_total": int64(4),
  898. "op_5_project_0_records_out_total": int64(4),
  899. "sink_mockSink_0_exceptions_total": int64(0),
  900. "sink_mockSink_0_records_in_total": int64(4),
  901. "sink_mockSink_0_records_out_total": int64(4),
  902. "source_demoE_0_exceptions_total": int64(0),
  903. "source_demoE_0_records_in_total": int64(6),
  904. "source_demoE_0_records_out_total": int64(6),
  905. "op_2_window_0_exceptions_total": int64(0),
  906. "op_2_window_0_process_latency_us": int64(0),
  907. "op_2_window_0_records_in_total": int64(6),
  908. "op_2_window_0_records_out_total": int64(4),
  909. "op_3_aggregate_0_exceptions_total": int64(0),
  910. "op_3_aggregate_0_process_latency_us": int64(0),
  911. "op_3_aggregate_0_records_in_total": int64(4),
  912. "op_3_aggregate_0_records_out_total": int64(4),
  913. "op_4_order_0_exceptions_total": int64(0),
  914. "op_4_order_0_process_latency_us": int64(0),
  915. "op_4_order_0_records_in_total": int64(4),
  916. "op_4_order_0_records_out_total": int64(4),
  917. },
  918. }, {
  919. Name: `TestEventWindowRule5`,
  920. Sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  921. R: [][]map[string]interface{}{
  922. {{
  923. "temp": 25.5,
  924. }}, {{
  925. "temp": 28.1,
  926. }, {
  927. "temp": 27.4,
  928. }, {
  929. "temp": 25.5,
  930. }}, {{
  931. "temp": 26.2,
  932. }, {
  933. "temp": 26.8,
  934. }, {
  935. "temp": 28.9,
  936. }, {
  937. "temp": 29.1,
  938. }, {
  939. "temp": 32.2,
  940. }}, {{
  941. "temp": 30.9,
  942. }},
  943. },
  944. M: map[string]interface{}{
  945. "op_1_preprocessor_sessionDemoE_0_exceptions_total": int64(0),
  946. "op_1_preprocessor_sessionDemoE_0_process_latency_us": int64(0),
  947. "op_1_preprocessor_sessionDemoE_0_records_in_total": int64(12),
  948. "op_1_preprocessor_sessionDemoE_0_records_out_total": int64(12),
  949. "op_3_project_0_exceptions_total": int64(0),
  950. "op_3_project_0_process_latency_us": int64(0),
  951. "op_3_project_0_records_in_total": int64(4),
  952. "op_3_project_0_records_out_total": int64(4),
  953. "sink_mockSink_0_exceptions_total": int64(0),
  954. "sink_mockSink_0_records_in_total": int64(4),
  955. "sink_mockSink_0_records_out_total": int64(4),
  956. "source_sessionDemoE_0_exceptions_total": int64(0),
  957. "source_sessionDemoE_0_records_in_total": int64(12),
  958. "source_sessionDemoE_0_records_out_total": int64(12),
  959. "op_2_window_0_exceptions_total": int64(0),
  960. "op_2_window_0_process_latency_us": int64(0),
  961. "op_2_window_0_records_in_total": int64(12),
  962. "op_2_window_0_records_out_total": int64(4),
  963. },
  964. }, {
  965. Name: `TestEventWindowRule6`,
  966. Sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  967. R: [][]map[string]interface{}{
  968. {{
  969. "m": 25.5,
  970. "c": float64(1),
  971. }}, {{
  972. "m": 25.5,
  973. "c": float64(1),
  974. }}, {{
  975. "m": 28.1,
  976. "c": float64(1),
  977. }}, {{
  978. "m": 28.1,
  979. "c": float64(2),
  980. }}, {{
  981. "m": 27.4,
  982. "c": float64(2),
  983. }},
  984. },
  985. M: map[string]interface{}{
  986. "op_1_preprocessor_demoE_0_exceptions_total": int64(0),
  987. "op_1_preprocessor_demoE_0_process_latency_us": int64(0),
  988. "op_1_preprocessor_demoE_0_records_in_total": int64(6),
  989. "op_1_preprocessor_demoE_0_records_out_total": int64(6),
  990. "op_2_preprocessor_demo1E_0_exceptions_total": int64(0),
  991. "op_2_preprocessor_demo1E_0_process_latency_us": int64(0),
  992. "op_2_preprocessor_demo1E_0_records_in_total": int64(6),
  993. "op_2_preprocessor_demo1E_0_records_out_total": int64(6),
  994. "op_5_project_0_exceptions_total": int64(0),
  995. "op_5_project_0_process_latency_us": int64(0),
  996. "op_5_project_0_records_in_total": int64(5),
  997. "op_5_project_0_records_out_total": int64(5),
  998. "sink_mockSink_0_exceptions_total": int64(0),
  999. "sink_mockSink_0_records_in_total": int64(5),
  1000. "sink_mockSink_0_records_out_total": int64(5),
  1001. "source_demoE_0_exceptions_total": int64(0),
  1002. "source_demoE_0_records_in_total": int64(6),
  1003. "source_demoE_0_records_out_total": int64(6),
  1004. "source_demo1E_0_exceptions_total": int64(0),
  1005. "source_demo1E_0_records_in_total": int64(6),
  1006. "source_demo1E_0_records_out_total": int64(6),
  1007. "op_3_window_0_exceptions_total": int64(0),
  1008. "op_3_window_0_records_in_total": int64(12),
  1009. "op_3_window_0_records_out_total": int64(5),
  1010. "op_4_join_0_exceptions_total": int64(0),
  1011. "op_4_join_0_process_latency_us": int64(0),
  1012. "op_4_join_0_records_in_total": int64(5),
  1013. "op_4_join_0_records_out_total": int64(5),
  1014. },
  1015. }, {
  1016. Name: `TestEventWindowRule7`,
  1017. Sql: `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1018. R: [][]map[string]interface{}{
  1019. {{
  1020. "error": "error in preprocessor: invalid data type for color, expect string but found int(2)",
  1021. }},
  1022. {{
  1023. "color": "red",
  1024. "size": float64(3),
  1025. "ts": float64(1541152486013),
  1026. }},
  1027. {{
  1028. "color": "red",
  1029. "size": float64(3),
  1030. "ts": float64(1541152486013),
  1031. }},
  1032. {{
  1033. "color": "yellow",
  1034. "size": float64(4),
  1035. "ts": float64(1541152488442),
  1036. }}, {{
  1037. "color": "yellow",
  1038. "size": float64(4),
  1039. "ts": float64(1541152488442),
  1040. }, {
  1041. "color": "red",
  1042. "size": float64(1),
  1043. "ts": float64(1541152489252),
  1044. }}, {{
  1045. "color": "red",
  1046. "size": float64(1),
  1047. "ts": float64(1541152489252),
  1048. }},
  1049. },
  1050. M: map[string]interface{}{
  1051. "op_1_preprocessor_demoErr_0_exceptions_total": int64(1),
  1052. "op_1_preprocessor_demoErr_0_process_latency_us": int64(0),
  1053. "op_1_preprocessor_demoErr_0_records_in_total": int64(6),
  1054. "op_1_preprocessor_demoErr_0_records_out_total": int64(5),
  1055. "op_3_project_0_exceptions_total": int64(1),
  1056. "op_3_project_0_process_latency_us": int64(0),
  1057. "op_3_project_0_records_in_total": int64(6),
  1058. "op_3_project_0_records_out_total": int64(5),
  1059. "sink_mockSink_0_exceptions_total": int64(0),
  1060. "sink_mockSink_0_records_in_total": int64(6),
  1061. "sink_mockSink_0_records_out_total": int64(6),
  1062. "source_demoErr_0_exceptions_total": int64(0),
  1063. "source_demoErr_0_records_in_total": int64(6),
  1064. "source_demoErr_0_records_out_total": int64(6),
  1065. "op_2_window_0_exceptions_total": int64(1),
  1066. "op_2_window_0_process_latency_us": int64(0),
  1067. "op_2_window_0_records_in_total": int64(6),
  1068. "op_2_window_0_records_out_total": int64(5),
  1069. },
  1070. }, {
  1071. Name: `TestEventWindowRule8`,
  1072. Sql: `SELECT temp, window_start(), window_end() FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1073. R: [][]map[string]interface{}{
  1074. {{
  1075. "temp": 25.5,
  1076. "window_start": float64(1541152486013),
  1077. "window_end": float64(1541152487013),
  1078. }}, {{
  1079. "temp": 28.1,
  1080. "window_start": float64(1541152487932),
  1081. "window_end": float64(1541152490000),
  1082. }}, {{
  1083. "temp": 26.2,
  1084. "window_start": float64(1541152490000),
  1085. "window_end": float64(1541152494000),
  1086. }}, {{
  1087. "temp": 30.9,
  1088. "window_start": float64(1541152494000),
  1089. "window_end": float64(1541152495112),
  1090. }},
  1091. },
  1092. M: map[string]interface{}{
  1093. "op_1_preprocessor_sessionDemoE_0_exceptions_total": int64(0),
  1094. "op_1_preprocessor_sessionDemoE_0_process_latency_us": int64(0),
  1095. "op_1_preprocessor_sessionDemoE_0_records_in_total": int64(12),
  1096. "op_1_preprocessor_sessionDemoE_0_records_out_total": int64(12),
  1097. "op_3_project_0_exceptions_total": int64(0),
  1098. "op_3_project_0_process_latency_us": int64(0),
  1099. "op_3_project_0_records_in_total": int64(4),
  1100. "op_3_project_0_records_out_total": int64(4),
  1101. "sink_mockSink_0_exceptions_total": int64(0),
  1102. "sink_mockSink_0_records_in_total": int64(4),
  1103. "sink_mockSink_0_records_out_total": int64(4),
  1104. "source_sessionDemoE_0_exceptions_total": int64(0),
  1105. "source_sessionDemoE_0_records_in_total": int64(12),
  1106. "source_sessionDemoE_0_records_out_total": int64(12),
  1107. "op_2_window_0_exceptions_total": int64(0),
  1108. "op_2_window_0_process_latency_us": int64(0),
  1109. "op_2_window_0_records_in_total": int64(12),
  1110. "op_2_window_0_records_out_total": int64(4),
  1111. },
  1112. }, {
  1113. Name: `TestEventWindowRule9`,
  1114. Sql: `SELECT window_end(), color, window_start() FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1115. R: [][]map[string]interface{}{
  1116. {{
  1117. "color": "red",
  1118. "window_start": float64(1541152485000),
  1119. "window_end": float64(1541152487000),
  1120. }},
  1121. {{
  1122. "color": "red",
  1123. "window_start": float64(1541152486000),
  1124. "window_end": float64(1541152488000),
  1125. }},
  1126. {{
  1127. "color": "blue",
  1128. "window_start": float64(1541152487000),
  1129. "window_end": float64(1541152489000),
  1130. }}, {{
  1131. "color": "yellow",
  1132. "window_start": float64(1541152488000),
  1133. "window_end": float64(1541152490000),
  1134. }}, {{
  1135. "color": "red",
  1136. "window_start": float64(1541152489000),
  1137. "window_end": float64(1541152491000),
  1138. }},
  1139. },
  1140. M: map[string]interface{}{
  1141. "op_1_preprocessor_demoE_0_exceptions_total": int64(0),
  1142. "op_1_preprocessor_demoE_0_process_latency_us": int64(0),
  1143. "op_1_preprocessor_demoE_0_records_in_total": int64(6),
  1144. "op_1_preprocessor_demoE_0_records_out_total": int64(6),
  1145. "op_3_project_0_exceptions_total": int64(0),
  1146. "op_3_project_0_process_latency_us": int64(0),
  1147. "op_3_project_0_records_in_total": int64(5),
  1148. "op_3_project_0_records_out_total": int64(5),
  1149. "sink_mockSink_0_exceptions_total": int64(0),
  1150. "sink_mockSink_0_records_in_total": int64(5),
  1151. "sink_mockSink_0_records_out_total": int64(5),
  1152. "source_demoE_0_exceptions_total": int64(0),
  1153. "source_demoE_0_records_in_total": int64(6),
  1154. "source_demoE_0_records_out_total": int64(6),
  1155. "op_2_window_0_exceptions_total": int64(0),
  1156. "op_2_window_0_process_latency_us": int64(0),
  1157. "op_2_window_0_records_in_total": int64(6),
  1158. "op_2_window_0_records_out_total": int64(5),
  1159. },
  1160. },
  1161. }
  1162. HandleStream(true, streamList, t)
  1163. options := []*api.RuleOption{
  1164. {
  1165. BufferLength: 100,
  1166. SendError: true,
  1167. IsEventTime: true,
  1168. LateTol: 1000,
  1169. }, {
  1170. BufferLength: 100,
  1171. SendError: true,
  1172. Qos: api.AtLeastOnce,
  1173. CheckpointInterval: 5000,
  1174. IsEventTime: true,
  1175. LateTol: 1000,
  1176. }, {
  1177. BufferLength: 100,
  1178. SendError: true,
  1179. Qos: api.ExactlyOnce,
  1180. CheckpointInterval: 5000,
  1181. IsEventTime: true,
  1182. LateTol: 1000,
  1183. },
  1184. }
  1185. for j, opt := range options {
  1186. DoRuleTest(t, tests, j, opt, 10)
  1187. }
  1188. }
  1189. func TestWindowError(t *testing.T) {
  1190. //Reset
  1191. streamList := []string{"ldemo", "ldemo1"}
  1192. HandleStream(false, streamList, t)
  1193. var tests = []RuleTest{
  1194. {
  1195. Name: `TestWindowErrorRule1`,
  1196. Sql: `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
  1197. R: [][]map[string]interface{}{
  1198. {{
  1199. "error": "run Select error: invalid operation string(string) * int64(3)",
  1200. }}, {{
  1201. "kuiper_field_0": float64(6),
  1202. }, {}},
  1203. },
  1204. M: map[string]interface{}{
  1205. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  1206. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  1207. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  1208. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  1209. "op_3_project_0_exceptions_total": int64(1),
  1210. "op_3_project_0_process_latency_us": int64(0),
  1211. "op_3_project_0_records_in_total": int64(2),
  1212. "op_3_project_0_records_out_total": int64(1),
  1213. "sink_mockSink_0_exceptions_total": int64(0),
  1214. "sink_mockSink_0_records_in_total": int64(2),
  1215. "sink_mockSink_0_records_out_total": int64(2),
  1216. "source_ldemo_0_exceptions_total": int64(0),
  1217. "source_ldemo_0_records_in_total": int64(5),
  1218. "source_ldemo_0_records_out_total": int64(5),
  1219. "op_2_window_0_exceptions_total": int64(0),
  1220. "op_2_window_0_process_latency_us": int64(0),
  1221. "op_2_window_0_records_in_total": int64(5),
  1222. "op_2_window_0_records_out_total": int64(2),
  1223. },
  1224. }, {
  1225. Name: `TestWindowErrorRule2`,
  1226. Sql: `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1227. R: [][]map[string]interface{}{
  1228. {{
  1229. "error": "run Where error: invalid operation string(string) > int64(2)",
  1230. }}, {{
  1231. "color": "red",
  1232. "ts": float64(1541152486013),
  1233. }}, {{
  1234. "ts": float64(1541152487632),
  1235. }},
  1236. },
  1237. M: map[string]interface{}{
  1238. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  1239. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  1240. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  1241. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  1242. "op_4_project_0_exceptions_total": int64(1),
  1243. "op_4_project_0_process_latency_us": int64(0),
  1244. "op_4_project_0_records_in_total": int64(3),
  1245. "op_4_project_0_records_out_total": int64(2),
  1246. "sink_mockSink_0_exceptions_total": int64(0),
  1247. "sink_mockSink_0_records_in_total": int64(3),
  1248. "sink_mockSink_0_records_out_total": int64(3),
  1249. "source_ldemo_0_exceptions_total": int64(0),
  1250. "source_ldemo_0_records_in_total": int64(5),
  1251. "source_ldemo_0_records_out_total": int64(5),
  1252. "op_3_window_0_exceptions_total": int64(1),
  1253. "op_3_window_0_process_latency_us": int64(0),
  1254. "op_3_window_0_records_in_total": int64(3),
  1255. "op_3_window_0_records_out_total": int64(2),
  1256. "op_2_filter_0_exceptions_total": int64(1),
  1257. "op_2_filter_0_process_latency_us": int64(0),
  1258. "op_2_filter_0_records_in_total": int64(5),
  1259. "op_2_filter_0_records_out_total": int64(2),
  1260. },
  1261. }, {
  1262. Name: `TestWindowErrorRule3`,
  1263. Sql: `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1264. R: [][]map[string]interface{}{
  1265. {{
  1266. "color": "red",
  1267. "temp": 25.5,
  1268. "ts": float64(1541152486013),
  1269. }}, {{
  1270. "color": "red",
  1271. "temp": 25.5,
  1272. "ts": float64(1541152486013),
  1273. }}, {{
  1274. "color": "red",
  1275. "temp": 25.5,
  1276. "ts": float64(1541152486013),
  1277. }}, {{
  1278. "temp": 28.1,
  1279. "ts": float64(1541152487632),
  1280. }}, {{
  1281. "temp": 28.1,
  1282. "ts": float64(1541152487632),
  1283. }}, {{
  1284. "error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
  1285. }}, {{
  1286. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1287. }}, {{
  1288. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1289. }},
  1290. },
  1291. M: map[string]interface{}{
  1292. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  1293. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  1294. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  1295. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  1296. "op_2_preprocessor_ldemo1_0_exceptions_total": int64(0),
  1297. "op_2_preprocessor_ldemo1_0_process_latency_us": int64(0),
  1298. "op_2_preprocessor_ldemo1_0_records_in_total": int64(5),
  1299. "op_2_preprocessor_ldemo1_0_records_out_total": int64(5),
  1300. "op_5_project_0_exceptions_total": int64(3),
  1301. "op_5_project_0_process_latency_us": int64(0),
  1302. "op_5_project_0_records_in_total": int64(8),
  1303. "op_5_project_0_records_out_total": int64(5),
  1304. "sink_mockSink_0_exceptions_total": int64(0),
  1305. "sink_mockSink_0_records_in_total": int64(8),
  1306. "sink_mockSink_0_records_out_total": int64(8),
  1307. "source_ldemo_0_exceptions_total": int64(0),
  1308. "source_ldemo_0_records_in_total": int64(5),
  1309. "source_ldemo_0_records_out_total": int64(5),
  1310. "source_ldemo1_0_exceptions_total": int64(0),
  1311. "source_ldemo1_0_records_in_total": int64(5),
  1312. "source_ldemo1_0_records_out_total": int64(5),
  1313. "op_3_window_0_exceptions_total": int64(0),
  1314. "op_3_window_0_process_latency_us": int64(0),
  1315. "op_3_window_0_records_in_total": int64(10),
  1316. "op_3_window_0_records_out_total": int64(10),
  1317. "op_4_join_0_exceptions_total": int64(3),
  1318. "op_4_join_0_process_latency_us": int64(0),
  1319. "op_4_join_0_records_in_total": int64(10),
  1320. "op_4_join_0_records_out_total": int64(5),
  1321. },
  1322. }, {
  1323. Name: `TestWindowErrorRule4`,
  1324. Sql: `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having collect(size)[0] >= 2 order by color`,
  1325. R: [][]map[string]interface{}{
  1326. {{
  1327. "color": "red",
  1328. }}, {{
  1329. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1330. }}, {{
  1331. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1332. }}, {{
  1333. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1334. }}, {{
  1335. "color": float64(49),
  1336. }, {}},
  1337. },
  1338. M: map[string]interface{}{
  1339. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  1340. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  1341. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  1342. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  1343. "op_6_project_0_exceptions_total": int64(3),
  1344. "op_6_project_0_process_latency_us": int64(0),
  1345. "op_6_project_0_records_in_total": int64(5),
  1346. "op_6_project_0_records_out_total": int64(2),
  1347. "sink_mockSink_0_exceptions_total": int64(0),
  1348. "sink_mockSink_0_records_in_total": int64(5),
  1349. "sink_mockSink_0_records_out_total": int64(5),
  1350. "source_ldemo_0_exceptions_total": int64(0),
  1351. "source_ldemo_0_records_in_total": int64(5),
  1352. "source_ldemo_0_records_out_total": int64(5),
  1353. "op_2_window_0_exceptions_total": int64(0),
  1354. "op_2_window_0_process_latency_us": int64(0),
  1355. "op_2_window_0_records_in_total": int64(5),
  1356. "op_2_window_0_records_out_total": int64(5),
  1357. "op_3_aggregate_0_exceptions_total": int64(0),
  1358. "op_3_aggregate_0_process_latency_us": int64(0),
  1359. "op_3_aggregate_0_records_in_total": int64(5),
  1360. "op_3_aggregate_0_records_out_total": int64(5),
  1361. "op_4_having_0_exceptions_total": int64(3),
  1362. "op_4_having_0_process_latency_us": int64(0),
  1363. "op_4_having_0_records_in_total": int64(5),
  1364. "op_4_having_0_records_out_total": int64(2),
  1365. },
  1366. }, {
  1367. Name: `TestWindowErrorRule5`,
  1368. Sql: `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
  1369. R: [][]map[string]interface{}{
  1370. {{
  1371. "error": "run Order By error: incompatible types for comparison: int and string",
  1372. }}, {{
  1373. "size": float64(3),
  1374. }}, {{
  1375. "color": float64(49),
  1376. "size": float64(2),
  1377. }}, {{
  1378. "color": "red",
  1379. }},
  1380. },
  1381. M: map[string]interface{}{
  1382. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  1383. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  1384. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  1385. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  1386. "op_4_project_0_exceptions_total": int64(1),
  1387. "op_4_project_0_process_latency_us": int64(0),
  1388. "op_4_project_0_records_in_total": int64(4),
  1389. "op_4_project_0_records_out_total": int64(3),
  1390. "sink_mockSink_0_exceptions_total": int64(0),
  1391. "sink_mockSink_0_records_in_total": int64(4),
  1392. "sink_mockSink_0_records_out_total": int64(4),
  1393. "source_ldemo_0_exceptions_total": int64(0),
  1394. "source_ldemo_0_records_in_total": int64(5),
  1395. "source_ldemo_0_records_out_total": int64(5),
  1396. "op_2_window_0_exceptions_total": int64(0),
  1397. "op_2_window_0_process_latency_us": int64(0),
  1398. "op_2_window_0_records_in_total": int64(5),
  1399. "op_2_window_0_records_out_total": int64(4),
  1400. "op_3_order_0_exceptions_total": int64(1),
  1401. "op_3_order_0_process_latency_us": int64(0),
  1402. "op_3_order_0_records_in_total": int64(4),
  1403. "op_3_order_0_records_out_total": int64(3),
  1404. },
  1405. },
  1406. }
  1407. HandleStream(true, streamList, t)
  1408. DoRuleTest(t, tests, 0, &api.RuleOption{
  1409. BufferLength: 100,
  1410. SendError: true,
  1411. }, 0)
  1412. }