window_rule_test.go 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "testing"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. )
  19. func TestWindow(t *testing.T) {
  20. // Reset
  21. streamList := []string{"demo", "demoError", "demo1", "sessionDemo", "table1"}
  22. HandleStream(false, streamList, t)
  23. tests := []RuleTest{
  24. {
  25. Name: `TestWindowRule1`,
  26. Sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  27. R: [][]map[string]interface{}{
  28. {{
  29. "color": "red",
  30. "size": float64(3),
  31. "ts": float64(1541152486013),
  32. }, {
  33. "color": "blue",
  34. "size": float64(6),
  35. "ts": float64(1541152486822),
  36. }},
  37. {{
  38. "color": "red",
  39. "size": float64(3),
  40. "ts": float64(1541152486013),
  41. }, {
  42. "color": "blue",
  43. "size": float64(6),
  44. "ts": float64(1541152486822),
  45. }, {
  46. "color": "blue",
  47. "size": float64(2),
  48. "ts": float64(1541152487632),
  49. }},
  50. {{
  51. "color": "blue",
  52. "size": float64(2),
  53. "ts": float64(1541152487632),
  54. }, {
  55. "color": "yellow",
  56. "size": float64(4),
  57. "ts": float64(1541152488442),
  58. }},
  59. {{
  60. "color": "yellow",
  61. "size": float64(4),
  62. "ts": float64(1541152488442),
  63. }, {
  64. "color": "red",
  65. "size": float64(1),
  66. "ts": float64(1541152489252),
  67. }},
  68. },
  69. M: map[string]interface{}{
  70. "op_3_project_0_exceptions_total": int64(0),
  71. "op_3_project_0_process_latency_us": int64(0),
  72. "op_3_project_0_records_in_total": int64(4),
  73. "op_3_project_0_records_out_total": int64(4),
  74. "sink_mockSink_0_exceptions_total": int64(0),
  75. "sink_mockSink_0_records_in_total": int64(4),
  76. "sink_mockSink_0_records_out_total": int64(4),
  77. "source_demo_0_exceptions_total": int64(0),
  78. "source_demo_0_records_in_total": int64(5),
  79. "source_demo_0_records_out_total": int64(5),
  80. "op_2_window_0_exceptions_total": int64(0),
  81. "op_2_window_0_process_latency_us": int64(0),
  82. "op_2_window_0_records_in_total": int64(5),
  83. "op_2_window_0_records_out_total": int64(4),
  84. },
  85. }, {
  86. Name: `TestWindowRule2`,
  87. Sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  88. R: [][]map[string]interface{}{
  89. {{
  90. "color": "red",
  91. "ts": float64(1541152486013),
  92. }, {
  93. "color": "blue",
  94. "ts": float64(1541152486822),
  95. }},
  96. {},
  97. {{
  98. "color": "yellow",
  99. "ts": float64(1541152488442),
  100. }},
  101. {},
  102. },
  103. M: map[string]interface{}{
  104. "op_4_project_0_exceptions_total": int64(0),
  105. "op_4_project_0_process_latency_us": int64(0),
  106. "op_4_project_0_records_in_total": int64(4),
  107. "op_4_project_0_records_out_total": int64(4),
  108. "sink_mockSink_0_exceptions_total": int64(0),
  109. "sink_mockSink_0_records_in_total": int64(4),
  110. "sink_mockSink_0_records_out_total": int64(4),
  111. "source_demo_0_exceptions_total": int64(0),
  112. "source_demo_0_records_in_total": int64(5),
  113. "source_demo_0_records_out_total": int64(5),
  114. "op_3_window_0_exceptions_total": int64(0),
  115. "op_3_window_0_process_latency_us": int64(0),
  116. "op_3_window_0_records_in_total": int64(3),
  117. "op_3_window_0_records_out_total": int64(4),
  118. "op_2_filter_0_exceptions_total": int64(0),
  119. "op_2_filter_0_process_latency_us": int64(0),
  120. "op_2_filter_0_records_in_total": int64(5),
  121. "op_2_filter_0_records_out_total": int64(3),
  122. },
  123. }, {
  124. Name: `TestWindowRule3`,
  125. Sql: `SELECT color, temp, demo.ts as ts1, demo1.ts as ts2, demo.ts - demo1.ts as diff FROM demo INNER JOIN demo1 ON ts1 = ts2 GROUP BY SlidingWindow(ss, 1)`,
  126. R: [][]map[string]interface{}{
  127. {{
  128. "color": "red",
  129. "temp": 25.5,
  130. "ts1": float64(1541152486013),
  131. "ts2": float64(1541152486013),
  132. "diff": float64(0),
  133. }}, {{
  134. "color": "red",
  135. "temp": 25.5,
  136. "ts1": float64(1541152486013),
  137. "ts2": float64(1541152486013),
  138. "diff": float64(0),
  139. }}, {{
  140. "color": "red",
  141. "temp": 25.5,
  142. "ts1": float64(1541152486013),
  143. "ts2": float64(1541152486013),
  144. "diff": float64(0),
  145. }}, {{
  146. "color": "blue",
  147. "temp": 28.1,
  148. "ts1": float64(1541152487632),
  149. "ts2": float64(1541152487632),
  150. "diff": float64(0),
  151. }}, {{
  152. "color": "blue",
  153. "temp": 28.1,
  154. "ts1": float64(1541152487632),
  155. "ts2": float64(1541152487632),
  156. "diff": float64(0),
  157. }}, {{
  158. "color": "blue",
  159. "temp": 28.1,
  160. "ts1": float64(1541152487632),
  161. "ts2": float64(1541152487632),
  162. "diff": float64(0),
  163. }, {
  164. "color": "yellow",
  165. "temp": 27.4,
  166. "ts1": float64(1541152488442),
  167. "ts2": float64(1541152488442),
  168. "diff": float64(0),
  169. }}, {{
  170. "color": "yellow",
  171. "temp": 27.4,
  172. "ts1": float64(1541152488442),
  173. "ts2": float64(1541152488442),
  174. "diff": float64(0),
  175. }}, {{
  176. "color": "yellow",
  177. "temp": 27.4,
  178. "ts1": float64(1541152488442),
  179. "ts2": float64(1541152488442),
  180. "diff": float64(0),
  181. }, {
  182. "color": "red",
  183. "temp": 25.5,
  184. "ts1": float64(1541152489252),
  185. "ts2": float64(1541152489252),
  186. "diff": float64(0),
  187. }},
  188. },
  189. M: map[string]interface{}{
  190. "op_5_project_0_exceptions_total": int64(0),
  191. "op_5_project_0_process_latency_us": int64(0),
  192. "op_5_project_0_records_in_total": int64(8),
  193. "op_5_project_0_records_out_total": int64(8),
  194. "sink_mockSink_0_exceptions_total": int64(0),
  195. "sink_mockSink_0_records_in_total": int64(8),
  196. "sink_mockSink_0_records_out_total": int64(8),
  197. "source_demo_0_exceptions_total": int64(0),
  198. "source_demo_0_records_in_total": int64(5),
  199. "source_demo_0_records_out_total": int64(5),
  200. "source_demo1_0_exceptions_total": int64(0),
  201. "source_demo1_0_records_in_total": int64(5),
  202. "source_demo1_0_records_out_total": int64(5),
  203. "op_3_window_0_exceptions_total": int64(0),
  204. "op_3_window_0_process_latency_us": int64(0),
  205. "op_3_window_0_records_in_total": int64(10),
  206. "op_3_window_0_records_out_total": int64(10),
  207. "op_4_join_0_exceptions_total": int64(0),
  208. "op_4_join_0_process_latency_us": int64(0),
  209. "op_4_join_0_records_in_total": int64(10),
  210. "op_4_join_0_records_out_total": int64(8),
  211. },
  212. T: &api.PrintableTopo{
  213. Sources: []string{"source_demo", "source_demo1"},
  214. Edges: map[string][]interface{}{
  215. "source_demo": {"op_3_window"},
  216. "source_demo1": {"op_3_window"},
  217. "op_3_window": {"op_4_join"},
  218. "op_4_join": {"op_5_project"},
  219. "op_5_project": {"sink_mockSink"},
  220. },
  221. },
  222. }, {
  223. Name: `TestWindowRule4`,
  224. Sql: `SELECT color, count(*) as c FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  225. R: [][]map[string]interface{}{
  226. {{
  227. "color": "red",
  228. "c": float64(1),
  229. }}, {{
  230. "color": "blue",
  231. "c": float64(1),
  232. }, {
  233. "color": "red",
  234. "c": float64(1),
  235. }}, {{
  236. "color": "blue",
  237. "c": float64(2),
  238. }, {
  239. "color": "red",
  240. "c": float64(1),
  241. }}, {{
  242. "color": "blue",
  243. "c": float64(2),
  244. }, {
  245. "color": "yellow",
  246. "c": float64(1),
  247. }}, {{
  248. "color": "blue",
  249. "c": float64(1),
  250. }, {
  251. "color": "red",
  252. "c": float64(1),
  253. }, {
  254. "color": "yellow",
  255. "c": float64(1),
  256. }},
  257. },
  258. M: map[string]interface{}{
  259. "op_5_project_0_exceptions_total": int64(0),
  260. "op_5_project_0_process_latency_us": int64(0),
  261. "op_5_project_0_records_in_total": int64(5),
  262. "op_5_project_0_records_out_total": int64(5),
  263. "sink_mockSink_0_exceptions_total": int64(0),
  264. "sink_mockSink_0_records_in_total": int64(5),
  265. "sink_mockSink_0_records_out_total": int64(5),
  266. "source_demo_0_exceptions_total": int64(0),
  267. "source_demo_0_records_in_total": int64(5),
  268. "source_demo_0_records_out_total": int64(5),
  269. "op_2_window_0_exceptions_total": int64(0),
  270. "op_2_window_0_process_latency_us": int64(0),
  271. "op_2_window_0_records_in_total": int64(5),
  272. "op_2_window_0_records_out_total": int64(5),
  273. "op_3_aggregate_0_exceptions_total": int64(0),
  274. "op_3_aggregate_0_process_latency_us": int64(0),
  275. "op_3_aggregate_0_records_in_total": int64(5),
  276. "op_3_aggregate_0_records_out_total": int64(5),
  277. "op_4_order_0_exceptions_total": int64(0),
  278. "op_4_order_0_process_latency_us": int64(0),
  279. "op_4_order_0_records_in_total": int64(5),
  280. "op_4_order_0_records_out_total": int64(5),
  281. },
  282. }, {
  283. Name: `TestWindowRule5`,
  284. Sql: `SELECT count(temp), window_start() as ws, window_end() FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  285. R: [][]map[string]interface{}{
  286. {{
  287. "count": float64(2),
  288. "ws": float64(1541152486013),
  289. "window_end": float64(1541152487823), // timeout
  290. }}, {{
  291. "count": float64(3),
  292. "ws": float64(1541152487932),
  293. "window_end": float64(1541152490000), // tick
  294. }}, {{
  295. "count": float64(5),
  296. "ws": float64(1541152490000),
  297. "window_end": float64(1541152494000), // tick
  298. }}, {{
  299. "count": float64(1),
  300. "ws": float64(1541152494000),
  301. "window_end": float64(1541152495112), // timeout
  302. }},
  303. },
  304. M: map[string]interface{}{
  305. "op_3_project_0_exceptions_total": int64(0),
  306. "op_3_project_0_process_latency_us": int64(0),
  307. "op_3_project_0_records_in_total": int64(4),
  308. "op_3_project_0_records_out_total": int64(4),
  309. "sink_mockSink_0_exceptions_total": int64(0),
  310. "sink_mockSink_0_records_in_total": int64(4),
  311. "sink_mockSink_0_records_out_total": int64(4),
  312. "source_sessionDemo_0_exceptions_total": int64(0),
  313. "source_sessionDemo_0_records_in_total": int64(11),
  314. "source_sessionDemo_0_records_out_total": int64(11),
  315. "op_2_window_0_exceptions_total": int64(0),
  316. "op_2_window_0_process_latency_us": int64(0),
  317. "op_2_window_0_records_in_total": int64(11),
  318. "op_2_window_0_records_out_total": int64(4),
  319. },
  320. }, {
  321. Name: `TestWindowRule6`,
  322. Sql: `SELECT window_end(), sum(temp) as temp, count(color) as c, window_start() FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  323. R: [][]map[string]interface{}{
  324. {{
  325. "temp": 25.5,
  326. "c": float64(1),
  327. "window_start": float64(1541152485115),
  328. "window_end": float64(1541152486115),
  329. }}, {{
  330. "temp": 25.5,
  331. "c": float64(1),
  332. "window_start": float64(1541152485822),
  333. "window_end": float64(1541152486822),
  334. }}, {{
  335. "temp": 25.5,
  336. "c": float64(1),
  337. "window_start": float64(1541152485903),
  338. "window_end": float64(1541152486903),
  339. }}, {{
  340. "temp": 28.1,
  341. "c": float64(1),
  342. "window_start": float64(1541152486702),
  343. "window_end": float64(1541152487702),
  344. }}, {{
  345. "temp": 28.1,
  346. "c": float64(1),
  347. "window_start": float64(1541152487442),
  348. "window_end": float64(1541152488442),
  349. }}, {{
  350. "temp": 55.5,
  351. "c": float64(2),
  352. "window_start": float64(1541152487605),
  353. "window_end": float64(1541152488605),
  354. }}, {{
  355. "temp": 27.4,
  356. "c": float64(1),
  357. "window_start": float64(1541152488252),
  358. "window_end": float64(1541152489252),
  359. }}, {{
  360. "temp": 52.9,
  361. "c": float64(2),
  362. "window_start": float64(1541152488305),
  363. "window_end": float64(1541152489305),
  364. }},
  365. },
  366. M: map[string]interface{}{
  367. "op_5_project_0_exceptions_total": int64(0),
  368. "op_5_project_0_process_latency_us": int64(0),
  369. "op_5_project_0_records_in_total": int64(8),
  370. "op_5_project_0_records_out_total": int64(8),
  371. "sink_mockSink_0_exceptions_total": int64(0),
  372. "sink_mockSink_0_records_in_total": int64(8),
  373. "sink_mockSink_0_records_out_total": int64(8),
  374. "source_demo_0_exceptions_total": int64(0),
  375. "source_demo_0_records_in_total": int64(5),
  376. "source_demo_0_records_out_total": int64(5),
  377. "source_demo1_0_exceptions_total": int64(0),
  378. "source_demo1_0_records_in_total": int64(5),
  379. "source_demo1_0_records_out_total": int64(5),
  380. "op_3_window_0_exceptions_total": int64(0),
  381. "op_3_window_0_process_latency_us": int64(0),
  382. "op_3_window_0_records_in_total": int64(10),
  383. "op_3_window_0_records_out_total": int64(10),
  384. "op_4_join_0_exceptions_total": int64(0),
  385. "op_4_join_0_process_latency_us": int64(0),
  386. "op_4_join_0_records_in_total": int64(10),
  387. "op_4_join_0_records_out_total": int64(8),
  388. },
  389. }, {
  390. Name: `TestWindowRule7`,
  391. Sql: `SELECT * FROM demoError GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  392. R: [][]map[string]interface{}{
  393. {{
  394. "error": "error in preprocessor: field size type mismatch: cannot convert string(red) to int64",
  395. }},
  396. {{
  397. "color": "blue",
  398. "size": float64(6),
  399. "ts": float64(1541152486822),
  400. }},
  401. {{
  402. "color": "blue",
  403. "size": float64(6),
  404. "ts": float64(1541152486822),
  405. }, {
  406. "color": "blue",
  407. "size": float64(2),
  408. "ts": float64(1541152487632),
  409. }},
  410. {{
  411. "error": "error in preprocessor: field color type mismatch: cannot convert int(7) to string",
  412. }},
  413. {{
  414. "color": "blue",
  415. "size": float64(2),
  416. "ts": float64(1541152487632),
  417. }},
  418. {{
  419. "error": "error in preprocessor: field size type mismatch: cannot convert string(blue) to int64",
  420. }},
  421. {},
  422. },
  423. M: map[string]interface{}{
  424. "op_3_project_0_exceptions_total": int64(3),
  425. "op_3_project_0_process_latency_us": int64(0),
  426. "op_3_project_0_records_in_total": int64(7),
  427. "op_3_project_0_records_out_total": int64(4),
  428. "sink_mockSink_0_exceptions_total": int64(0),
  429. "sink_mockSink_0_records_in_total": int64(7),
  430. "sink_mockSink_0_records_out_total": int64(7),
  431. "source_demoError_0_exceptions_total": int64(3),
  432. "source_demoError_0_records_in_total": int64(5),
  433. "source_demoError_0_records_out_total": int64(5),
  434. "op_2_window_0_exceptions_total": int64(3),
  435. "op_2_window_0_process_latency_us": int64(0),
  436. "op_2_window_0_records_in_total": int64(5),
  437. "op_2_window_0_records_out_total": int64(4),
  438. },
  439. }, {
  440. Name: `TestWindowRule8`,
  441. Sql: `SELECT color, window_end(), ts, count(*) as c, window_start() FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
  442. R: [][]map[string]interface{}{
  443. {{
  444. "color": "red",
  445. "ts": float64(1541152486013),
  446. "c": float64(2),
  447. "window_start": float64(1541152486000),
  448. "window_end": float64(1541152487000),
  449. }},
  450. },
  451. M: map[string]interface{}{
  452. "op_5_project_0_exceptions_total": int64(0),
  453. "op_5_project_0_process_latency_us": int64(0),
  454. "op_5_project_0_records_in_total": int64(1),
  455. "op_5_project_0_records_out_total": int64(1),
  456. "sink_mockSink_0_exceptions_total": int64(0),
  457. "sink_mockSink_0_records_in_total": int64(1),
  458. "sink_mockSink_0_records_out_total": int64(1),
  459. "source_demo_0_exceptions_total": int64(0),
  460. "source_demo_0_records_in_total": int64(5),
  461. "source_demo_0_records_out_total": int64(5),
  462. "op_3_window_0_exceptions_total": int64(0),
  463. "op_3_window_0_process_latency_us": int64(0),
  464. "op_3_window_0_records_in_total": int64(3),
  465. "op_3_window_0_records_out_total": int64(4),
  466. "op_2_filter_0_exceptions_total": int64(0),
  467. "op_2_filter_0_process_latency_us": int64(0),
  468. "op_2_filter_0_records_in_total": int64(5),
  469. "op_2_filter_0_records_out_total": int64(3),
  470. "op_4_having_0_exceptions_total": int64(0),
  471. "op_4_having_0_process_latency_us": int64(0),
  472. "op_4_having_0_records_in_total": int64(4),
  473. "op_4_having_0_records_out_total": int64(1),
  474. },
  475. }, {
  476. Name: `TestWindowRule9`,
  477. Sql: `SELECT color, window_start(), window_end() FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1) FILTER( WHERE size > 2)`,
  478. R: [][]map[string]interface{}{
  479. {{
  480. "color": "red",
  481. "window_start": float64(1541152485000),
  482. "window_end": float64(1541152487000),
  483. }, {
  484. "color": "blue",
  485. "window_start": float64(1541152485000),
  486. "window_end": float64(1541152487000),
  487. }},
  488. {{
  489. "color": "red",
  490. "window_start": float64(1541152486000),
  491. "window_end": float64(1541152488000),
  492. }, {
  493. "color": "blue",
  494. "window_start": float64(1541152486000),
  495. "window_end": float64(1541152488000),
  496. }},
  497. {{
  498. "color": "yellow",
  499. "window_start": float64(1541152487000),
  500. "window_end": float64(1541152489000),
  501. }},
  502. {{
  503. "color": "yellow",
  504. "window_start": float64(1541152488000),
  505. "window_end": float64(1541152490000),
  506. }},
  507. },
  508. M: map[string]interface{}{
  509. "op_4_project_0_exceptions_total": int64(0),
  510. "op_4_project_0_process_latency_us": int64(0),
  511. "op_4_project_0_records_in_total": int64(4),
  512. "op_4_project_0_records_out_total": int64(4),
  513. "sink_mockSink_0_exceptions_total": int64(0),
  514. "sink_mockSink_0_records_in_total": int64(4),
  515. "sink_mockSink_0_records_out_total": int64(4),
  516. "source_demo_0_exceptions_total": int64(0),
  517. "source_demo_0_records_in_total": int64(5),
  518. "source_demo_0_records_out_total": int64(5),
  519. "op_3_window_0_exceptions_total": int64(0),
  520. "op_3_window_0_process_latency_us": int64(0),
  521. "op_3_window_0_records_in_total": int64(3),
  522. "op_3_window_0_records_out_total": int64(4),
  523. },
  524. }, {
  525. Name: `TestCountWindowRule1`,
  526. Sql: `SELECT collect(*)[0]->color as c, window_end() as we FROM demo GROUP BY COUNTWINDOW(3)`,
  527. R: [][]map[string]interface{}{
  528. {{
  529. "c": "red",
  530. "we": 1.541152487632e+12,
  531. }},
  532. },
  533. M: map[string]interface{}{
  534. "op_3_project_0_exceptions_total": int64(0),
  535. "op_3_project_0_process_latency_us": int64(0),
  536. "op_3_project_0_records_in_total": int64(1),
  537. "op_3_project_0_records_out_total": int64(1),
  538. "sink_mockSink_0_exceptions_total": int64(0),
  539. "sink_mockSink_0_records_in_total": int64(1),
  540. "sink_mockSink_0_records_out_total": int64(1),
  541. "source_demo_0_exceptions_total": int64(0),
  542. "source_demo_0_records_in_total": int64(5),
  543. "source_demo_0_records_out_total": int64(5),
  544. "op_2_window_0_exceptions_total": int64(0),
  545. "op_2_window_0_process_latency_us": int64(0),
  546. "op_2_window_0_records_in_total": int64(5),
  547. "op_2_window_0_records_out_total": int64(1),
  548. },
  549. }, {
  550. Name: `TestWindowRule10`,
  551. Sql: `SELECT deduplicate(color, false)->color as c FROM demo GROUP BY SlidingWindow(hh, 1)`,
  552. R: [][]map[string]interface{}{
  553. {{
  554. "c": "red",
  555. }}, {{
  556. "c": "blue",
  557. }}, {{}}, {{
  558. "c": "yellow",
  559. }}, {{}},
  560. },
  561. M: map[string]interface{}{
  562. "op_3_project_0_exceptions_total": int64(0),
  563. "op_3_project_0_process_latency_us": int64(0),
  564. "op_3_project_0_records_in_total": int64(5),
  565. "op_3_project_0_records_out_total": int64(5),
  566. "sink_mockSink_0_exceptions_total": int64(0),
  567. "sink_mockSink_0_records_in_total": int64(5),
  568. "sink_mockSink_0_records_out_total": int64(5),
  569. "source_demo_0_exceptions_total": int64(0),
  570. "source_demo_0_records_in_total": int64(5),
  571. "source_demo_0_records_out_total": int64(5),
  572. "op_2_window_0_exceptions_total": int64(0),
  573. "op_2_window_0_process_latency_us": int64(0),
  574. "op_2_window_0_records_in_total": int64(5),
  575. "op_2_window_0_records_out_total": int64(5),
  576. },
  577. }, {
  578. Name: `TestWindowRule11`,
  579. Sql: `SELECT color, name, window_start(), window_end() FROM demo INNER JOIN table1 on demo.ts = table1.id where demo.size > 2 and table1.size > 1 GROUP BY tumblingwindow(ss, 1)`,
  580. R: [][]map[string]interface{}{
  581. {{
  582. "color": "red",
  583. "name": "name1",
  584. "window_start": float64(1541152486000),
  585. "window_end": float64(1541152487000),
  586. }},
  587. },
  588. M: map[string]interface{}{
  589. "op_2_window_0_exceptions_total": int64(0),
  590. "op_2_window_0_process_latency_us": int64(0),
  591. "op_2_window_0_records_in_total": int64(5),
  592. "op_2_window_0_records_out_total": int64(4),
  593. "op_4_join_aligner_0_records_in_total": int64(5),
  594. "op_4_join_aligner_0_records_out_total": int64(4),
  595. "op_5_join_0_exceptions_total": int64(0),
  596. "op_5_join_0_records_in_total": int64(4),
  597. "op_5_join_0_records_out_total": int64(1),
  598. "op_6_project_0_exceptions_total": int64(0),
  599. "op_6_project_0_records_in_total": int64(1),
  600. "op_6_project_0_records_out_total": int64(1),
  601. "sink_mockSink_0_exceptions_total": int64(0),
  602. "sink_mockSink_0_records_in_total": int64(1),
  603. "sink_mockSink_0_records_out_total": int64(1),
  604. "source_demo_0_exceptions_total": int64(0),
  605. "source_demo_0_records_in_total": int64(5),
  606. "source_demo_0_records_out_total": int64(5),
  607. "source_table1_0_exceptions_total": int64(0),
  608. "source_table1_0_records_in_total": int64(4),
  609. "source_table1_0_records_out_total": int64(1),
  610. },
  611. }, {
  612. Name: `TestWindowRule12`,
  613. Sql: `SELECT collect(size) as allSize FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1), color ORDER BY color`,
  614. R: [][]map[string]interface{}{
  615. {{
  616. "allSize": []interface{}{float64(6)},
  617. }, {
  618. "allSize": []interface{}{float64(3)},
  619. }},
  620. {{
  621. "allSize": []interface{}{float64(6), float64(2)},
  622. }, {
  623. "allSize": []interface{}{float64(3)},
  624. }},
  625. {{
  626. "allSize": []interface{}{float64(2)},
  627. }, {
  628. "allSize": []interface{}{float64(4)},
  629. }},
  630. {{
  631. "allSize": []interface{}{float64(1)},
  632. }, {
  633. "allSize": []interface{}{float64(4)},
  634. }},
  635. },
  636. M: map[string]interface{}{
  637. "sink_mockSink_0_exceptions_total": int64(0),
  638. "sink_mockSink_0_records_in_total": int64(4),
  639. "sink_mockSink_0_records_out_total": int64(4),
  640. "source_demo_0_exceptions_total": int64(0),
  641. "source_demo_0_records_in_total": int64(5),
  642. "source_demo_0_records_out_total": int64(5),
  643. "op_2_window_0_exceptions_total": int64(0),
  644. "op_2_window_0_process_latency_us": int64(0),
  645. "op_2_window_0_records_in_total": int64(5),
  646. "op_2_window_0_records_out_total": int64(4),
  647. },
  648. },
  649. }
  650. HandleStream(true, streamList, t)
  651. options := []*api.RuleOption{
  652. {
  653. BufferLength: 100,
  654. SendError: true,
  655. },
  656. {
  657. BufferLength: 100,
  658. SendError: true,
  659. Qos: api.AtLeastOnce,
  660. CheckpointInterval: 5000,
  661. },
  662. {
  663. BufferLength: 100,
  664. SendError: true,
  665. Qos: api.ExactlyOnce,
  666. CheckpointInterval: 5000,
  667. },
  668. }
  669. for j, opt := range options {
  670. DoRuleTest(t, tests, j, opt, 15)
  671. }
  672. }
  673. func TestEventWindow(t *testing.T) {
  674. // Reset
  675. streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE"}
  676. HandleStream(false, streamList, t)
  677. tests := []RuleTest{
  678. {
  679. Name: `TestEventWindowRule1`,
  680. Sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  681. R: [][]map[string]interface{}{
  682. {{
  683. "color": "red",
  684. "size": float64(3),
  685. "ts": float64(1541152486013),
  686. }},
  687. {{
  688. "color": "red",
  689. "size": float64(3),
  690. "ts": float64(1541152486013),
  691. }, {
  692. "color": "blue",
  693. "size": float64(2),
  694. "ts": float64(1541152487632),
  695. }},
  696. {{
  697. "color": "blue",
  698. "size": float64(2),
  699. "ts": float64(1541152487632),
  700. }, {
  701. "color": "yellow",
  702. "size": float64(4),
  703. "ts": float64(1541152488442),
  704. }},
  705. {{
  706. "color": "yellow",
  707. "size": float64(4),
  708. "ts": float64(1541152488442),
  709. }, {
  710. "color": "red",
  711. "size": float64(1),
  712. "ts": float64(1541152489252),
  713. }},
  714. {{
  715. "color": "red",
  716. "size": float64(1),
  717. "ts": float64(1541152489252),
  718. }},
  719. },
  720. M: map[string]interface{}{
  721. "op_3_project_0_exceptions_total": int64(0),
  722. "op_3_project_0_process_latency_us": int64(0),
  723. "op_3_project_0_records_in_total": int64(5),
  724. "op_3_project_0_records_out_total": int64(5),
  725. "sink_mockSink_0_exceptions_total": int64(0),
  726. "sink_mockSink_0_records_in_total": int64(5),
  727. "sink_mockSink_0_records_out_total": int64(5),
  728. "source_demoE_0_exceptions_total": int64(0),
  729. "source_demoE_0_records_in_total": int64(6),
  730. "source_demoE_0_records_out_total": int64(6),
  731. "op_2_window_0_exceptions_total": int64(0),
  732. "op_2_window_0_process_latency_us": int64(0),
  733. "op_2_window_0_records_in_total": int64(6),
  734. "op_2_window_0_records_out_total": int64(5),
  735. },
  736. },
  737. {
  738. Name: `TestEventWindowRule2`,
  739. Sql: `SELECT window_start(), window_end(), color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  740. R: [][]map[string]interface{}{
  741. {{
  742. "window_start": float64(1541152486013),
  743. "window_end": float64(1541152487000),
  744. "color": "red",
  745. "ts": float64(1541152486013),
  746. }},
  747. {{
  748. "window_start": float64(1541152488000),
  749. "window_end": float64(1541152489000),
  750. "color": "yellow",
  751. "ts": float64(1541152488442),
  752. }},
  753. },
  754. M: map[string]interface{}{
  755. "op_4_project_0_exceptions_total": int64(0),
  756. "op_4_project_0_process_latency_us": int64(0),
  757. "op_4_project_0_records_in_total": int64(2),
  758. "op_4_project_0_records_out_total": int64(2),
  759. "sink_mockSink_0_exceptions_total": int64(0),
  760. "sink_mockSink_0_records_in_total": int64(2),
  761. "sink_mockSink_0_records_out_total": int64(2),
  762. "source_demoE_0_exceptions_total": int64(0),
  763. "source_demoE_0_records_in_total": int64(6),
  764. "source_demoE_0_records_out_total": int64(6),
  765. "op_2_window_0_exceptions_total": int64(0),
  766. "op_2_window_0_process_latency_us": int64(0),
  767. "op_2_window_0_records_in_total": int64(6),
  768. "op_2_window_0_records_out_total": int64(5),
  769. "op_3_filter_0_exceptions_total": int64(0),
  770. "op_3_filter_0_process_latency_us": int64(0),
  771. "op_3_filter_0_records_in_total": int64(5),
  772. "op_3_filter_0_records_out_total": int64(2),
  773. },
  774. },
  775. {
  776. Name: `TestEventWindowRule3`,
  777. Sql: `SELECT color, temp, demoE.ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  778. R: [][]map[string]interface{}{
  779. {{
  780. "color": "red",
  781. "temp": 25.5,
  782. "ts": float64(1541152486013),
  783. }}, {{
  784. "color": "red",
  785. "temp": 25.5,
  786. "ts": float64(1541152486013),
  787. }}, {{
  788. "color": "blue",
  789. "temp": 28.1,
  790. "ts": float64(1541152487632),
  791. }}, {{
  792. "color": "blue",
  793. "temp": 28.1,
  794. "ts": float64(1541152487632),
  795. }, {
  796. "color": "yellow",
  797. "temp": 27.4,
  798. "ts": float64(1541152488442),
  799. }}, {{
  800. "color": "yellow",
  801. "temp": 27.4,
  802. "ts": float64(1541152488442),
  803. }, {
  804. "color": "red",
  805. "temp": 25.5,
  806. "ts": float64(1541152489252),
  807. }},
  808. },
  809. M: map[string]interface{}{
  810. "op_5_project_0_exceptions_total": int64(0),
  811. "op_5_project_0_process_latency_us": int64(0),
  812. "op_5_project_0_records_in_total": int64(5),
  813. "op_5_project_0_records_out_total": int64(5),
  814. "sink_mockSink_0_exceptions_total": int64(0),
  815. "sink_mockSink_0_records_in_total": int64(5),
  816. "sink_mockSink_0_records_out_total": int64(5),
  817. "source_demoE_0_exceptions_total": int64(0),
  818. "source_demoE_0_records_in_total": int64(6),
  819. "source_demoE_0_records_out_total": int64(6),
  820. "source_demo1E_0_exceptions_total": int64(0),
  821. "source_demo1E_0_records_in_total": int64(6),
  822. "source_demo1E_0_records_out_total": int64(6),
  823. "op_3_window_0_exceptions_total": int64(0),
  824. "op_3_window_0_process_latency_us": int64(0),
  825. "op_3_window_0_records_in_total": int64(12),
  826. "op_3_window_0_records_out_total": int64(5),
  827. "op_4_join_0_exceptions_total": int64(0),
  828. "op_4_join_0_process_latency_us": int64(0),
  829. "op_4_join_0_records_in_total": int64(5),
  830. "op_4_join_0_records_out_total": int64(5),
  831. },
  832. },
  833. {
  834. Name: `TestEventWindowRule4`,
  835. Sql: `SELECT window_start() as ws, color, window_end() as we FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  836. R: [][]map[string]interface{}{
  837. {{
  838. "color": "red",
  839. "ws": float64(1541152484013),
  840. "we": float64(1541152486013),
  841. }}, {{
  842. "color": "blue",
  843. "ws": float64(1541152485632),
  844. "we": float64(1541152487632),
  845. }, {
  846. "color": "red",
  847. "ws": float64(1541152485632),
  848. "we": float64(1541152487632),
  849. }}, {{
  850. "color": "blue",
  851. "ws": float64(1541152486442),
  852. "we": float64(1541152488442),
  853. }, {
  854. "color": "yellow",
  855. "ws": float64(1541152486442),
  856. "we": float64(1541152488442),
  857. }}, {{
  858. "color": "blue",
  859. "ws": float64(1541152487252),
  860. "we": float64(1541152489252),
  861. }, {
  862. "color": "red",
  863. "ws": float64(1541152487252),
  864. "we": float64(1541152489252),
  865. }, {
  866. "color": "yellow",
  867. "ws": float64(1541152487252),
  868. "we": float64(1541152489252),
  869. }},
  870. },
  871. M: map[string]interface{}{
  872. "op_5_project_0_exceptions_total": int64(0),
  873. "op_5_project_0_process_latency_us": int64(0),
  874. "op_5_project_0_records_in_total": int64(4),
  875. "op_5_project_0_records_out_total": int64(4),
  876. "sink_mockSink_0_exceptions_total": int64(0),
  877. "sink_mockSink_0_records_in_total": int64(4),
  878. "sink_mockSink_0_records_out_total": int64(4),
  879. "source_demoE_0_exceptions_total": int64(0),
  880. "source_demoE_0_records_in_total": int64(6),
  881. "source_demoE_0_records_out_total": int64(6),
  882. "op_2_window_0_exceptions_total": int64(0),
  883. "op_2_window_0_process_latency_us": int64(0),
  884. "op_2_window_0_records_in_total": int64(6),
  885. "op_2_window_0_records_out_total": int64(4),
  886. "op_3_aggregate_0_exceptions_total": int64(0),
  887. "op_3_aggregate_0_process_latency_us": int64(0),
  888. "op_3_aggregate_0_records_in_total": int64(4),
  889. "op_3_aggregate_0_records_out_total": int64(4),
  890. "op_4_order_0_exceptions_total": int64(0),
  891. "op_4_order_0_process_latency_us": int64(0),
  892. "op_4_order_0_records_in_total": int64(4),
  893. "op_4_order_0_records_out_total": int64(4),
  894. },
  895. },
  896. {
  897. Name: `TestEventWindowRule5`,
  898. Sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  899. R: [][]map[string]interface{}{
  900. {{
  901. "temp": 25.5,
  902. }}, {{
  903. "temp": 28.1,
  904. }, {
  905. "temp": 27.4,
  906. }, {
  907. "temp": 25.5,
  908. }}, {{
  909. "temp": 26.2,
  910. }, {
  911. "temp": 26.8,
  912. }, {
  913. "temp": 28.9,
  914. }, {
  915. "temp": 29.1,
  916. }, {
  917. "temp": 32.2,
  918. }}, {{
  919. "temp": 30.9,
  920. }},
  921. },
  922. M: map[string]interface{}{
  923. "op_3_project_0_exceptions_total": int64(0),
  924. "op_3_project_0_process_latency_us": int64(0),
  925. "op_3_project_0_records_in_total": int64(4),
  926. "op_3_project_0_records_out_total": int64(4),
  927. "sink_mockSink_0_exceptions_total": int64(0),
  928. "sink_mockSink_0_records_in_total": int64(4),
  929. "sink_mockSink_0_records_out_total": int64(4),
  930. "source_sessionDemoE_0_exceptions_total": int64(0),
  931. "source_sessionDemoE_0_records_in_total": int64(12),
  932. "source_sessionDemoE_0_records_out_total": int64(12),
  933. "op_2_window_0_exceptions_total": int64(0),
  934. "op_2_window_0_process_latency_us": int64(0),
  935. "op_2_window_0_records_in_total": int64(12),
  936. "op_2_window_0_records_out_total": int64(4),
  937. },
  938. },
  939. {
  940. Name: `TestEventWindowRule6`,
  941. Sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  942. R: [][]map[string]interface{}{
  943. {{
  944. "m": 25.5,
  945. "c": float64(1),
  946. }}, {{
  947. "m": 25.5,
  948. "c": float64(1),
  949. }}, {{
  950. "m": 28.1,
  951. "c": float64(1),
  952. }}, {{
  953. "m": 28.1,
  954. "c": float64(2),
  955. }}, {{
  956. "m": 27.4,
  957. "c": float64(2),
  958. }},
  959. },
  960. M: map[string]interface{}{
  961. "op_5_project_0_exceptions_total": int64(0),
  962. "op_5_project_0_process_latency_us": int64(0),
  963. "op_5_project_0_records_in_total": int64(5),
  964. "op_5_project_0_records_out_total": int64(5),
  965. "sink_mockSink_0_exceptions_total": int64(0),
  966. "sink_mockSink_0_records_in_total": int64(5),
  967. "sink_mockSink_0_records_out_total": int64(5),
  968. "source_demoE_0_exceptions_total": int64(0),
  969. "source_demoE_0_records_in_total": int64(6),
  970. "source_demoE_0_records_out_total": int64(6),
  971. "source_demo1E_0_exceptions_total": int64(0),
  972. "source_demo1E_0_records_in_total": int64(6),
  973. "source_demo1E_0_records_out_total": int64(6),
  974. "op_3_window_0_exceptions_total": int64(0),
  975. "op_3_window_0_records_in_total": int64(12),
  976. "op_3_window_0_records_out_total": int64(5),
  977. "op_4_join_0_exceptions_total": int64(0),
  978. "op_4_join_0_process_latency_us": int64(0),
  979. "op_4_join_0_records_in_total": int64(5),
  980. "op_4_join_0_records_out_total": int64(5),
  981. },
  982. },
  983. {
  984. Name: `TestEventWindowRule7`,
  985. Sql: `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  986. R: [][]map[string]interface{}{
  987. {{
  988. "error": "error in preprocessor: field color type mismatch: cannot convert int(2) to string",
  989. }},
  990. {{
  991. "color": "red",
  992. "size": float64(3),
  993. "ts": float64(1541152486013),
  994. }},
  995. {{
  996. "color": "red",
  997. "size": float64(3),
  998. "ts": float64(1541152486013),
  999. }},
  1000. {{
  1001. "color": "yellow",
  1002. "size": float64(4),
  1003. "ts": float64(1541152488442),
  1004. }},
  1005. {{
  1006. "color": "yellow",
  1007. "size": float64(4),
  1008. "ts": float64(1541152488442),
  1009. }, {
  1010. "color": "red",
  1011. "size": float64(1),
  1012. "ts": float64(1541152489252),
  1013. }},
  1014. {{
  1015. "color": "red",
  1016. "size": float64(1),
  1017. "ts": float64(1541152489252),
  1018. }},
  1019. },
  1020. M: map[string]interface{}{
  1021. "op_3_project_0_exceptions_total": int64(1),
  1022. "op_3_project_0_process_latency_us": int64(0),
  1023. "op_3_project_0_records_in_total": int64(6),
  1024. "op_3_project_0_records_out_total": int64(5),
  1025. "sink_mockSink_0_exceptions_total": int64(0),
  1026. "sink_mockSink_0_records_in_total": int64(6),
  1027. "sink_mockSink_0_records_out_total": int64(6),
  1028. "source_demoErr_0_exceptions_total": int64(1),
  1029. "source_demoErr_0_records_in_total": int64(6),
  1030. "source_demoErr_0_records_out_total": int64(6),
  1031. "op_2_window_0_exceptions_total": int64(1),
  1032. "op_2_window_0_process_latency_us": int64(0),
  1033. "op_2_window_0_records_in_total": int64(6),
  1034. "op_2_window_0_records_out_total": int64(5),
  1035. },
  1036. },
  1037. {
  1038. Name: `TestEventWindowRule8`,
  1039. Sql: `SELECT temp, window_start(), window_end() FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1040. R: [][]map[string]interface{}{
  1041. {{
  1042. "temp": 25.5,
  1043. "window_start": float64(1541152486013),
  1044. "window_end": float64(1541152487013),
  1045. }}, {{
  1046. "temp": 28.1,
  1047. "window_start": float64(1541152487932),
  1048. "window_end": float64(1541152490000),
  1049. }, {
  1050. "temp": 27.4,
  1051. "window_start": float64(1541152487932),
  1052. "window_end": float64(1541152490000),
  1053. }, {
  1054. "temp": 25.5,
  1055. "window_start": float64(1541152487932),
  1056. "window_end": float64(1541152490000),
  1057. }}, {{
  1058. "temp": 26.2,
  1059. "window_start": float64(1541152490000),
  1060. "window_end": float64(1541152494000),
  1061. }, {
  1062. "temp": 26.8,
  1063. "window_start": float64(1541152490000),
  1064. "window_end": float64(1541152494000),
  1065. }, {
  1066. "temp": 28.9,
  1067. "window_start": float64(1541152490000),
  1068. "window_end": float64(1541152494000),
  1069. }, {
  1070. "temp": 29.1,
  1071. "window_start": float64(1541152490000),
  1072. "window_end": float64(1541152494000),
  1073. }, {
  1074. "temp": 32.2,
  1075. "window_start": float64(1541152490000),
  1076. "window_end": float64(1541152494000),
  1077. }}, {{
  1078. "temp": 30.9,
  1079. "window_start": float64(1541152494000),
  1080. "window_end": float64(1541152495112),
  1081. }},
  1082. },
  1083. M: map[string]interface{}{
  1084. "op_3_project_0_exceptions_total": int64(0),
  1085. "op_3_project_0_process_latency_us": int64(0),
  1086. "op_3_project_0_records_in_total": int64(4),
  1087. "op_3_project_0_records_out_total": int64(4),
  1088. "sink_mockSink_0_exceptions_total": int64(0),
  1089. "sink_mockSink_0_records_in_total": int64(4),
  1090. "sink_mockSink_0_records_out_total": int64(4),
  1091. "source_sessionDemoE_0_exceptions_total": int64(0),
  1092. "source_sessionDemoE_0_records_in_total": int64(12),
  1093. "source_sessionDemoE_0_records_out_total": int64(12),
  1094. "op_2_window_0_exceptions_total": int64(0),
  1095. "op_2_window_0_process_latency_us": int64(0),
  1096. "op_2_window_0_records_in_total": int64(12),
  1097. "op_2_window_0_records_out_total": int64(4),
  1098. },
  1099. },
  1100. {
  1101. Name: `TestEventWindowRule9`,
  1102. Sql: `SELECT window_end(), color, window_start() FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1103. R: [][]map[string]interface{}{
  1104. {{
  1105. "color": "red",
  1106. "window_start": float64(1541152485013),
  1107. "window_end": float64(1541152487000),
  1108. }},
  1109. {{
  1110. "color": "red",
  1111. "window_start": float64(1541152486000),
  1112. "window_end": float64(1541152488000),
  1113. }, {
  1114. "color": "blue",
  1115. "window_start": float64(1541152486000),
  1116. "window_end": float64(1541152488000),
  1117. }},
  1118. {{
  1119. "color": "blue",
  1120. "window_start": float64(1541152487000),
  1121. "window_end": float64(1541152489000),
  1122. }, {
  1123. "color": "yellow",
  1124. "window_start": float64(1541152487000),
  1125. "window_end": float64(1541152489000),
  1126. }},
  1127. {{
  1128. "color": "yellow",
  1129. "window_start": float64(1541152488000),
  1130. "window_end": float64(1541152490000),
  1131. }, {
  1132. "color": "red",
  1133. "window_start": float64(1541152488000),
  1134. "window_end": float64(1541152490000),
  1135. }},
  1136. {{
  1137. "color": "red",
  1138. "window_start": float64(1541152489000),
  1139. "window_end": float64(1541152491000),
  1140. }},
  1141. },
  1142. M: map[string]interface{}{
  1143. "op_3_project_0_exceptions_total": int64(0),
  1144. "op_3_project_0_process_latency_us": int64(0),
  1145. "op_3_project_0_records_in_total": int64(5),
  1146. "op_3_project_0_records_out_total": int64(5),
  1147. "sink_mockSink_0_exceptions_total": int64(0),
  1148. "sink_mockSink_0_records_in_total": int64(5),
  1149. "sink_mockSink_0_records_out_total": int64(5),
  1150. "source_demoE_0_exceptions_total": int64(0),
  1151. "source_demoE_0_records_in_total": int64(6),
  1152. "source_demoE_0_records_out_total": int64(6),
  1153. "op_2_window_0_exceptions_total": int64(0),
  1154. "op_2_window_0_process_latency_us": int64(0),
  1155. "op_2_window_0_records_in_total": int64(6),
  1156. "op_2_window_0_records_out_total": int64(5),
  1157. },
  1158. },
  1159. }
  1160. HandleStream(true, streamList, t)
  1161. options := []*api.RuleOption{
  1162. {
  1163. BufferLength: 100,
  1164. SendError: true,
  1165. IsEventTime: true,
  1166. LateTol: 1000,
  1167. }, {
  1168. BufferLength: 100,
  1169. SendError: true,
  1170. Qos: api.AtLeastOnce,
  1171. CheckpointInterval: 5000,
  1172. IsEventTime: true,
  1173. LateTol: 1000,
  1174. }, {
  1175. BufferLength: 100,
  1176. SendError: true,
  1177. Qos: api.ExactlyOnce,
  1178. CheckpointInterval: 5000,
  1179. IsEventTime: true,
  1180. LateTol: 1000,
  1181. },
  1182. }
  1183. for j, opt := range options {
  1184. DoRuleTest(t, tests, j, opt, 10)
  1185. }
  1186. }
  1187. func TestWindowError(t *testing.T) {
  1188. // Reset
  1189. streamList := []string{"ldemo", "ldemo1"}
  1190. HandleStream(false, streamList, t)
  1191. tests := []RuleTest{
  1192. {
  1193. Name: `TestWindowErrorRule1`,
  1194. Sql: `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
  1195. R: [][]map[string]interface{}{
  1196. {{
  1197. "error": "run Select error: invalid operation string(string) * int64(3)",
  1198. }}, {{
  1199. "kuiper_field_0": float64(6),
  1200. }, {}},
  1201. },
  1202. M: map[string]interface{}{
  1203. "op_3_project_0_exceptions_total": int64(1),
  1204. "op_3_project_0_process_latency_us": int64(0),
  1205. "op_3_project_0_records_in_total": int64(2),
  1206. "op_3_project_0_records_out_total": int64(1),
  1207. "sink_mockSink_0_exceptions_total": int64(0),
  1208. "sink_mockSink_0_records_in_total": int64(2),
  1209. "sink_mockSink_0_records_out_total": int64(2),
  1210. "source_ldemo_0_exceptions_total": int64(0),
  1211. "source_ldemo_0_records_in_total": int64(5),
  1212. "source_ldemo_0_records_out_total": int64(5),
  1213. "op_2_window_0_exceptions_total": int64(0),
  1214. "op_2_window_0_process_latency_us": int64(0),
  1215. "op_2_window_0_records_in_total": int64(5),
  1216. "op_2_window_0_records_out_total": int64(2),
  1217. },
  1218. }, {
  1219. Name: `TestWindowErrorRule2`,
  1220. Sql: `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1221. R: [][]map[string]interface{}{
  1222. {{
  1223. "error": "run Where error: invalid operation string(string) > int64(2)",
  1224. }}, {{
  1225. "color": "red",
  1226. "ts": float64(1541152486013),
  1227. }}, {{
  1228. "ts": float64(1541152487632),
  1229. }}, {}, {},
  1230. },
  1231. M: map[string]interface{}{
  1232. "op_4_project_0_exceptions_total": int64(1),
  1233. "op_4_project_0_process_latency_us": int64(0),
  1234. "op_4_project_0_records_in_total": int64(5),
  1235. "op_4_project_0_records_out_total": int64(4),
  1236. "sink_mockSink_0_exceptions_total": int64(0),
  1237. "sink_mockSink_0_records_in_total": int64(5),
  1238. "sink_mockSink_0_records_out_total": int64(5),
  1239. "source_ldemo_0_exceptions_total": int64(0),
  1240. "source_ldemo_0_records_in_total": int64(5),
  1241. "source_ldemo_0_records_out_total": int64(5),
  1242. "op_3_window_0_exceptions_total": int64(1),
  1243. "op_3_window_0_process_latency_us": int64(0),
  1244. "op_3_window_0_records_in_total": int64(3),
  1245. "op_3_window_0_records_out_total": int64(4),
  1246. "op_2_filter_0_exceptions_total": int64(1),
  1247. "op_2_filter_0_process_latency_us": int64(0),
  1248. "op_2_filter_0_records_in_total": int64(5),
  1249. "op_2_filter_0_records_out_total": int64(2),
  1250. },
  1251. }, {
  1252. Name: `TestWindowErrorRule3`,
  1253. Sql: `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1254. R: [][]map[string]interface{}{
  1255. {{
  1256. "color": "red",
  1257. "temp": 25.5,
  1258. "ts": float64(1541152486013),
  1259. }}, {{
  1260. "color": "red",
  1261. "temp": 25.5,
  1262. "ts": float64(1541152486013),
  1263. }}, {{
  1264. "color": "red",
  1265. "temp": 25.5,
  1266. "ts": float64(1541152486013),
  1267. }}, {{
  1268. "temp": 28.1,
  1269. "ts": float64(1541152487632),
  1270. }}, {{
  1271. "temp": 28.1,
  1272. "ts": float64(1541152487632),
  1273. }}, {{
  1274. "error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
  1275. }}, {{
  1276. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1277. }}, {{
  1278. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1279. }},
  1280. },
  1281. M: map[string]interface{}{
  1282. "op_5_project_0_exceptions_total": int64(3),
  1283. "op_5_project_0_process_latency_us": int64(0),
  1284. "op_5_project_0_records_in_total": int64(8),
  1285. "op_5_project_0_records_out_total": int64(5),
  1286. "sink_mockSink_0_exceptions_total": int64(0),
  1287. "sink_mockSink_0_records_in_total": int64(8),
  1288. "sink_mockSink_0_records_out_total": int64(8),
  1289. "source_ldemo_0_exceptions_total": int64(0),
  1290. "source_ldemo_0_records_in_total": int64(5),
  1291. "source_ldemo_0_records_out_total": int64(5),
  1292. "source_ldemo1_0_exceptions_total": int64(0),
  1293. "source_ldemo1_0_records_in_total": int64(5),
  1294. "source_ldemo1_0_records_out_total": int64(5),
  1295. "op_3_window_0_exceptions_total": int64(0),
  1296. "op_3_window_0_process_latency_us": int64(0),
  1297. "op_3_window_0_records_in_total": int64(10),
  1298. "op_3_window_0_records_out_total": int64(10),
  1299. "op_4_join_0_exceptions_total": int64(3),
  1300. "op_4_join_0_process_latency_us": int64(0),
  1301. "op_4_join_0_records_in_total": int64(10),
  1302. "op_4_join_0_records_out_total": int64(5),
  1303. },
  1304. }, {
  1305. Name: `TestWindowErrorRule4`,
  1306. Sql: `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having collect(size)[0] >= 2 order by color`,
  1307. R: [][]map[string]interface{}{
  1308. {{
  1309. "color": "red",
  1310. }}, {{
  1311. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1312. }}, {{
  1313. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1314. }}, {{
  1315. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1316. }}, {{
  1317. "color": float64(49),
  1318. }, {}},
  1319. },
  1320. M: map[string]interface{}{
  1321. "op_6_project_0_exceptions_total": int64(3),
  1322. "op_6_project_0_process_latency_us": int64(0),
  1323. "op_6_project_0_records_in_total": int64(5),
  1324. "op_6_project_0_records_out_total": int64(2),
  1325. "sink_mockSink_0_exceptions_total": int64(0),
  1326. "sink_mockSink_0_records_in_total": int64(5),
  1327. "sink_mockSink_0_records_out_total": int64(5),
  1328. "source_ldemo_0_exceptions_total": int64(0),
  1329. "source_ldemo_0_records_in_total": int64(5),
  1330. "source_ldemo_0_records_out_total": int64(5),
  1331. "op_2_window_0_exceptions_total": int64(0),
  1332. "op_2_window_0_process_latency_us": int64(0),
  1333. "op_2_window_0_records_in_total": int64(5),
  1334. "op_2_window_0_records_out_total": int64(5),
  1335. "op_3_aggregate_0_exceptions_total": int64(0),
  1336. "op_3_aggregate_0_process_latency_us": int64(0),
  1337. "op_3_aggregate_0_records_in_total": int64(5),
  1338. "op_3_aggregate_0_records_out_total": int64(5),
  1339. "op_4_having_0_exceptions_total": int64(3),
  1340. "op_4_having_0_process_latency_us": int64(0),
  1341. "op_4_having_0_records_in_total": int64(5),
  1342. "op_4_having_0_records_out_total": int64(2),
  1343. },
  1344. }, {
  1345. Name: `TestWindowErrorRule5`,
  1346. Sql: `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
  1347. R: [][]map[string]interface{}{
  1348. {{
  1349. "error": "run Order By error: incompatible types for comparison: int and string",
  1350. }}, {{
  1351. "size": float64(3),
  1352. }}, {{
  1353. "color": float64(49),
  1354. "size": float64(2),
  1355. }}, {{
  1356. "color": "red",
  1357. }},
  1358. },
  1359. M: map[string]interface{}{
  1360. "op_4_project_0_exceptions_total": int64(1),
  1361. "op_4_project_0_process_latency_us": int64(0),
  1362. "op_4_project_0_records_in_total": int64(4),
  1363. "op_4_project_0_records_out_total": int64(3),
  1364. "sink_mockSink_0_exceptions_total": int64(0),
  1365. "sink_mockSink_0_records_in_total": int64(4),
  1366. "sink_mockSink_0_records_out_total": int64(4),
  1367. "source_ldemo_0_exceptions_total": int64(0),
  1368. "source_ldemo_0_records_in_total": int64(5),
  1369. "source_ldemo_0_records_out_total": int64(5),
  1370. "op_2_window_0_exceptions_total": int64(0),
  1371. "op_2_window_0_process_latency_us": int64(0),
  1372. "op_2_window_0_records_in_total": int64(5),
  1373. "op_2_window_0_records_out_total": int64(4),
  1374. "op_3_order_0_exceptions_total": int64(1),
  1375. "op_3_order_0_process_latency_us": int64(0),
  1376. "op_3_order_0_records_in_total": int64(4),
  1377. "op_3_order_0_records_out_total": int64(3),
  1378. },
  1379. },
  1380. }
  1381. HandleStream(true, streamList, t)
  1382. DoRuleTest(t, tests, 0, &api.RuleOption{
  1383. BufferLength: 100,
  1384. SendError: true,
  1385. }, 0)
  1386. }