window_rule_test.go 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/topo"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. "testing"
  19. )
  20. func TestWindow(t *testing.T) {
  21. //Reset
  22. streamList := []string{"demo", "demoError", "demo1", "sessionDemo", "table1"}
  23. HandleStream(false, streamList, t)
  24. var tests = []RuleTest{
  25. {
  26. Name: `TestWindowRule1`,
  27. Sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  28. R: [][]map[string]interface{}{
  29. {{
  30. "color": "red",
  31. "size": float64(3),
  32. "ts": float64(1541152486013),
  33. }, {
  34. "color": "blue",
  35. "size": float64(6),
  36. "ts": float64(1541152486822),
  37. }},
  38. {{
  39. "color": "red",
  40. "size": float64(3),
  41. "ts": float64(1541152486013),
  42. }, {
  43. "color": "blue",
  44. "size": float64(6),
  45. "ts": float64(1541152486822),
  46. }, {
  47. "color": "blue",
  48. "size": float64(2),
  49. "ts": float64(1541152487632),
  50. }},
  51. {{
  52. "color": "blue",
  53. "size": float64(2),
  54. "ts": float64(1541152487632),
  55. }, {
  56. "color": "yellow",
  57. "size": float64(4),
  58. "ts": float64(1541152488442),
  59. }},
  60. {{
  61. "color": "yellow",
  62. "size": float64(4),
  63. "ts": float64(1541152488442),
  64. }, {
  65. "color": "red",
  66. "size": float64(1),
  67. "ts": float64(1541152489252),
  68. }},
  69. },
  70. M: map[string]interface{}{
  71. "op_3_project_0_exceptions_total": int64(0),
  72. "op_3_project_0_process_latency_us": int64(0),
  73. "op_3_project_0_records_in_total": int64(4),
  74. "op_3_project_0_records_out_total": int64(4),
  75. "sink_mockSink_0_exceptions_total": int64(0),
  76. "sink_mockSink_0_records_in_total": int64(4),
  77. "sink_mockSink_0_records_out_total": int64(4),
  78. "source_demo_0_exceptions_total": int64(0),
  79. "source_demo_0_records_in_total": int64(5),
  80. "source_demo_0_records_out_total": int64(5),
  81. "op_2_window_0_exceptions_total": int64(0),
  82. "op_2_window_0_process_latency_us": int64(0),
  83. "op_2_window_0_records_in_total": int64(5),
  84. "op_2_window_0_records_out_total": int64(4),
  85. },
  86. }, {
  87. Name: `TestWindowRule2`,
  88. Sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  89. R: [][]map[string]interface{}{
  90. {{
  91. "color": "red",
  92. "ts": float64(1541152486013),
  93. }, {
  94. "color": "blue",
  95. "ts": float64(1541152486822),
  96. }},
  97. {{
  98. "color": "yellow",
  99. "ts": float64(1541152488442),
  100. }},
  101. },
  102. M: map[string]interface{}{
  103. "op_4_project_0_exceptions_total": int64(0),
  104. "op_4_project_0_process_latency_us": int64(0),
  105. "op_4_project_0_records_in_total": int64(2),
  106. "op_4_project_0_records_out_total": int64(2),
  107. "sink_mockSink_0_exceptions_total": int64(0),
  108. "sink_mockSink_0_records_in_total": int64(2),
  109. "sink_mockSink_0_records_out_total": int64(2),
  110. "source_demo_0_exceptions_total": int64(0),
  111. "source_demo_0_records_in_total": int64(5),
  112. "source_demo_0_records_out_total": int64(5),
  113. "op_3_window_0_exceptions_total": int64(0),
  114. "op_3_window_0_process_latency_us": int64(0),
  115. "op_3_window_0_records_in_total": int64(3),
  116. "op_3_window_0_records_out_total": int64(2),
  117. "op_2_filter_0_exceptions_total": int64(0),
  118. "op_2_filter_0_process_latency_us": int64(0),
  119. "op_2_filter_0_records_in_total": int64(5),
  120. "op_2_filter_0_records_out_total": int64(3),
  121. },
  122. }, {
  123. Name: `TestWindowRule3`,
  124. Sql: `SELECT color, temp, demo.ts as ts1, demo1.ts as ts2, demo.ts - demo1.ts as diff FROM demo INNER JOIN demo1 ON ts1 = ts2 GROUP BY SlidingWindow(ss, 1)`,
  125. R: [][]map[string]interface{}{
  126. {{
  127. "color": "red",
  128. "temp": 25.5,
  129. "ts1": float64(1541152486013),
  130. "ts2": float64(1541152486013),
  131. "diff": float64(0),
  132. }}, {{
  133. "color": "red",
  134. "temp": 25.5,
  135. "ts1": float64(1541152486013),
  136. "ts2": float64(1541152486013),
  137. "diff": float64(0),
  138. }}, {{
  139. "color": "red",
  140. "temp": 25.5,
  141. "ts1": float64(1541152486013),
  142. "ts2": float64(1541152486013),
  143. "diff": float64(0),
  144. }}, {{
  145. "color": "blue",
  146. "temp": 28.1,
  147. "ts1": float64(1541152487632),
  148. "ts2": float64(1541152487632),
  149. "diff": float64(0),
  150. }}, {{
  151. "color": "blue",
  152. "temp": 28.1,
  153. "ts1": float64(1541152487632),
  154. "ts2": float64(1541152487632),
  155. "diff": float64(0),
  156. }}, {{
  157. "color": "blue",
  158. "temp": 28.1,
  159. "ts1": float64(1541152487632),
  160. "ts2": float64(1541152487632),
  161. "diff": float64(0),
  162. }, {
  163. "color": "yellow",
  164. "temp": 27.4,
  165. "ts1": float64(1541152488442),
  166. "ts2": float64(1541152488442),
  167. "diff": float64(0),
  168. }}, {{
  169. "color": "yellow",
  170. "temp": 27.4,
  171. "ts1": float64(1541152488442),
  172. "ts2": float64(1541152488442),
  173. "diff": float64(0),
  174. }}, {{
  175. "color": "yellow",
  176. "temp": 27.4,
  177. "ts1": float64(1541152488442),
  178. "ts2": float64(1541152488442),
  179. "diff": float64(0),
  180. }, {
  181. "color": "red",
  182. "temp": 25.5,
  183. "ts1": float64(1541152489252),
  184. "ts2": float64(1541152489252),
  185. "diff": float64(0),
  186. }},
  187. },
  188. M: map[string]interface{}{
  189. "op_5_project_0_exceptions_total": int64(0),
  190. "op_5_project_0_process_latency_us": int64(0),
  191. "op_5_project_0_records_in_total": int64(8),
  192. "op_5_project_0_records_out_total": int64(8),
  193. "sink_mockSink_0_exceptions_total": int64(0),
  194. "sink_mockSink_0_records_in_total": int64(8),
  195. "sink_mockSink_0_records_out_total": int64(8),
  196. "source_demo_0_exceptions_total": int64(0),
  197. "source_demo_0_records_in_total": int64(5),
  198. "source_demo_0_records_out_total": int64(5),
  199. "source_demo1_0_exceptions_total": int64(0),
  200. "source_demo1_0_records_in_total": int64(5),
  201. "source_demo1_0_records_out_total": int64(5),
  202. "op_3_window_0_exceptions_total": int64(0),
  203. "op_3_window_0_process_latency_us": int64(0),
  204. "op_3_window_0_records_in_total": int64(10),
  205. "op_3_window_0_records_out_total": int64(10),
  206. "op_4_join_0_exceptions_total": int64(0),
  207. "op_4_join_0_process_latency_us": int64(0),
  208. "op_4_join_0_records_in_total": int64(10),
  209. "op_4_join_0_records_out_total": int64(8),
  210. },
  211. T: &topo.PrintableTopo{
  212. Sources: []string{"source_demo", "source_demo1"},
  213. Edges: map[string][]string{
  214. "source_demo": {"op_3_window"},
  215. "source_demo1": {"op_3_window"},
  216. "op_3_window": {"op_4_join"},
  217. "op_4_join": {"op_5_project"},
  218. "op_5_project": {"sink_mockSink"},
  219. },
  220. },
  221. }, {
  222. Name: `TestWindowRule4`,
  223. Sql: `SELECT color, count(*) as c FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  224. R: [][]map[string]interface{}{
  225. {{
  226. "color": "red",
  227. "c": float64(1),
  228. }}, {{
  229. "color": "blue",
  230. "c": float64(1),
  231. }, {
  232. "color": "red",
  233. "c": float64(1),
  234. }}, {{
  235. "color": "blue",
  236. "c": float64(2),
  237. }, {
  238. "color": "red",
  239. "c": float64(1),
  240. }}, {{
  241. "color": "blue",
  242. "c": float64(2),
  243. }, {
  244. "color": "yellow",
  245. "c": float64(1),
  246. }}, {{
  247. "color": "blue",
  248. "c": float64(1),
  249. }, {
  250. "color": "red",
  251. "c": float64(1),
  252. }, {
  253. "color": "yellow",
  254. "c": float64(1),
  255. }},
  256. },
  257. M: map[string]interface{}{
  258. "op_5_project_0_exceptions_total": int64(0),
  259. "op_5_project_0_process_latency_us": int64(0),
  260. "op_5_project_0_records_in_total": int64(5),
  261. "op_5_project_0_records_out_total": int64(5),
  262. "sink_mockSink_0_exceptions_total": int64(0),
  263. "sink_mockSink_0_records_in_total": int64(5),
  264. "sink_mockSink_0_records_out_total": int64(5),
  265. "source_demo_0_exceptions_total": int64(0),
  266. "source_demo_0_records_in_total": int64(5),
  267. "source_demo_0_records_out_total": int64(5),
  268. "op_2_window_0_exceptions_total": int64(0),
  269. "op_2_window_0_process_latency_us": int64(0),
  270. "op_2_window_0_records_in_total": int64(5),
  271. "op_2_window_0_records_out_total": int64(5),
  272. "op_3_aggregate_0_exceptions_total": int64(0),
  273. "op_3_aggregate_0_process_latency_us": int64(0),
  274. "op_3_aggregate_0_records_in_total": int64(5),
  275. "op_3_aggregate_0_records_out_total": int64(5),
  276. "op_4_order_0_exceptions_total": int64(0),
  277. "op_4_order_0_process_latency_us": int64(0),
  278. "op_4_order_0_records_in_total": int64(5),
  279. "op_4_order_0_records_out_total": int64(5),
  280. },
  281. }, {
  282. Name: `TestWindowRule5`,
  283. Sql: `SELECT count(temp), window_start() as ws, window_end() FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  284. R: [][]map[string]interface{}{
  285. {{
  286. "count": float64(2),
  287. "ws": float64(1541152486013),
  288. "window_end": float64(1541152487823), // timeout
  289. }}, {{
  290. "count": float64(3),
  291. "ws": float64(1541152487932),
  292. "window_end": float64(1541152490000), // tick
  293. }}, {{
  294. "count": float64(5),
  295. "ws": float64(1541152490000),
  296. "window_end": float64(1541152494000), // tick
  297. }}, {{
  298. "count": float64(1),
  299. "ws": float64(1541152494000),
  300. "window_end": float64(1541152495112), // timeout
  301. }},
  302. },
  303. M: map[string]interface{}{
  304. "op_3_project_0_exceptions_total": int64(0),
  305. "op_3_project_0_process_latency_us": int64(0),
  306. "op_3_project_0_records_in_total": int64(4),
  307. "op_3_project_0_records_out_total": int64(4),
  308. "sink_mockSink_0_exceptions_total": int64(0),
  309. "sink_mockSink_0_records_in_total": int64(4),
  310. "sink_mockSink_0_records_out_total": int64(4),
  311. "source_sessionDemo_0_exceptions_total": int64(0),
  312. "source_sessionDemo_0_records_in_total": int64(11),
  313. "source_sessionDemo_0_records_out_total": int64(11),
  314. "op_2_window_0_exceptions_total": int64(0),
  315. "op_2_window_0_process_latency_us": int64(0),
  316. "op_2_window_0_records_in_total": int64(11),
  317. "op_2_window_0_records_out_total": int64(4),
  318. },
  319. }, {
  320. Name: `TestWindowRule6`,
  321. Sql: `SELECT window_end(), sum(temp) as temp, count(color) as c, window_start() FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  322. R: [][]map[string]interface{}{
  323. {{
  324. "temp": 25.5,
  325. "c": float64(1),
  326. "window_start": float64(1541152485115),
  327. "window_end": float64(1541152486115),
  328. }}, {{
  329. "temp": 25.5,
  330. "c": float64(1),
  331. "window_start": float64(1541152485822),
  332. "window_end": float64(1541152486822),
  333. }}, {{
  334. "temp": 25.5,
  335. "c": float64(1),
  336. "window_start": float64(1541152485903),
  337. "window_end": float64(1541152486903),
  338. }}, {{
  339. "temp": 28.1,
  340. "c": float64(1),
  341. "window_start": float64(1541152486702),
  342. "window_end": float64(1541152487702),
  343. }}, {{
  344. "temp": 28.1,
  345. "c": float64(1),
  346. "window_start": float64(1541152487442),
  347. "window_end": float64(1541152488442),
  348. }}, {{
  349. "temp": 55.5,
  350. "c": float64(2),
  351. "window_start": float64(1541152487605),
  352. "window_end": float64(1541152488605),
  353. }}, {{
  354. "temp": 27.4,
  355. "c": float64(1),
  356. "window_start": float64(1541152488252),
  357. "window_end": float64(1541152489252),
  358. }}, {{
  359. "temp": 52.9,
  360. "c": float64(2),
  361. "window_start": float64(1541152488305),
  362. "window_end": float64(1541152489305),
  363. }},
  364. },
  365. M: map[string]interface{}{
  366. "op_5_project_0_exceptions_total": int64(0),
  367. "op_5_project_0_process_latency_us": int64(0),
  368. "op_5_project_0_records_in_total": int64(8),
  369. "op_5_project_0_records_out_total": int64(8),
  370. "sink_mockSink_0_exceptions_total": int64(0),
  371. "sink_mockSink_0_records_in_total": int64(8),
  372. "sink_mockSink_0_records_out_total": int64(8),
  373. "source_demo_0_exceptions_total": int64(0),
  374. "source_demo_0_records_in_total": int64(5),
  375. "source_demo_0_records_out_total": int64(5),
  376. "source_demo1_0_exceptions_total": int64(0),
  377. "source_demo1_0_records_in_total": int64(5),
  378. "source_demo1_0_records_out_total": int64(5),
  379. "op_3_window_0_exceptions_total": int64(0),
  380. "op_3_window_0_process_latency_us": int64(0),
  381. "op_3_window_0_records_in_total": int64(10),
  382. "op_3_window_0_records_out_total": int64(10),
  383. "op_4_join_0_exceptions_total": int64(0),
  384. "op_4_join_0_process_latency_us": int64(0),
  385. "op_4_join_0_records_in_total": int64(10),
  386. "op_4_join_0_records_out_total": int64(8),
  387. },
  388. }, {
  389. Name: `TestWindowRule7`,
  390. Sql: `SELECT * FROM demoError GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  391. R: [][]map[string]interface{}{
  392. {{
  393. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  394. }},
  395. {{
  396. "color": "blue",
  397. "size": float64(6),
  398. "ts": float64(1541152486822),
  399. }},
  400. {{
  401. "color": "blue",
  402. "size": float64(6),
  403. "ts": float64(1541152486822),
  404. }, {
  405. "color": "blue",
  406. "size": float64(2),
  407. "ts": float64(1541152487632),
  408. }},
  409. {{
  410. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  411. }},
  412. {{
  413. "color": "blue",
  414. "size": float64(2),
  415. "ts": float64(1541152487632),
  416. }},
  417. {{
  418. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  419. }},
  420. },
  421. M: map[string]interface{}{
  422. "op_3_project_0_exceptions_total": int64(3),
  423. "op_3_project_0_process_latency_us": int64(0),
  424. "op_3_project_0_records_in_total": int64(6),
  425. "op_3_project_0_records_out_total": int64(3),
  426. "sink_mockSink_0_exceptions_total": int64(0),
  427. "sink_mockSink_0_records_in_total": int64(6),
  428. "sink_mockSink_0_records_out_total": int64(6),
  429. "source_demoError_0_exceptions_total": int64(3),
  430. "source_demoError_0_records_in_total": int64(5),
  431. "source_demoError_0_records_out_total": int64(5),
  432. "op_2_window_0_exceptions_total": int64(3),
  433. "op_2_window_0_process_latency_us": int64(0),
  434. "op_2_window_0_records_in_total": int64(5),
  435. "op_2_window_0_records_out_total": int64(3),
  436. },
  437. }, {
  438. Name: `TestWindowRule8`,
  439. Sql: `SELECT color, window_end(), ts, count(*) as c, window_start() FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
  440. R: [][]map[string]interface{}{
  441. {{
  442. "color": "red",
  443. "ts": float64(1541152486013),
  444. "c": float64(2),
  445. "window_start": float64(1541152486000),
  446. "window_end": float64(1541152487000),
  447. }},
  448. },
  449. M: map[string]interface{}{
  450. "op_5_project_0_exceptions_total": int64(0),
  451. "op_5_project_0_process_latency_us": int64(0),
  452. "op_5_project_0_records_in_total": int64(1),
  453. "op_5_project_0_records_out_total": int64(1),
  454. "sink_mockSink_0_exceptions_total": int64(0),
  455. "sink_mockSink_0_records_in_total": int64(1),
  456. "sink_mockSink_0_records_out_total": int64(1),
  457. "source_demo_0_exceptions_total": int64(0),
  458. "source_demo_0_records_in_total": int64(5),
  459. "source_demo_0_records_out_total": int64(5),
  460. "op_3_window_0_exceptions_total": int64(0),
  461. "op_3_window_0_process_latency_us": int64(0),
  462. "op_3_window_0_records_in_total": int64(3),
  463. "op_3_window_0_records_out_total": int64(2),
  464. "op_2_filter_0_exceptions_total": int64(0),
  465. "op_2_filter_0_process_latency_us": int64(0),
  466. "op_2_filter_0_records_in_total": int64(5),
  467. "op_2_filter_0_records_out_total": int64(3),
  468. "op_4_having_0_exceptions_total": int64(0),
  469. "op_4_having_0_process_latency_us": int64(0),
  470. "op_4_having_0_records_in_total": int64(2),
  471. "op_4_having_0_records_out_total": int64(1),
  472. },
  473. }, {
  474. Name: `TestWindowRule9`,
  475. Sql: `SELECT color, window_start(), window_end() FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1) FILTER( WHERE size > 2)`,
  476. R: [][]map[string]interface{}{
  477. {{
  478. "color": "red",
  479. "window_start": float64(1541152485000),
  480. "window_end": float64(1541152487000),
  481. }, {
  482. "color": "blue",
  483. "window_start": float64(1541152485000),
  484. "window_end": float64(1541152487000),
  485. }},
  486. {{
  487. "color": "red",
  488. "window_start": float64(1541152486000),
  489. "window_end": float64(1541152488000),
  490. }, {
  491. "color": "blue",
  492. "window_start": float64(1541152486000),
  493. "window_end": float64(1541152488000),
  494. }},
  495. {{
  496. "color": "yellow",
  497. "window_start": float64(1541152487000),
  498. "window_end": float64(1541152489000),
  499. }},
  500. {{
  501. "color": "yellow",
  502. "window_start": float64(1541152488000),
  503. "window_end": float64(1541152490000),
  504. }},
  505. },
  506. M: map[string]interface{}{
  507. "op_4_project_0_exceptions_total": int64(0),
  508. "op_4_project_0_process_latency_us": int64(0),
  509. "op_4_project_0_records_in_total": int64(4),
  510. "op_4_project_0_records_out_total": int64(4),
  511. "sink_mockSink_0_exceptions_total": int64(0),
  512. "sink_mockSink_0_records_in_total": int64(4),
  513. "sink_mockSink_0_records_out_total": int64(4),
  514. "source_demo_0_exceptions_total": int64(0),
  515. "source_demo_0_records_in_total": int64(5),
  516. "source_demo_0_records_out_total": int64(5),
  517. "op_3_window_0_exceptions_total": int64(0),
  518. "op_3_window_0_process_latency_us": int64(0),
  519. "op_3_window_0_records_in_total": int64(3),
  520. "op_3_window_0_records_out_total": int64(4),
  521. },
  522. }, {
  523. Name: `TestCountWindowRule1`,
  524. Sql: `SELECT collect(*)[0]->color as c FROM demo GROUP BY COUNTWINDOW(3)`,
  525. R: [][]map[string]interface{}{
  526. {{
  527. "c": "red",
  528. }},
  529. },
  530. M: map[string]interface{}{
  531. "op_3_project_0_exceptions_total": int64(0),
  532. "op_3_project_0_process_latency_us": int64(0),
  533. "op_3_project_0_records_in_total": int64(1),
  534. "op_3_project_0_records_out_total": int64(1),
  535. "sink_mockSink_0_exceptions_total": int64(0),
  536. "sink_mockSink_0_records_in_total": int64(1),
  537. "sink_mockSink_0_records_out_total": int64(1),
  538. "source_demo_0_exceptions_total": int64(0),
  539. "source_demo_0_records_in_total": int64(5),
  540. "source_demo_0_records_out_total": int64(5),
  541. "op_2_window_0_exceptions_total": int64(0),
  542. "op_2_window_0_process_latency_us": int64(0),
  543. "op_2_window_0_records_in_total": int64(5),
  544. "op_2_window_0_records_out_total": int64(1),
  545. },
  546. }, {
  547. Name: `TestWindowRule10`,
  548. Sql: `SELECT deduplicate(color, false)->color as c FROM demo GROUP BY SlidingWindow(hh, 1)`,
  549. R: [][]map[string]interface{}{
  550. {{
  551. "c": "red",
  552. }}, {{
  553. "c": "blue",
  554. }}, {{}}, {{
  555. "c": "yellow",
  556. }}, {{}},
  557. },
  558. M: map[string]interface{}{
  559. "op_3_project_0_exceptions_total": int64(0),
  560. "op_3_project_0_process_latency_us": int64(0),
  561. "op_3_project_0_records_in_total": int64(5),
  562. "op_3_project_0_records_out_total": int64(5),
  563. "sink_mockSink_0_exceptions_total": int64(0),
  564. "sink_mockSink_0_records_in_total": int64(5),
  565. "sink_mockSink_0_records_out_total": int64(5),
  566. "source_demo_0_exceptions_total": int64(0),
  567. "source_demo_0_records_in_total": int64(5),
  568. "source_demo_0_records_out_total": int64(5),
  569. "op_2_window_0_exceptions_total": int64(0),
  570. "op_2_window_0_process_latency_us": int64(0),
  571. "op_2_window_0_records_in_total": int64(5),
  572. "op_2_window_0_records_out_total": int64(5),
  573. },
  574. }, {
  575. Name: `TestWindowRule11`,
  576. Sql: `SELECT color, name, window_start(), window_end() FROM demo INNER JOIN table1 on demo.ts = table1.id where demo.size > 2 and table1.size > 1 GROUP BY tumblingwindow(ss, 1)`,
  577. R: [][]map[string]interface{}{
  578. {{
  579. "color": "red",
  580. "name": "name1",
  581. "window_start": float64(1541152486000),
  582. "window_end": float64(1541152487000),
  583. }},
  584. },
  585. M: map[string]interface{}{
  586. //"op_4_project_0_exceptions_total": int64(0),
  587. //"op_4_project_0_process_latency_us": int64(0),
  588. //"op_4_project_0_records_in_total": int64(2),
  589. //"op_4_project_0_records_out_total": int64(2),
  590. "op_3_window_0_exceptions_total": int64(0),
  591. "op_3_window_0_process_latency_us": int64(0),
  592. "op_3_window_0_records_in_total": int64(3),
  593. "op_3_window_0_records_out_total": int64(2),
  594. "op_2_filter_0_exceptions_total": int64(0),
  595. "op_2_filter_0_process_latency_us": int64(0),
  596. "op_2_filter_0_records_in_total": int64(5),
  597. "op_2_filter_0_records_out_total": int64(3),
  598. "op_5_filter_0_exceptions_total": int64(0),
  599. "op_5_filter_0_records_in_total": int64(1),
  600. "op_5_filter_0_records_out_total": int64(1),
  601. "op_6_join_aligner_0_records_in_total": int64(3),
  602. "op_6_join_aligner_0_records_out_total": int64(2),
  603. "op_7_join_0_exceptions_total": int64(0),
  604. "op_7_join_0_records_in_total": int64(2),
  605. "op_7_join_0_records_out_total": int64(1),
  606. "op_8_project_0_exceptions_total": int64(0),
  607. "op_8_project_0_records_in_total": int64(1),
  608. "op_8_project_0_records_out_total": int64(1),
  609. "sink_mockSink_0_exceptions_total": int64(0),
  610. "sink_mockSink_0_records_in_total": int64(1),
  611. "sink_mockSink_0_records_out_total": int64(1),
  612. "source_demo_0_exceptions_total": int64(0),
  613. "source_demo_0_records_in_total": int64(5),
  614. "source_demo_0_records_out_total": int64(5),
  615. "source_table1_0_exceptions_total": int64(0),
  616. "source_table1_0_records_in_total": int64(4),
  617. "source_table1_0_records_out_total": int64(1),
  618. },
  619. },
  620. }
  621. HandleStream(true, streamList, t)
  622. options := []*api.RuleOption{
  623. {
  624. BufferLength: 100,
  625. SendError: true,
  626. }, {
  627. BufferLength: 100,
  628. SendError: true,
  629. Qos: api.AtLeastOnce,
  630. CheckpointInterval: 5000,
  631. }, {
  632. BufferLength: 100,
  633. SendError: true,
  634. Qos: api.ExactlyOnce,
  635. CheckpointInterval: 5000,
  636. },
  637. }
  638. for j, opt := range options {
  639. DoRuleTest(t, tests, j, opt, 15)
  640. }
  641. }
  642. func TestEventWindow(t *testing.T) {
  643. //Reset
  644. streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE"}
  645. HandleStream(false, streamList, t)
  646. var tests = []RuleTest{
  647. {
  648. Name: `TestEventWindowRule1`,
  649. Sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  650. R: [][]map[string]interface{}{
  651. {{
  652. "color": "red",
  653. "size": float64(3),
  654. "ts": float64(1541152486013),
  655. }},
  656. {{
  657. "color": "red",
  658. "size": float64(3),
  659. "ts": float64(1541152486013),
  660. }, {
  661. "color": "blue",
  662. "size": float64(2),
  663. "ts": float64(1541152487632),
  664. }},
  665. {{
  666. "color": "blue",
  667. "size": float64(2),
  668. "ts": float64(1541152487632),
  669. }, {
  670. "color": "yellow",
  671. "size": float64(4),
  672. "ts": float64(1541152488442),
  673. }}, {{
  674. "color": "yellow",
  675. "size": float64(4),
  676. "ts": float64(1541152488442),
  677. }, {
  678. "color": "red",
  679. "size": float64(1),
  680. "ts": float64(1541152489252),
  681. }}, {{
  682. "color": "red",
  683. "size": float64(1),
  684. "ts": float64(1541152489252),
  685. }},
  686. },
  687. M: map[string]interface{}{
  688. "op_3_project_0_exceptions_total": int64(0),
  689. "op_3_project_0_process_latency_us": int64(0),
  690. "op_3_project_0_records_in_total": int64(5),
  691. "op_3_project_0_records_out_total": int64(5),
  692. "sink_mockSink_0_exceptions_total": int64(0),
  693. "sink_mockSink_0_records_in_total": int64(5),
  694. "sink_mockSink_0_records_out_total": int64(5),
  695. "source_demoE_0_exceptions_total": int64(0),
  696. "source_demoE_0_records_in_total": int64(6),
  697. "source_demoE_0_records_out_total": int64(6),
  698. "op_2_window_0_exceptions_total": int64(0),
  699. "op_2_window_0_process_latency_us": int64(0),
  700. "op_2_window_0_records_in_total": int64(6),
  701. "op_2_window_0_records_out_total": int64(5),
  702. },
  703. }, {
  704. Name: `TestEventWindowRule2`,
  705. Sql: `SELECT window_start(), window_end(), color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  706. R: [][]map[string]interface{}{
  707. {{
  708. "window_start": float64(1541152486000),
  709. "window_end": float64(1541152487000),
  710. "color": "red",
  711. "ts": float64(1541152486013),
  712. }},
  713. {{
  714. "window_start": float64(1541152488000),
  715. "window_end": float64(1541152489000),
  716. "color": "yellow",
  717. "ts": float64(1541152488442),
  718. }},
  719. },
  720. M: map[string]interface{}{
  721. "op_4_project_0_exceptions_total": int64(0),
  722. "op_4_project_0_process_latency_us": int64(0),
  723. "op_4_project_0_records_in_total": int64(2),
  724. "op_4_project_0_records_out_total": int64(2),
  725. "sink_mockSink_0_exceptions_total": int64(0),
  726. "sink_mockSink_0_records_in_total": int64(2),
  727. "sink_mockSink_0_records_out_total": int64(2),
  728. "source_demoE_0_exceptions_total": int64(0),
  729. "source_demoE_0_records_in_total": int64(6),
  730. "source_demoE_0_records_out_total": int64(6),
  731. "op_2_window_0_exceptions_total": int64(0),
  732. "op_2_window_0_process_latency_us": int64(0),
  733. "op_2_window_0_records_in_total": int64(6),
  734. "op_2_window_0_records_out_total": int64(4),
  735. "op_3_filter_0_exceptions_total": int64(0),
  736. "op_3_filter_0_process_latency_us": int64(0),
  737. "op_3_filter_0_records_in_total": int64(4),
  738. "op_3_filter_0_records_out_total": int64(2),
  739. },
  740. }, {
  741. Name: `TestEventWindowRule3`,
  742. Sql: `SELECT color, temp, demoE.ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  743. R: [][]map[string]interface{}{
  744. {{
  745. "color": "red",
  746. "temp": 25.5,
  747. "ts": float64(1541152486013),
  748. }}, {{
  749. "color": "red",
  750. "temp": 25.5,
  751. "ts": float64(1541152486013),
  752. }}, {{
  753. "color": "blue",
  754. "temp": 28.1,
  755. "ts": float64(1541152487632),
  756. }}, {{
  757. "color": "blue",
  758. "temp": 28.1,
  759. "ts": float64(1541152487632),
  760. }, {
  761. "color": "yellow",
  762. "temp": 27.4,
  763. "ts": float64(1541152488442),
  764. }}, {{
  765. "color": "yellow",
  766. "temp": 27.4,
  767. "ts": float64(1541152488442),
  768. }, {
  769. "color": "red",
  770. "temp": 25.5,
  771. "ts": float64(1541152489252),
  772. }},
  773. },
  774. M: map[string]interface{}{
  775. "op_5_project_0_exceptions_total": int64(0),
  776. "op_5_project_0_process_latency_us": int64(0),
  777. "op_5_project_0_records_in_total": int64(5),
  778. "op_5_project_0_records_out_total": int64(5),
  779. "sink_mockSink_0_exceptions_total": int64(0),
  780. "sink_mockSink_0_records_in_total": int64(5),
  781. "sink_mockSink_0_records_out_total": int64(5),
  782. "source_demoE_0_exceptions_total": int64(0),
  783. "source_demoE_0_records_in_total": int64(6),
  784. "source_demoE_0_records_out_total": int64(6),
  785. "source_demo1E_0_exceptions_total": int64(0),
  786. "source_demo1E_0_records_in_total": int64(6),
  787. "source_demo1E_0_records_out_total": int64(6),
  788. "op_3_window_0_exceptions_total": int64(0),
  789. "op_3_window_0_process_latency_us": int64(0),
  790. "op_3_window_0_records_in_total": int64(12),
  791. "op_3_window_0_records_out_total": int64(5),
  792. "op_4_join_0_exceptions_total": int64(0),
  793. "op_4_join_0_process_latency_us": int64(0),
  794. "op_4_join_0_records_in_total": int64(5),
  795. "op_4_join_0_records_out_total": int64(5),
  796. },
  797. }, {
  798. Name: `TestEventWindowRule4`,
  799. Sql: `SELECT window_start() as ws, color, window_end() as we FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  800. R: [][]map[string]interface{}{
  801. {{
  802. "color": "red",
  803. "ws": float64(1541152484013),
  804. "we": float64(1541152486013),
  805. }}, {{
  806. "color": "blue",
  807. "ws": float64(1541152485632),
  808. "we": float64(1541152487632),
  809. }, {
  810. "color": "red",
  811. "ws": float64(1541152485632),
  812. "we": float64(1541152487632),
  813. }}, {{
  814. "color": "blue",
  815. "ws": float64(1541152486442),
  816. "we": float64(1541152488442),
  817. }, {
  818. "color": "yellow",
  819. "ws": float64(1541152486442),
  820. "we": float64(1541152488442),
  821. }}, {{
  822. "color": "blue",
  823. "ws": float64(1541152487252),
  824. "we": float64(1541152489252),
  825. }, {
  826. "color": "red",
  827. "ws": float64(1541152487252),
  828. "we": float64(1541152489252),
  829. }, {
  830. "color": "yellow",
  831. "ws": float64(1541152487252),
  832. "we": float64(1541152489252),
  833. }},
  834. },
  835. M: map[string]interface{}{
  836. "op_5_project_0_exceptions_total": int64(0),
  837. "op_5_project_0_process_latency_us": int64(0),
  838. "op_5_project_0_records_in_total": int64(4),
  839. "op_5_project_0_records_out_total": int64(4),
  840. "sink_mockSink_0_exceptions_total": int64(0),
  841. "sink_mockSink_0_records_in_total": int64(4),
  842. "sink_mockSink_0_records_out_total": int64(4),
  843. "source_demoE_0_exceptions_total": int64(0),
  844. "source_demoE_0_records_in_total": int64(6),
  845. "source_demoE_0_records_out_total": int64(6),
  846. "op_2_window_0_exceptions_total": int64(0),
  847. "op_2_window_0_process_latency_us": int64(0),
  848. "op_2_window_0_records_in_total": int64(6),
  849. "op_2_window_0_records_out_total": int64(4),
  850. "op_3_aggregate_0_exceptions_total": int64(0),
  851. "op_3_aggregate_0_process_latency_us": int64(0),
  852. "op_3_aggregate_0_records_in_total": int64(4),
  853. "op_3_aggregate_0_records_out_total": int64(4),
  854. "op_4_order_0_exceptions_total": int64(0),
  855. "op_4_order_0_process_latency_us": int64(0),
  856. "op_4_order_0_records_in_total": int64(4),
  857. "op_4_order_0_records_out_total": int64(4),
  858. },
  859. }, {
  860. Name: `TestEventWindowRule5`,
  861. Sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  862. R: [][]map[string]interface{}{
  863. {{
  864. "temp": 25.5,
  865. }}, {{
  866. "temp": 28.1,
  867. }, {
  868. "temp": 27.4,
  869. }, {
  870. "temp": 25.5,
  871. }}, {{
  872. "temp": 26.2,
  873. }, {
  874. "temp": 26.8,
  875. }, {
  876. "temp": 28.9,
  877. }, {
  878. "temp": 29.1,
  879. }, {
  880. "temp": 32.2,
  881. }}, {{
  882. "temp": 30.9,
  883. }},
  884. },
  885. M: map[string]interface{}{
  886. "op_3_project_0_exceptions_total": int64(0),
  887. "op_3_project_0_process_latency_us": int64(0),
  888. "op_3_project_0_records_in_total": int64(4),
  889. "op_3_project_0_records_out_total": int64(4),
  890. "sink_mockSink_0_exceptions_total": int64(0),
  891. "sink_mockSink_0_records_in_total": int64(4),
  892. "sink_mockSink_0_records_out_total": int64(4),
  893. "source_sessionDemoE_0_exceptions_total": int64(0),
  894. "source_sessionDemoE_0_records_in_total": int64(12),
  895. "source_sessionDemoE_0_records_out_total": int64(12),
  896. "op_2_window_0_exceptions_total": int64(0),
  897. "op_2_window_0_process_latency_us": int64(0),
  898. "op_2_window_0_records_in_total": int64(12),
  899. "op_2_window_0_records_out_total": int64(4),
  900. },
  901. }, {
  902. Name: `TestEventWindowRule6`,
  903. Sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  904. R: [][]map[string]interface{}{
  905. {{
  906. "m": 25.5,
  907. "c": float64(1),
  908. }}, {{
  909. "m": 25.5,
  910. "c": float64(1),
  911. }}, {{
  912. "m": 28.1,
  913. "c": float64(1),
  914. }}, {{
  915. "m": 28.1,
  916. "c": float64(2),
  917. }}, {{
  918. "m": 27.4,
  919. "c": float64(2),
  920. }},
  921. },
  922. M: map[string]interface{}{
  923. "op_5_project_0_exceptions_total": int64(0),
  924. "op_5_project_0_process_latency_us": int64(0),
  925. "op_5_project_0_records_in_total": int64(5),
  926. "op_5_project_0_records_out_total": int64(5),
  927. "sink_mockSink_0_exceptions_total": int64(0),
  928. "sink_mockSink_0_records_in_total": int64(5),
  929. "sink_mockSink_0_records_out_total": int64(5),
  930. "source_demoE_0_exceptions_total": int64(0),
  931. "source_demoE_0_records_in_total": int64(6),
  932. "source_demoE_0_records_out_total": int64(6),
  933. "source_demo1E_0_exceptions_total": int64(0),
  934. "source_demo1E_0_records_in_total": int64(6),
  935. "source_demo1E_0_records_out_total": int64(6),
  936. "op_3_window_0_exceptions_total": int64(0),
  937. "op_3_window_0_records_in_total": int64(12),
  938. "op_3_window_0_records_out_total": int64(5),
  939. "op_4_join_0_exceptions_total": int64(0),
  940. "op_4_join_0_process_latency_us": int64(0),
  941. "op_4_join_0_records_in_total": int64(5),
  942. "op_4_join_0_records_out_total": int64(5),
  943. },
  944. }, {
  945. Name: `TestEventWindowRule7`,
  946. Sql: `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  947. R: [][]map[string]interface{}{
  948. {{
  949. "error": "error in preprocessor: invalid data type for color, expect string but found int(2)",
  950. }},
  951. {{
  952. "color": "red",
  953. "size": float64(3),
  954. "ts": float64(1541152486013),
  955. }},
  956. {{
  957. "color": "red",
  958. "size": float64(3),
  959. "ts": float64(1541152486013),
  960. }},
  961. {{
  962. "color": "yellow",
  963. "size": float64(4),
  964. "ts": float64(1541152488442),
  965. }}, {{
  966. "color": "yellow",
  967. "size": float64(4),
  968. "ts": float64(1541152488442),
  969. }, {
  970. "color": "red",
  971. "size": float64(1),
  972. "ts": float64(1541152489252),
  973. }}, {{
  974. "color": "red",
  975. "size": float64(1),
  976. "ts": float64(1541152489252),
  977. }},
  978. },
  979. M: map[string]interface{}{
  980. "op_3_project_0_exceptions_total": int64(1),
  981. "op_3_project_0_process_latency_us": int64(0),
  982. "op_3_project_0_records_in_total": int64(6),
  983. "op_3_project_0_records_out_total": int64(5),
  984. "sink_mockSink_0_exceptions_total": int64(0),
  985. "sink_mockSink_0_records_in_total": int64(6),
  986. "sink_mockSink_0_records_out_total": int64(6),
  987. "source_demoErr_0_exceptions_total": int64(1),
  988. "source_demoErr_0_records_in_total": int64(6),
  989. "source_demoErr_0_records_out_total": int64(6),
  990. "op_2_window_0_exceptions_total": int64(1),
  991. "op_2_window_0_process_latency_us": int64(0),
  992. "op_2_window_0_records_in_total": int64(6),
  993. "op_2_window_0_records_out_total": int64(5),
  994. },
  995. }, {
  996. Name: `TestEventWindowRule8`,
  997. Sql: `SELECT temp, window_start(), window_end() FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  998. R: [][]map[string]interface{}{
  999. {{
  1000. "temp": 25.5,
  1001. "window_start": float64(1541152486013),
  1002. "window_end": float64(1541152487013),
  1003. }}, {{
  1004. "temp": 28.1,
  1005. "window_start": float64(1541152487932),
  1006. "window_end": float64(1541152490000),
  1007. }, {
  1008. "temp": 27.4,
  1009. "window_start": float64(1541152487932),
  1010. "window_end": float64(1541152490000),
  1011. }, {
  1012. "temp": 25.5,
  1013. "window_start": float64(1541152487932),
  1014. "window_end": float64(1541152490000),
  1015. }}, {{
  1016. "temp": 26.2,
  1017. "window_start": float64(1541152490000),
  1018. "window_end": float64(1541152494000),
  1019. }, {
  1020. "temp": 26.8,
  1021. "window_start": float64(1541152490000),
  1022. "window_end": float64(1541152494000),
  1023. }, {
  1024. "temp": 28.9,
  1025. "window_start": float64(1541152490000),
  1026. "window_end": float64(1541152494000),
  1027. }, {
  1028. "temp": 29.1,
  1029. "window_start": float64(1541152490000),
  1030. "window_end": float64(1541152494000),
  1031. }, {
  1032. "temp": 32.2,
  1033. "window_start": float64(1541152490000),
  1034. "window_end": float64(1541152494000),
  1035. }}, {{
  1036. "temp": 30.9,
  1037. "window_start": float64(1541152494000),
  1038. "window_end": float64(1541152495112),
  1039. }},
  1040. },
  1041. M: map[string]interface{}{
  1042. "op_3_project_0_exceptions_total": int64(0),
  1043. "op_3_project_0_process_latency_us": int64(0),
  1044. "op_3_project_0_records_in_total": int64(4),
  1045. "op_3_project_0_records_out_total": int64(4),
  1046. "sink_mockSink_0_exceptions_total": int64(0),
  1047. "sink_mockSink_0_records_in_total": int64(4),
  1048. "sink_mockSink_0_records_out_total": int64(4),
  1049. "source_sessionDemoE_0_exceptions_total": int64(0),
  1050. "source_sessionDemoE_0_records_in_total": int64(12),
  1051. "source_sessionDemoE_0_records_out_total": int64(12),
  1052. "op_2_window_0_exceptions_total": int64(0),
  1053. "op_2_window_0_process_latency_us": int64(0),
  1054. "op_2_window_0_records_in_total": int64(12),
  1055. "op_2_window_0_records_out_total": int64(4),
  1056. },
  1057. }, {
  1058. Name: `TestEventWindowRule9`,
  1059. Sql: `SELECT window_end(), color, window_start() FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1060. R: [][]map[string]interface{}{
  1061. {{
  1062. "color": "red",
  1063. "window_start": float64(1541152485000),
  1064. "window_end": float64(1541152487000),
  1065. }},
  1066. {{
  1067. "color": "red",
  1068. "window_start": float64(1541152486000),
  1069. "window_end": float64(1541152488000),
  1070. }, {
  1071. "color": "blue",
  1072. "window_start": float64(1541152486000),
  1073. "window_end": float64(1541152488000),
  1074. }},
  1075. {{
  1076. "color": "blue",
  1077. "window_start": float64(1541152487000),
  1078. "window_end": float64(1541152489000),
  1079. }, {
  1080. "color": "yellow",
  1081. "window_start": float64(1541152487000),
  1082. "window_end": float64(1541152489000),
  1083. }}, {{
  1084. "color": "yellow",
  1085. "window_start": float64(1541152488000),
  1086. "window_end": float64(1541152490000),
  1087. }, {
  1088. "color": "red",
  1089. "window_start": float64(1541152488000),
  1090. "window_end": float64(1541152490000),
  1091. }}, {{
  1092. "color": "red",
  1093. "window_start": float64(1541152489000),
  1094. "window_end": float64(1541152491000),
  1095. }},
  1096. },
  1097. M: map[string]interface{}{
  1098. "op_3_project_0_exceptions_total": int64(0),
  1099. "op_3_project_0_process_latency_us": int64(0),
  1100. "op_3_project_0_records_in_total": int64(5),
  1101. "op_3_project_0_records_out_total": int64(5),
  1102. "sink_mockSink_0_exceptions_total": int64(0),
  1103. "sink_mockSink_0_records_in_total": int64(5),
  1104. "sink_mockSink_0_records_out_total": int64(5),
  1105. "source_demoE_0_exceptions_total": int64(0),
  1106. "source_demoE_0_records_in_total": int64(6),
  1107. "source_demoE_0_records_out_total": int64(6),
  1108. "op_2_window_0_exceptions_total": int64(0),
  1109. "op_2_window_0_process_latency_us": int64(0),
  1110. "op_2_window_0_records_in_total": int64(6),
  1111. "op_2_window_0_records_out_total": int64(5),
  1112. },
  1113. },
  1114. }
  1115. HandleStream(true, streamList, t)
  1116. options := []*api.RuleOption{
  1117. {
  1118. BufferLength: 100,
  1119. SendError: true,
  1120. IsEventTime: true,
  1121. LateTol: 1000,
  1122. }, {
  1123. BufferLength: 100,
  1124. SendError: true,
  1125. Qos: api.AtLeastOnce,
  1126. CheckpointInterval: 5000,
  1127. IsEventTime: true,
  1128. LateTol: 1000,
  1129. }, {
  1130. BufferLength: 100,
  1131. SendError: true,
  1132. Qos: api.ExactlyOnce,
  1133. CheckpointInterval: 5000,
  1134. IsEventTime: true,
  1135. LateTol: 1000,
  1136. },
  1137. }
  1138. for j, opt := range options {
  1139. DoRuleTest(t, tests, j, opt, 10)
  1140. }
  1141. }
  1142. func TestWindowError(t *testing.T) {
  1143. //Reset
  1144. streamList := []string{"ldemo", "ldemo1"}
  1145. HandleStream(false, streamList, t)
  1146. var tests = []RuleTest{
  1147. {
  1148. Name: `TestWindowErrorRule1`,
  1149. Sql: `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
  1150. R: [][]map[string]interface{}{
  1151. {{
  1152. "error": "run Select error: invalid operation string(string) * int64(3)",
  1153. }}, {{
  1154. "kuiper_field_0": float64(6),
  1155. }, {}},
  1156. },
  1157. M: map[string]interface{}{
  1158. "op_3_project_0_exceptions_total": int64(1),
  1159. "op_3_project_0_process_latency_us": int64(0),
  1160. "op_3_project_0_records_in_total": int64(2),
  1161. "op_3_project_0_records_out_total": int64(1),
  1162. "sink_mockSink_0_exceptions_total": int64(0),
  1163. "sink_mockSink_0_records_in_total": int64(2),
  1164. "sink_mockSink_0_records_out_total": int64(2),
  1165. "source_ldemo_0_exceptions_total": int64(0),
  1166. "source_ldemo_0_records_in_total": int64(5),
  1167. "source_ldemo_0_records_out_total": int64(5),
  1168. "op_2_window_0_exceptions_total": int64(0),
  1169. "op_2_window_0_process_latency_us": int64(0),
  1170. "op_2_window_0_records_in_total": int64(5),
  1171. "op_2_window_0_records_out_total": int64(2),
  1172. },
  1173. }, {
  1174. Name: `TestWindowErrorRule2`,
  1175. Sql: `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1176. R: [][]map[string]interface{}{
  1177. {{
  1178. "error": "run Where error: invalid operation string(string) > int64(2)",
  1179. }}, {{
  1180. "color": "red",
  1181. "ts": float64(1541152486013),
  1182. }}, {{
  1183. "ts": float64(1541152487632),
  1184. }},
  1185. },
  1186. M: map[string]interface{}{
  1187. "op_4_project_0_exceptions_total": int64(1),
  1188. "op_4_project_0_process_latency_us": int64(0),
  1189. "op_4_project_0_records_in_total": int64(3),
  1190. "op_4_project_0_records_out_total": int64(2),
  1191. "sink_mockSink_0_exceptions_total": int64(0),
  1192. "sink_mockSink_0_records_in_total": int64(3),
  1193. "sink_mockSink_0_records_out_total": int64(3),
  1194. "source_ldemo_0_exceptions_total": int64(0),
  1195. "source_ldemo_0_records_in_total": int64(5),
  1196. "source_ldemo_0_records_out_total": int64(5),
  1197. "op_3_window_0_exceptions_total": int64(1),
  1198. "op_3_window_0_process_latency_us": int64(0),
  1199. "op_3_window_0_records_in_total": int64(3),
  1200. "op_3_window_0_records_out_total": int64(2),
  1201. "op_2_filter_0_exceptions_total": int64(1),
  1202. "op_2_filter_0_process_latency_us": int64(0),
  1203. "op_2_filter_0_records_in_total": int64(5),
  1204. "op_2_filter_0_records_out_total": int64(2),
  1205. },
  1206. }, {
  1207. Name: `TestWindowErrorRule3`,
  1208. Sql: `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1209. R: [][]map[string]interface{}{
  1210. {{
  1211. "color": "red",
  1212. "temp": 25.5,
  1213. "ts": float64(1541152486013),
  1214. }}, {{
  1215. "color": "red",
  1216. "temp": 25.5,
  1217. "ts": float64(1541152486013),
  1218. }}, {{
  1219. "color": "red",
  1220. "temp": 25.5,
  1221. "ts": float64(1541152486013),
  1222. }}, {{
  1223. "temp": 28.1,
  1224. "ts": float64(1541152487632),
  1225. }}, {{
  1226. "temp": 28.1,
  1227. "ts": float64(1541152487632),
  1228. }}, {{
  1229. "error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
  1230. }}, {{
  1231. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1232. }}, {{
  1233. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1234. }},
  1235. },
  1236. M: map[string]interface{}{
  1237. "op_5_project_0_exceptions_total": int64(3),
  1238. "op_5_project_0_process_latency_us": int64(0),
  1239. "op_5_project_0_records_in_total": int64(8),
  1240. "op_5_project_0_records_out_total": int64(5),
  1241. "sink_mockSink_0_exceptions_total": int64(0),
  1242. "sink_mockSink_0_records_in_total": int64(8),
  1243. "sink_mockSink_0_records_out_total": int64(8),
  1244. "source_ldemo_0_exceptions_total": int64(0),
  1245. "source_ldemo_0_records_in_total": int64(5),
  1246. "source_ldemo_0_records_out_total": int64(5),
  1247. "source_ldemo1_0_exceptions_total": int64(0),
  1248. "source_ldemo1_0_records_in_total": int64(5),
  1249. "source_ldemo1_0_records_out_total": int64(5),
  1250. "op_3_window_0_exceptions_total": int64(0),
  1251. "op_3_window_0_process_latency_us": int64(0),
  1252. "op_3_window_0_records_in_total": int64(10),
  1253. "op_3_window_0_records_out_total": int64(10),
  1254. "op_4_join_0_exceptions_total": int64(3),
  1255. "op_4_join_0_process_latency_us": int64(0),
  1256. "op_4_join_0_records_in_total": int64(10),
  1257. "op_4_join_0_records_out_total": int64(5),
  1258. },
  1259. }, {
  1260. Name: `TestWindowErrorRule4`,
  1261. Sql: `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having collect(size)[0] >= 2 order by color`,
  1262. R: [][]map[string]interface{}{
  1263. {{
  1264. "color": "red",
  1265. }}, {{
  1266. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1267. }}, {{
  1268. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1269. }}, {{
  1270. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1271. }}, {{
  1272. "color": float64(49),
  1273. }, {}},
  1274. },
  1275. M: map[string]interface{}{
  1276. "op_6_project_0_exceptions_total": int64(3),
  1277. "op_6_project_0_process_latency_us": int64(0),
  1278. "op_6_project_0_records_in_total": int64(5),
  1279. "op_6_project_0_records_out_total": int64(2),
  1280. "sink_mockSink_0_exceptions_total": int64(0),
  1281. "sink_mockSink_0_records_in_total": int64(5),
  1282. "sink_mockSink_0_records_out_total": int64(5),
  1283. "source_ldemo_0_exceptions_total": int64(0),
  1284. "source_ldemo_0_records_in_total": int64(5),
  1285. "source_ldemo_0_records_out_total": int64(5),
  1286. "op_2_window_0_exceptions_total": int64(0),
  1287. "op_2_window_0_process_latency_us": int64(0),
  1288. "op_2_window_0_records_in_total": int64(5),
  1289. "op_2_window_0_records_out_total": int64(5),
  1290. "op_3_aggregate_0_exceptions_total": int64(0),
  1291. "op_3_aggregate_0_process_latency_us": int64(0),
  1292. "op_3_aggregate_0_records_in_total": int64(5),
  1293. "op_3_aggregate_0_records_out_total": int64(5),
  1294. "op_4_having_0_exceptions_total": int64(3),
  1295. "op_4_having_0_process_latency_us": int64(0),
  1296. "op_4_having_0_records_in_total": int64(5),
  1297. "op_4_having_0_records_out_total": int64(2),
  1298. },
  1299. }, {
  1300. Name: `TestWindowErrorRule5`,
  1301. Sql: `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
  1302. R: [][]map[string]interface{}{
  1303. {{
  1304. "error": "run Order By error: incompatible types for comparison: int and string",
  1305. }}, {{
  1306. "size": float64(3),
  1307. }}, {{
  1308. "color": float64(49),
  1309. "size": float64(2),
  1310. }}, {{
  1311. "color": "red",
  1312. }},
  1313. },
  1314. M: map[string]interface{}{
  1315. "op_4_project_0_exceptions_total": int64(1),
  1316. "op_4_project_0_process_latency_us": int64(0),
  1317. "op_4_project_0_records_in_total": int64(4),
  1318. "op_4_project_0_records_out_total": int64(3),
  1319. "sink_mockSink_0_exceptions_total": int64(0),
  1320. "sink_mockSink_0_records_in_total": int64(4),
  1321. "sink_mockSink_0_records_out_total": int64(4),
  1322. "source_ldemo_0_exceptions_total": int64(0),
  1323. "source_ldemo_0_records_in_total": int64(5),
  1324. "source_ldemo_0_records_out_total": int64(5),
  1325. "op_2_window_0_exceptions_total": int64(0),
  1326. "op_2_window_0_process_latency_us": int64(0),
  1327. "op_2_window_0_records_in_total": int64(5),
  1328. "op_2_window_0_records_out_total": int64(4),
  1329. "op_3_order_0_exceptions_total": int64(1),
  1330. "op_3_order_0_process_latency_us": int64(0),
  1331. "op_3_order_0_records_in_total": int64(4),
  1332. "op_3_order_0_records_out_total": int64(3),
  1333. },
  1334. },
  1335. }
  1336. HandleStream(true, streamList, t)
  1337. DoRuleTest(t, tests, 0, &api.RuleOption{
  1338. BufferLength: 100,
  1339. SendError: true,
  1340. }, 0)
  1341. }