window_rule_test.go 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "github.com/lf-edge/ekuiper/pkg/api"
  17. "testing"
  18. )
  19. func TestWindow(t *testing.T) {
  20. //Reset
  21. streamList := []string{"demo", "demoError", "demo1", "sessionDemo", "table1"}
  22. HandleStream(false, streamList, t)
  23. var tests = []RuleTest{
  24. {
  25. Name: `TestWindowRule1`,
  26. Sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  27. R: [][]map[string]interface{}{
  28. {{
  29. "color": "red",
  30. "size": float64(3),
  31. "ts": float64(1541152486013),
  32. }, {
  33. "color": "blue",
  34. "size": float64(6),
  35. "ts": float64(1541152486822),
  36. }},
  37. {{
  38. "color": "red",
  39. "size": float64(3),
  40. "ts": float64(1541152486013),
  41. }, {
  42. "color": "blue",
  43. "size": float64(6),
  44. "ts": float64(1541152486822),
  45. }, {
  46. "color": "blue",
  47. "size": float64(2),
  48. "ts": float64(1541152487632),
  49. }},
  50. {{
  51. "color": "blue",
  52. "size": float64(2),
  53. "ts": float64(1541152487632),
  54. }, {
  55. "color": "yellow",
  56. "size": float64(4),
  57. "ts": float64(1541152488442),
  58. }},
  59. {{
  60. "color": "yellow",
  61. "size": float64(4),
  62. "ts": float64(1541152488442),
  63. }, {
  64. "color": "red",
  65. "size": float64(1),
  66. "ts": float64(1541152489252),
  67. }},
  68. },
  69. M: map[string]interface{}{
  70. "op_3_project_0_exceptions_total": int64(0),
  71. "op_3_project_0_process_latency_us": int64(0),
  72. "op_3_project_0_records_in_total": int64(4),
  73. "op_3_project_0_records_out_total": int64(4),
  74. "sink_mockSink_0_exceptions_total": int64(0),
  75. "sink_mockSink_0_records_in_total": int64(4),
  76. "sink_mockSink_0_records_out_total": int64(4),
  77. "source_demo_0_exceptions_total": int64(0),
  78. "source_demo_0_records_in_total": int64(5),
  79. "source_demo_0_records_out_total": int64(5),
  80. "op_2_window_0_exceptions_total": int64(0),
  81. "op_2_window_0_process_latency_us": int64(0),
  82. "op_2_window_0_records_in_total": int64(5),
  83. "op_2_window_0_records_out_total": int64(4),
  84. },
  85. }, {
  86. Name: `TestWindowRule2`,
  87. Sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  88. R: [][]map[string]interface{}{
  89. {{
  90. "color": "red",
  91. "ts": float64(1541152486013),
  92. }, {
  93. "color": "blue",
  94. "ts": float64(1541152486822),
  95. }},
  96. {{
  97. "color": "yellow",
  98. "ts": float64(1541152488442),
  99. }},
  100. },
  101. M: map[string]interface{}{
  102. "op_4_project_0_exceptions_total": int64(0),
  103. "op_4_project_0_process_latency_us": int64(0),
  104. "op_4_project_0_records_in_total": int64(2),
  105. "op_4_project_0_records_out_total": int64(2),
  106. "sink_mockSink_0_exceptions_total": int64(0),
  107. "sink_mockSink_0_records_in_total": int64(2),
  108. "sink_mockSink_0_records_out_total": int64(2),
  109. "source_demo_0_exceptions_total": int64(0),
  110. "source_demo_0_records_in_total": int64(5),
  111. "source_demo_0_records_out_total": int64(5),
  112. "op_3_window_0_exceptions_total": int64(0),
  113. "op_3_window_0_process_latency_us": int64(0),
  114. "op_3_window_0_records_in_total": int64(3),
  115. "op_3_window_0_records_out_total": int64(2),
  116. "op_2_filter_0_exceptions_total": int64(0),
  117. "op_2_filter_0_process_latency_us": int64(0),
  118. "op_2_filter_0_records_in_total": int64(5),
  119. "op_2_filter_0_records_out_total": int64(3),
  120. },
  121. }, {
  122. Name: `TestWindowRule3`,
  123. Sql: `SELECT color, temp, demo.ts as ts1, demo1.ts as ts2, demo.ts - demo1.ts as diff FROM demo INNER JOIN demo1 ON ts1 = ts2 GROUP BY SlidingWindow(ss, 1)`,
  124. R: [][]map[string]interface{}{
  125. {{
  126. "color": "red",
  127. "temp": 25.5,
  128. "ts1": float64(1541152486013),
  129. "ts2": float64(1541152486013),
  130. "diff": float64(0),
  131. }}, {{
  132. "color": "red",
  133. "temp": 25.5,
  134. "ts1": float64(1541152486013),
  135. "ts2": float64(1541152486013),
  136. "diff": float64(0),
  137. }}, {{
  138. "color": "red",
  139. "temp": 25.5,
  140. "ts1": float64(1541152486013),
  141. "ts2": float64(1541152486013),
  142. "diff": float64(0),
  143. }}, {{
  144. "color": "blue",
  145. "temp": 28.1,
  146. "ts1": float64(1541152487632),
  147. "ts2": float64(1541152487632),
  148. "diff": float64(0),
  149. }}, {{
  150. "color": "blue",
  151. "temp": 28.1,
  152. "ts1": float64(1541152487632),
  153. "ts2": float64(1541152487632),
  154. "diff": float64(0),
  155. }}, {{
  156. "color": "blue",
  157. "temp": 28.1,
  158. "ts1": float64(1541152487632),
  159. "ts2": float64(1541152487632),
  160. "diff": float64(0),
  161. }, {
  162. "color": "yellow",
  163. "temp": 27.4,
  164. "ts1": float64(1541152488442),
  165. "ts2": float64(1541152488442),
  166. "diff": float64(0),
  167. }}, {{
  168. "color": "yellow",
  169. "temp": 27.4,
  170. "ts1": float64(1541152488442),
  171. "ts2": float64(1541152488442),
  172. "diff": float64(0),
  173. }}, {{
  174. "color": "yellow",
  175. "temp": 27.4,
  176. "ts1": float64(1541152488442),
  177. "ts2": float64(1541152488442),
  178. "diff": float64(0),
  179. }, {
  180. "color": "red",
  181. "temp": 25.5,
  182. "ts1": float64(1541152489252),
  183. "ts2": float64(1541152489252),
  184. "diff": float64(0),
  185. }},
  186. },
  187. M: map[string]interface{}{
  188. "op_5_project_0_exceptions_total": int64(0),
  189. "op_5_project_0_process_latency_us": int64(0),
  190. "op_5_project_0_records_in_total": int64(8),
  191. "op_5_project_0_records_out_total": int64(8),
  192. "sink_mockSink_0_exceptions_total": int64(0),
  193. "sink_mockSink_0_records_in_total": int64(8),
  194. "sink_mockSink_0_records_out_total": int64(8),
  195. "source_demo_0_exceptions_total": int64(0),
  196. "source_demo_0_records_in_total": int64(5),
  197. "source_demo_0_records_out_total": int64(5),
  198. "source_demo1_0_exceptions_total": int64(0),
  199. "source_demo1_0_records_in_total": int64(5),
  200. "source_demo1_0_records_out_total": int64(5),
  201. "op_3_window_0_exceptions_total": int64(0),
  202. "op_3_window_0_process_latency_us": int64(0),
  203. "op_3_window_0_records_in_total": int64(10),
  204. "op_3_window_0_records_out_total": int64(10),
  205. "op_4_join_0_exceptions_total": int64(0),
  206. "op_4_join_0_process_latency_us": int64(0),
  207. "op_4_join_0_records_in_total": int64(10),
  208. "op_4_join_0_records_out_total": int64(8),
  209. },
  210. T: &api.PrintableTopo{
  211. Sources: []string{"source_demo", "source_demo1"},
  212. Edges: map[string][]string{
  213. "source_demo": {"op_3_window"},
  214. "source_demo1": {"op_3_window"},
  215. "op_3_window": {"op_4_join"},
  216. "op_4_join": {"op_5_project"},
  217. "op_5_project": {"sink_mockSink"},
  218. },
  219. },
  220. }, {
  221. Name: `TestWindowRule4`,
  222. Sql: `SELECT color, count(*) as c FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  223. R: [][]map[string]interface{}{
  224. {{
  225. "color": "red",
  226. "c": float64(1),
  227. }}, {{
  228. "color": "blue",
  229. "c": float64(1),
  230. }, {
  231. "color": "red",
  232. "c": float64(1),
  233. }}, {{
  234. "color": "blue",
  235. "c": float64(2),
  236. }, {
  237. "color": "red",
  238. "c": float64(1),
  239. }}, {{
  240. "color": "blue",
  241. "c": float64(2),
  242. }, {
  243. "color": "yellow",
  244. "c": float64(1),
  245. }}, {{
  246. "color": "blue",
  247. "c": float64(1),
  248. }, {
  249. "color": "red",
  250. "c": float64(1),
  251. }, {
  252. "color": "yellow",
  253. "c": float64(1),
  254. }},
  255. },
  256. M: map[string]interface{}{
  257. "op_5_project_0_exceptions_total": int64(0),
  258. "op_5_project_0_process_latency_us": int64(0),
  259. "op_5_project_0_records_in_total": int64(5),
  260. "op_5_project_0_records_out_total": int64(5),
  261. "sink_mockSink_0_exceptions_total": int64(0),
  262. "sink_mockSink_0_records_in_total": int64(5),
  263. "sink_mockSink_0_records_out_total": int64(5),
  264. "source_demo_0_exceptions_total": int64(0),
  265. "source_demo_0_records_in_total": int64(5),
  266. "source_demo_0_records_out_total": int64(5),
  267. "op_2_window_0_exceptions_total": int64(0),
  268. "op_2_window_0_process_latency_us": int64(0),
  269. "op_2_window_0_records_in_total": int64(5),
  270. "op_2_window_0_records_out_total": int64(5),
  271. "op_3_aggregate_0_exceptions_total": int64(0),
  272. "op_3_aggregate_0_process_latency_us": int64(0),
  273. "op_3_aggregate_0_records_in_total": int64(5),
  274. "op_3_aggregate_0_records_out_total": int64(5),
  275. "op_4_order_0_exceptions_total": int64(0),
  276. "op_4_order_0_process_latency_us": int64(0),
  277. "op_4_order_0_records_in_total": int64(5),
  278. "op_4_order_0_records_out_total": int64(5),
  279. },
  280. }, {
  281. Name: `TestWindowRule5`,
  282. Sql: `SELECT count(temp), window_start() as ws, window_end() FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  283. R: [][]map[string]interface{}{
  284. {{
  285. "count": float64(2),
  286. "ws": float64(1541152486013),
  287. "window_end": float64(1541152487823), // timeout
  288. }}, {{
  289. "count": float64(3),
  290. "ws": float64(1541152487932),
  291. "window_end": float64(1541152490000), // tick
  292. }}, {{
  293. "count": float64(5),
  294. "ws": float64(1541152490000),
  295. "window_end": float64(1541152494000), // tick
  296. }}, {{
  297. "count": float64(1),
  298. "ws": float64(1541152494000),
  299. "window_end": float64(1541152495112), // timeout
  300. }},
  301. },
  302. M: map[string]interface{}{
  303. "op_3_project_0_exceptions_total": int64(0),
  304. "op_3_project_0_process_latency_us": int64(0),
  305. "op_3_project_0_records_in_total": int64(4),
  306. "op_3_project_0_records_out_total": int64(4),
  307. "sink_mockSink_0_exceptions_total": int64(0),
  308. "sink_mockSink_0_records_in_total": int64(4),
  309. "sink_mockSink_0_records_out_total": int64(4),
  310. "source_sessionDemo_0_exceptions_total": int64(0),
  311. "source_sessionDemo_0_records_in_total": int64(11),
  312. "source_sessionDemo_0_records_out_total": int64(11),
  313. "op_2_window_0_exceptions_total": int64(0),
  314. "op_2_window_0_process_latency_us": int64(0),
  315. "op_2_window_0_records_in_total": int64(11),
  316. "op_2_window_0_records_out_total": int64(4),
  317. },
  318. }, {
  319. Name: `TestWindowRule6`,
  320. Sql: `SELECT window_end(), sum(temp) as temp, count(color) as c, window_start() FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  321. R: [][]map[string]interface{}{
  322. {{
  323. "temp": 25.5,
  324. "c": float64(1),
  325. "window_start": float64(1541152485115),
  326. "window_end": float64(1541152486115),
  327. }}, {{
  328. "temp": 25.5,
  329. "c": float64(1),
  330. "window_start": float64(1541152485822),
  331. "window_end": float64(1541152486822),
  332. }}, {{
  333. "temp": 25.5,
  334. "c": float64(1),
  335. "window_start": float64(1541152485903),
  336. "window_end": float64(1541152486903),
  337. }}, {{
  338. "temp": 28.1,
  339. "c": float64(1),
  340. "window_start": float64(1541152486702),
  341. "window_end": float64(1541152487702),
  342. }}, {{
  343. "temp": 28.1,
  344. "c": float64(1),
  345. "window_start": float64(1541152487442),
  346. "window_end": float64(1541152488442),
  347. }}, {{
  348. "temp": 55.5,
  349. "c": float64(2),
  350. "window_start": float64(1541152487605),
  351. "window_end": float64(1541152488605),
  352. }}, {{
  353. "temp": 27.4,
  354. "c": float64(1),
  355. "window_start": float64(1541152488252),
  356. "window_end": float64(1541152489252),
  357. }}, {{
  358. "temp": 52.9,
  359. "c": float64(2),
  360. "window_start": float64(1541152488305),
  361. "window_end": float64(1541152489305),
  362. }},
  363. },
  364. M: map[string]interface{}{
  365. "op_5_project_0_exceptions_total": int64(0),
  366. "op_5_project_0_process_latency_us": int64(0),
  367. "op_5_project_0_records_in_total": int64(8),
  368. "op_5_project_0_records_out_total": int64(8),
  369. "sink_mockSink_0_exceptions_total": int64(0),
  370. "sink_mockSink_0_records_in_total": int64(8),
  371. "sink_mockSink_0_records_out_total": int64(8),
  372. "source_demo_0_exceptions_total": int64(0),
  373. "source_demo_0_records_in_total": int64(5),
  374. "source_demo_0_records_out_total": int64(5),
  375. "source_demo1_0_exceptions_total": int64(0),
  376. "source_demo1_0_records_in_total": int64(5),
  377. "source_demo1_0_records_out_total": int64(5),
  378. "op_3_window_0_exceptions_total": int64(0),
  379. "op_3_window_0_process_latency_us": int64(0),
  380. "op_3_window_0_records_in_total": int64(10),
  381. "op_3_window_0_records_out_total": int64(10),
  382. "op_4_join_0_exceptions_total": int64(0),
  383. "op_4_join_0_process_latency_us": int64(0),
  384. "op_4_join_0_records_in_total": int64(10),
  385. "op_4_join_0_records_out_total": int64(8),
  386. },
  387. }, {
  388. Name: `TestWindowRule7`,
  389. Sql: `SELECT * FROM demoError GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  390. R: [][]map[string]interface{}{
  391. {{
  392. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  393. }},
  394. {{
  395. "color": "blue",
  396. "size": float64(6),
  397. "ts": float64(1541152486822),
  398. }},
  399. {{
  400. "color": "blue",
  401. "size": float64(6),
  402. "ts": float64(1541152486822),
  403. }, {
  404. "color": "blue",
  405. "size": float64(2),
  406. "ts": float64(1541152487632),
  407. }},
  408. {{
  409. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  410. }},
  411. {{
  412. "color": "blue",
  413. "size": float64(2),
  414. "ts": float64(1541152487632),
  415. }},
  416. {{
  417. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  418. }},
  419. },
  420. M: map[string]interface{}{
  421. "op_3_project_0_exceptions_total": int64(3),
  422. "op_3_project_0_process_latency_us": int64(0),
  423. "op_3_project_0_records_in_total": int64(6),
  424. "op_3_project_0_records_out_total": int64(3),
  425. "sink_mockSink_0_exceptions_total": int64(0),
  426. "sink_mockSink_0_records_in_total": int64(6),
  427. "sink_mockSink_0_records_out_total": int64(6),
  428. "source_demoError_0_exceptions_total": int64(3),
  429. "source_demoError_0_records_in_total": int64(5),
  430. "source_demoError_0_records_out_total": int64(5),
  431. "op_2_window_0_exceptions_total": int64(3),
  432. "op_2_window_0_process_latency_us": int64(0),
  433. "op_2_window_0_records_in_total": int64(5),
  434. "op_2_window_0_records_out_total": int64(3),
  435. },
  436. }, {
  437. Name: `TestWindowRule8`,
  438. Sql: `SELECT color, window_end(), ts, count(*) as c, window_start() FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
  439. R: [][]map[string]interface{}{
  440. {{
  441. "color": "red",
  442. "ts": float64(1541152486013),
  443. "c": float64(2),
  444. "window_start": float64(1541152486000),
  445. "window_end": float64(1541152487000),
  446. }},
  447. },
  448. M: map[string]interface{}{
  449. "op_5_project_0_exceptions_total": int64(0),
  450. "op_5_project_0_process_latency_us": int64(0),
  451. "op_5_project_0_records_in_total": int64(1),
  452. "op_5_project_0_records_out_total": int64(1),
  453. "sink_mockSink_0_exceptions_total": int64(0),
  454. "sink_mockSink_0_records_in_total": int64(1),
  455. "sink_mockSink_0_records_out_total": int64(1),
  456. "source_demo_0_exceptions_total": int64(0),
  457. "source_demo_0_records_in_total": int64(5),
  458. "source_demo_0_records_out_total": int64(5),
  459. "op_3_window_0_exceptions_total": int64(0),
  460. "op_3_window_0_process_latency_us": int64(0),
  461. "op_3_window_0_records_in_total": int64(3),
  462. "op_3_window_0_records_out_total": int64(2),
  463. "op_2_filter_0_exceptions_total": int64(0),
  464. "op_2_filter_0_process_latency_us": int64(0),
  465. "op_2_filter_0_records_in_total": int64(5),
  466. "op_2_filter_0_records_out_total": int64(3),
  467. "op_4_having_0_exceptions_total": int64(0),
  468. "op_4_having_0_process_latency_us": int64(0),
  469. "op_4_having_0_records_in_total": int64(2),
  470. "op_4_having_0_records_out_total": int64(1),
  471. },
  472. }, {
  473. Name: `TestWindowRule9`,
  474. Sql: `SELECT color, window_start(), window_end() FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1) FILTER( WHERE size > 2)`,
  475. R: [][]map[string]interface{}{
  476. {{
  477. "color": "red",
  478. "window_start": float64(1541152485000),
  479. "window_end": float64(1541152487000),
  480. }, {
  481. "color": "blue",
  482. "window_start": float64(1541152485000),
  483. "window_end": float64(1541152487000),
  484. }},
  485. {{
  486. "color": "red",
  487. "window_start": float64(1541152486000),
  488. "window_end": float64(1541152488000),
  489. }, {
  490. "color": "blue",
  491. "window_start": float64(1541152486000),
  492. "window_end": float64(1541152488000),
  493. }},
  494. {{
  495. "color": "yellow",
  496. "window_start": float64(1541152487000),
  497. "window_end": float64(1541152489000),
  498. }},
  499. {{
  500. "color": "yellow",
  501. "window_start": float64(1541152488000),
  502. "window_end": float64(1541152490000),
  503. }},
  504. },
  505. M: map[string]interface{}{
  506. "op_4_project_0_exceptions_total": int64(0),
  507. "op_4_project_0_process_latency_us": int64(0),
  508. "op_4_project_0_records_in_total": int64(4),
  509. "op_4_project_0_records_out_total": int64(4),
  510. "sink_mockSink_0_exceptions_total": int64(0),
  511. "sink_mockSink_0_records_in_total": int64(4),
  512. "sink_mockSink_0_records_out_total": int64(4),
  513. "source_demo_0_exceptions_total": int64(0),
  514. "source_demo_0_records_in_total": int64(5),
  515. "source_demo_0_records_out_total": int64(5),
  516. "op_3_window_0_exceptions_total": int64(0),
  517. "op_3_window_0_process_latency_us": int64(0),
  518. "op_3_window_0_records_in_total": int64(3),
  519. "op_3_window_0_records_out_total": int64(4),
  520. },
  521. }, {
  522. Name: `TestCountWindowRule1`,
  523. Sql: `SELECT collect(*)[0]->color as c, window_end() as we FROM demo GROUP BY COUNTWINDOW(3)`,
  524. R: [][]map[string]interface{}{
  525. {{
  526. "c": "red",
  527. "we": 1.541152487632e+12,
  528. }},
  529. },
  530. M: map[string]interface{}{
  531. "op_3_project_0_exceptions_total": int64(0),
  532. "op_3_project_0_process_latency_us": int64(0),
  533. "op_3_project_0_records_in_total": int64(1),
  534. "op_3_project_0_records_out_total": int64(1),
  535. "sink_mockSink_0_exceptions_total": int64(0),
  536. "sink_mockSink_0_records_in_total": int64(1),
  537. "sink_mockSink_0_records_out_total": int64(1),
  538. "source_demo_0_exceptions_total": int64(0),
  539. "source_demo_0_records_in_total": int64(5),
  540. "source_demo_0_records_out_total": int64(5),
  541. "op_2_window_0_exceptions_total": int64(0),
  542. "op_2_window_0_process_latency_us": int64(0),
  543. "op_2_window_0_records_in_total": int64(5),
  544. "op_2_window_0_records_out_total": int64(1),
  545. },
  546. }, {
  547. Name: `TestWindowRule10`,
  548. Sql: `SELECT deduplicate(color, false)->color as c FROM demo GROUP BY SlidingWindow(hh, 1)`,
  549. R: [][]map[string]interface{}{
  550. {{
  551. "c": "red",
  552. }}, {{
  553. "c": "blue",
  554. }}, {{}}, {{
  555. "c": "yellow",
  556. }}, {{}},
  557. },
  558. M: map[string]interface{}{
  559. "op_3_project_0_exceptions_total": int64(0),
  560. "op_3_project_0_process_latency_us": int64(0),
  561. "op_3_project_0_records_in_total": int64(5),
  562. "op_3_project_0_records_out_total": int64(5),
  563. "sink_mockSink_0_exceptions_total": int64(0),
  564. "sink_mockSink_0_records_in_total": int64(5),
  565. "sink_mockSink_0_records_out_total": int64(5),
  566. "source_demo_0_exceptions_total": int64(0),
  567. "source_demo_0_records_in_total": int64(5),
  568. "source_demo_0_records_out_total": int64(5),
  569. "op_2_window_0_exceptions_total": int64(0),
  570. "op_2_window_0_process_latency_us": int64(0),
  571. "op_2_window_0_records_in_total": int64(5),
  572. "op_2_window_0_records_out_total": int64(5),
  573. },
  574. }, {
  575. Name: `TestWindowRule11`,
  576. Sql: `SELECT color, name, window_start(), window_end() FROM demo INNER JOIN table1 on demo.ts = table1.id where demo.size > 2 and table1.size > 1 GROUP BY tumblingwindow(ss, 1)`,
  577. R: [][]map[string]interface{}{
  578. {{
  579. "color": "red",
  580. "name": "name1",
  581. "window_start": float64(1541152486000),
  582. "window_end": float64(1541152487000),
  583. }},
  584. },
  585. M: map[string]interface{}{
  586. "op_2_window_0_exceptions_total": int64(0),
  587. "op_2_window_0_process_latency_us": int64(0),
  588. "op_2_window_0_records_in_total": int64(5),
  589. "op_2_window_0_records_out_total": int64(4),
  590. "op_4_join_aligner_0_records_in_total": int64(5),
  591. "op_4_join_aligner_0_records_out_total": int64(4),
  592. "op_5_join_0_exceptions_total": int64(0),
  593. "op_5_join_0_records_in_total": int64(4),
  594. "op_5_join_0_records_out_total": int64(1),
  595. "op_6_project_0_exceptions_total": int64(0),
  596. "op_6_project_0_records_in_total": int64(1),
  597. "op_6_project_0_records_out_total": int64(1),
  598. "sink_mockSink_0_exceptions_total": int64(0),
  599. "sink_mockSink_0_records_in_total": int64(1),
  600. "sink_mockSink_0_records_out_total": int64(1),
  601. "source_demo_0_exceptions_total": int64(0),
  602. "source_demo_0_records_in_total": int64(5),
  603. "source_demo_0_records_out_total": int64(5),
  604. "source_table1_0_exceptions_total": int64(0),
  605. "source_table1_0_records_in_total": int64(4),
  606. "source_table1_0_records_out_total": int64(1),
  607. },
  608. },
  609. }
  610. HandleStream(true, streamList, t)
  611. options := []*api.RuleOption{
  612. {
  613. BufferLength: 100,
  614. SendError: true,
  615. }, {
  616. BufferLength: 100,
  617. SendError: true,
  618. Qos: api.AtLeastOnce,
  619. CheckpointInterval: 5000,
  620. }, {
  621. BufferLength: 100,
  622. SendError: true,
  623. Qos: api.ExactlyOnce,
  624. CheckpointInterval: 5000,
  625. },
  626. }
  627. for j, opt := range options {
  628. DoRuleTest(t, tests, j, opt, 15)
  629. }
  630. }
  631. func TestEventWindow(t *testing.T) {
  632. //Reset
  633. streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE"}
  634. HandleStream(false, streamList, t)
  635. var tests = []RuleTest{
  636. {
  637. Name: `TestEventWindowRule1`,
  638. Sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  639. R: [][]map[string]interface{}{
  640. {{
  641. "color": "red",
  642. "size": float64(3),
  643. "ts": float64(1541152486013),
  644. }},
  645. {{
  646. "color": "red",
  647. "size": float64(3),
  648. "ts": float64(1541152486013),
  649. }, {
  650. "color": "blue",
  651. "size": float64(2),
  652. "ts": float64(1541152487632),
  653. }},
  654. {{
  655. "color": "blue",
  656. "size": float64(2),
  657. "ts": float64(1541152487632),
  658. }, {
  659. "color": "yellow",
  660. "size": float64(4),
  661. "ts": float64(1541152488442),
  662. }}, {{
  663. "color": "yellow",
  664. "size": float64(4),
  665. "ts": float64(1541152488442),
  666. }, {
  667. "color": "red",
  668. "size": float64(1),
  669. "ts": float64(1541152489252),
  670. }}, {{
  671. "color": "red",
  672. "size": float64(1),
  673. "ts": float64(1541152489252),
  674. }},
  675. },
  676. M: map[string]interface{}{
  677. "op_3_project_0_exceptions_total": int64(0),
  678. "op_3_project_0_process_latency_us": int64(0),
  679. "op_3_project_0_records_in_total": int64(5),
  680. "op_3_project_0_records_out_total": int64(5),
  681. "sink_mockSink_0_exceptions_total": int64(0),
  682. "sink_mockSink_0_records_in_total": int64(5),
  683. "sink_mockSink_0_records_out_total": int64(5),
  684. "source_demoE_0_exceptions_total": int64(0),
  685. "source_demoE_0_records_in_total": int64(6),
  686. "source_demoE_0_records_out_total": int64(6),
  687. "op_2_window_0_exceptions_total": int64(0),
  688. "op_2_window_0_process_latency_us": int64(0),
  689. "op_2_window_0_records_in_total": int64(6),
  690. "op_2_window_0_records_out_total": int64(5),
  691. },
  692. }, {
  693. Name: `TestEventWindowRule2`,
  694. Sql: `SELECT window_start(), window_end(), color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  695. R: [][]map[string]interface{}{
  696. {{
  697. "window_start": float64(1541152486000),
  698. "window_end": float64(1541152487000),
  699. "color": "red",
  700. "ts": float64(1541152486013),
  701. }},
  702. {{
  703. "window_start": float64(1541152488000),
  704. "window_end": float64(1541152489000),
  705. "color": "yellow",
  706. "ts": float64(1541152488442),
  707. }},
  708. },
  709. M: map[string]interface{}{
  710. "op_4_project_0_exceptions_total": int64(0),
  711. "op_4_project_0_process_latency_us": int64(0),
  712. "op_4_project_0_records_in_total": int64(2),
  713. "op_4_project_0_records_out_total": int64(2),
  714. "sink_mockSink_0_exceptions_total": int64(0),
  715. "sink_mockSink_0_records_in_total": int64(2),
  716. "sink_mockSink_0_records_out_total": int64(2),
  717. "source_demoE_0_exceptions_total": int64(0),
  718. "source_demoE_0_records_in_total": int64(6),
  719. "source_demoE_0_records_out_total": int64(6),
  720. "op_2_window_0_exceptions_total": int64(0),
  721. "op_2_window_0_process_latency_us": int64(0),
  722. "op_2_window_0_records_in_total": int64(6),
  723. "op_2_window_0_records_out_total": int64(4),
  724. "op_3_filter_0_exceptions_total": int64(0),
  725. "op_3_filter_0_process_latency_us": int64(0),
  726. "op_3_filter_0_records_in_total": int64(4),
  727. "op_3_filter_0_records_out_total": int64(2),
  728. },
  729. }, {
  730. Name: `TestEventWindowRule3`,
  731. Sql: `SELECT color, temp, demoE.ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  732. R: [][]map[string]interface{}{
  733. {{
  734. "color": "red",
  735. "temp": 25.5,
  736. "ts": float64(1541152486013),
  737. }}, {{
  738. "color": "red",
  739. "temp": 25.5,
  740. "ts": float64(1541152486013),
  741. }}, {{
  742. "color": "blue",
  743. "temp": 28.1,
  744. "ts": float64(1541152487632),
  745. }}, {{
  746. "color": "blue",
  747. "temp": 28.1,
  748. "ts": float64(1541152487632),
  749. }, {
  750. "color": "yellow",
  751. "temp": 27.4,
  752. "ts": float64(1541152488442),
  753. }}, {{
  754. "color": "yellow",
  755. "temp": 27.4,
  756. "ts": float64(1541152488442),
  757. }, {
  758. "color": "red",
  759. "temp": 25.5,
  760. "ts": float64(1541152489252),
  761. }},
  762. },
  763. M: map[string]interface{}{
  764. "op_5_project_0_exceptions_total": int64(0),
  765. "op_5_project_0_process_latency_us": int64(0),
  766. "op_5_project_0_records_in_total": int64(5),
  767. "op_5_project_0_records_out_total": int64(5),
  768. "sink_mockSink_0_exceptions_total": int64(0),
  769. "sink_mockSink_0_records_in_total": int64(5),
  770. "sink_mockSink_0_records_out_total": int64(5),
  771. "source_demoE_0_exceptions_total": int64(0),
  772. "source_demoE_0_records_in_total": int64(6),
  773. "source_demoE_0_records_out_total": int64(6),
  774. "source_demo1E_0_exceptions_total": int64(0),
  775. "source_demo1E_0_records_in_total": int64(6),
  776. "source_demo1E_0_records_out_total": int64(6),
  777. "op_3_window_0_exceptions_total": int64(0),
  778. "op_3_window_0_process_latency_us": int64(0),
  779. "op_3_window_0_records_in_total": int64(12),
  780. "op_3_window_0_records_out_total": int64(5),
  781. "op_4_join_0_exceptions_total": int64(0),
  782. "op_4_join_0_process_latency_us": int64(0),
  783. "op_4_join_0_records_in_total": int64(5),
  784. "op_4_join_0_records_out_total": int64(5),
  785. },
  786. }, {
  787. Name: `TestEventWindowRule4`,
  788. Sql: `SELECT window_start() as ws, color, window_end() as we FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  789. R: [][]map[string]interface{}{
  790. {{
  791. "color": "red",
  792. "ws": float64(1541152484013),
  793. "we": float64(1541152486013),
  794. }}, {{
  795. "color": "blue",
  796. "ws": float64(1541152485632),
  797. "we": float64(1541152487632),
  798. }, {
  799. "color": "red",
  800. "ws": float64(1541152485632),
  801. "we": float64(1541152487632),
  802. }}, {{
  803. "color": "blue",
  804. "ws": float64(1541152486442),
  805. "we": float64(1541152488442),
  806. }, {
  807. "color": "yellow",
  808. "ws": float64(1541152486442),
  809. "we": float64(1541152488442),
  810. }}, {{
  811. "color": "blue",
  812. "ws": float64(1541152487252),
  813. "we": float64(1541152489252),
  814. }, {
  815. "color": "red",
  816. "ws": float64(1541152487252),
  817. "we": float64(1541152489252),
  818. }, {
  819. "color": "yellow",
  820. "ws": float64(1541152487252),
  821. "we": float64(1541152489252),
  822. }},
  823. },
  824. M: map[string]interface{}{
  825. "op_5_project_0_exceptions_total": int64(0),
  826. "op_5_project_0_process_latency_us": int64(0),
  827. "op_5_project_0_records_in_total": int64(4),
  828. "op_5_project_0_records_out_total": int64(4),
  829. "sink_mockSink_0_exceptions_total": int64(0),
  830. "sink_mockSink_0_records_in_total": int64(4),
  831. "sink_mockSink_0_records_out_total": int64(4),
  832. "source_demoE_0_exceptions_total": int64(0),
  833. "source_demoE_0_records_in_total": int64(6),
  834. "source_demoE_0_records_out_total": int64(6),
  835. "op_2_window_0_exceptions_total": int64(0),
  836. "op_2_window_0_process_latency_us": int64(0),
  837. "op_2_window_0_records_in_total": int64(6),
  838. "op_2_window_0_records_out_total": int64(4),
  839. "op_3_aggregate_0_exceptions_total": int64(0),
  840. "op_3_aggregate_0_process_latency_us": int64(0),
  841. "op_3_aggregate_0_records_in_total": int64(4),
  842. "op_3_aggregate_0_records_out_total": int64(4),
  843. "op_4_order_0_exceptions_total": int64(0),
  844. "op_4_order_0_process_latency_us": int64(0),
  845. "op_4_order_0_records_in_total": int64(4),
  846. "op_4_order_0_records_out_total": int64(4),
  847. },
  848. }, {
  849. Name: `TestEventWindowRule5`,
  850. Sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  851. R: [][]map[string]interface{}{
  852. {{
  853. "temp": 25.5,
  854. }}, {{
  855. "temp": 28.1,
  856. }, {
  857. "temp": 27.4,
  858. }, {
  859. "temp": 25.5,
  860. }}, {{
  861. "temp": 26.2,
  862. }, {
  863. "temp": 26.8,
  864. }, {
  865. "temp": 28.9,
  866. }, {
  867. "temp": 29.1,
  868. }, {
  869. "temp": 32.2,
  870. }}, {{
  871. "temp": 30.9,
  872. }},
  873. },
  874. M: map[string]interface{}{
  875. "op_3_project_0_exceptions_total": int64(0),
  876. "op_3_project_0_process_latency_us": int64(0),
  877. "op_3_project_0_records_in_total": int64(4),
  878. "op_3_project_0_records_out_total": int64(4),
  879. "sink_mockSink_0_exceptions_total": int64(0),
  880. "sink_mockSink_0_records_in_total": int64(4),
  881. "sink_mockSink_0_records_out_total": int64(4),
  882. "source_sessionDemoE_0_exceptions_total": int64(0),
  883. "source_sessionDemoE_0_records_in_total": int64(12),
  884. "source_sessionDemoE_0_records_out_total": int64(12),
  885. "op_2_window_0_exceptions_total": int64(0),
  886. "op_2_window_0_process_latency_us": int64(0),
  887. "op_2_window_0_records_in_total": int64(12),
  888. "op_2_window_0_records_out_total": int64(4),
  889. },
  890. }, {
  891. Name: `TestEventWindowRule6`,
  892. Sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  893. R: [][]map[string]interface{}{
  894. {{
  895. "m": 25.5,
  896. "c": float64(1),
  897. }}, {{
  898. "m": 25.5,
  899. "c": float64(1),
  900. }}, {{
  901. "m": 28.1,
  902. "c": float64(1),
  903. }}, {{
  904. "m": 28.1,
  905. "c": float64(2),
  906. }}, {{
  907. "m": 27.4,
  908. "c": float64(2),
  909. }},
  910. },
  911. M: map[string]interface{}{
  912. "op_5_project_0_exceptions_total": int64(0),
  913. "op_5_project_0_process_latency_us": int64(0),
  914. "op_5_project_0_records_in_total": int64(5),
  915. "op_5_project_0_records_out_total": int64(5),
  916. "sink_mockSink_0_exceptions_total": int64(0),
  917. "sink_mockSink_0_records_in_total": int64(5),
  918. "sink_mockSink_0_records_out_total": int64(5),
  919. "source_demoE_0_exceptions_total": int64(0),
  920. "source_demoE_0_records_in_total": int64(6),
  921. "source_demoE_0_records_out_total": int64(6),
  922. "source_demo1E_0_exceptions_total": int64(0),
  923. "source_demo1E_0_records_in_total": int64(6),
  924. "source_demo1E_0_records_out_total": int64(6),
  925. "op_3_window_0_exceptions_total": int64(0),
  926. "op_3_window_0_records_in_total": int64(12),
  927. "op_3_window_0_records_out_total": int64(5),
  928. "op_4_join_0_exceptions_total": int64(0),
  929. "op_4_join_0_process_latency_us": int64(0),
  930. "op_4_join_0_records_in_total": int64(5),
  931. "op_4_join_0_records_out_total": int64(5),
  932. },
  933. }, {
  934. Name: `TestEventWindowRule7`,
  935. Sql: `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  936. R: [][]map[string]interface{}{
  937. {{
  938. "error": "error in preprocessor: invalid data type for color, expect string but found int(2)",
  939. }},
  940. {{
  941. "color": "red",
  942. "size": float64(3),
  943. "ts": float64(1541152486013),
  944. }},
  945. {{
  946. "color": "red",
  947. "size": float64(3),
  948. "ts": float64(1541152486013),
  949. }},
  950. {{
  951. "color": "yellow",
  952. "size": float64(4),
  953. "ts": float64(1541152488442),
  954. }}, {{
  955. "color": "yellow",
  956. "size": float64(4),
  957. "ts": float64(1541152488442),
  958. }, {
  959. "color": "red",
  960. "size": float64(1),
  961. "ts": float64(1541152489252),
  962. }}, {{
  963. "color": "red",
  964. "size": float64(1),
  965. "ts": float64(1541152489252),
  966. }},
  967. },
  968. M: map[string]interface{}{
  969. "op_3_project_0_exceptions_total": int64(1),
  970. "op_3_project_0_process_latency_us": int64(0),
  971. "op_3_project_0_records_in_total": int64(6),
  972. "op_3_project_0_records_out_total": int64(5),
  973. "sink_mockSink_0_exceptions_total": int64(0),
  974. "sink_mockSink_0_records_in_total": int64(6),
  975. "sink_mockSink_0_records_out_total": int64(6),
  976. "source_demoErr_0_exceptions_total": int64(1),
  977. "source_demoErr_0_records_in_total": int64(6),
  978. "source_demoErr_0_records_out_total": int64(6),
  979. "op_2_window_0_exceptions_total": int64(1),
  980. "op_2_window_0_process_latency_us": int64(0),
  981. "op_2_window_0_records_in_total": int64(6),
  982. "op_2_window_0_records_out_total": int64(5),
  983. },
  984. }, {
  985. Name: `TestEventWindowRule8`,
  986. Sql: `SELECT temp, window_start(), window_end() FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  987. R: [][]map[string]interface{}{
  988. {{
  989. "temp": 25.5,
  990. "window_start": float64(1541152486013),
  991. "window_end": float64(1541152487013),
  992. }}, {{
  993. "temp": 28.1,
  994. "window_start": float64(1541152487932),
  995. "window_end": float64(1541152490000),
  996. }, {
  997. "temp": 27.4,
  998. "window_start": float64(1541152487932),
  999. "window_end": float64(1541152490000),
  1000. }, {
  1001. "temp": 25.5,
  1002. "window_start": float64(1541152487932),
  1003. "window_end": float64(1541152490000),
  1004. }}, {{
  1005. "temp": 26.2,
  1006. "window_start": float64(1541152490000),
  1007. "window_end": float64(1541152494000),
  1008. }, {
  1009. "temp": 26.8,
  1010. "window_start": float64(1541152490000),
  1011. "window_end": float64(1541152494000),
  1012. }, {
  1013. "temp": 28.9,
  1014. "window_start": float64(1541152490000),
  1015. "window_end": float64(1541152494000),
  1016. }, {
  1017. "temp": 29.1,
  1018. "window_start": float64(1541152490000),
  1019. "window_end": float64(1541152494000),
  1020. }, {
  1021. "temp": 32.2,
  1022. "window_start": float64(1541152490000),
  1023. "window_end": float64(1541152494000),
  1024. }}, {{
  1025. "temp": 30.9,
  1026. "window_start": float64(1541152494000),
  1027. "window_end": float64(1541152495112),
  1028. }},
  1029. },
  1030. M: map[string]interface{}{
  1031. "op_3_project_0_exceptions_total": int64(0),
  1032. "op_3_project_0_process_latency_us": int64(0),
  1033. "op_3_project_0_records_in_total": int64(4),
  1034. "op_3_project_0_records_out_total": int64(4),
  1035. "sink_mockSink_0_exceptions_total": int64(0),
  1036. "sink_mockSink_0_records_in_total": int64(4),
  1037. "sink_mockSink_0_records_out_total": int64(4),
  1038. "source_sessionDemoE_0_exceptions_total": int64(0),
  1039. "source_sessionDemoE_0_records_in_total": int64(12),
  1040. "source_sessionDemoE_0_records_out_total": int64(12),
  1041. "op_2_window_0_exceptions_total": int64(0),
  1042. "op_2_window_0_process_latency_us": int64(0),
  1043. "op_2_window_0_records_in_total": int64(12),
  1044. "op_2_window_0_records_out_total": int64(4),
  1045. },
  1046. }, {
  1047. Name: `TestEventWindowRule9`,
  1048. Sql: `SELECT window_end(), color, window_start() FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1049. R: [][]map[string]interface{}{
  1050. {{
  1051. "color": "red",
  1052. "window_start": float64(1541152485000),
  1053. "window_end": float64(1541152487000),
  1054. }},
  1055. {{
  1056. "color": "red",
  1057. "window_start": float64(1541152486000),
  1058. "window_end": float64(1541152488000),
  1059. }, {
  1060. "color": "blue",
  1061. "window_start": float64(1541152486000),
  1062. "window_end": float64(1541152488000),
  1063. }},
  1064. {{
  1065. "color": "blue",
  1066. "window_start": float64(1541152487000),
  1067. "window_end": float64(1541152489000),
  1068. }, {
  1069. "color": "yellow",
  1070. "window_start": float64(1541152487000),
  1071. "window_end": float64(1541152489000),
  1072. }}, {{
  1073. "color": "yellow",
  1074. "window_start": float64(1541152488000),
  1075. "window_end": float64(1541152490000),
  1076. }, {
  1077. "color": "red",
  1078. "window_start": float64(1541152488000),
  1079. "window_end": float64(1541152490000),
  1080. }}, {{
  1081. "color": "red",
  1082. "window_start": float64(1541152489000),
  1083. "window_end": float64(1541152491000),
  1084. }},
  1085. },
  1086. M: map[string]interface{}{
  1087. "op_3_project_0_exceptions_total": int64(0),
  1088. "op_3_project_0_process_latency_us": int64(0),
  1089. "op_3_project_0_records_in_total": int64(5),
  1090. "op_3_project_0_records_out_total": int64(5),
  1091. "sink_mockSink_0_exceptions_total": int64(0),
  1092. "sink_mockSink_0_records_in_total": int64(5),
  1093. "sink_mockSink_0_records_out_total": int64(5),
  1094. "source_demoE_0_exceptions_total": int64(0),
  1095. "source_demoE_0_records_in_total": int64(6),
  1096. "source_demoE_0_records_out_total": int64(6),
  1097. "op_2_window_0_exceptions_total": int64(0),
  1098. "op_2_window_0_process_latency_us": int64(0),
  1099. "op_2_window_0_records_in_total": int64(6),
  1100. "op_2_window_0_records_out_total": int64(5),
  1101. },
  1102. },
  1103. }
  1104. HandleStream(true, streamList, t)
  1105. options := []*api.RuleOption{
  1106. {
  1107. BufferLength: 100,
  1108. SendError: true,
  1109. IsEventTime: true,
  1110. LateTol: 1000,
  1111. }, {
  1112. BufferLength: 100,
  1113. SendError: true,
  1114. Qos: api.AtLeastOnce,
  1115. CheckpointInterval: 5000,
  1116. IsEventTime: true,
  1117. LateTol: 1000,
  1118. }, {
  1119. BufferLength: 100,
  1120. SendError: true,
  1121. Qos: api.ExactlyOnce,
  1122. CheckpointInterval: 5000,
  1123. IsEventTime: true,
  1124. LateTol: 1000,
  1125. },
  1126. }
  1127. for j, opt := range options {
  1128. DoRuleTest(t, tests, j, opt, 10)
  1129. }
  1130. }
  1131. func TestWindowError(t *testing.T) {
  1132. //Reset
  1133. streamList := []string{"ldemo", "ldemo1"}
  1134. HandleStream(false, streamList, t)
  1135. var tests = []RuleTest{
  1136. {
  1137. Name: `TestWindowErrorRule1`,
  1138. Sql: `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
  1139. R: [][]map[string]interface{}{
  1140. {{
  1141. "error": "run Select error: invalid operation string(string) * int64(3)",
  1142. }}, {{
  1143. "kuiper_field_0": float64(6),
  1144. }, {}},
  1145. },
  1146. M: map[string]interface{}{
  1147. "op_3_project_0_exceptions_total": int64(1),
  1148. "op_3_project_0_process_latency_us": int64(0),
  1149. "op_3_project_0_records_in_total": int64(2),
  1150. "op_3_project_0_records_out_total": int64(1),
  1151. "sink_mockSink_0_exceptions_total": int64(0),
  1152. "sink_mockSink_0_records_in_total": int64(2),
  1153. "sink_mockSink_0_records_out_total": int64(2),
  1154. "source_ldemo_0_exceptions_total": int64(0),
  1155. "source_ldemo_0_records_in_total": int64(5),
  1156. "source_ldemo_0_records_out_total": int64(5),
  1157. "op_2_window_0_exceptions_total": int64(0),
  1158. "op_2_window_0_process_latency_us": int64(0),
  1159. "op_2_window_0_records_in_total": int64(5),
  1160. "op_2_window_0_records_out_total": int64(2),
  1161. },
  1162. }, {
  1163. Name: `TestWindowErrorRule2`,
  1164. Sql: `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1165. R: [][]map[string]interface{}{
  1166. {{
  1167. "error": "run Where error: invalid operation string(string) > int64(2)",
  1168. }}, {{
  1169. "color": "red",
  1170. "ts": float64(1541152486013),
  1171. }}, {{
  1172. "ts": float64(1541152487632),
  1173. }},
  1174. },
  1175. M: map[string]interface{}{
  1176. "op_4_project_0_exceptions_total": int64(1),
  1177. "op_4_project_0_process_latency_us": int64(0),
  1178. "op_4_project_0_records_in_total": int64(3),
  1179. "op_4_project_0_records_out_total": int64(2),
  1180. "sink_mockSink_0_exceptions_total": int64(0),
  1181. "sink_mockSink_0_records_in_total": int64(3),
  1182. "sink_mockSink_0_records_out_total": int64(3),
  1183. "source_ldemo_0_exceptions_total": int64(0),
  1184. "source_ldemo_0_records_in_total": int64(5),
  1185. "source_ldemo_0_records_out_total": int64(5),
  1186. "op_3_window_0_exceptions_total": int64(1),
  1187. "op_3_window_0_process_latency_us": int64(0),
  1188. "op_3_window_0_records_in_total": int64(3),
  1189. "op_3_window_0_records_out_total": int64(2),
  1190. "op_2_filter_0_exceptions_total": int64(1),
  1191. "op_2_filter_0_process_latency_us": int64(0),
  1192. "op_2_filter_0_records_in_total": int64(5),
  1193. "op_2_filter_0_records_out_total": int64(2),
  1194. },
  1195. }, {
  1196. Name: `TestWindowErrorRule3`,
  1197. Sql: `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1198. R: [][]map[string]interface{}{
  1199. {{
  1200. "color": "red",
  1201. "temp": 25.5,
  1202. "ts": float64(1541152486013),
  1203. }}, {{
  1204. "color": "red",
  1205. "temp": 25.5,
  1206. "ts": float64(1541152486013),
  1207. }}, {{
  1208. "color": "red",
  1209. "temp": 25.5,
  1210. "ts": float64(1541152486013),
  1211. }}, {{
  1212. "temp": 28.1,
  1213. "ts": float64(1541152487632),
  1214. }}, {{
  1215. "temp": 28.1,
  1216. "ts": float64(1541152487632),
  1217. }}, {{
  1218. "error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
  1219. }}, {{
  1220. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1221. }}, {{
  1222. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1223. }},
  1224. },
  1225. M: map[string]interface{}{
  1226. "op_5_project_0_exceptions_total": int64(3),
  1227. "op_5_project_0_process_latency_us": int64(0),
  1228. "op_5_project_0_records_in_total": int64(8),
  1229. "op_5_project_0_records_out_total": int64(5),
  1230. "sink_mockSink_0_exceptions_total": int64(0),
  1231. "sink_mockSink_0_records_in_total": int64(8),
  1232. "sink_mockSink_0_records_out_total": int64(8),
  1233. "source_ldemo_0_exceptions_total": int64(0),
  1234. "source_ldemo_0_records_in_total": int64(5),
  1235. "source_ldemo_0_records_out_total": int64(5),
  1236. "source_ldemo1_0_exceptions_total": int64(0),
  1237. "source_ldemo1_0_records_in_total": int64(5),
  1238. "source_ldemo1_0_records_out_total": int64(5),
  1239. "op_3_window_0_exceptions_total": int64(0),
  1240. "op_3_window_0_process_latency_us": int64(0),
  1241. "op_3_window_0_records_in_total": int64(10),
  1242. "op_3_window_0_records_out_total": int64(10),
  1243. "op_4_join_0_exceptions_total": int64(3),
  1244. "op_4_join_0_process_latency_us": int64(0),
  1245. "op_4_join_0_records_in_total": int64(10),
  1246. "op_4_join_0_records_out_total": int64(5),
  1247. },
  1248. }, {
  1249. Name: `TestWindowErrorRule4`,
  1250. Sql: `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having collect(size)[0] >= 2 order by color`,
  1251. R: [][]map[string]interface{}{
  1252. {{
  1253. "color": "red",
  1254. }}, {{
  1255. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1256. }}, {{
  1257. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1258. }}, {{
  1259. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1260. }}, {{
  1261. "color": float64(49),
  1262. }, {}},
  1263. },
  1264. M: map[string]interface{}{
  1265. "op_6_project_0_exceptions_total": int64(3),
  1266. "op_6_project_0_process_latency_us": int64(0),
  1267. "op_6_project_0_records_in_total": int64(5),
  1268. "op_6_project_0_records_out_total": int64(2),
  1269. "sink_mockSink_0_exceptions_total": int64(0),
  1270. "sink_mockSink_0_records_in_total": int64(5),
  1271. "sink_mockSink_0_records_out_total": int64(5),
  1272. "source_ldemo_0_exceptions_total": int64(0),
  1273. "source_ldemo_0_records_in_total": int64(5),
  1274. "source_ldemo_0_records_out_total": int64(5),
  1275. "op_2_window_0_exceptions_total": int64(0),
  1276. "op_2_window_0_process_latency_us": int64(0),
  1277. "op_2_window_0_records_in_total": int64(5),
  1278. "op_2_window_0_records_out_total": int64(5),
  1279. "op_3_aggregate_0_exceptions_total": int64(0),
  1280. "op_3_aggregate_0_process_latency_us": int64(0),
  1281. "op_3_aggregate_0_records_in_total": int64(5),
  1282. "op_3_aggregate_0_records_out_total": int64(5),
  1283. "op_4_having_0_exceptions_total": int64(3),
  1284. "op_4_having_0_process_latency_us": int64(0),
  1285. "op_4_having_0_records_in_total": int64(5),
  1286. "op_4_having_0_records_out_total": int64(2),
  1287. },
  1288. }, {
  1289. Name: `TestWindowErrorRule5`,
  1290. Sql: `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
  1291. R: [][]map[string]interface{}{
  1292. {{
  1293. "error": "run Order By error: incompatible types for comparison: int and string",
  1294. }}, {{
  1295. "size": float64(3),
  1296. }}, {{
  1297. "color": float64(49),
  1298. "size": float64(2),
  1299. }}, {{
  1300. "color": "red",
  1301. }},
  1302. },
  1303. M: map[string]interface{}{
  1304. "op_4_project_0_exceptions_total": int64(1),
  1305. "op_4_project_0_process_latency_us": int64(0),
  1306. "op_4_project_0_records_in_total": int64(4),
  1307. "op_4_project_0_records_out_total": int64(3),
  1308. "sink_mockSink_0_exceptions_total": int64(0),
  1309. "sink_mockSink_0_records_in_total": int64(4),
  1310. "sink_mockSink_0_records_out_total": int64(4),
  1311. "source_ldemo_0_exceptions_total": int64(0),
  1312. "source_ldemo_0_records_in_total": int64(5),
  1313. "source_ldemo_0_records_out_total": int64(5),
  1314. "op_2_window_0_exceptions_total": int64(0),
  1315. "op_2_window_0_process_latency_us": int64(0),
  1316. "op_2_window_0_records_in_total": int64(5),
  1317. "op_2_window_0_records_out_total": int64(4),
  1318. "op_3_order_0_exceptions_total": int64(1),
  1319. "op_3_order_0_process_latency_us": int64(0),
  1320. "op_3_order_0_records_in_total": int64(4),
  1321. "op_3_order_0_records_out_total": int64(3),
  1322. },
  1323. },
  1324. }
  1325. HandleStream(true, streamList, t)
  1326. DoRuleTest(t, tests, 0, &api.RuleOption{
  1327. BufferLength: 100,
  1328. SendError: true,
  1329. }, 0)
  1330. }