window_rule_test.go 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "testing"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. )
  19. func TestWindow(t *testing.T) {
  20. // Reset
  21. streamList := []string{"demo", "demoError", "demo1", "sessionDemo", "table1"}
  22. HandleStream(false, streamList, t)
  23. tests := []RuleTest{
  24. {
  25. Name: `TestWindowRule0`,
  26. Sql: `SELECT size,color FROM demo GROUP BY SlidingWindow(ss, 5) Filter (where color = "red") Over (when size = 1)`,
  27. R: [][]map[string]interface{}{
  28. {
  29. {
  30. "size": float64(3),
  31. "color": "red",
  32. },
  33. {
  34. "size": float64(1),
  35. "color": "red",
  36. },
  37. },
  38. },
  39. M: map[string]interface{}{},
  40. },
  41. {
  42. Name: `TestWindowRule1`,
  43. Sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  44. R: [][]map[string]interface{}{
  45. {{
  46. "color": "red",
  47. "size": float64(3),
  48. "ts": float64(1541152486013),
  49. }, {
  50. "color": "blue",
  51. "size": float64(6),
  52. "ts": float64(1541152486822),
  53. }},
  54. {{
  55. "color": "red",
  56. "size": float64(3),
  57. "ts": float64(1541152486013),
  58. }, {
  59. "color": "blue",
  60. "size": float64(6),
  61. "ts": float64(1541152486822),
  62. }, {
  63. "color": "blue",
  64. "size": float64(2),
  65. "ts": float64(1541152487632),
  66. }},
  67. {{
  68. "color": "blue",
  69. "size": float64(2),
  70. "ts": float64(1541152487632),
  71. }, {
  72. "color": "yellow",
  73. "size": float64(4),
  74. "ts": float64(1541152488442),
  75. }},
  76. {{
  77. "color": "yellow",
  78. "size": float64(4),
  79. "ts": float64(1541152488442),
  80. }, {
  81. "color": "red",
  82. "size": float64(1),
  83. "ts": float64(1541152489252),
  84. }},
  85. },
  86. M: map[string]interface{}{
  87. "op_3_project_0_exceptions_total": int64(0),
  88. "op_3_project_0_process_latency_us": int64(0),
  89. "op_3_project_0_records_in_total": int64(4),
  90. "op_3_project_0_records_out_total": int64(4),
  91. "sink_mockSink_0_exceptions_total": int64(0),
  92. "sink_mockSink_0_records_in_total": int64(4),
  93. "sink_mockSink_0_records_out_total": int64(4),
  94. "source_demo_0_exceptions_total": int64(0),
  95. "source_demo_0_records_in_total": int64(5),
  96. "source_demo_0_records_out_total": int64(5),
  97. "op_2_window_0_exceptions_total": int64(0),
  98. "op_2_window_0_process_latency_us": int64(0),
  99. "op_2_window_0_records_in_total": int64(5),
  100. "op_2_window_0_records_out_total": int64(4),
  101. },
  102. },
  103. {
  104. Name: `TestWindowRule2`,
  105. Sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  106. R: [][]map[string]interface{}{
  107. {{
  108. "color": "red",
  109. "ts": float64(1541152486013),
  110. }, {
  111. "color": "blue",
  112. "ts": float64(1541152486822),
  113. }},
  114. {},
  115. {{
  116. "color": "yellow",
  117. "ts": float64(1541152488442),
  118. }},
  119. {},
  120. },
  121. M: map[string]interface{}{
  122. "op_4_project_0_exceptions_total": int64(0),
  123. "op_4_project_0_process_latency_us": int64(0),
  124. "op_4_project_0_records_in_total": int64(4),
  125. "op_4_project_0_records_out_total": int64(4),
  126. "sink_mockSink_0_exceptions_total": int64(0),
  127. "sink_mockSink_0_records_in_total": int64(4),
  128. "sink_mockSink_0_records_out_total": int64(4),
  129. "source_demo_0_exceptions_total": int64(0),
  130. "source_demo_0_records_in_total": int64(5),
  131. "source_demo_0_records_out_total": int64(5),
  132. "op_3_window_0_exceptions_total": int64(0),
  133. "op_3_window_0_process_latency_us": int64(0),
  134. "op_3_window_0_records_in_total": int64(3),
  135. "op_3_window_0_records_out_total": int64(4),
  136. "op_2_filter_0_exceptions_total": int64(0),
  137. "op_2_filter_0_process_latency_us": int64(0),
  138. "op_2_filter_0_records_in_total": int64(5),
  139. "op_2_filter_0_records_out_total": int64(3),
  140. },
  141. },
  142. {
  143. Name: `TestWindowRule3`,
  144. Sql: `SELECT color, temp, demo.ts as ts1, demo1.ts as ts2, demo.ts - demo1.ts as diff FROM demo INNER JOIN demo1 ON ts1 = ts2 GROUP BY SlidingWindow(ss, 1)`,
  145. R: [][]map[string]interface{}{
  146. {{
  147. "color": "red",
  148. "temp": 25.5,
  149. "ts1": float64(1541152486013),
  150. "ts2": float64(1541152486013),
  151. "diff": float64(0),
  152. }}, {{
  153. "color": "red",
  154. "temp": 25.5,
  155. "ts1": float64(1541152486013),
  156. "ts2": float64(1541152486013),
  157. "diff": float64(0),
  158. }}, {{
  159. "color": "red",
  160. "temp": 25.5,
  161. "ts1": float64(1541152486013),
  162. "ts2": float64(1541152486013),
  163. "diff": float64(0),
  164. }}, {{
  165. "color": "blue",
  166. "temp": 28.1,
  167. "ts1": float64(1541152487632),
  168. "ts2": float64(1541152487632),
  169. "diff": float64(0),
  170. }}, {{
  171. "color": "blue",
  172. "temp": 28.1,
  173. "ts1": float64(1541152487632),
  174. "ts2": float64(1541152487632),
  175. "diff": float64(0),
  176. }}, {{
  177. "color": "blue",
  178. "temp": 28.1,
  179. "ts1": float64(1541152487632),
  180. "ts2": float64(1541152487632),
  181. "diff": float64(0),
  182. }, {
  183. "color": "yellow",
  184. "temp": 27.4,
  185. "ts1": float64(1541152488442),
  186. "ts2": float64(1541152488442),
  187. "diff": float64(0),
  188. }}, {{
  189. "color": "yellow",
  190. "temp": 27.4,
  191. "ts1": float64(1541152488442),
  192. "ts2": float64(1541152488442),
  193. "diff": float64(0),
  194. }}, {{
  195. "color": "yellow",
  196. "temp": 27.4,
  197. "ts1": float64(1541152488442),
  198. "ts2": float64(1541152488442),
  199. "diff": float64(0),
  200. }, {
  201. "color": "red",
  202. "temp": 25.5,
  203. "ts1": float64(1541152489252),
  204. "ts2": float64(1541152489252),
  205. "diff": float64(0),
  206. }},
  207. },
  208. M: map[string]interface{}{
  209. "op_5_project_0_exceptions_total": int64(0),
  210. "op_5_project_0_process_latency_us": int64(0),
  211. "op_5_project_0_records_in_total": int64(8),
  212. "op_5_project_0_records_out_total": int64(8),
  213. "sink_mockSink_0_exceptions_total": int64(0),
  214. "sink_mockSink_0_records_in_total": int64(8),
  215. "sink_mockSink_0_records_out_total": int64(8),
  216. "source_demo_0_exceptions_total": int64(0),
  217. "source_demo_0_records_in_total": int64(5),
  218. "source_demo_0_records_out_total": int64(5),
  219. "source_demo1_0_exceptions_total": int64(0),
  220. "source_demo1_0_records_in_total": int64(5),
  221. "source_demo1_0_records_out_total": int64(5),
  222. "op_3_window_0_exceptions_total": int64(0),
  223. "op_3_window_0_process_latency_us": int64(0),
  224. "op_3_window_0_records_in_total": int64(10),
  225. "op_3_window_0_records_out_total": int64(10),
  226. "op_4_join_0_exceptions_total": int64(0),
  227. "op_4_join_0_process_latency_us": int64(0),
  228. "op_4_join_0_records_in_total": int64(10),
  229. "op_4_join_0_records_out_total": int64(8),
  230. },
  231. T: &api.PrintableTopo{
  232. Sources: []string{"source_demo", "source_demo1"},
  233. Edges: map[string][]interface{}{
  234. "source_demo": {"op_3_window"},
  235. "source_demo1": {"op_3_window"},
  236. "op_3_window": {"op_4_join"},
  237. "op_4_join": {"op_5_project"},
  238. "op_5_project": {"sink_mockSink"},
  239. },
  240. },
  241. },
  242. {
  243. Name: `TestWindowRule4`,
  244. Sql: `SELECT color, count(*) as c FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  245. R: [][]map[string]interface{}{
  246. {{
  247. "color": "red",
  248. "c": float64(1),
  249. }}, {{
  250. "color": "blue",
  251. "c": float64(1),
  252. }, {
  253. "color": "red",
  254. "c": float64(1),
  255. }}, {{
  256. "color": "blue",
  257. "c": float64(2),
  258. }, {
  259. "color": "red",
  260. "c": float64(1),
  261. }}, {{
  262. "color": "blue",
  263. "c": float64(2),
  264. }, {
  265. "color": "yellow",
  266. "c": float64(1),
  267. }}, {{
  268. "color": "blue",
  269. "c": float64(1),
  270. }, {
  271. "color": "red",
  272. "c": float64(1),
  273. }, {
  274. "color": "yellow",
  275. "c": float64(1),
  276. }},
  277. },
  278. M: map[string]interface{}{
  279. "op_5_project_0_exceptions_total": int64(0),
  280. "op_5_project_0_process_latency_us": int64(0),
  281. "op_5_project_0_records_in_total": int64(5),
  282. "op_5_project_0_records_out_total": int64(5),
  283. "sink_mockSink_0_exceptions_total": int64(0),
  284. "sink_mockSink_0_records_in_total": int64(5),
  285. "sink_mockSink_0_records_out_total": int64(5),
  286. "source_demo_0_exceptions_total": int64(0),
  287. "source_demo_0_records_in_total": int64(5),
  288. "source_demo_0_records_out_total": int64(5),
  289. "op_2_window_0_exceptions_total": int64(0),
  290. "op_2_window_0_process_latency_us": int64(0),
  291. "op_2_window_0_records_in_total": int64(5),
  292. "op_2_window_0_records_out_total": int64(5),
  293. "op_3_aggregate_0_exceptions_total": int64(0),
  294. "op_3_aggregate_0_process_latency_us": int64(0),
  295. "op_3_aggregate_0_records_in_total": int64(5),
  296. "op_3_aggregate_0_records_out_total": int64(5),
  297. "op_4_order_0_exceptions_total": int64(0),
  298. "op_4_order_0_process_latency_us": int64(0),
  299. "op_4_order_0_records_in_total": int64(5),
  300. "op_4_order_0_records_out_total": int64(5),
  301. },
  302. },
  303. {
  304. Name: `TestWindowRule5`,
  305. Sql: `SELECT count(temp), window_start() as ws, window_end() FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  306. R: [][]map[string]interface{}{
  307. {{
  308. "count": float64(2),
  309. "ws": float64(1541152486013),
  310. "window_end": float64(1541152487823), // timeout
  311. }}, {{
  312. "count": float64(3),
  313. "ws": float64(1541152487932),
  314. "window_end": float64(1541152490000), // tick
  315. }}, {{
  316. "count": float64(5),
  317. "ws": float64(1541152490000),
  318. "window_end": float64(1541152494000), // tick
  319. }}, {{
  320. "count": float64(1),
  321. "ws": float64(1541152494000),
  322. "window_end": float64(1541152495112), // timeout
  323. }},
  324. },
  325. M: map[string]interface{}{
  326. "op_3_project_0_exceptions_total": int64(0),
  327. "op_3_project_0_process_latency_us": int64(0),
  328. "op_3_project_0_records_in_total": int64(4),
  329. "op_3_project_0_records_out_total": int64(4),
  330. "sink_mockSink_0_exceptions_total": int64(0),
  331. "sink_mockSink_0_records_in_total": int64(4),
  332. "sink_mockSink_0_records_out_total": int64(4),
  333. "source_sessionDemo_0_exceptions_total": int64(0),
  334. "source_sessionDemo_0_records_in_total": int64(11),
  335. "source_sessionDemo_0_records_out_total": int64(11),
  336. "op_2_window_0_exceptions_total": int64(0),
  337. "op_2_window_0_process_latency_us": int64(0),
  338. "op_2_window_0_records_in_total": int64(11),
  339. "op_2_window_0_records_out_total": int64(4),
  340. },
  341. },
  342. {
  343. Name: `TestWindowRule6`,
  344. Sql: `SELECT window_end(), sum(temp) as temp, count(color) as c, window_start() FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  345. R: [][]map[string]interface{}{
  346. {{
  347. "temp": 25.5,
  348. "c": float64(1),
  349. "window_start": float64(1541152485115),
  350. "window_end": float64(1541152486115),
  351. }}, {{
  352. "temp": 25.5,
  353. "c": float64(1),
  354. "window_start": float64(1541152485822),
  355. "window_end": float64(1541152486822),
  356. }}, {{
  357. "temp": 25.5,
  358. "c": float64(1),
  359. "window_start": float64(1541152485903),
  360. "window_end": float64(1541152486903),
  361. }}, {{
  362. "temp": 28.1,
  363. "c": float64(1),
  364. "window_start": float64(1541152486702),
  365. "window_end": float64(1541152487702),
  366. }}, {{
  367. "temp": 28.1,
  368. "c": float64(1),
  369. "window_start": float64(1541152487442),
  370. "window_end": float64(1541152488442),
  371. }}, {{
  372. "temp": 55.5,
  373. "c": float64(2),
  374. "window_start": float64(1541152487605),
  375. "window_end": float64(1541152488605),
  376. }}, {{
  377. "temp": 27.4,
  378. "c": float64(1),
  379. "window_start": float64(1541152488252),
  380. "window_end": float64(1541152489252),
  381. }}, {{
  382. "temp": 52.9,
  383. "c": float64(2),
  384. "window_start": float64(1541152488305),
  385. "window_end": float64(1541152489305),
  386. }},
  387. },
  388. M: map[string]interface{}{
  389. "op_5_project_0_exceptions_total": int64(0),
  390. "op_5_project_0_process_latency_us": int64(0),
  391. "op_5_project_0_records_in_total": int64(8),
  392. "op_5_project_0_records_out_total": int64(8),
  393. "sink_mockSink_0_exceptions_total": int64(0),
  394. "sink_mockSink_0_records_in_total": int64(8),
  395. "sink_mockSink_0_records_out_total": int64(8),
  396. "source_demo_0_exceptions_total": int64(0),
  397. "source_demo_0_records_in_total": int64(5),
  398. "source_demo_0_records_out_total": int64(5),
  399. "source_demo1_0_exceptions_total": int64(0),
  400. "source_demo1_0_records_in_total": int64(5),
  401. "source_demo1_0_records_out_total": int64(5),
  402. "op_3_window_0_exceptions_total": int64(0),
  403. "op_3_window_0_process_latency_us": int64(0),
  404. "op_3_window_0_records_in_total": int64(10),
  405. "op_3_window_0_records_out_total": int64(10),
  406. "op_4_join_0_exceptions_total": int64(0),
  407. "op_4_join_0_process_latency_us": int64(0),
  408. "op_4_join_0_records_in_total": int64(10),
  409. "op_4_join_0_records_out_total": int64(8),
  410. },
  411. },
  412. {
  413. Name: `TestWindowRule7`,
  414. Sql: `SELECT * FROM demoError GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  415. R: [][]map[string]interface{}{
  416. {{
  417. "error": "error in preprocessor: field size type mismatch: cannot convert string(red) to int64",
  418. }},
  419. {{
  420. "color": "blue",
  421. "size": float64(6),
  422. "ts": float64(1541152486822),
  423. }},
  424. {{
  425. "color": "blue",
  426. "size": float64(6),
  427. "ts": float64(1541152486822),
  428. }, {
  429. "color": "blue",
  430. "size": float64(2),
  431. "ts": float64(1541152487632),
  432. }},
  433. {{
  434. "error": "error in preprocessor: field color type mismatch: cannot convert int(7) to string",
  435. }},
  436. {{
  437. "color": "blue",
  438. "size": float64(2),
  439. "ts": float64(1541152487632),
  440. }},
  441. {{
  442. "error": "error in preprocessor: field size type mismatch: cannot convert string(blue) to int64",
  443. }},
  444. {},
  445. },
  446. M: map[string]interface{}{
  447. "op_3_project_0_exceptions_total": int64(3),
  448. "op_3_project_0_process_latency_us": int64(0),
  449. "op_3_project_0_records_in_total": int64(4),
  450. "op_3_project_0_records_out_total": int64(4),
  451. "sink_mockSink_0_exceptions_total": int64(0),
  452. "sink_mockSink_0_records_in_total": int64(7),
  453. "sink_mockSink_0_records_out_total": int64(7),
  454. "source_demoError_0_exceptions_total": int64(3),
  455. "source_demoError_0_records_in_total": int64(5),
  456. "source_demoError_0_records_out_total": int64(2),
  457. "op_2_window_0_exceptions_total": int64(3),
  458. "op_2_window_0_process_latency_us": int64(0),
  459. "op_2_window_0_records_in_total": int64(5),
  460. "op_2_window_0_records_out_total": int64(4),
  461. },
  462. },
  463. {
  464. Name: `TestWindowRule8`,
  465. Sql: `SELECT color, window_end(), ts, count(*) as c, window_start() FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
  466. R: [][]map[string]interface{}{
  467. {{
  468. "color": "red",
  469. "ts": float64(1541152486013),
  470. "c": float64(2),
  471. "window_start": float64(1541152486000),
  472. "window_end": float64(1541152487000),
  473. }},
  474. },
  475. M: map[string]interface{}{
  476. "op_5_project_0_exceptions_total": int64(0),
  477. "op_5_project_0_process_latency_us": int64(0),
  478. "op_5_project_0_records_in_total": int64(1),
  479. "op_5_project_0_records_out_total": int64(1),
  480. "sink_mockSink_0_exceptions_total": int64(0),
  481. "sink_mockSink_0_records_in_total": int64(1),
  482. "sink_mockSink_0_records_out_total": int64(1),
  483. "source_demo_0_exceptions_total": int64(0),
  484. "source_demo_0_records_in_total": int64(5),
  485. "source_demo_0_records_out_total": int64(5),
  486. "op_3_window_0_exceptions_total": int64(0),
  487. "op_3_window_0_process_latency_us": int64(0),
  488. "op_3_window_0_records_in_total": int64(3),
  489. "op_3_window_0_records_out_total": int64(4),
  490. "op_2_filter_0_exceptions_total": int64(0),
  491. "op_2_filter_0_process_latency_us": int64(0),
  492. "op_2_filter_0_records_in_total": int64(5),
  493. "op_2_filter_0_records_out_total": int64(3),
  494. "op_4_having_0_exceptions_total": int64(0),
  495. "op_4_having_0_process_latency_us": int64(0),
  496. "op_4_having_0_records_in_total": int64(4),
  497. "op_4_having_0_records_out_total": int64(1),
  498. },
  499. },
  500. {
  501. Name: `TestWindowRule9`,
  502. Sql: `SELECT color, window_start(), window_end() FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1) FILTER( WHERE size > 2)`,
  503. R: [][]map[string]interface{}{
  504. {{
  505. "color": "red",
  506. "window_start": float64(1541152485000),
  507. "window_end": float64(1541152487000),
  508. }, {
  509. "color": "blue",
  510. "window_start": float64(1541152485000),
  511. "window_end": float64(1541152487000),
  512. }},
  513. {{
  514. "color": "red",
  515. "window_start": float64(1541152486000),
  516. "window_end": float64(1541152488000),
  517. }, {
  518. "color": "blue",
  519. "window_start": float64(1541152486000),
  520. "window_end": float64(1541152488000),
  521. }},
  522. {{
  523. "color": "yellow",
  524. "window_start": float64(1541152487000),
  525. "window_end": float64(1541152489000),
  526. }},
  527. {{
  528. "color": "yellow",
  529. "window_start": float64(1541152488000),
  530. "window_end": float64(1541152490000),
  531. }},
  532. },
  533. M: map[string]interface{}{
  534. "op_4_project_0_exceptions_total": int64(0),
  535. "op_4_project_0_process_latency_us": int64(0),
  536. "op_4_project_0_records_in_total": int64(4),
  537. "op_4_project_0_records_out_total": int64(4),
  538. "sink_mockSink_0_exceptions_total": int64(0),
  539. "sink_mockSink_0_records_in_total": int64(4),
  540. "sink_mockSink_0_records_out_total": int64(4),
  541. "source_demo_0_exceptions_total": int64(0),
  542. "source_demo_0_records_in_total": int64(5),
  543. "source_demo_0_records_out_total": int64(5),
  544. "op_3_window_0_exceptions_total": int64(0),
  545. "op_3_window_0_process_latency_us": int64(0),
  546. "op_3_window_0_records_in_total": int64(3),
  547. "op_3_window_0_records_out_total": int64(4),
  548. },
  549. },
  550. {
  551. Name: `TestCountWindowRule1`,
  552. Sql: `SELECT collect(*)[0]->color as c, window_end() as we FROM demo GROUP BY COUNTWINDOW(3)`,
  553. R: [][]map[string]interface{}{
  554. {{
  555. "c": "red",
  556. "we": 1.541152487632e+12,
  557. }},
  558. },
  559. M: map[string]interface{}{
  560. "op_3_project_0_exceptions_total": int64(0),
  561. "op_3_project_0_process_latency_us": int64(0),
  562. "op_3_project_0_records_in_total": int64(1),
  563. "op_3_project_0_records_out_total": int64(1),
  564. "sink_mockSink_0_exceptions_total": int64(0),
  565. "sink_mockSink_0_records_in_total": int64(1),
  566. "sink_mockSink_0_records_out_total": int64(1),
  567. "source_demo_0_exceptions_total": int64(0),
  568. "source_demo_0_records_in_total": int64(5),
  569. "source_demo_0_records_out_total": int64(5),
  570. "op_2_window_0_exceptions_total": int64(0),
  571. "op_2_window_0_process_latency_us": int64(0),
  572. "op_2_window_0_records_in_total": int64(5),
  573. "op_2_window_0_records_out_total": int64(1),
  574. },
  575. },
  576. {
  577. Name: `TestWindowRule10`,
  578. Sql: `SELECT deduplicate(color, false)->color as c FROM demo GROUP BY SlidingWindow(hh, 1)`,
  579. R: [][]map[string]interface{}{
  580. {{
  581. "c": "red",
  582. }}, {{
  583. "c": "blue",
  584. }}, {{}}, {{
  585. "c": "yellow",
  586. }}, {{}},
  587. },
  588. M: map[string]interface{}{
  589. "op_3_project_0_exceptions_total": int64(0),
  590. "op_3_project_0_process_latency_us": int64(0),
  591. "op_3_project_0_records_in_total": int64(5),
  592. "op_3_project_0_records_out_total": int64(5),
  593. "sink_mockSink_0_exceptions_total": int64(0),
  594. "sink_mockSink_0_records_in_total": int64(5),
  595. "sink_mockSink_0_records_out_total": int64(5),
  596. "source_demo_0_exceptions_total": int64(0),
  597. "source_demo_0_records_in_total": int64(5),
  598. "source_demo_0_records_out_total": int64(5),
  599. "op_2_window_0_exceptions_total": int64(0),
  600. "op_2_window_0_process_latency_us": int64(0),
  601. "op_2_window_0_records_in_total": int64(5),
  602. "op_2_window_0_records_out_total": int64(5),
  603. },
  604. },
  605. {
  606. Name: `TestWindowRule11`,
  607. Sql: `SELECT color, name, window_start(), window_end() FROM demo INNER JOIN table1 on demo.ts = table1.id where demo.size > 2 and table1.size > 1 GROUP BY tumblingwindow(ss, 1)`,
  608. R: [][]map[string]interface{}{
  609. {{
  610. "color": "red",
  611. "name": "name1",
  612. "window_start": float64(1541152486000),
  613. "window_end": float64(1541152487000),
  614. }},
  615. },
  616. M: map[string]interface{}{
  617. "op_2_window_0_exceptions_total": int64(0),
  618. "op_2_window_0_process_latency_us": int64(0),
  619. "op_2_window_0_records_in_total": int64(5),
  620. "op_2_window_0_records_out_total": int64(4),
  621. "op_4_join_aligner_0_records_in_total": int64(5),
  622. "op_4_join_aligner_0_records_out_total": int64(4),
  623. "op_5_join_0_exceptions_total": int64(0),
  624. "op_5_join_0_records_in_total": int64(4),
  625. "op_5_join_0_records_out_total": int64(1),
  626. "op_6_project_0_exceptions_total": int64(0),
  627. "op_6_project_0_records_in_total": int64(1),
  628. "op_6_project_0_records_out_total": int64(1),
  629. "sink_mockSink_0_exceptions_total": int64(0),
  630. "sink_mockSink_0_records_in_total": int64(1),
  631. "sink_mockSink_0_records_out_total": int64(1),
  632. "source_demo_0_exceptions_total": int64(0),
  633. "source_demo_0_records_in_total": int64(5),
  634. "source_demo_0_records_out_total": int64(5),
  635. "source_table1_0_exceptions_total": int64(0),
  636. "source_table1_0_records_in_total": int64(4),
  637. "source_table1_0_records_out_total": int64(1),
  638. },
  639. },
  640. {
  641. Name: `TestWindowRule12`,
  642. Sql: `SELECT collect(size) as allSize FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1), color ORDER BY color`,
  643. R: [][]map[string]interface{}{
  644. {{
  645. "allSize": []interface{}{float64(6)},
  646. }, {
  647. "allSize": []interface{}{float64(3)},
  648. }},
  649. {{
  650. "allSize": []interface{}{float64(6), float64(2)},
  651. }, {
  652. "allSize": []interface{}{float64(3)},
  653. }},
  654. {{
  655. "allSize": []interface{}{float64(2)},
  656. }, {
  657. "allSize": []interface{}{float64(4)},
  658. }},
  659. {{
  660. "allSize": []interface{}{float64(1)},
  661. }, {
  662. "allSize": []interface{}{float64(4)},
  663. }},
  664. },
  665. M: map[string]interface{}{
  666. "sink_mockSink_0_exceptions_total": int64(0),
  667. "sink_mockSink_0_records_in_total": int64(4),
  668. "sink_mockSink_0_records_out_total": int64(4),
  669. "source_demo_0_exceptions_total": int64(0),
  670. "source_demo_0_records_in_total": int64(5),
  671. "source_demo_0_records_out_total": int64(5),
  672. "op_2_window_0_exceptions_total": int64(0),
  673. "op_2_window_0_process_latency_us": int64(0),
  674. "op_2_window_0_records_in_total": int64(5),
  675. "op_2_window_0_records_out_total": int64(4),
  676. },
  677. },
  678. {
  679. Name: `TestWindowRule13`,
  680. Sql: `SELECT color as c FROM demo GROUP BY SlidingWindow(ss, 3600,1) filter (where size = 3 )`,
  681. R: [][]map[string]interface{}{
  682. {{
  683. "c": "red",
  684. }},
  685. },
  686. M: map[string]interface{}{
  687. "op_3_project_0_exceptions_total": int64(0),
  688. "op_3_project_0_process_latency_us": int64(0),
  689. "op_3_project_0_records_in_total": int64(1),
  690. "op_3_project_0_records_out_total": int64(1),
  691. "sink_mockSink_0_exceptions_total": int64(0),
  692. "sink_mockSink_0_records_in_total": int64(1),
  693. "sink_mockSink_0_records_out_total": int64(1),
  694. "source_demo_0_exceptions_total": int64(0),
  695. "source_demo_0_records_in_total": int64(5),
  696. "source_demo_0_records_out_total": int64(5),
  697. "op_2_window_0_exceptions_total": int64(0),
  698. "op_2_window_0_process_latency_us": int64(0),
  699. "op_2_window_0_records_in_total": int64(1),
  700. "op_2_window_0_records_out_total": int64(1),
  701. },
  702. },
  703. }
  704. HandleStream(true, streamList, t)
  705. options := []*api.RuleOption{
  706. {
  707. BufferLength: 100,
  708. SendError: true,
  709. }, {
  710. BufferLength: 100,
  711. SendError: true,
  712. Qos: api.AtLeastOnce,
  713. CheckpointInterval: 5000,
  714. }, {
  715. BufferLength: 100,
  716. SendError: true,
  717. Qos: api.ExactlyOnce,
  718. CheckpointInterval: 5000,
  719. },
  720. }
  721. for j, opt := range options {
  722. DoRuleTest(t, tests, j, opt, 15)
  723. }
  724. }
  725. func TestEventWindow(t *testing.T) {
  726. // Reset
  727. streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE"}
  728. HandleStream(false, streamList, t)
  729. tests := []RuleTest{
  730. {
  731. Name: `TestEventWindowDelayRule0`,
  732. Sql: `SELECT size FROM demoE GROUP BY SlidingWindow(ss, 1,4) FILTER (where color = "red")`,
  733. R: [][]map[string]interface{}{
  734. {
  735. {
  736. "size": float64(3),
  737. },
  738. {
  739. "size": float64(1),
  740. },
  741. },
  742. },
  743. M: map[string]interface{}{
  744. "op_2_watermark_0_records_in_total": int64(6),
  745. "op_2_watermark_0_records_out_total": int64(4),
  746. "op_2_watermark_0_exceptions_total": int64(0),
  747. "op_3_windowFilter_0_records_in_total": int64(4),
  748. "op_3_windowFilter_0_records_out_total": int64(2),
  749. "op_3_windowFilter_0_exceptions_total": int64(0),
  750. "op_3_window_0_records_in_total": int64(2),
  751. "op_3_window_0_records_out_total": int64(1),
  752. "op_3_window_0_exceptions_total": int64(0),
  753. },
  754. },
  755. {
  756. Name: `TestEventWindowRule1`,
  757. Sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  758. R: [][]map[string]interface{}{
  759. {{
  760. "color": "red",
  761. "size": float64(3),
  762. "ts": float64(1541152486013),
  763. }},
  764. {{
  765. "color": "red",
  766. "size": float64(3),
  767. "ts": float64(1541152486013),
  768. }, {
  769. "color": "blue",
  770. "size": float64(2),
  771. "ts": float64(1541152487632),
  772. }},
  773. {{
  774. "color": "blue",
  775. "size": float64(2),
  776. "ts": float64(1541152487632),
  777. }, {
  778. "color": "yellow",
  779. "size": float64(4),
  780. "ts": float64(1541152488442),
  781. }},
  782. {{
  783. "color": "yellow",
  784. "size": float64(4),
  785. "ts": float64(1541152488442),
  786. }, {
  787. "color": "red",
  788. "size": float64(1),
  789. "ts": float64(1541152489252),
  790. }},
  791. {{
  792. "color": "red",
  793. "size": float64(1),
  794. "ts": float64(1541152489252),
  795. }},
  796. },
  797. M: map[string]interface{}{
  798. "op_4_project_0_exceptions_total": int64(0),
  799. "op_4_project_0_process_latency_us": int64(0),
  800. "op_4_project_0_records_in_total": int64(5),
  801. "op_4_project_0_records_out_total": int64(5),
  802. "sink_mockSink_0_exceptions_total": int64(0),
  803. "sink_mockSink_0_records_in_total": int64(5),
  804. "sink_mockSink_0_records_out_total": int64(5),
  805. "source_demoE_0_exceptions_total": int64(0),
  806. "source_demoE_0_records_in_total": int64(6),
  807. "source_demoE_0_records_out_total": int64(6),
  808. "op_3_window_0_exceptions_total": int64(0),
  809. "op_3_window_0_process_latency_us": int64(0),
  810. "op_3_window_0_records_in_total": int64(4),
  811. "op_3_window_0_records_out_total": int64(5),
  812. "op_2_watermark_0_records_in_total": int64(6),
  813. "op_2_watermark_0_records_out_total": int64(4),
  814. },
  815. },
  816. {
  817. Name: `TestEventWindowRule2`,
  818. Sql: `SELECT window_start(), window_end(), color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  819. R: [][]map[string]interface{}{
  820. {{
  821. "window_start": float64(1541152486013),
  822. "window_end": float64(1541152487000),
  823. "color": "red",
  824. "ts": float64(1541152486013),
  825. }},
  826. {{
  827. "window_start": float64(1541152488000),
  828. "window_end": float64(1541152489000),
  829. "color": "yellow",
  830. "ts": float64(1541152488442),
  831. }},
  832. },
  833. M: map[string]interface{}{
  834. "op_5_project_0_exceptions_total": int64(0),
  835. "op_5_project_0_process_latency_us": int64(0),
  836. "op_5_project_0_records_in_total": int64(2),
  837. "op_5_project_0_records_out_total": int64(2),
  838. "sink_mockSink_0_exceptions_total": int64(0),
  839. "sink_mockSink_0_records_in_total": int64(2),
  840. "sink_mockSink_0_records_out_total": int64(2),
  841. "source_demoE_0_exceptions_total": int64(0),
  842. "source_demoE_0_records_in_total": int64(6),
  843. "source_demoE_0_records_out_total": int64(6),
  844. "op_3_window_0_exceptions_total": int64(0),
  845. "op_3_window_0_process_latency_us": int64(0),
  846. "op_3_window_0_records_in_total": int64(4),
  847. "op_3_window_0_records_out_total": int64(5),
  848. "op_4_filter_0_exceptions_total": int64(0),
  849. "op_4_filter_0_process_latency_us": int64(0),
  850. "op_4_filter_0_records_in_total": int64(5),
  851. "op_4_filter_0_records_out_total": int64(2),
  852. },
  853. },
  854. {
  855. Name: `TestEventWindowRule3`,
  856. Sql: `SELECT color, temp, demoE.ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  857. R: [][]map[string]interface{}{
  858. {{
  859. "color": "red",
  860. "temp": 25.5,
  861. "ts": float64(1541152486013),
  862. }}, {{
  863. "color": "red",
  864. "temp": 25.5,
  865. "ts": float64(1541152486013),
  866. }}, {{
  867. "color": "blue",
  868. "temp": 28.1,
  869. "ts": float64(1541152487632),
  870. }}, {{
  871. "color": "blue",
  872. "temp": 28.1,
  873. "ts": float64(1541152487632),
  874. }, {
  875. "color": "yellow",
  876. "temp": 27.4,
  877. "ts": float64(1541152488442),
  878. }}, {{
  879. "color": "yellow",
  880. "temp": 27.4,
  881. "ts": float64(1541152488442),
  882. }, {
  883. "color": "red",
  884. "temp": 25.5,
  885. "ts": float64(1541152489252),
  886. }},
  887. },
  888. M: map[string]interface{}{
  889. "op_6_project_0_exceptions_total": int64(0),
  890. "op_6_project_0_process_latency_us": int64(0),
  891. "op_6_project_0_records_in_total": int64(5),
  892. "op_6_project_0_records_out_total": int64(5),
  893. "sink_mockSink_0_exceptions_total": int64(0),
  894. "sink_mockSink_0_records_in_total": int64(5),
  895. "sink_mockSink_0_records_out_total": int64(5),
  896. "source_demoE_0_exceptions_total": int64(0),
  897. "source_demoE_0_records_in_total": int64(6),
  898. "source_demoE_0_records_out_total": int64(6),
  899. "source_demo1E_0_exceptions_total": int64(0),
  900. "source_demo1E_0_records_in_total": int64(6),
  901. "source_demo1E_0_records_out_total": int64(6),
  902. "op_4_window_0_exceptions_total": int64(0),
  903. "op_4_window_0_process_latency_us": int64(0),
  904. "op_4_window_0_records_in_total": int64(9),
  905. "op_4_window_0_records_out_total": int64(5),
  906. "op_5_join_0_exceptions_total": int64(0),
  907. "op_5_join_0_process_latency_us": int64(0),
  908. "op_5_join_0_records_in_total": int64(5),
  909. "op_5_join_0_records_out_total": int64(5),
  910. },
  911. },
  912. {
  913. Name: `TestEventWindowRule4`,
  914. Sql: `SELECT window_start() as ws, color, window_end() as we FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  915. R: [][]map[string]interface{}{
  916. {{
  917. "color": "red",
  918. "ws": float64(1541152484013),
  919. "we": float64(1541152486013),
  920. }}, {{
  921. "color": "blue",
  922. "ws": float64(1541152485632),
  923. "we": float64(1541152487632),
  924. }, {
  925. "color": "red",
  926. "ws": float64(1541152485632),
  927. "we": float64(1541152487632),
  928. }}, {{
  929. "color": "blue",
  930. "ws": float64(1541152486442),
  931. "we": float64(1541152488442),
  932. }, {
  933. "color": "yellow",
  934. "ws": float64(1541152486442),
  935. "we": float64(1541152488442),
  936. }}, {{
  937. "color": "blue",
  938. "ws": float64(1541152487252),
  939. "we": float64(1541152489252),
  940. }, {
  941. "color": "red",
  942. "ws": float64(1541152487252),
  943. "we": float64(1541152489252),
  944. }, {
  945. "color": "yellow",
  946. "ws": float64(1541152487252),
  947. "we": float64(1541152489252),
  948. }},
  949. },
  950. M: map[string]interface{}{
  951. "op_6_project_0_exceptions_total": int64(0),
  952. "op_6_project_0_process_latency_us": int64(0),
  953. "op_6_project_0_records_in_total": int64(4),
  954. "op_6_project_0_records_out_total": int64(4),
  955. "sink_mockSink_0_exceptions_total": int64(0),
  956. "sink_mockSink_0_records_in_total": int64(4),
  957. "sink_mockSink_0_records_out_total": int64(4),
  958. "source_demoE_0_exceptions_total": int64(0),
  959. "source_demoE_0_records_in_total": int64(6),
  960. "source_demoE_0_records_out_total": int64(6),
  961. "op_3_window_0_exceptions_total": int64(0),
  962. "op_3_window_0_process_latency_us": int64(0),
  963. "op_3_window_0_records_in_total": int64(4),
  964. "op_3_window_0_records_out_total": int64(4),
  965. "op_4_aggregate_0_exceptions_total": int64(0),
  966. "op_4_aggregate_0_process_latency_us": int64(0),
  967. "op_4_aggregate_0_records_in_total": int64(4),
  968. "op_4_aggregate_0_records_out_total": int64(4),
  969. "op_5_order_0_exceptions_total": int64(0),
  970. "op_5_order_0_process_latency_us": int64(0),
  971. "op_5_order_0_records_in_total": int64(4),
  972. "op_5_order_0_records_out_total": int64(4),
  973. },
  974. },
  975. {
  976. Name: `TestEventWindowRule5`,
  977. Sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  978. R: [][]map[string]interface{}{
  979. {{
  980. "temp": 25.5,
  981. }}, {{
  982. "temp": 28.1,
  983. }, {
  984. "temp": 27.4,
  985. }, {
  986. "temp": 25.5,
  987. }}, {{
  988. "temp": 26.2,
  989. }, {
  990. "temp": 26.8,
  991. }, {
  992. "temp": 28.9,
  993. }, {
  994. "temp": 29.1,
  995. }, {
  996. "temp": 32.2,
  997. }}, {{
  998. "temp": 30.9,
  999. }},
  1000. },
  1001. M: map[string]interface{}{
  1002. "op_4_project_0_exceptions_total": int64(0),
  1003. "op_4_project_0_process_latency_us": int64(0),
  1004. "op_4_project_0_records_in_total": int64(4),
  1005. "op_4_project_0_records_out_total": int64(4),
  1006. "sink_mockSink_0_exceptions_total": int64(0),
  1007. "sink_mockSink_0_records_in_total": int64(4),
  1008. "sink_mockSink_0_records_out_total": int64(4),
  1009. "source_sessionDemoE_0_exceptions_total": int64(0),
  1010. "source_sessionDemoE_0_records_in_total": int64(12),
  1011. "source_sessionDemoE_0_records_out_total": int64(12),
  1012. "op_3_window_0_exceptions_total": int64(0),
  1013. "op_3_window_0_process_latency_us": int64(0),
  1014. "op_3_window_0_records_in_total": int64(10),
  1015. "op_3_window_0_records_out_total": int64(4),
  1016. },
  1017. },
  1018. {
  1019. Name: `TestEventWindowRule6`,
  1020. Sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  1021. R: [][]map[string]interface{}{
  1022. {{
  1023. "m": 25.5,
  1024. "c": float64(1),
  1025. }}, {{
  1026. "m": 25.5,
  1027. "c": float64(1),
  1028. }}, {{
  1029. "m": 28.1,
  1030. "c": float64(1),
  1031. }}, {{
  1032. "m": 28.1,
  1033. "c": float64(2),
  1034. }}, {{
  1035. "m": 27.4,
  1036. "c": float64(2),
  1037. }},
  1038. },
  1039. M: map[string]interface{}{
  1040. "op_6_project_0_exceptions_total": int64(0),
  1041. "op_6_project_0_process_latency_us": int64(0),
  1042. "op_6_project_0_records_in_total": int64(5),
  1043. "op_6_project_0_records_out_total": int64(5),
  1044. "sink_mockSink_0_exceptions_total": int64(0),
  1045. "sink_mockSink_0_records_in_total": int64(5),
  1046. "sink_mockSink_0_records_out_total": int64(5),
  1047. "source_demoE_0_exceptions_total": int64(0),
  1048. "source_demoE_0_records_in_total": int64(6),
  1049. "source_demoE_0_records_out_total": int64(6),
  1050. "source_demo1E_0_exceptions_total": int64(0),
  1051. "source_demo1E_0_records_in_total": int64(6),
  1052. "source_demo1E_0_records_out_total": int64(6),
  1053. "op_4_window_0_exceptions_total": int64(0),
  1054. "op_4_window_0_records_in_total": int64(9),
  1055. "op_4_window_0_records_out_total": int64(5),
  1056. "op_5_join_0_exceptions_total": int64(0),
  1057. "op_5_join_0_process_latency_us": int64(0),
  1058. "op_5_join_0_records_in_total": int64(5),
  1059. "op_5_join_0_records_out_total": int64(5),
  1060. },
  1061. },
  1062. {
  1063. Name: `TestEventWindowRule7`,
  1064. Sql: `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1065. R: [][]map[string]interface{}{
  1066. {{
  1067. "error": "error in preprocessor: field color type mismatch: cannot convert int(2) to string",
  1068. }},
  1069. {{
  1070. "color": "red",
  1071. "size": float64(3),
  1072. "ts": float64(1541152486013),
  1073. }},
  1074. {{
  1075. "color": "red",
  1076. "size": float64(3),
  1077. "ts": float64(1541152486013),
  1078. }},
  1079. {{
  1080. "color": "yellow",
  1081. "size": float64(4),
  1082. "ts": float64(1541152488442),
  1083. }},
  1084. {{
  1085. "color": "yellow",
  1086. "size": float64(4),
  1087. "ts": float64(1541152488442),
  1088. }, {
  1089. "color": "red",
  1090. "size": float64(1),
  1091. "ts": float64(1541152489252),
  1092. }},
  1093. {{
  1094. "color": "red",
  1095. "size": float64(1),
  1096. "ts": float64(1541152489252),
  1097. }},
  1098. },
  1099. M: map[string]interface{}{
  1100. "op_4_project_0_exceptions_total": int64(1),
  1101. "op_4_project_0_process_latency_us": int64(0),
  1102. "op_4_project_0_records_in_total": int64(5),
  1103. "op_4_project_0_records_out_total": int64(5),
  1104. "sink_mockSink_0_exceptions_total": int64(0),
  1105. "sink_mockSink_0_records_in_total": int64(6),
  1106. "sink_mockSink_0_records_out_total": int64(6),
  1107. "source_demoErr_0_exceptions_total": int64(1),
  1108. "source_demoErr_0_records_in_total": int64(6),
  1109. "source_demoErr_0_records_out_total": int64(5),
  1110. "op_3_window_0_exceptions_total": int64(1),
  1111. "op_3_window_0_process_latency_us": int64(0),
  1112. "op_3_window_0_records_in_total": int64(3),
  1113. "op_3_window_0_records_out_total": int64(5),
  1114. },
  1115. },
  1116. {
  1117. Name: `TestEventWindowRule8`,
  1118. Sql: `SELECT temp, window_start(), window_end() FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1119. R: [][]map[string]interface{}{
  1120. {{
  1121. "temp": 25.5,
  1122. "window_start": float64(1541152486013),
  1123. "window_end": float64(1541152487013),
  1124. }}, {{
  1125. "temp": 28.1,
  1126. "window_start": float64(1541152487932),
  1127. "window_end": float64(1541152490000),
  1128. }, {
  1129. "temp": 27.4,
  1130. "window_start": float64(1541152487932),
  1131. "window_end": float64(1541152490000),
  1132. }, {
  1133. "temp": 25.5,
  1134. "window_start": float64(1541152487932),
  1135. "window_end": float64(1541152490000),
  1136. }}, {{
  1137. "temp": 26.2,
  1138. "window_start": float64(1541152490000),
  1139. "window_end": float64(1541152494000),
  1140. }, {
  1141. "temp": 26.8,
  1142. "window_start": float64(1541152490000),
  1143. "window_end": float64(1541152494000),
  1144. }, {
  1145. "temp": 28.9,
  1146. "window_start": float64(1541152490000),
  1147. "window_end": float64(1541152494000),
  1148. }, {
  1149. "temp": 29.1,
  1150. "window_start": float64(1541152490000),
  1151. "window_end": float64(1541152494000),
  1152. }, {
  1153. "temp": 32.2,
  1154. "window_start": float64(1541152490000),
  1155. "window_end": float64(1541152494000),
  1156. }}, {{
  1157. "temp": 30.9,
  1158. "window_start": float64(1541152494000),
  1159. "window_end": float64(1541152495112),
  1160. }},
  1161. },
  1162. M: map[string]interface{}{
  1163. "op_4_project_0_exceptions_total": int64(0),
  1164. "op_4_project_0_process_latency_us": int64(0),
  1165. "op_4_project_0_records_in_total": int64(4),
  1166. "op_4_project_0_records_out_total": int64(4),
  1167. "sink_mockSink_0_exceptions_total": int64(0),
  1168. "sink_mockSink_0_records_in_total": int64(4),
  1169. "sink_mockSink_0_records_out_total": int64(4),
  1170. "source_sessionDemoE_0_exceptions_total": int64(0),
  1171. "source_sessionDemoE_0_records_in_total": int64(12),
  1172. "source_sessionDemoE_0_records_out_total": int64(12),
  1173. "op_3_window_0_exceptions_total": int64(0),
  1174. "op_3_window_0_process_latency_us": int64(0),
  1175. "op_3_window_0_records_in_total": int64(10),
  1176. "op_3_window_0_records_out_total": int64(4),
  1177. },
  1178. },
  1179. {
  1180. Name: `TestEventWindowRule9`,
  1181. Sql: `SELECT window_end(), color, window_start() FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1182. R: [][]map[string]interface{}{
  1183. {{
  1184. "color": "red",
  1185. "window_start": float64(1541152485013),
  1186. "window_end": float64(1541152487000),
  1187. }},
  1188. {{
  1189. "color": "red",
  1190. "window_start": float64(1541152486000),
  1191. "window_end": float64(1541152488000),
  1192. }, {
  1193. "color": "blue",
  1194. "window_start": float64(1541152486000),
  1195. "window_end": float64(1541152488000),
  1196. }},
  1197. {{
  1198. "color": "blue",
  1199. "window_start": float64(1541152487000),
  1200. "window_end": float64(1541152489000),
  1201. }, {
  1202. "color": "yellow",
  1203. "window_start": float64(1541152487000),
  1204. "window_end": float64(1541152489000),
  1205. }},
  1206. {{
  1207. "color": "yellow",
  1208. "window_start": float64(1541152488000),
  1209. "window_end": float64(1541152490000),
  1210. }, {
  1211. "color": "red",
  1212. "window_start": float64(1541152488000),
  1213. "window_end": float64(1541152490000),
  1214. }},
  1215. {{
  1216. "color": "red",
  1217. "window_start": float64(1541152489000),
  1218. "window_end": float64(1541152491000),
  1219. }},
  1220. },
  1221. M: map[string]interface{}{
  1222. "op_4_project_0_exceptions_total": int64(0),
  1223. "op_4_project_0_process_latency_us": int64(0),
  1224. "op_4_project_0_records_in_total": int64(5),
  1225. "op_4_project_0_records_out_total": int64(5),
  1226. "sink_mockSink_0_exceptions_total": int64(0),
  1227. "sink_mockSink_0_records_in_total": int64(5),
  1228. "sink_mockSink_0_records_out_total": int64(5),
  1229. "source_demoE_0_exceptions_total": int64(0),
  1230. "source_demoE_0_records_in_total": int64(6),
  1231. "source_demoE_0_records_out_total": int64(6),
  1232. "op_3_window_0_exceptions_total": int64(0),
  1233. "op_3_window_0_process_latency_us": int64(0),
  1234. "op_3_window_0_records_in_total": int64(4),
  1235. "op_3_window_0_records_out_total": int64(5),
  1236. },
  1237. },
  1238. }
  1239. HandleStream(true, streamList, t)
  1240. options := []*api.RuleOption{
  1241. {
  1242. BufferLength: 100,
  1243. SendError: true,
  1244. IsEventTime: true,
  1245. LateTol: 1000,
  1246. }, {
  1247. BufferLength: 100,
  1248. SendError: true,
  1249. Qos: api.AtLeastOnce,
  1250. CheckpointInterval: 5000,
  1251. IsEventTime: true,
  1252. LateTol: 1000,
  1253. }, {
  1254. BufferLength: 100,
  1255. SendError: true,
  1256. Qos: api.ExactlyOnce,
  1257. CheckpointInterval: 5000,
  1258. IsEventTime: true,
  1259. LateTol: 1000,
  1260. },
  1261. }
  1262. for j, opt := range options {
  1263. DoRuleTest(t, tests, j, opt, 10)
  1264. }
  1265. }
  1266. func TestWindowError(t *testing.T) {
  1267. // Reset
  1268. streamList := []string{"ldemo", "ldemo1"}
  1269. HandleStream(false, streamList, t)
  1270. tests := []RuleTest{
  1271. {
  1272. Name: `TestWindowErrorRule1`,
  1273. Sql: `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
  1274. R: [][]map[string]interface{}{
  1275. {{
  1276. "error": "run Select error: invalid operation string(string) * int64(3)",
  1277. }}, {{
  1278. "kuiper_field_0": float64(6),
  1279. }, {}},
  1280. },
  1281. M: map[string]interface{}{
  1282. "op_3_project_0_exceptions_total": int64(1),
  1283. "op_3_project_0_process_latency_us": int64(0),
  1284. "op_3_project_0_records_in_total": int64(2),
  1285. "op_3_project_0_records_out_total": int64(1),
  1286. "sink_mockSink_0_exceptions_total": int64(0),
  1287. "sink_mockSink_0_records_in_total": int64(2),
  1288. "sink_mockSink_0_records_out_total": int64(2),
  1289. "source_ldemo_0_exceptions_total": int64(0),
  1290. "source_ldemo_0_records_in_total": int64(5),
  1291. "source_ldemo_0_records_out_total": int64(5),
  1292. "op_2_window_0_exceptions_total": int64(0),
  1293. "op_2_window_0_process_latency_us": int64(0),
  1294. "op_2_window_0_records_in_total": int64(5),
  1295. "op_2_window_0_records_out_total": int64(2),
  1296. },
  1297. }, {
  1298. Name: `TestWindowErrorRule2`,
  1299. Sql: `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1300. R: [][]map[string]interface{}{
  1301. {{
  1302. "error": "run Where error: invalid operation string(string) > int64(2)",
  1303. }}, {{
  1304. "color": "red",
  1305. "ts": float64(1541152486013),
  1306. }}, {{
  1307. "ts": float64(1541152487632),
  1308. }}, {}, {},
  1309. },
  1310. M: map[string]interface{}{
  1311. "op_4_project_0_exceptions_total": int64(1),
  1312. "op_4_project_0_process_latency_us": int64(0),
  1313. "op_4_project_0_records_in_total": int64(4),
  1314. "op_4_project_0_records_out_total": int64(4),
  1315. "sink_mockSink_0_exceptions_total": int64(0),
  1316. "sink_mockSink_0_records_in_total": int64(5),
  1317. "sink_mockSink_0_records_out_total": int64(5),
  1318. "source_ldemo_0_exceptions_total": int64(0),
  1319. "source_ldemo_0_records_in_total": int64(5),
  1320. "source_ldemo_0_records_out_total": int64(5),
  1321. "op_3_window_0_exceptions_total": int64(1),
  1322. "op_3_window_0_process_latency_us": int64(0),
  1323. "op_3_window_0_records_in_total": int64(3),
  1324. "op_3_window_0_records_out_total": int64(4),
  1325. "op_2_filter_0_exceptions_total": int64(1),
  1326. "op_2_filter_0_process_latency_us": int64(0),
  1327. "op_2_filter_0_records_in_total": int64(5),
  1328. "op_2_filter_0_records_out_total": int64(2),
  1329. },
  1330. }, {
  1331. Name: `TestWindowErrorRule3`,
  1332. Sql: `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1333. R: [][]map[string]interface{}{
  1334. {{
  1335. "color": "red",
  1336. "temp": 25.5,
  1337. "ts": float64(1541152486013),
  1338. }}, {{
  1339. "color": "red",
  1340. "temp": 25.5,
  1341. "ts": float64(1541152486013),
  1342. }}, {{
  1343. "color": "red",
  1344. "temp": 25.5,
  1345. "ts": float64(1541152486013),
  1346. }}, {{
  1347. "temp": 28.1,
  1348. "ts": float64(1541152487632),
  1349. }}, {{
  1350. "temp": 28.1,
  1351. "ts": float64(1541152487632),
  1352. }}, {{
  1353. "error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
  1354. }}, {{
  1355. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1356. }}, {{
  1357. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1358. }},
  1359. },
  1360. M: map[string]interface{}{
  1361. "op_5_project_0_exceptions_total": int64(3),
  1362. "op_5_project_0_process_latency_us": int64(0),
  1363. "op_5_project_0_records_in_total": int64(5),
  1364. "op_5_project_0_records_out_total": int64(5),
  1365. "sink_mockSink_0_exceptions_total": int64(0),
  1366. "sink_mockSink_0_records_in_total": int64(8),
  1367. "sink_mockSink_0_records_out_total": int64(8),
  1368. "source_ldemo_0_exceptions_total": int64(0),
  1369. "source_ldemo_0_records_in_total": int64(5),
  1370. "source_ldemo_0_records_out_total": int64(5),
  1371. "source_ldemo1_0_exceptions_total": int64(0),
  1372. "source_ldemo1_0_records_in_total": int64(5),
  1373. "source_ldemo1_0_records_out_total": int64(5),
  1374. "op_3_window_0_exceptions_total": int64(0),
  1375. "op_3_window_0_process_latency_us": int64(0),
  1376. "op_3_window_0_records_in_total": int64(10),
  1377. "op_3_window_0_records_out_total": int64(10),
  1378. "op_4_join_0_exceptions_total": int64(3),
  1379. "op_4_join_0_process_latency_us": int64(0),
  1380. "op_4_join_0_records_in_total": int64(10),
  1381. "op_4_join_0_records_out_total": int64(5),
  1382. },
  1383. }, {
  1384. Name: `TestWindowErrorRule4`,
  1385. Sql: `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having collect(size)[0] >= 2 order by color`,
  1386. R: [][]map[string]interface{}{
  1387. {{
  1388. "color": "red",
  1389. }}, {{
  1390. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1391. }}, {{
  1392. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1393. }}, {{
  1394. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1395. }}, {{
  1396. "color": float64(49),
  1397. }, {}},
  1398. },
  1399. M: map[string]interface{}{
  1400. "op_6_project_0_exceptions_total": int64(3),
  1401. "op_6_project_0_process_latency_us": int64(0),
  1402. "op_6_project_0_records_in_total": int64(2),
  1403. "op_6_project_0_records_out_total": int64(2),
  1404. "sink_mockSink_0_exceptions_total": int64(0),
  1405. "sink_mockSink_0_records_in_total": int64(5),
  1406. "sink_mockSink_0_records_out_total": int64(5),
  1407. "source_ldemo_0_exceptions_total": int64(0),
  1408. "source_ldemo_0_records_in_total": int64(5),
  1409. "source_ldemo_0_records_out_total": int64(5),
  1410. "op_2_window_0_exceptions_total": int64(0),
  1411. "op_2_window_0_process_latency_us": int64(0),
  1412. "op_2_window_0_records_in_total": int64(5),
  1413. "op_2_window_0_records_out_total": int64(5),
  1414. "op_3_aggregate_0_exceptions_total": int64(0),
  1415. "op_3_aggregate_0_process_latency_us": int64(0),
  1416. "op_3_aggregate_0_records_in_total": int64(5),
  1417. "op_3_aggregate_0_records_out_total": int64(5),
  1418. "op_4_having_0_exceptions_total": int64(3),
  1419. "op_4_having_0_process_latency_us": int64(0),
  1420. "op_4_having_0_records_in_total": int64(5),
  1421. "op_4_having_0_records_out_total": int64(2),
  1422. },
  1423. }, {
  1424. Name: `TestWindowErrorRule5`,
  1425. Sql: `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
  1426. R: [][]map[string]interface{}{
  1427. {{
  1428. "error": "run Order By error: incompatible types for comparison: int and string",
  1429. }}, {{
  1430. "size": float64(3),
  1431. }}, {{
  1432. "color": float64(49),
  1433. "size": float64(2),
  1434. }}, {{
  1435. "color": "red",
  1436. }},
  1437. },
  1438. M: map[string]interface{}{
  1439. "op_4_project_0_exceptions_total": int64(1),
  1440. "op_4_project_0_process_latency_us": int64(0),
  1441. "op_4_project_0_records_in_total": int64(3),
  1442. "op_4_project_0_records_out_total": int64(3),
  1443. "sink_mockSink_0_exceptions_total": int64(0),
  1444. "sink_mockSink_0_records_in_total": int64(4),
  1445. "sink_mockSink_0_records_out_total": int64(4),
  1446. "source_ldemo_0_exceptions_total": int64(0),
  1447. "source_ldemo_0_records_in_total": int64(5),
  1448. "source_ldemo_0_records_out_total": int64(5),
  1449. "op_2_window_0_exceptions_total": int64(0),
  1450. "op_2_window_0_process_latency_us": int64(0),
  1451. "op_2_window_0_records_in_total": int64(5),
  1452. "op_2_window_0_records_out_total": int64(4),
  1453. "op_3_order_0_exceptions_total": int64(1),
  1454. "op_3_order_0_process_latency_us": int64(0),
  1455. "op_3_order_0_records_in_total": int64(4),
  1456. "op_3_order_0_records_out_total": int64(3),
  1457. },
  1458. },
  1459. }
  1460. HandleStream(true, streamList, t)
  1461. DoRuleTest(t, tests, 0, &api.RuleOption{
  1462. BufferLength: 100,
  1463. SendError: true,
  1464. }, 0)
  1465. }
  1466. func TestEventSlidingWindow(t *testing.T) {
  1467. // Reset
  1468. streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE"}
  1469. HandleStream(false, streamList, t)
  1470. tests := []RuleTest{
  1471. {
  1472. Name: `TestEventWindowRuleDelay`,
  1473. Sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 1,1) FILTER (where size = 3)`,
  1474. R: [][]map[string]interface{}{
  1475. {{
  1476. "color": "red",
  1477. }},
  1478. },
  1479. M: map[string]interface{}{
  1480. "source_demoE_0_exceptions_total": int64(0),
  1481. "source_demoE_0_records_in_total": int64(6),
  1482. "source_demoE_0_records_out_total": int64(6),
  1483. },
  1484. },
  1485. }
  1486. HandleStream(true, streamList, t)
  1487. options := []*api.RuleOption{
  1488. {
  1489. BufferLength: 100,
  1490. SendError: true,
  1491. Qos: api.AtLeastOnce,
  1492. CheckpointInterval: 5000,
  1493. IsEventTime: true,
  1494. LateTol: 1000,
  1495. },
  1496. {
  1497. BufferLength: 100,
  1498. SendError: true,
  1499. Qos: api.ExactlyOnce,
  1500. CheckpointInterval: 5000,
  1501. IsEventTime: true,
  1502. LateTol: 1000,
  1503. },
  1504. {
  1505. BufferLength: 100,
  1506. SendError: true,
  1507. IsEventTime: true,
  1508. LateTol: 1000,
  1509. },
  1510. }
  1511. for j, opt := range options {
  1512. DoRuleTest(t, tests, j, opt, 10)
  1513. }
  1514. }