window_rule_test.go 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "testing"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. )
  19. func TestWindow(t *testing.T) {
  20. // Reset
  21. streamList := []string{"demo", "demoError", "demo1", "sessionDemo", "table1"}
  22. HandleStream(false, streamList, t)
  23. tests := []RuleTest{
  24. {
  25. Name: `TestWindowRule1`,
  26. Sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  27. R: [][]map[string]interface{}{
  28. {{
  29. "color": "red",
  30. "size": float64(3),
  31. "ts": float64(1541152486013),
  32. }, {
  33. "color": "blue",
  34. "size": float64(6),
  35. "ts": float64(1541152486822),
  36. }},
  37. {{
  38. "color": "red",
  39. "size": float64(3),
  40. "ts": float64(1541152486013),
  41. }, {
  42. "color": "blue",
  43. "size": float64(6),
  44. "ts": float64(1541152486822),
  45. }, {
  46. "color": "blue",
  47. "size": float64(2),
  48. "ts": float64(1541152487632),
  49. }},
  50. {{
  51. "color": "blue",
  52. "size": float64(2),
  53. "ts": float64(1541152487632),
  54. }, {
  55. "color": "yellow",
  56. "size": float64(4),
  57. "ts": float64(1541152488442),
  58. }},
  59. {{
  60. "color": "yellow",
  61. "size": float64(4),
  62. "ts": float64(1541152488442),
  63. }, {
  64. "color": "red",
  65. "size": float64(1),
  66. "ts": float64(1541152489252),
  67. }},
  68. },
  69. M: map[string]interface{}{
  70. "op_3_project_0_exceptions_total": int64(0),
  71. "op_3_project_0_process_latency_us": int64(0),
  72. "op_3_project_0_records_in_total": int64(4),
  73. "op_3_project_0_records_out_total": int64(4),
  74. "sink_mockSink_0_exceptions_total": int64(0),
  75. "sink_mockSink_0_records_in_total": int64(4),
  76. "sink_mockSink_0_records_out_total": int64(4),
  77. "source_demo_0_exceptions_total": int64(0),
  78. "source_demo_0_records_in_total": int64(5),
  79. "source_demo_0_records_out_total": int64(5),
  80. "op_2_window_0_exceptions_total": int64(0),
  81. "op_2_window_0_process_latency_us": int64(0),
  82. "op_2_window_0_records_in_total": int64(5),
  83. "op_2_window_0_records_out_total": int64(4),
  84. },
  85. }, {
  86. Name: `TestWindowRule2`,
  87. Sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  88. R: [][]map[string]interface{}{
  89. {{
  90. "color": "red",
  91. "ts": float64(1541152486013),
  92. }, {
  93. "color": "blue",
  94. "ts": float64(1541152486822),
  95. }},
  96. {},
  97. {{
  98. "color": "yellow",
  99. "ts": float64(1541152488442),
  100. }},
  101. {},
  102. },
  103. M: map[string]interface{}{
  104. "op_4_project_0_exceptions_total": int64(0),
  105. "op_4_project_0_process_latency_us": int64(0),
  106. "op_4_project_0_records_in_total": int64(4),
  107. "op_4_project_0_records_out_total": int64(4),
  108. "sink_mockSink_0_exceptions_total": int64(0),
  109. "sink_mockSink_0_records_in_total": int64(4),
  110. "sink_mockSink_0_records_out_total": int64(4),
  111. "source_demo_0_exceptions_total": int64(0),
  112. "source_demo_0_records_in_total": int64(5),
  113. "source_demo_0_records_out_total": int64(5),
  114. "op_3_window_0_exceptions_total": int64(0),
  115. "op_3_window_0_process_latency_us": int64(0),
  116. "op_3_window_0_records_in_total": int64(3),
  117. "op_3_window_0_records_out_total": int64(4),
  118. "op_2_filter_0_exceptions_total": int64(0),
  119. "op_2_filter_0_process_latency_us": int64(0),
  120. "op_2_filter_0_records_in_total": int64(5),
  121. "op_2_filter_0_records_out_total": int64(3),
  122. },
  123. }, {
  124. Name: `TestWindowRule3`,
  125. Sql: `SELECT color, temp, demo.ts as ts1, demo1.ts as ts2, demo.ts - demo1.ts as diff FROM demo INNER JOIN demo1 ON ts1 = ts2 GROUP BY SlidingWindow(ss, 1)`,
  126. R: [][]map[string]interface{}{
  127. {{
  128. "color": "red",
  129. "temp": 25.5,
  130. "ts1": float64(1541152486013),
  131. "ts2": float64(1541152486013),
  132. "diff": float64(0),
  133. }}, {{
  134. "color": "red",
  135. "temp": 25.5,
  136. "ts1": float64(1541152486013),
  137. "ts2": float64(1541152486013),
  138. "diff": float64(0),
  139. }}, {{
  140. "color": "red",
  141. "temp": 25.5,
  142. "ts1": float64(1541152486013),
  143. "ts2": float64(1541152486013),
  144. "diff": float64(0),
  145. }}, {{
  146. "color": "blue",
  147. "temp": 28.1,
  148. "ts1": float64(1541152487632),
  149. "ts2": float64(1541152487632),
  150. "diff": float64(0),
  151. }}, {{
  152. "color": "blue",
  153. "temp": 28.1,
  154. "ts1": float64(1541152487632),
  155. "ts2": float64(1541152487632),
  156. "diff": float64(0),
  157. }}, {{
  158. "color": "blue",
  159. "temp": 28.1,
  160. "ts1": float64(1541152487632),
  161. "ts2": float64(1541152487632),
  162. "diff": float64(0),
  163. }, {
  164. "color": "yellow",
  165. "temp": 27.4,
  166. "ts1": float64(1541152488442),
  167. "ts2": float64(1541152488442),
  168. "diff": float64(0),
  169. }}, {{
  170. "color": "yellow",
  171. "temp": 27.4,
  172. "ts1": float64(1541152488442),
  173. "ts2": float64(1541152488442),
  174. "diff": float64(0),
  175. }}, {{
  176. "color": "yellow",
  177. "temp": 27.4,
  178. "ts1": float64(1541152488442),
  179. "ts2": float64(1541152488442),
  180. "diff": float64(0),
  181. }, {
  182. "color": "red",
  183. "temp": 25.5,
  184. "ts1": float64(1541152489252),
  185. "ts2": float64(1541152489252),
  186. "diff": float64(0),
  187. }},
  188. },
  189. M: map[string]interface{}{
  190. "op_5_project_0_exceptions_total": int64(0),
  191. "op_5_project_0_process_latency_us": int64(0),
  192. "op_5_project_0_records_in_total": int64(8),
  193. "op_5_project_0_records_out_total": int64(8),
  194. "sink_mockSink_0_exceptions_total": int64(0),
  195. "sink_mockSink_0_records_in_total": int64(8),
  196. "sink_mockSink_0_records_out_total": int64(8),
  197. "source_demo_0_exceptions_total": int64(0),
  198. "source_demo_0_records_in_total": int64(5),
  199. "source_demo_0_records_out_total": int64(5),
  200. "source_demo1_0_exceptions_total": int64(0),
  201. "source_demo1_0_records_in_total": int64(5),
  202. "source_demo1_0_records_out_total": int64(5),
  203. "op_3_window_0_exceptions_total": int64(0),
  204. "op_3_window_0_process_latency_us": int64(0),
  205. "op_3_window_0_records_in_total": int64(10),
  206. "op_3_window_0_records_out_total": int64(10),
  207. "op_4_join_0_exceptions_total": int64(0),
  208. "op_4_join_0_process_latency_us": int64(0),
  209. "op_4_join_0_records_in_total": int64(10),
  210. "op_4_join_0_records_out_total": int64(8),
  211. },
  212. T: &api.PrintableTopo{
  213. Sources: []string{"source_demo", "source_demo1"},
  214. Edges: map[string][]interface{}{
  215. "source_demo": {"op_3_window"},
  216. "source_demo1": {"op_3_window"},
  217. "op_3_window": {"op_4_join"},
  218. "op_4_join": {"op_5_project"},
  219. "op_5_project": {"sink_mockSink"},
  220. },
  221. },
  222. }, {
  223. Name: `TestWindowRule4`,
  224. Sql: `SELECT color, count(*) as c FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  225. R: [][]map[string]interface{}{
  226. {{
  227. "color": "red",
  228. "c": float64(1),
  229. }}, {{
  230. "color": "blue",
  231. "c": float64(1),
  232. }, {
  233. "color": "red",
  234. "c": float64(1),
  235. }}, {{
  236. "color": "blue",
  237. "c": float64(2),
  238. }, {
  239. "color": "red",
  240. "c": float64(1),
  241. }}, {{
  242. "color": "blue",
  243. "c": float64(2),
  244. }, {
  245. "color": "yellow",
  246. "c": float64(1),
  247. }}, {{
  248. "color": "blue",
  249. "c": float64(1),
  250. }, {
  251. "color": "red",
  252. "c": float64(1),
  253. }, {
  254. "color": "yellow",
  255. "c": float64(1),
  256. }},
  257. },
  258. M: map[string]interface{}{
  259. "op_5_project_0_exceptions_total": int64(0),
  260. "op_5_project_0_process_latency_us": int64(0),
  261. "op_5_project_0_records_in_total": int64(5),
  262. "op_5_project_0_records_out_total": int64(5),
  263. "sink_mockSink_0_exceptions_total": int64(0),
  264. "sink_mockSink_0_records_in_total": int64(5),
  265. "sink_mockSink_0_records_out_total": int64(5),
  266. "source_demo_0_exceptions_total": int64(0),
  267. "source_demo_0_records_in_total": int64(5),
  268. "source_demo_0_records_out_total": int64(5),
  269. "op_2_window_0_exceptions_total": int64(0),
  270. "op_2_window_0_process_latency_us": int64(0),
  271. "op_2_window_0_records_in_total": int64(5),
  272. "op_2_window_0_records_out_total": int64(5),
  273. "op_3_aggregate_0_exceptions_total": int64(0),
  274. "op_3_aggregate_0_process_latency_us": int64(0),
  275. "op_3_aggregate_0_records_in_total": int64(5),
  276. "op_3_aggregate_0_records_out_total": int64(5),
  277. "op_4_order_0_exceptions_total": int64(0),
  278. "op_4_order_0_process_latency_us": int64(0),
  279. "op_4_order_0_records_in_total": int64(5),
  280. "op_4_order_0_records_out_total": int64(5),
  281. },
  282. }, {
  283. Name: `TestWindowRule5`,
  284. Sql: `SELECT count(temp), window_start() as ws, window_end() FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  285. R: [][]map[string]interface{}{
  286. {{
  287. "count": float64(2),
  288. "ws": float64(1541152486013),
  289. "window_end": float64(1541152487823), // timeout
  290. }}, {{
  291. "count": float64(3),
  292. "ws": float64(1541152487932),
  293. "window_end": float64(1541152490000), // tick
  294. }}, {{
  295. "count": float64(5),
  296. "ws": float64(1541152490000),
  297. "window_end": float64(1541152494000), // tick
  298. }}, {{
  299. "count": float64(1),
  300. "ws": float64(1541152494000),
  301. "window_end": float64(1541152495112), // timeout
  302. }},
  303. },
  304. M: map[string]interface{}{
  305. "op_3_project_0_exceptions_total": int64(0),
  306. "op_3_project_0_process_latency_us": int64(0),
  307. "op_3_project_0_records_in_total": int64(4),
  308. "op_3_project_0_records_out_total": int64(4),
  309. "sink_mockSink_0_exceptions_total": int64(0),
  310. "sink_mockSink_0_records_in_total": int64(4),
  311. "sink_mockSink_0_records_out_total": int64(4),
  312. "source_sessionDemo_0_exceptions_total": int64(0),
  313. "source_sessionDemo_0_records_in_total": int64(11),
  314. "source_sessionDemo_0_records_out_total": int64(11),
  315. "op_2_window_0_exceptions_total": int64(0),
  316. "op_2_window_0_process_latency_us": int64(0),
  317. "op_2_window_0_records_in_total": int64(11),
  318. "op_2_window_0_records_out_total": int64(4),
  319. },
  320. }, {
  321. Name: `TestWindowRule6`,
  322. Sql: `SELECT window_end(), sum(temp) as temp, count(color) as c, window_start() FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  323. R: [][]map[string]interface{}{
  324. {{
  325. "temp": 25.5,
  326. "c": float64(1),
  327. "window_start": float64(1541152485115),
  328. "window_end": float64(1541152486115),
  329. }}, {{
  330. "temp": 25.5,
  331. "c": float64(1),
  332. "window_start": float64(1541152485822),
  333. "window_end": float64(1541152486822),
  334. }}, {{
  335. "temp": 25.5,
  336. "c": float64(1),
  337. "window_start": float64(1541152485903),
  338. "window_end": float64(1541152486903),
  339. }}, {{
  340. "temp": 28.1,
  341. "c": float64(1),
  342. "window_start": float64(1541152486702),
  343. "window_end": float64(1541152487702),
  344. }}, {{
  345. "temp": 28.1,
  346. "c": float64(1),
  347. "window_start": float64(1541152487442),
  348. "window_end": float64(1541152488442),
  349. }}, {{
  350. "temp": 55.5,
  351. "c": float64(2),
  352. "window_start": float64(1541152487605),
  353. "window_end": float64(1541152488605),
  354. }}, {{
  355. "temp": 27.4,
  356. "c": float64(1),
  357. "window_start": float64(1541152488252),
  358. "window_end": float64(1541152489252),
  359. }}, {{
  360. "temp": 52.9,
  361. "c": float64(2),
  362. "window_start": float64(1541152488305),
  363. "window_end": float64(1541152489305),
  364. }},
  365. },
  366. M: map[string]interface{}{
  367. "op_5_project_0_exceptions_total": int64(0),
  368. "op_5_project_0_process_latency_us": int64(0),
  369. "op_5_project_0_records_in_total": int64(8),
  370. "op_5_project_0_records_out_total": int64(8),
  371. "sink_mockSink_0_exceptions_total": int64(0),
  372. "sink_mockSink_0_records_in_total": int64(8),
  373. "sink_mockSink_0_records_out_total": int64(8),
  374. "source_demo_0_exceptions_total": int64(0),
  375. "source_demo_0_records_in_total": int64(5),
  376. "source_demo_0_records_out_total": int64(5),
  377. "source_demo1_0_exceptions_total": int64(0),
  378. "source_demo1_0_records_in_total": int64(5),
  379. "source_demo1_0_records_out_total": int64(5),
  380. "op_3_window_0_exceptions_total": int64(0),
  381. "op_3_window_0_process_latency_us": int64(0),
  382. "op_3_window_0_records_in_total": int64(10),
  383. "op_3_window_0_records_out_total": int64(10),
  384. "op_4_join_0_exceptions_total": int64(0),
  385. "op_4_join_0_process_latency_us": int64(0),
  386. "op_4_join_0_records_in_total": int64(10),
  387. "op_4_join_0_records_out_total": int64(8),
  388. },
  389. }, {
  390. Name: `TestWindowRule7`,
  391. Sql: `SELECT * FROM demoError GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  392. R: [][]map[string]interface{}{
  393. {{
  394. "error": "error in preprocessor: field size type mismatch: cannot convert string(red) to int64",
  395. }},
  396. {{
  397. "color": "blue",
  398. "size": float64(6),
  399. "ts": float64(1541152486822),
  400. }},
  401. {{
  402. "color": "blue",
  403. "size": float64(6),
  404. "ts": float64(1541152486822),
  405. }, {
  406. "color": "blue",
  407. "size": float64(2),
  408. "ts": float64(1541152487632),
  409. }},
  410. {{
  411. "error": "error in preprocessor: field color type mismatch: cannot convert int(7) to string",
  412. }},
  413. {{
  414. "color": "blue",
  415. "size": float64(2),
  416. "ts": float64(1541152487632),
  417. }},
  418. {{
  419. "error": "error in preprocessor: field size type mismatch: cannot convert string(blue) to int64",
  420. }},
  421. {},
  422. },
  423. M: map[string]interface{}{
  424. "op_3_project_0_exceptions_total": int64(3),
  425. "op_3_project_0_process_latency_us": int64(0),
  426. "op_3_project_0_records_in_total": int64(7),
  427. "op_3_project_0_records_out_total": int64(4),
  428. "sink_mockSink_0_exceptions_total": int64(0),
  429. "sink_mockSink_0_records_in_total": int64(7),
  430. "sink_mockSink_0_records_out_total": int64(7),
  431. "source_demoError_0_exceptions_total": int64(3),
  432. "source_demoError_0_records_in_total": int64(5),
  433. "source_demoError_0_records_out_total": int64(5),
  434. "op_2_window_0_exceptions_total": int64(3),
  435. "op_2_window_0_process_latency_us": int64(0),
  436. "op_2_window_0_records_in_total": int64(5),
  437. "op_2_window_0_records_out_total": int64(4),
  438. },
  439. }, {
  440. Name: `TestWindowRule8`,
  441. Sql: `SELECT color, window_end(), ts, count(*) as c, window_start() FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
  442. R: [][]map[string]interface{}{
  443. {{
  444. "color": "red",
  445. "ts": float64(1541152486013),
  446. "c": float64(2),
  447. "window_start": float64(1541152486000),
  448. "window_end": float64(1541152487000),
  449. }},
  450. },
  451. M: map[string]interface{}{
  452. "op_5_project_0_exceptions_total": int64(0),
  453. "op_5_project_0_process_latency_us": int64(0),
  454. "op_5_project_0_records_in_total": int64(1),
  455. "op_5_project_0_records_out_total": int64(1),
  456. "sink_mockSink_0_exceptions_total": int64(0),
  457. "sink_mockSink_0_records_in_total": int64(1),
  458. "sink_mockSink_0_records_out_total": int64(1),
  459. "source_demo_0_exceptions_total": int64(0),
  460. "source_demo_0_records_in_total": int64(5),
  461. "source_demo_0_records_out_total": int64(5),
  462. "op_3_window_0_exceptions_total": int64(0),
  463. "op_3_window_0_process_latency_us": int64(0),
  464. "op_3_window_0_records_in_total": int64(3),
  465. "op_3_window_0_records_out_total": int64(4),
  466. "op_2_filter_0_exceptions_total": int64(0),
  467. "op_2_filter_0_process_latency_us": int64(0),
  468. "op_2_filter_0_records_in_total": int64(5),
  469. "op_2_filter_0_records_out_total": int64(3),
  470. "op_4_having_0_exceptions_total": int64(0),
  471. "op_4_having_0_process_latency_us": int64(0),
  472. "op_4_having_0_records_in_total": int64(4),
  473. "op_4_having_0_records_out_total": int64(1),
  474. },
  475. }, {
  476. Name: `TestWindowRule9`,
  477. Sql: `SELECT color, window_start(), window_end() FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1) FILTER( WHERE size > 2)`,
  478. R: [][]map[string]interface{}{
  479. {{
  480. "color": "red",
  481. "window_start": float64(1541152485000),
  482. "window_end": float64(1541152487000),
  483. }, {
  484. "color": "blue",
  485. "window_start": float64(1541152485000),
  486. "window_end": float64(1541152487000),
  487. }},
  488. {{
  489. "color": "red",
  490. "window_start": float64(1541152486000),
  491. "window_end": float64(1541152488000),
  492. }, {
  493. "color": "blue",
  494. "window_start": float64(1541152486000),
  495. "window_end": float64(1541152488000),
  496. }},
  497. {{
  498. "color": "yellow",
  499. "window_start": float64(1541152487000),
  500. "window_end": float64(1541152489000),
  501. }},
  502. {{
  503. "color": "yellow",
  504. "window_start": float64(1541152488000),
  505. "window_end": float64(1541152490000),
  506. }},
  507. },
  508. M: map[string]interface{}{
  509. "op_4_project_0_exceptions_total": int64(0),
  510. "op_4_project_0_process_latency_us": int64(0),
  511. "op_4_project_0_records_in_total": int64(4),
  512. "op_4_project_0_records_out_total": int64(4),
  513. "sink_mockSink_0_exceptions_total": int64(0),
  514. "sink_mockSink_0_records_in_total": int64(4),
  515. "sink_mockSink_0_records_out_total": int64(4),
  516. "source_demo_0_exceptions_total": int64(0),
  517. "source_demo_0_records_in_total": int64(5),
  518. "source_demo_0_records_out_total": int64(5),
  519. "op_3_window_0_exceptions_total": int64(0),
  520. "op_3_window_0_process_latency_us": int64(0),
  521. "op_3_window_0_records_in_total": int64(3),
  522. "op_3_window_0_records_out_total": int64(4),
  523. },
  524. }, {
  525. Name: `TestCountWindowRule1`,
  526. Sql: `SELECT collect(*)[0]->color as c, window_end() as we FROM demo GROUP BY COUNTWINDOW(3)`,
  527. R: [][]map[string]interface{}{
  528. {{
  529. "c": "red",
  530. "we": 1.541152487632e+12,
  531. }},
  532. },
  533. M: map[string]interface{}{
  534. "op_3_project_0_exceptions_total": int64(0),
  535. "op_3_project_0_process_latency_us": int64(0),
  536. "op_3_project_0_records_in_total": int64(1),
  537. "op_3_project_0_records_out_total": int64(1),
  538. "sink_mockSink_0_exceptions_total": int64(0),
  539. "sink_mockSink_0_records_in_total": int64(1),
  540. "sink_mockSink_0_records_out_total": int64(1),
  541. "source_demo_0_exceptions_total": int64(0),
  542. "source_demo_0_records_in_total": int64(5),
  543. "source_demo_0_records_out_total": int64(5),
  544. "op_2_window_0_exceptions_total": int64(0),
  545. "op_2_window_0_process_latency_us": int64(0),
  546. "op_2_window_0_records_in_total": int64(5),
  547. "op_2_window_0_records_out_total": int64(1),
  548. },
  549. }, {
  550. Name: `TestWindowRule10`,
  551. Sql: `SELECT deduplicate(color, false)->color as c FROM demo GROUP BY SlidingWindow(hh, 1)`,
  552. R: [][]map[string]interface{}{
  553. {{
  554. "c": "red",
  555. }}, {{
  556. "c": "blue",
  557. }}, {{}}, {{
  558. "c": "yellow",
  559. }}, {{}},
  560. },
  561. M: map[string]interface{}{
  562. "op_3_project_0_exceptions_total": int64(0),
  563. "op_3_project_0_process_latency_us": int64(0),
  564. "op_3_project_0_records_in_total": int64(5),
  565. "op_3_project_0_records_out_total": int64(5),
  566. "sink_mockSink_0_exceptions_total": int64(0),
  567. "sink_mockSink_0_records_in_total": int64(5),
  568. "sink_mockSink_0_records_out_total": int64(5),
  569. "source_demo_0_exceptions_total": int64(0),
  570. "source_demo_0_records_in_total": int64(5),
  571. "source_demo_0_records_out_total": int64(5),
  572. "op_2_window_0_exceptions_total": int64(0),
  573. "op_2_window_0_process_latency_us": int64(0),
  574. "op_2_window_0_records_in_total": int64(5),
  575. "op_2_window_0_records_out_total": int64(5),
  576. },
  577. }, {
  578. Name: `TestWindowRule11`,
  579. Sql: `SELECT color, name, window_start(), window_end() FROM demo INNER JOIN table1 on demo.ts = table1.id where demo.size > 2 and table1.size > 1 GROUP BY tumblingwindow(ss, 1)`,
  580. R: [][]map[string]interface{}{
  581. {{
  582. "color": "red",
  583. "name": "name1",
  584. "window_start": float64(1541152486000),
  585. "window_end": float64(1541152487000),
  586. }},
  587. },
  588. M: map[string]interface{}{
  589. "op_2_window_0_exceptions_total": int64(0),
  590. "op_2_window_0_process_latency_us": int64(0),
  591. "op_2_window_0_records_in_total": int64(5),
  592. "op_2_window_0_records_out_total": int64(4),
  593. "op_4_join_aligner_0_records_in_total": int64(5),
  594. "op_4_join_aligner_0_records_out_total": int64(4),
  595. "op_5_join_0_exceptions_total": int64(0),
  596. "op_5_join_0_records_in_total": int64(4),
  597. "op_5_join_0_records_out_total": int64(1),
  598. "op_6_project_0_exceptions_total": int64(0),
  599. "op_6_project_0_records_in_total": int64(1),
  600. "op_6_project_0_records_out_total": int64(1),
  601. "sink_mockSink_0_exceptions_total": int64(0),
  602. "sink_mockSink_0_records_in_total": int64(1),
  603. "sink_mockSink_0_records_out_total": int64(1),
  604. "source_demo_0_exceptions_total": int64(0),
  605. "source_demo_0_records_in_total": int64(5),
  606. "source_demo_0_records_out_total": int64(5),
  607. "source_table1_0_exceptions_total": int64(0),
  608. "source_table1_0_records_in_total": int64(4),
  609. "source_table1_0_records_out_total": int64(1),
  610. },
  611. }, {
  612. Name: `TestWindowRule12`,
  613. Sql: `SELECT collect(size) as allSize FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1), color ORDER BY color`,
  614. R: [][]map[string]interface{}{
  615. {{
  616. "allSize": []interface{}{float64(6)},
  617. }, {
  618. "allSize": []interface{}{float64(3)},
  619. }},
  620. {{
  621. "allSize": []interface{}{float64(6), float64(2)},
  622. }, {
  623. "allSize": []interface{}{float64(3)},
  624. }},
  625. {{
  626. "allSize": []interface{}{float64(2)},
  627. }, {
  628. "allSize": []interface{}{float64(4)},
  629. }},
  630. {{
  631. "allSize": []interface{}{float64(1)},
  632. }, {
  633. "allSize": []interface{}{float64(4)},
  634. }},
  635. },
  636. M: map[string]interface{}{
  637. "sink_mockSink_0_exceptions_total": int64(0),
  638. "sink_mockSink_0_records_in_total": int64(4),
  639. "sink_mockSink_0_records_out_total": int64(4),
  640. "source_demo_0_exceptions_total": int64(0),
  641. "source_demo_0_records_in_total": int64(5),
  642. "source_demo_0_records_out_total": int64(5),
  643. "op_2_window_0_exceptions_total": int64(0),
  644. "op_2_window_0_process_latency_us": int64(0),
  645. "op_2_window_0_records_in_total": int64(5),
  646. "op_2_window_0_records_out_total": int64(4),
  647. },
  648. },
  649. }
  650. HandleStream(true, streamList, t)
  651. options := []*api.RuleOption{
  652. {
  653. BufferLength: 100,
  654. SendError: true,
  655. }, {
  656. BufferLength: 100,
  657. SendError: true,
  658. Qos: api.AtLeastOnce,
  659. CheckpointInterval: 5000,
  660. }, {
  661. BufferLength: 100,
  662. SendError: true,
  663. Qos: api.ExactlyOnce,
  664. CheckpointInterval: 5000,
  665. },
  666. }
  667. for j, opt := range options {
  668. DoRuleTest(t, tests, j, opt, 15)
  669. }
  670. }
  671. func TestEventWindow(t *testing.T) {
  672. // Reset
  673. streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE"}
  674. HandleStream(false, streamList, t)
  675. tests := []RuleTest{
  676. {
  677. Name: `TestEventWindowRule1`,
  678. Sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  679. R: [][]map[string]interface{}{
  680. {{
  681. "color": "red",
  682. "size": float64(3),
  683. "ts": float64(1541152486013),
  684. }},
  685. {{
  686. "color": "red",
  687. "size": float64(3),
  688. "ts": float64(1541152486013),
  689. }, {
  690. "color": "blue",
  691. "size": float64(2),
  692. "ts": float64(1541152487632),
  693. }},
  694. {{
  695. "color": "blue",
  696. "size": float64(2),
  697. "ts": float64(1541152487632),
  698. }, {
  699. "color": "yellow",
  700. "size": float64(4),
  701. "ts": float64(1541152488442),
  702. }},
  703. {{
  704. "color": "yellow",
  705. "size": float64(4),
  706. "ts": float64(1541152488442),
  707. }, {
  708. "color": "red",
  709. "size": float64(1),
  710. "ts": float64(1541152489252),
  711. }},
  712. {{
  713. "color": "red",
  714. "size": float64(1),
  715. "ts": float64(1541152489252),
  716. }},
  717. },
  718. M: map[string]interface{}{
  719. "op_3_project_0_exceptions_total": int64(0),
  720. "op_3_project_0_process_latency_us": int64(0),
  721. "op_3_project_0_records_in_total": int64(5),
  722. "op_3_project_0_records_out_total": int64(5),
  723. "sink_mockSink_0_exceptions_total": int64(0),
  724. "sink_mockSink_0_records_in_total": int64(5),
  725. "sink_mockSink_0_records_out_total": int64(5),
  726. "source_demoE_0_exceptions_total": int64(0),
  727. "source_demoE_0_records_in_total": int64(6),
  728. "source_demoE_0_records_out_total": int64(6),
  729. "op_2_window_0_exceptions_total": int64(0),
  730. "op_2_window_0_process_latency_us": int64(0),
  731. "op_2_window_0_records_in_total": int64(6),
  732. "op_2_window_0_records_out_total": int64(5),
  733. },
  734. }, {
  735. Name: `TestEventWindowRule2`,
  736. Sql: `SELECT window_start(), window_end(), color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  737. R: [][]map[string]interface{}{
  738. {{
  739. "window_start": float64(1541152486000),
  740. "window_end": float64(1541152487000),
  741. "color": "red",
  742. "ts": float64(1541152486013),
  743. }},
  744. {{
  745. "window_start": float64(1541152488000),
  746. "window_end": float64(1541152489000),
  747. "color": "yellow",
  748. "ts": float64(1541152488442),
  749. }},
  750. },
  751. M: map[string]interface{}{
  752. "op_4_project_0_exceptions_total": int64(0),
  753. "op_4_project_0_process_latency_us": int64(0),
  754. "op_4_project_0_records_in_total": int64(2),
  755. "op_4_project_0_records_out_total": int64(2),
  756. "sink_mockSink_0_exceptions_total": int64(0),
  757. "sink_mockSink_0_records_in_total": int64(2),
  758. "sink_mockSink_0_records_out_total": int64(2),
  759. "source_demoE_0_exceptions_total": int64(0),
  760. "source_demoE_0_records_in_total": int64(6),
  761. "source_demoE_0_records_out_total": int64(6),
  762. "op_2_window_0_exceptions_total": int64(0),
  763. "op_2_window_0_process_latency_us": int64(0),
  764. "op_2_window_0_records_in_total": int64(6),
  765. "op_2_window_0_records_out_total": int64(5),
  766. "op_3_filter_0_exceptions_total": int64(0),
  767. "op_3_filter_0_process_latency_us": int64(0),
  768. "op_3_filter_0_records_in_total": int64(5),
  769. "op_3_filter_0_records_out_total": int64(2),
  770. },
  771. }, {
  772. Name: `TestEventWindowRule3`,
  773. Sql: `SELECT color, temp, demoE.ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  774. R: [][]map[string]interface{}{
  775. {{
  776. "color": "red",
  777. "temp": 25.5,
  778. "ts": float64(1541152486013),
  779. }}, {{
  780. "color": "red",
  781. "temp": 25.5,
  782. "ts": float64(1541152486013),
  783. }}, {{
  784. "color": "blue",
  785. "temp": 28.1,
  786. "ts": float64(1541152487632),
  787. }}, {{
  788. "color": "blue",
  789. "temp": 28.1,
  790. "ts": float64(1541152487632),
  791. }, {
  792. "color": "yellow",
  793. "temp": 27.4,
  794. "ts": float64(1541152488442),
  795. }}, {{
  796. "color": "yellow",
  797. "temp": 27.4,
  798. "ts": float64(1541152488442),
  799. }, {
  800. "color": "red",
  801. "temp": 25.5,
  802. "ts": float64(1541152489252),
  803. }},
  804. },
  805. M: map[string]interface{}{
  806. "op_5_project_0_exceptions_total": int64(0),
  807. "op_5_project_0_process_latency_us": int64(0),
  808. "op_5_project_0_records_in_total": int64(5),
  809. "op_5_project_0_records_out_total": int64(5),
  810. "sink_mockSink_0_exceptions_total": int64(0),
  811. "sink_mockSink_0_records_in_total": int64(5),
  812. "sink_mockSink_0_records_out_total": int64(5),
  813. "source_demoE_0_exceptions_total": int64(0),
  814. "source_demoE_0_records_in_total": int64(6),
  815. "source_demoE_0_records_out_total": int64(6),
  816. "source_demo1E_0_exceptions_total": int64(0),
  817. "source_demo1E_0_records_in_total": int64(6),
  818. "source_demo1E_0_records_out_total": int64(6),
  819. "op_3_window_0_exceptions_total": int64(0),
  820. "op_3_window_0_process_latency_us": int64(0),
  821. "op_3_window_0_records_in_total": int64(12),
  822. "op_3_window_0_records_out_total": int64(5),
  823. "op_4_join_0_exceptions_total": int64(0),
  824. "op_4_join_0_process_latency_us": int64(0),
  825. "op_4_join_0_records_in_total": int64(5),
  826. "op_4_join_0_records_out_total": int64(5),
  827. },
  828. }, {
  829. Name: `TestEventWindowRule4`,
  830. Sql: `SELECT window_start() as ws, color, window_end() as we FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  831. R: [][]map[string]interface{}{
  832. {{
  833. "color": "red",
  834. "ws": float64(1541152484013),
  835. "we": float64(1541152486013),
  836. }}, {{
  837. "color": "blue",
  838. "ws": float64(1541152485632),
  839. "we": float64(1541152487632),
  840. }, {
  841. "color": "red",
  842. "ws": float64(1541152485632),
  843. "we": float64(1541152487632),
  844. }}, {{
  845. "color": "blue",
  846. "ws": float64(1541152486442),
  847. "we": float64(1541152488442),
  848. }, {
  849. "color": "yellow",
  850. "ws": float64(1541152486442),
  851. "we": float64(1541152488442),
  852. }}, {{
  853. "color": "blue",
  854. "ws": float64(1541152487252),
  855. "we": float64(1541152489252),
  856. }, {
  857. "color": "red",
  858. "ws": float64(1541152487252),
  859. "we": float64(1541152489252),
  860. }, {
  861. "color": "yellow",
  862. "ws": float64(1541152487252),
  863. "we": float64(1541152489252),
  864. }},
  865. },
  866. M: map[string]interface{}{
  867. "op_5_project_0_exceptions_total": int64(0),
  868. "op_5_project_0_process_latency_us": int64(0),
  869. "op_5_project_0_records_in_total": int64(4),
  870. "op_5_project_0_records_out_total": int64(4),
  871. "sink_mockSink_0_exceptions_total": int64(0),
  872. "sink_mockSink_0_records_in_total": int64(4),
  873. "sink_mockSink_0_records_out_total": int64(4),
  874. "source_demoE_0_exceptions_total": int64(0),
  875. "source_demoE_0_records_in_total": int64(6),
  876. "source_demoE_0_records_out_total": int64(6),
  877. "op_2_window_0_exceptions_total": int64(0),
  878. "op_2_window_0_process_latency_us": int64(0),
  879. "op_2_window_0_records_in_total": int64(6),
  880. "op_2_window_0_records_out_total": int64(4),
  881. "op_3_aggregate_0_exceptions_total": int64(0),
  882. "op_3_aggregate_0_process_latency_us": int64(0),
  883. "op_3_aggregate_0_records_in_total": int64(4),
  884. "op_3_aggregate_0_records_out_total": int64(4),
  885. "op_4_order_0_exceptions_total": int64(0),
  886. "op_4_order_0_process_latency_us": int64(0),
  887. "op_4_order_0_records_in_total": int64(4),
  888. "op_4_order_0_records_out_total": int64(4),
  889. },
  890. }, {
  891. Name: `TestEventWindowRule5`,
  892. Sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  893. R: [][]map[string]interface{}{
  894. {{
  895. "temp": 25.5,
  896. }}, {{
  897. "temp": 28.1,
  898. }, {
  899. "temp": 27.4,
  900. }, {
  901. "temp": 25.5,
  902. }}, {{
  903. "temp": 26.2,
  904. }, {
  905. "temp": 26.8,
  906. }, {
  907. "temp": 28.9,
  908. }, {
  909. "temp": 29.1,
  910. }, {
  911. "temp": 32.2,
  912. }}, {{
  913. "temp": 30.9,
  914. }},
  915. },
  916. M: map[string]interface{}{
  917. "op_3_project_0_exceptions_total": int64(0),
  918. "op_3_project_0_process_latency_us": int64(0),
  919. "op_3_project_0_records_in_total": int64(4),
  920. "op_3_project_0_records_out_total": int64(4),
  921. "sink_mockSink_0_exceptions_total": int64(0),
  922. "sink_mockSink_0_records_in_total": int64(4),
  923. "sink_mockSink_0_records_out_total": int64(4),
  924. "source_sessionDemoE_0_exceptions_total": int64(0),
  925. "source_sessionDemoE_0_records_in_total": int64(12),
  926. "source_sessionDemoE_0_records_out_total": int64(12),
  927. "op_2_window_0_exceptions_total": int64(0),
  928. "op_2_window_0_process_latency_us": int64(0),
  929. "op_2_window_0_records_in_total": int64(12),
  930. "op_2_window_0_records_out_total": int64(4),
  931. },
  932. }, {
  933. Name: `TestEventWindowRule6`,
  934. Sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  935. R: [][]map[string]interface{}{
  936. {{
  937. "m": 25.5,
  938. "c": float64(1),
  939. }}, {{
  940. "m": 25.5,
  941. "c": float64(1),
  942. }}, {{
  943. "m": 28.1,
  944. "c": float64(1),
  945. }}, {{
  946. "m": 28.1,
  947. "c": float64(2),
  948. }}, {{
  949. "m": 27.4,
  950. "c": float64(2),
  951. }},
  952. },
  953. M: map[string]interface{}{
  954. "op_5_project_0_exceptions_total": int64(0),
  955. "op_5_project_0_process_latency_us": int64(0),
  956. "op_5_project_0_records_in_total": int64(5),
  957. "op_5_project_0_records_out_total": int64(5),
  958. "sink_mockSink_0_exceptions_total": int64(0),
  959. "sink_mockSink_0_records_in_total": int64(5),
  960. "sink_mockSink_0_records_out_total": int64(5),
  961. "source_demoE_0_exceptions_total": int64(0),
  962. "source_demoE_0_records_in_total": int64(6),
  963. "source_demoE_0_records_out_total": int64(6),
  964. "source_demo1E_0_exceptions_total": int64(0),
  965. "source_demo1E_0_records_in_total": int64(6),
  966. "source_demo1E_0_records_out_total": int64(6),
  967. "op_3_window_0_exceptions_total": int64(0),
  968. "op_3_window_0_records_in_total": int64(12),
  969. "op_3_window_0_records_out_total": int64(5),
  970. "op_4_join_0_exceptions_total": int64(0),
  971. "op_4_join_0_process_latency_us": int64(0),
  972. "op_4_join_0_records_in_total": int64(5),
  973. "op_4_join_0_records_out_total": int64(5),
  974. },
  975. }, {
  976. Name: `TestEventWindowRule7`,
  977. Sql: `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  978. R: [][]map[string]interface{}{
  979. {{
  980. "error": "error in preprocessor: field color type mismatch: cannot convert int(2) to string",
  981. }},
  982. {{
  983. "color": "red",
  984. "size": float64(3),
  985. "ts": float64(1541152486013),
  986. }},
  987. {{
  988. "color": "red",
  989. "size": float64(3),
  990. "ts": float64(1541152486013),
  991. }},
  992. {{
  993. "color": "yellow",
  994. "size": float64(4),
  995. "ts": float64(1541152488442),
  996. }},
  997. {{
  998. "color": "yellow",
  999. "size": float64(4),
  1000. "ts": float64(1541152488442),
  1001. }, {
  1002. "color": "red",
  1003. "size": float64(1),
  1004. "ts": float64(1541152489252),
  1005. }},
  1006. {{
  1007. "color": "red",
  1008. "size": float64(1),
  1009. "ts": float64(1541152489252),
  1010. }},
  1011. },
  1012. M: map[string]interface{}{
  1013. "op_3_project_0_exceptions_total": int64(1),
  1014. "op_3_project_0_process_latency_us": int64(0),
  1015. "op_3_project_0_records_in_total": int64(6),
  1016. "op_3_project_0_records_out_total": int64(5),
  1017. "sink_mockSink_0_exceptions_total": int64(0),
  1018. "sink_mockSink_0_records_in_total": int64(6),
  1019. "sink_mockSink_0_records_out_total": int64(6),
  1020. "source_demoErr_0_exceptions_total": int64(1),
  1021. "source_demoErr_0_records_in_total": int64(6),
  1022. "source_demoErr_0_records_out_total": int64(6),
  1023. "op_2_window_0_exceptions_total": int64(1),
  1024. "op_2_window_0_process_latency_us": int64(0),
  1025. "op_2_window_0_records_in_total": int64(6),
  1026. "op_2_window_0_records_out_total": int64(5),
  1027. },
  1028. }, {
  1029. Name: `TestEventWindowRule8`,
  1030. Sql: `SELECT temp, window_start(), window_end() FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  1031. R: [][]map[string]interface{}{
  1032. {{
  1033. "temp": 25.5,
  1034. "window_start": float64(1541152486013),
  1035. "window_end": float64(1541152487013),
  1036. }}, {{
  1037. "temp": 28.1,
  1038. "window_start": float64(1541152487932),
  1039. "window_end": float64(1541152490000),
  1040. }, {
  1041. "temp": 27.4,
  1042. "window_start": float64(1541152487932),
  1043. "window_end": float64(1541152490000),
  1044. }, {
  1045. "temp": 25.5,
  1046. "window_start": float64(1541152487932),
  1047. "window_end": float64(1541152490000),
  1048. }}, {{
  1049. "temp": 26.2,
  1050. "window_start": float64(1541152490000),
  1051. "window_end": float64(1541152494000),
  1052. }, {
  1053. "temp": 26.8,
  1054. "window_start": float64(1541152490000),
  1055. "window_end": float64(1541152494000),
  1056. }, {
  1057. "temp": 28.9,
  1058. "window_start": float64(1541152490000),
  1059. "window_end": float64(1541152494000),
  1060. }, {
  1061. "temp": 29.1,
  1062. "window_start": float64(1541152490000),
  1063. "window_end": float64(1541152494000),
  1064. }, {
  1065. "temp": 32.2,
  1066. "window_start": float64(1541152490000),
  1067. "window_end": float64(1541152494000),
  1068. }}, {{
  1069. "temp": 30.9,
  1070. "window_start": float64(1541152494000),
  1071. "window_end": float64(1541152495112),
  1072. }},
  1073. },
  1074. M: map[string]interface{}{
  1075. "op_3_project_0_exceptions_total": int64(0),
  1076. "op_3_project_0_process_latency_us": int64(0),
  1077. "op_3_project_0_records_in_total": int64(4),
  1078. "op_3_project_0_records_out_total": int64(4),
  1079. "sink_mockSink_0_exceptions_total": int64(0),
  1080. "sink_mockSink_0_records_in_total": int64(4),
  1081. "sink_mockSink_0_records_out_total": int64(4),
  1082. "source_sessionDemoE_0_exceptions_total": int64(0),
  1083. "source_sessionDemoE_0_records_in_total": int64(12),
  1084. "source_sessionDemoE_0_records_out_total": int64(12),
  1085. "op_2_window_0_exceptions_total": int64(0),
  1086. "op_2_window_0_process_latency_us": int64(0),
  1087. "op_2_window_0_records_in_total": int64(12),
  1088. "op_2_window_0_records_out_total": int64(4),
  1089. },
  1090. }, {
  1091. Name: `TestEventWindowRule9`,
  1092. Sql: `SELECT window_end(), color, window_start() FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1093. R: [][]map[string]interface{}{
  1094. {{
  1095. "color": "red",
  1096. "window_start": float64(1541152485000),
  1097. "window_end": float64(1541152487000),
  1098. }},
  1099. {{
  1100. "color": "red",
  1101. "window_start": float64(1541152486000),
  1102. "window_end": float64(1541152488000),
  1103. }, {
  1104. "color": "blue",
  1105. "window_start": float64(1541152486000),
  1106. "window_end": float64(1541152488000),
  1107. }},
  1108. {{
  1109. "color": "blue",
  1110. "window_start": float64(1541152487000),
  1111. "window_end": float64(1541152489000),
  1112. }, {
  1113. "color": "yellow",
  1114. "window_start": float64(1541152487000),
  1115. "window_end": float64(1541152489000),
  1116. }},
  1117. {{
  1118. "color": "yellow",
  1119. "window_start": float64(1541152488000),
  1120. "window_end": float64(1541152490000),
  1121. }, {
  1122. "color": "red",
  1123. "window_start": float64(1541152488000),
  1124. "window_end": float64(1541152490000),
  1125. }},
  1126. {{
  1127. "color": "red",
  1128. "window_start": float64(1541152489000),
  1129. "window_end": float64(1541152491000),
  1130. }},
  1131. },
  1132. M: map[string]interface{}{
  1133. "op_3_project_0_exceptions_total": int64(0),
  1134. "op_3_project_0_process_latency_us": int64(0),
  1135. "op_3_project_0_records_in_total": int64(5),
  1136. "op_3_project_0_records_out_total": int64(5),
  1137. "sink_mockSink_0_exceptions_total": int64(0),
  1138. "sink_mockSink_0_records_in_total": int64(5),
  1139. "sink_mockSink_0_records_out_total": int64(5),
  1140. "source_demoE_0_exceptions_total": int64(0),
  1141. "source_demoE_0_records_in_total": int64(6),
  1142. "source_demoE_0_records_out_total": int64(6),
  1143. "op_2_window_0_exceptions_total": int64(0),
  1144. "op_2_window_0_process_latency_us": int64(0),
  1145. "op_2_window_0_records_in_total": int64(6),
  1146. "op_2_window_0_records_out_total": int64(5),
  1147. },
  1148. },
  1149. }
  1150. HandleStream(true, streamList, t)
  1151. options := []*api.RuleOption{
  1152. {
  1153. BufferLength: 100,
  1154. SendError: true,
  1155. IsEventTime: true,
  1156. LateTol: 1000,
  1157. }, {
  1158. BufferLength: 100,
  1159. SendError: true,
  1160. Qos: api.AtLeastOnce,
  1161. CheckpointInterval: 5000,
  1162. IsEventTime: true,
  1163. LateTol: 1000,
  1164. }, {
  1165. BufferLength: 100,
  1166. SendError: true,
  1167. Qos: api.ExactlyOnce,
  1168. CheckpointInterval: 5000,
  1169. IsEventTime: true,
  1170. LateTol: 1000,
  1171. },
  1172. }
  1173. for j, opt := range options {
  1174. DoRuleTest(t, tests, j, opt, 10)
  1175. }
  1176. }
  1177. func TestWindowError(t *testing.T) {
  1178. // Reset
  1179. streamList := []string{"ldemo", "ldemo1"}
  1180. HandleStream(false, streamList, t)
  1181. tests := []RuleTest{
  1182. {
  1183. Name: `TestWindowErrorRule1`,
  1184. Sql: `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
  1185. R: [][]map[string]interface{}{
  1186. {{
  1187. "error": "run Select error: invalid operation string(string) * int64(3)",
  1188. }}, {{
  1189. "kuiper_field_0": float64(6),
  1190. }, {}},
  1191. },
  1192. M: map[string]interface{}{
  1193. "op_3_project_0_exceptions_total": int64(1),
  1194. "op_3_project_0_process_latency_us": int64(0),
  1195. "op_3_project_0_records_in_total": int64(2),
  1196. "op_3_project_0_records_out_total": int64(1),
  1197. "sink_mockSink_0_exceptions_total": int64(0),
  1198. "sink_mockSink_0_records_in_total": int64(2),
  1199. "sink_mockSink_0_records_out_total": int64(2),
  1200. "source_ldemo_0_exceptions_total": int64(0),
  1201. "source_ldemo_0_records_in_total": int64(5),
  1202. "source_ldemo_0_records_out_total": int64(5),
  1203. "op_2_window_0_exceptions_total": int64(0),
  1204. "op_2_window_0_process_latency_us": int64(0),
  1205. "op_2_window_0_records_in_total": int64(5),
  1206. "op_2_window_0_records_out_total": int64(2),
  1207. },
  1208. }, {
  1209. Name: `TestWindowErrorRule2`,
  1210. Sql: `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1211. R: [][]map[string]interface{}{
  1212. {{
  1213. "error": "run Where error: invalid operation string(string) > int64(2)",
  1214. }}, {{
  1215. "color": "red",
  1216. "ts": float64(1541152486013),
  1217. }}, {{
  1218. "ts": float64(1541152487632),
  1219. }}, {}, {},
  1220. },
  1221. M: map[string]interface{}{
  1222. "op_4_project_0_exceptions_total": int64(1),
  1223. "op_4_project_0_process_latency_us": int64(0),
  1224. "op_4_project_0_records_in_total": int64(5),
  1225. "op_4_project_0_records_out_total": int64(4),
  1226. "sink_mockSink_0_exceptions_total": int64(0),
  1227. "sink_mockSink_0_records_in_total": int64(5),
  1228. "sink_mockSink_0_records_out_total": int64(5),
  1229. "source_ldemo_0_exceptions_total": int64(0),
  1230. "source_ldemo_0_records_in_total": int64(5),
  1231. "source_ldemo_0_records_out_total": int64(5),
  1232. "op_3_window_0_exceptions_total": int64(1),
  1233. "op_3_window_0_process_latency_us": int64(0),
  1234. "op_3_window_0_records_in_total": int64(3),
  1235. "op_3_window_0_records_out_total": int64(4),
  1236. "op_2_filter_0_exceptions_total": int64(1),
  1237. "op_2_filter_0_process_latency_us": int64(0),
  1238. "op_2_filter_0_records_in_total": int64(5),
  1239. "op_2_filter_0_records_out_total": int64(2),
  1240. },
  1241. }, {
  1242. Name: `TestWindowErrorRule3`,
  1243. Sql: `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1244. R: [][]map[string]interface{}{
  1245. {{
  1246. "color": "red",
  1247. "temp": 25.5,
  1248. "ts": float64(1541152486013),
  1249. }}, {{
  1250. "color": "red",
  1251. "temp": 25.5,
  1252. "ts": float64(1541152486013),
  1253. }}, {{
  1254. "color": "red",
  1255. "temp": 25.5,
  1256. "ts": float64(1541152486013),
  1257. }}, {{
  1258. "temp": 28.1,
  1259. "ts": float64(1541152487632),
  1260. }}, {{
  1261. "temp": 28.1,
  1262. "ts": float64(1541152487632),
  1263. }}, {{
  1264. "error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
  1265. }}, {{
  1266. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1267. }}, {{
  1268. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1269. }},
  1270. },
  1271. M: map[string]interface{}{
  1272. "op_5_project_0_exceptions_total": int64(3),
  1273. "op_5_project_0_process_latency_us": int64(0),
  1274. "op_5_project_0_records_in_total": int64(8),
  1275. "op_5_project_0_records_out_total": int64(5),
  1276. "sink_mockSink_0_exceptions_total": int64(0),
  1277. "sink_mockSink_0_records_in_total": int64(8),
  1278. "sink_mockSink_0_records_out_total": int64(8),
  1279. "source_ldemo_0_exceptions_total": int64(0),
  1280. "source_ldemo_0_records_in_total": int64(5),
  1281. "source_ldemo_0_records_out_total": int64(5),
  1282. "source_ldemo1_0_exceptions_total": int64(0),
  1283. "source_ldemo1_0_records_in_total": int64(5),
  1284. "source_ldemo1_0_records_out_total": int64(5),
  1285. "op_3_window_0_exceptions_total": int64(0),
  1286. "op_3_window_0_process_latency_us": int64(0),
  1287. "op_3_window_0_records_in_total": int64(10),
  1288. "op_3_window_0_records_out_total": int64(10),
  1289. "op_4_join_0_exceptions_total": int64(3),
  1290. "op_4_join_0_process_latency_us": int64(0),
  1291. "op_4_join_0_records_in_total": int64(10),
  1292. "op_4_join_0_records_out_total": int64(5),
  1293. },
  1294. }, {
  1295. Name: `TestWindowErrorRule4`,
  1296. Sql: `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having collect(size)[0] >= 2 order by color`,
  1297. R: [][]map[string]interface{}{
  1298. {{
  1299. "color": "red",
  1300. }}, {{
  1301. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1302. }}, {{
  1303. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1304. }}, {{
  1305. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1306. }}, {{
  1307. "color": float64(49),
  1308. }, {}},
  1309. },
  1310. M: map[string]interface{}{
  1311. "op_6_project_0_exceptions_total": int64(3),
  1312. "op_6_project_0_process_latency_us": int64(0),
  1313. "op_6_project_0_records_in_total": int64(5),
  1314. "op_6_project_0_records_out_total": int64(2),
  1315. "sink_mockSink_0_exceptions_total": int64(0),
  1316. "sink_mockSink_0_records_in_total": int64(5),
  1317. "sink_mockSink_0_records_out_total": int64(5),
  1318. "source_ldemo_0_exceptions_total": int64(0),
  1319. "source_ldemo_0_records_in_total": int64(5),
  1320. "source_ldemo_0_records_out_total": int64(5),
  1321. "op_2_window_0_exceptions_total": int64(0),
  1322. "op_2_window_0_process_latency_us": int64(0),
  1323. "op_2_window_0_records_in_total": int64(5),
  1324. "op_2_window_0_records_out_total": int64(5),
  1325. "op_3_aggregate_0_exceptions_total": int64(0),
  1326. "op_3_aggregate_0_process_latency_us": int64(0),
  1327. "op_3_aggregate_0_records_in_total": int64(5),
  1328. "op_3_aggregate_0_records_out_total": int64(5),
  1329. "op_4_having_0_exceptions_total": int64(3),
  1330. "op_4_having_0_process_latency_us": int64(0),
  1331. "op_4_having_0_records_in_total": int64(5),
  1332. "op_4_having_0_records_out_total": int64(2),
  1333. },
  1334. }, {
  1335. Name: `TestWindowErrorRule5`,
  1336. Sql: `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
  1337. R: [][]map[string]interface{}{
  1338. {{
  1339. "error": "run Order By error: incompatible types for comparison: int and string",
  1340. }}, {{
  1341. "size": float64(3),
  1342. }}, {{
  1343. "color": float64(49),
  1344. "size": float64(2),
  1345. }}, {{
  1346. "color": "red",
  1347. }},
  1348. },
  1349. M: map[string]interface{}{
  1350. "op_4_project_0_exceptions_total": int64(1),
  1351. "op_4_project_0_process_latency_us": int64(0),
  1352. "op_4_project_0_records_in_total": int64(4),
  1353. "op_4_project_0_records_out_total": int64(3),
  1354. "sink_mockSink_0_exceptions_total": int64(0),
  1355. "sink_mockSink_0_records_in_total": int64(4),
  1356. "sink_mockSink_0_records_out_total": int64(4),
  1357. "source_ldemo_0_exceptions_total": int64(0),
  1358. "source_ldemo_0_records_in_total": int64(5),
  1359. "source_ldemo_0_records_out_total": int64(5),
  1360. "op_2_window_0_exceptions_total": int64(0),
  1361. "op_2_window_0_process_latency_us": int64(0),
  1362. "op_2_window_0_records_in_total": int64(5),
  1363. "op_2_window_0_records_out_total": int64(4),
  1364. "op_3_order_0_exceptions_total": int64(1),
  1365. "op_3_order_0_process_latency_us": int64(0),
  1366. "op_3_order_0_records_in_total": int64(4),
  1367. "op_3_order_0_records_out_total": int64(3),
  1368. },
  1369. },
  1370. }
  1371. HandleStream(true, streamList, t)
  1372. DoRuleTest(t, tests, 0, &api.RuleOption{
  1373. BufferLength: 100,
  1374. SendError: true,
  1375. }, 0)
  1376. }