rule_test.go 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "encoding/json"
  17. "testing"
  18. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. )
  21. func TestLimitSQL(t *testing.T) {
  22. // Reset
  23. streamList := []string{"demo", "demoArr", "demoArr2"}
  24. HandleStream(false, streamList, t)
  25. var r [][]map[string]interface{}
  26. tests := []RuleTest{
  27. {
  28. Name: "TestLimitSQL01",
  29. Sql: `SELECT unnest(demoArr2.arr) as col, demo.size FROM demo inner join demoArr2 on demo.size = demoArr2.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  30. R: [][]map[string]interface{}{
  31. {
  32. {
  33. "col": float64(1),
  34. "size": float64(1),
  35. },
  36. },
  37. },
  38. },
  39. {
  40. Name: "TestLimitSQL0",
  41. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  42. R: [][]map[string]interface{}{
  43. {
  44. {
  45. "col": float64(1),
  46. "size": float64(1),
  47. },
  48. },
  49. },
  50. },
  51. {
  52. Name: "TestLimitSQL1",
  53. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 0;`,
  54. R: r,
  55. },
  56. {
  57. Name: "TestLimitSQL2",
  58. Sql: `SELECT demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  59. R: [][]map[string]interface{}{
  60. {
  61. {
  62. "size": float64(1),
  63. },
  64. },
  65. },
  66. },
  67. {
  68. Name: "TestLimitSQL3",
  69. Sql: `SELECT demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 0;`,
  70. R: r,
  71. },
  72. }
  73. // Data setup
  74. HandleStream(true, streamList, t)
  75. options := []*api.RuleOption{
  76. {
  77. BufferLength: 100,
  78. SendError: true,
  79. },
  80. {
  81. BufferLength: 100,
  82. SendError: true,
  83. Qos: api.AtLeastOnce,
  84. CheckpointInterval: 5000,
  85. },
  86. {
  87. BufferLength: 100,
  88. SendError: true,
  89. Qos: api.ExactlyOnce,
  90. CheckpointInterval: 5000,
  91. },
  92. }
  93. for j, opt := range options {
  94. DoRuleTest(t, tests, j, opt, 0)
  95. }
  96. }
  97. func TestSRFSQL(t *testing.T) {
  98. // Reset
  99. streamList := []string{"demo", "demoArr"}
  100. HandleStream(false, streamList, t)
  101. tests := []RuleTest{
  102. {
  103. Name: "TestSingleSQLRule25",
  104. Sql: "SELECT unnest(a) from demoArr group by SESSIONWINDOW(ss, 2, 1);",
  105. R: [][]map[string]interface{}{
  106. {
  107. {
  108. "error": "the argument for the unnest function should be array",
  109. },
  110. },
  111. },
  112. },
  113. {
  114. Name: "TestSingleSQLRule24",
  115. Sql: "Select unnest(a) from demoArr;",
  116. R: [][]map[string]interface{}{
  117. {
  118. {
  119. "error": "the argument for the unnest function should be array",
  120. },
  121. },
  122. },
  123. },
  124. {
  125. Name: "TestSingleSQLRule21",
  126. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1);`,
  127. R: [][]map[string]interface{}{
  128. {
  129. {
  130. "col": float64(1),
  131. "size": float64(1),
  132. },
  133. {
  134. "col": float64(2),
  135. "size": float64(1),
  136. },
  137. {
  138. "col": float64(3),
  139. "size": float64(1),
  140. },
  141. },
  142. },
  143. },
  144. {
  145. Name: "TestSingleSQLRule22",
  146. Sql: `SELECT unnest(arr3) as col,y From demoArr group by y, SESSIONWINDOW(ss, 2, 1);`,
  147. R: [][]map[string]interface{}{
  148. {
  149. {
  150. "col": float64(1),
  151. "y": float64(2),
  152. },
  153. {
  154. "col": float64(2),
  155. "y": float64(2),
  156. },
  157. {
  158. "col": float64(3),
  159. "y": float64(2),
  160. },
  161. },
  162. },
  163. },
  164. {
  165. Name: "TestSingleSQLRule23",
  166. Sql: "SELECT unnest(arr3) as col,a from demoArr group by SESSIONWINDOW(ss, 2, 1);",
  167. R: [][]map[string]interface{}{
  168. {
  169. {
  170. "col": float64(1),
  171. "a": float64(6),
  172. },
  173. {
  174. "col": float64(2),
  175. "a": float64(6),
  176. },
  177. {
  178. "col": float64(3),
  179. "a": float64(6),
  180. },
  181. },
  182. },
  183. },
  184. {
  185. Name: `TestSingleSQLRule18`,
  186. Sql: `SELECT unnest(arr2) FROM demoArr where x=1`,
  187. R: [][]map[string]interface{}{
  188. {
  189. {
  190. "a": float64(1),
  191. "b": float64(2),
  192. },
  193. },
  194. {
  195. {
  196. "a": float64(3),
  197. "b": float64(4),
  198. },
  199. },
  200. },
  201. },
  202. // The mapping schema created by unnest function will cover the original column if they have the same column name
  203. {
  204. Name: `TestSingleSQLRule19`,
  205. Sql: `SELECT unnest(arr2),a FROM demoArr where x=1`,
  206. R: [][]map[string]interface{}{
  207. {
  208. {
  209. "a": float64(1),
  210. "b": float64(2),
  211. },
  212. },
  213. {
  214. {
  215. "a": float64(3),
  216. "b": float64(4),
  217. },
  218. },
  219. },
  220. },
  221. {
  222. Name: `TestSingleSQLRule20`,
  223. Sql: `SELECT unnest(arr3) as col FROM demoArr where x=1`,
  224. R: [][]map[string]interface{}{
  225. {
  226. {
  227. "col": float64(1),
  228. },
  229. },
  230. {
  231. {
  232. "col": float64(2),
  233. },
  234. },
  235. {
  236. {
  237. "col": float64(3),
  238. },
  239. },
  240. },
  241. },
  242. {
  243. Name: `TestSingleSQLRule21`,
  244. Sql: `SELECT unnest(arr2),x FROM demoArr where x=1`,
  245. R: [][]map[string]interface{}{
  246. {
  247. {
  248. "a": float64(1),
  249. "b": float64(2),
  250. "x": float64(1),
  251. },
  252. },
  253. {
  254. {
  255. "a": float64(3),
  256. "b": float64(4),
  257. "x": float64(1),
  258. },
  259. },
  260. },
  261. },
  262. }
  263. // Data setup
  264. HandleStream(true, streamList, t)
  265. options := []*api.RuleOption{
  266. {
  267. BufferLength: 100,
  268. SendError: true,
  269. }, {
  270. BufferLength: 100,
  271. SendError: true,
  272. Qos: api.AtLeastOnce,
  273. CheckpointInterval: 5000,
  274. }, {
  275. BufferLength: 100,
  276. SendError: true,
  277. Qos: api.ExactlyOnce,
  278. CheckpointInterval: 5000,
  279. },
  280. }
  281. for j, opt := range options {
  282. DoRuleTest(t, tests, j, opt, 0)
  283. }
  284. }
  285. func TestSingleSQL(t *testing.T) {
  286. // Reset
  287. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable", "demoArr"}
  288. HandleStream(false, streamList, t)
  289. // Data setup
  290. tests := []RuleTest{
  291. {
  292. Name: `TestSingleSQLRule0`,
  293. Sql: `SELECT arr[x:y+1] as col1 FROM demoArr where x=1`,
  294. R: [][]map[string]interface{}{
  295. {{
  296. "col1": []interface{}{
  297. float64(2), float64(3),
  298. },
  299. }},
  300. },
  301. },
  302. {
  303. Name: `TestSingleSQLRule1`,
  304. Sql: `SELECT *, upper(color), event_time() FROM demo`,
  305. R: [][]map[string]interface{}{
  306. {{
  307. "color": "red",
  308. "size": float64(3),
  309. "ts": float64(1541152486013),
  310. "upper": "RED",
  311. "event_time": float64(1541152486013),
  312. }},
  313. {{
  314. "color": "blue",
  315. "size": float64(6),
  316. "ts": float64(1541152486822),
  317. "upper": "BLUE",
  318. "event_time": float64(1541152486822),
  319. }},
  320. {{
  321. "color": "blue",
  322. "size": float64(2),
  323. "ts": float64(1541152487632),
  324. "upper": "BLUE",
  325. "event_time": float64(1541152487632),
  326. }},
  327. {{
  328. "color": "yellow",
  329. "size": float64(4),
  330. "ts": float64(1541152488442),
  331. "upper": "YELLOW",
  332. "event_time": float64(1541152488442),
  333. }},
  334. {{
  335. "color": "red",
  336. "size": float64(1),
  337. "ts": float64(1541152489252),
  338. "upper": "RED",
  339. "event_time": float64(1541152489252),
  340. }},
  341. },
  342. M: map[string]interface{}{
  343. "op_2_project_0_exceptions_total": int64(0),
  344. "op_2_project_0_process_latency_us": int64(0),
  345. "op_2_project_0_records_in_total": int64(5),
  346. "op_2_project_0_records_out_total": int64(5),
  347. "sink_mockSink_0_exceptions_total": int64(0),
  348. "sink_mockSink_0_records_in_total": int64(5),
  349. "sink_mockSink_0_records_out_total": int64(5),
  350. "source_demo_0_exceptions_total": int64(0),
  351. "source_demo_0_records_in_total": int64(5),
  352. "source_demo_0_records_out_total": int64(5),
  353. },
  354. T: &api.PrintableTopo{
  355. Sources: []string{"source_demo"},
  356. Edges: map[string][]interface{}{
  357. "source_demo": {"op_2_project"},
  358. "op_2_project": {"sink_mockSink"},
  359. },
  360. },
  361. },
  362. {
  363. Name: `TestSingleSQLRule2`,
  364. Sql: `SELECT color, ts, last_hit_count() + 1 as lc FROM demo where size > 3`,
  365. R: [][]map[string]interface{}{
  366. {{
  367. "color": "blue",
  368. "ts": float64(1541152486822),
  369. "lc": float64(1),
  370. }},
  371. {{
  372. "color": "yellow",
  373. "ts": float64(1541152488442),
  374. "lc": float64(2),
  375. }},
  376. },
  377. M: map[string]interface{}{
  378. "op_3_project_0_exceptions_total": int64(0),
  379. "op_3_project_0_process_latency_us": int64(0),
  380. "op_3_project_0_records_in_total": int64(2),
  381. "op_3_project_0_records_out_total": int64(2),
  382. "sink_mockSink_0_exceptions_total": int64(0),
  383. "sink_mockSink_0_records_in_total": int64(2),
  384. "sink_mockSink_0_records_out_total": int64(2),
  385. "source_demo_0_exceptions_total": int64(0),
  386. "source_demo_0_records_in_total": int64(5),
  387. "source_demo_0_records_out_total": int64(5),
  388. "op_2_filter_0_exceptions_total": int64(0),
  389. "op_2_filter_0_process_latency_us": int64(0),
  390. "op_2_filter_0_records_in_total": int64(5),
  391. "op_2_filter_0_records_out_total": int64(2),
  392. },
  393. },
  394. {
  395. Name: `TestSingleSQLRule3`,
  396. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  397. R: [][]map[string]interface{}{
  398. {{
  399. "Int8": float64(6),
  400. "ts": float64(1541152486822),
  401. }},
  402. {{
  403. "Int8": float64(4),
  404. "ts": float64(1541152488442),
  405. }},
  406. },
  407. M: map[string]interface{}{
  408. "op_3_project_0_exceptions_total": int64(0),
  409. "op_3_project_0_process_latency_us": int64(0),
  410. "op_3_project_0_records_in_total": int64(2),
  411. "op_3_project_0_records_out_total": int64(2),
  412. "sink_mockSink_0_exceptions_total": int64(0),
  413. "sink_mockSink_0_records_in_total": int64(2),
  414. "sink_mockSink_0_records_out_total": int64(2),
  415. "source_demo_0_exceptions_total": int64(0),
  416. "source_demo_0_records_in_total": int64(5),
  417. "source_demo_0_records_out_total": int64(5),
  418. "op_2_filter_0_exceptions_total": int64(0),
  419. "op_2_filter_0_process_latency_us": int64(0),
  420. "op_2_filter_0_records_in_total": int64(5),
  421. "op_2_filter_0_records_out_total": int64(2),
  422. },
  423. },
  424. {
  425. Name: `TestSingleSQLRule4`,
  426. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  427. R: [][]map[string]interface{}{
  428. {{
  429. "error": "error in preprocessor: field size type mismatch: cannot convert string(red) to int64",
  430. }},
  431. {{
  432. "Int8": float64(6),
  433. "ts": float64(1541152486822),
  434. }},
  435. {{
  436. "Int8": float64(4),
  437. "ts": float64(1541152488442),
  438. }},
  439. {{
  440. "error": "error in preprocessor: field size type mismatch: cannot convert string(blue) to int64",
  441. }},
  442. },
  443. M: map[string]interface{}{
  444. "op_3_project_0_exceptions_total": int64(2),
  445. "op_3_project_0_process_latency_us": int64(0),
  446. "op_3_project_0_records_in_total": int64(2),
  447. "op_3_project_0_records_out_total": int64(2),
  448. "sink_mockSink_0_exceptions_total": int64(0),
  449. "sink_mockSink_0_records_in_total": int64(4),
  450. "sink_mockSink_0_records_out_total": int64(4),
  451. "source_demoError_0_exceptions_total": int64(2),
  452. "source_demoError_0_records_in_total": int64(5),
  453. "source_demoError_0_records_out_total": int64(3),
  454. "op_2_filter_0_exceptions_total": int64(2),
  455. "op_2_filter_0_process_latency_us": int64(0),
  456. "op_2_filter_0_records_in_total": int64(3),
  457. "op_2_filter_0_records_out_total": int64(2),
  458. },
  459. },
  460. {
  461. Name: `TestSingleSQLRule5`,
  462. Sql: `SELECT meta(topic) as m, ts FROM demo WHERE last_hit_count() < 4`,
  463. R: [][]map[string]interface{}{
  464. {{
  465. "m": "mock",
  466. "ts": float64(1541152486013),
  467. }},
  468. {{
  469. "m": "mock",
  470. "ts": float64(1541152486822),
  471. }},
  472. {{
  473. "m": "mock",
  474. "ts": float64(1541152487632),
  475. }},
  476. {{
  477. "m": "mock",
  478. "ts": float64(1541152488442),
  479. }},
  480. },
  481. M: map[string]interface{}{
  482. "sink_mockSink_0_exceptions_total": int64(0),
  483. "sink_mockSink_0_records_in_total": int64(4),
  484. "sink_mockSink_0_records_out_total": int64(4),
  485. "source_demo_0_exceptions_total": int64(0),
  486. "source_demo_0_records_in_total": int64(5),
  487. "source_demo_0_records_out_total": int64(5),
  488. },
  489. },
  490. {
  491. Name: `TestSingleSQLRule6`,
  492. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  493. R: [][]map[string]interface{}{
  494. {{
  495. "color": "blue",
  496. "ts": float64(1541152486822),
  497. }},
  498. {{
  499. "color": "yellow",
  500. "ts": float64(1541152488442),
  501. }},
  502. },
  503. M: map[string]interface{}{
  504. "op_3_project_0_exceptions_total": int64(0),
  505. "op_3_project_0_process_latency_us": int64(0),
  506. "op_3_project_0_records_in_total": int64(2),
  507. "op_3_project_0_records_out_total": int64(2),
  508. "sink_mockSink_0_exceptions_total": int64(0),
  509. "sink_mockSink_0_records_in_total": int64(2),
  510. "sink_mockSink_0_records_out_total": int64(2),
  511. "source_demo_0_exceptions_total": int64(0),
  512. "source_demo_0_records_in_total": int64(5),
  513. "source_demo_0_records_out_total": int64(5),
  514. "op_2_filter_0_exceptions_total": int64(0),
  515. "op_2_filter_0_process_latency_us": int64(0),
  516. "op_2_filter_0_records_in_total": int64(5),
  517. "op_2_filter_0_records_out_total": int64(2),
  518. },
  519. },
  520. {
  521. Name: `TestSingleSQLRule7`,
  522. Sql: "SELECT `from` FROM demo1",
  523. R: [][]map[string]interface{}{
  524. {{
  525. "from": "device1",
  526. }},
  527. {{
  528. "from": "device2",
  529. }},
  530. {{
  531. "from": "device3",
  532. }},
  533. {{
  534. "from": "device1",
  535. }},
  536. {{
  537. "from": "device3",
  538. }},
  539. },
  540. M: map[string]interface{}{
  541. "op_2_project_0_exceptions_total": int64(0),
  542. "op_2_project_0_process_latency_us": int64(0),
  543. "op_2_project_0_records_in_total": int64(5),
  544. "op_2_project_0_records_out_total": int64(5),
  545. "sink_mockSink_0_exceptions_total": int64(0),
  546. "sink_mockSink_0_records_in_total": int64(5),
  547. "sink_mockSink_0_records_out_total": int64(5),
  548. "source_demo1_0_exceptions_total": int64(0),
  549. "source_demo1_0_records_in_total": int64(5),
  550. "source_demo1_0_records_out_total": int64(5),
  551. },
  552. },
  553. {
  554. Name: `TestSingleSQLRule8`,
  555. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  556. R: [][]map[string]interface{}{
  557. {{
  558. "temp": float64(25.5),
  559. "hum": float64(65),
  560. "from": "device1",
  561. "ts": float64(1541152486013),
  562. }},
  563. {{
  564. "temp": float64(27.4),
  565. "hum": float64(80),
  566. "from": "device1",
  567. "ts": float64(1541152488442),
  568. }},
  569. },
  570. M: map[string]interface{}{
  571. "op_3_project_0_exceptions_total": int64(0),
  572. "op_3_project_0_process_latency_us": int64(0),
  573. "op_3_project_0_records_in_total": int64(2),
  574. "op_3_project_0_records_out_total": int64(2),
  575. "op_2_filter_0_exceptions_total": int64(0),
  576. "op_2_filter_0_process_latency_us": int64(0),
  577. "op_2_filter_0_records_in_total": int64(5),
  578. "op_2_filter_0_records_out_total": int64(2),
  579. "sink_mockSink_0_exceptions_total": int64(0),
  580. "sink_mockSink_0_records_in_total": int64(2),
  581. "sink_mockSink_0_records_out_total": int64(2),
  582. "source_demo1_0_exceptions_total": int64(0),
  583. "source_demo1_0_records_in_total": int64(5),
  584. "source_demo1_0_records_out_total": int64(5),
  585. },
  586. },
  587. {
  588. Name: `TestSingleSQLRule9`,
  589. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  590. R: [][]map[string]interface{}{
  591. {{
  592. "color": "red",
  593. "s": "M",
  594. "ts": float64(1541152486013),
  595. }},
  596. {{
  597. "color": "blue",
  598. "s": "L",
  599. "ts": float64(1541152486822),
  600. }},
  601. {{
  602. "color": "blue",
  603. "s": "M",
  604. "ts": float64(1541152487632),
  605. }},
  606. {{
  607. "color": "yellow",
  608. "s": "L",
  609. "ts": float64(1541152488442),
  610. }},
  611. {{
  612. "color": "red",
  613. "s": "S",
  614. "ts": float64(1541152489252),
  615. }},
  616. },
  617. M: map[string]interface{}{
  618. "op_2_project_0_exceptions_total": int64(0),
  619. "op_2_project_0_process_latency_us": int64(0),
  620. "op_2_project_0_records_in_total": int64(5),
  621. "op_2_project_0_records_out_total": int64(5),
  622. "sink_mockSink_0_exceptions_total": int64(0),
  623. "sink_mockSink_0_records_in_total": int64(5),
  624. "sink_mockSink_0_records_out_total": int64(5),
  625. "source_demo_0_exceptions_total": int64(0),
  626. "source_demo_0_records_in_total": int64(5),
  627. "source_demo_0_records_out_total": int64(5),
  628. },
  629. T: &api.PrintableTopo{
  630. Sources: []string{"source_demo"},
  631. Edges: map[string][]interface{}{
  632. "source_demo": {"op_2_project"},
  633. "op_2_project": {"sink_mockSink"},
  634. },
  635. },
  636. },
  637. {
  638. Name: `TestSingleSQLRule10`,
  639. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  640. R: [][]map[string]interface{}{
  641. {{
  642. "id": float64(1541152486013),
  643. "name": "name1",
  644. "color": "red",
  645. "size": float64(3),
  646. "ts": float64(1541152486013),
  647. }},
  648. {{
  649. "id": float64(1541152487632),
  650. "name": "name2",
  651. "color": "blue",
  652. "size": float64(2),
  653. "ts": float64(1541152487632),
  654. }},
  655. {{
  656. "id": float64(1541152489252),
  657. "name": "name3",
  658. "color": "red",
  659. "size": float64(1),
  660. "ts": float64(1541152489252),
  661. }},
  662. },
  663. W: 15,
  664. M: map[string]interface{}{
  665. "op_3_join_aligner_0_records_in_total": int64(6),
  666. "op_3_join_aligner_0_records_out_total": int64(5),
  667. "op_4_join_0_exceptions_total": int64(0),
  668. "op_4_join_0_records_in_total": int64(5),
  669. "op_4_join_0_records_out_total": int64(3),
  670. "op_5_project_0_exceptions_total": int64(0),
  671. "op_5_project_0_records_in_total": int64(3),
  672. "op_5_project_0_records_out_total": int64(3),
  673. "sink_mockSink_0_exceptions_total": int64(0),
  674. "sink_mockSink_0_records_in_total": int64(3),
  675. "sink_mockSink_0_records_out_total": int64(3),
  676. "source_demo_0_exceptions_total": int64(0),
  677. "source_demo_0_records_in_total": int64(5),
  678. "source_demo_0_records_out_total": int64(5),
  679. "source_table1_0_exceptions_total": int64(0),
  680. "source_table1_0_records_in_total": int64(4),
  681. "source_table1_0_records_out_total": int64(1),
  682. },
  683. },
  684. {
  685. Name: `TestSingleSQLRule11`,
  686. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  687. R: [][]map[string]interface{}{
  688. {{
  689. "device": "device2",
  690. }},
  691. {{
  692. "device": "device4",
  693. }},
  694. {{
  695. "device": "device5",
  696. }},
  697. },
  698. M: map[string]interface{}{
  699. "op_3_join_aligner_0_records_in_total": int64(10),
  700. "op_3_join_aligner_0_records_out_total": int64(5),
  701. "op_4_join_0_exceptions_total": int64(0),
  702. "op_4_join_0_records_in_total": int64(5),
  703. "op_4_join_0_records_out_total": int64(3),
  704. "op_5_project_0_exceptions_total": int64(0),
  705. "op_5_project_0_records_in_total": int64(3),
  706. "op_5_project_0_records_out_total": int64(3),
  707. "sink_mockSink_0_exceptions_total": int64(0),
  708. "sink_mockSink_0_records_in_total": int64(3),
  709. "sink_mockSink_0_records_out_total": int64(3),
  710. "source_demo_0_exceptions_total": int64(0),
  711. "source_demo_0_records_in_total": int64(5),
  712. "source_demo_0_records_out_total": int64(5),
  713. "source_demoTable_0_exceptions_total": int64(0),
  714. "source_demoTable_0_records_in_total": int64(5),
  715. "source_demoTable_0_records_out_total": int64(5),
  716. },
  717. },
  718. {
  719. Name: `TestSingleSQLRule12`,
  720. Sql: "SELECT demo.ts as demoTs, table1.id as table1Id FROM demo INNER JOIN table1 on demoTs = table1Id",
  721. R: [][]map[string]interface{}{
  722. {{
  723. "table1Id": float64(1541152486013),
  724. "demoTs": float64(1541152486013),
  725. }},
  726. {{
  727. "table1Id": float64(1541152487632),
  728. "demoTs": float64(1541152487632),
  729. }},
  730. {{
  731. "table1Id": float64(1541152489252),
  732. "demoTs": float64(1541152489252),
  733. }},
  734. },
  735. W: 15,
  736. M: map[string]interface{}{
  737. "op_3_join_aligner_0_records_in_total": int64(6),
  738. "op_3_join_aligner_0_records_out_total": int64(5),
  739. "op_4_join_0_exceptions_total": int64(0),
  740. "op_4_join_0_records_in_total": int64(5),
  741. "op_4_join_0_records_out_total": int64(3),
  742. "op_5_project_0_exceptions_total": int64(0),
  743. "op_5_project_0_records_in_total": int64(3),
  744. "op_5_project_0_records_out_total": int64(3),
  745. "sink_mockSink_0_exceptions_total": int64(0),
  746. "sink_mockSink_0_records_in_total": int64(3),
  747. "sink_mockSink_0_records_out_total": int64(3),
  748. "source_demo_0_exceptions_total": int64(0),
  749. "source_demo_0_records_in_total": int64(5),
  750. "source_demo_0_records_out_total": int64(5),
  751. "source_table1_0_exceptions_total": int64(0),
  752. "source_table1_0_records_in_total": int64(4),
  753. "source_table1_0_records_out_total": int64(1),
  754. },
  755. },
  756. {
  757. Name: `TestChanged13`,
  758. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demo",
  759. R: [][]map[string]interface{}{
  760. {{
  761. "tt_color": "red",
  762. "tt_size": float64(3),
  763. }},
  764. {{
  765. "tt_color": "blue",
  766. "tt_size": float64(6),
  767. }},
  768. {{
  769. "tt_size": float64(2),
  770. }},
  771. {{
  772. "tt_color": "yellow",
  773. "tt_size": float64(4),
  774. }},
  775. {{
  776. "tt_color": "red",
  777. "tt_size": float64(1),
  778. }},
  779. },
  780. M: map[string]interface{}{
  781. "op_2_project_0_exceptions_total": int64(0),
  782. "op_2_project_0_process_latency_us": int64(0),
  783. "op_2_project_0_records_in_total": int64(5),
  784. "op_2_project_0_records_out_total": int64(5),
  785. "sink_mockSink_0_exceptions_total": int64(0),
  786. "sink_mockSink_0_records_in_total": int64(5),
  787. "sink_mockSink_0_records_out_total": int64(5),
  788. "source_demo_0_exceptions_total": int64(0),
  789. "source_demo_0_records_in_total": int64(5),
  790. "source_demo_0_records_out_total": int64(5),
  791. },
  792. },
  793. {
  794. Name: `TestAliasOrderBy14`,
  795. Sql: "SELECT color, count(*) as c FROM demo where color != \"red\" GROUP BY COUNTWINDOW(5), color Order by c DESC",
  796. R: [][]map[string]interface{}{
  797. {
  798. {
  799. "color": "blue",
  800. "c": float64(2),
  801. },
  802. {
  803. "color": "yellow",
  804. "c": float64(1),
  805. },
  806. },
  807. },
  808. M: map[string]interface{}{
  809. "op_6_project_0_exceptions_total": int64(0),
  810. "op_6_project_0_process_latency_us": int64(0),
  811. "op_6_project_0_records_in_total": int64(1),
  812. "op_6_project_0_records_out_total": int64(1),
  813. "sink_mockSink_0_exceptions_total": int64(0),
  814. "sink_mockSink_0_records_in_total": int64(1),
  815. "sink_mockSink_0_records_out_total": int64(1),
  816. "source_demo_0_exceptions_total": int64(0),
  817. "source_demo_0_records_in_total": int64(5),
  818. "source_demo_0_records_out_total": int64(5),
  819. },
  820. },
  821. {
  822. Name: `TestSingleSQLRule17`,
  823. Sql: `SELECT arr[x:4] as col1 FROM demoArr where x=1`,
  824. R: [][]map[string]interface{}{
  825. {{
  826. "col1": []interface{}{
  827. float64(2), float64(3), float64(4),
  828. },
  829. }},
  830. },
  831. },
  832. {
  833. Name: `TestSingleSQLRule16`,
  834. Sql: `SELECT arr[1:y] as col1 FROM demoArr where x=1`,
  835. R: [][]map[string]interface{}{
  836. {{
  837. "col1": []interface{}{
  838. float64(2),
  839. },
  840. }},
  841. },
  842. },
  843. {
  844. Name: `TestSingleSQLRule15`,
  845. Sql: `SELECT arr[1] as col1 FROM demoArr where x=1`,
  846. R: [][]map[string]interface{}{
  847. {{
  848. "col1": float64(2),
  849. }},
  850. },
  851. },
  852. {
  853. Name: `TestLagAlias`,
  854. Sql: "SELECT lag(size) as lastSize, lag(had_changed(true,size)), size, lastSize/size as changeRate FROM demo WHERE size > 2",
  855. R: [][]map[string]interface{}{
  856. {{
  857. "size": float64(3),
  858. }},
  859. {{
  860. "lastSize": float64(3),
  861. "size": float64(6),
  862. "lag": true,
  863. "changeRate": float64(0),
  864. }},
  865. {{
  866. "lastSize": float64(2),
  867. "size": float64(4),
  868. "lag": true,
  869. "changeRate": float64(0),
  870. }},
  871. },
  872. M: map[string]interface{}{
  873. "sink_mockSink_0_exceptions_total": int64(0),
  874. "sink_mockSink_0_records_in_total": int64(3),
  875. "sink_mockSink_0_records_out_total": int64(3),
  876. "source_demo_0_exceptions_total": int64(0),
  877. "source_demo_0_records_in_total": int64(5),
  878. "source_demo_0_records_out_total": int64(5),
  879. },
  880. },
  881. {
  882. Name: `TestLagPartition`,
  883. Sql: "SELECT color, lag(size) over (partition by color) as lastSize, size, lastSize/size as changeRate FROM demo",
  884. R: [][]map[string]interface{}{
  885. {{
  886. "color": "red",
  887. "size": float64(3),
  888. }},
  889. {{
  890. "color": "blue",
  891. "size": float64(6),
  892. }},
  893. {{
  894. "color": "blue",
  895. "lastSize": float64(6),
  896. "size": float64(2),
  897. "changeRate": float64(3),
  898. }},
  899. {{
  900. "color": "yellow",
  901. "size": float64(4),
  902. }},
  903. {{
  904. "color": "red",
  905. "lastSize": float64(3),
  906. "size": float64(1),
  907. "changeRate": float64(3),
  908. }},
  909. },
  910. M: map[string]interface{}{
  911. "sink_mockSink_0_exceptions_total": int64(0),
  912. "sink_mockSink_0_records_in_total": int64(5),
  913. "sink_mockSink_0_records_out_total": int64(5),
  914. "source_demo_0_exceptions_total": int64(0),
  915. "source_demo_0_records_in_total": int64(5),
  916. "source_demo_0_records_out_total": int64(5),
  917. },
  918. },
  919. }
  920. HandleStream(true, streamList, t)
  921. options := []*api.RuleOption{
  922. {
  923. BufferLength: 100,
  924. SendError: true,
  925. }, {
  926. BufferLength: 100,
  927. SendError: true,
  928. Qos: api.AtLeastOnce,
  929. CheckpointInterval: 5000,
  930. }, {
  931. BufferLength: 100,
  932. SendError: true,
  933. Qos: api.ExactlyOnce,
  934. CheckpointInterval: 5000,
  935. },
  936. }
  937. for j, opt := range options {
  938. DoRuleTest(t, tests, j, opt, 0)
  939. }
  940. }
  941. func TestSingleSQLWithEventTime(t *testing.T) {
  942. // Reset
  943. streamList := []string{"demoE"}
  944. HandleStream(false, streamList, t)
  945. // Data setup
  946. tests := []RuleTest{
  947. {
  948. Name: `TestSingleSQLRule1`,
  949. Sql: `SELECT *, upper(color) FROM demoE`,
  950. R: [][]map[string]interface{}{
  951. {{
  952. "color": "red",
  953. "size": float64(3),
  954. "ts": float64(1541152486013),
  955. "upper": "RED",
  956. }},
  957. {{
  958. "color": "blue",
  959. "size": float64(2),
  960. "ts": float64(1541152487632),
  961. "upper": "BLUE",
  962. }},
  963. {{
  964. "color": "yellow",
  965. "size": float64(4),
  966. "ts": float64(1541152488442),
  967. "upper": "YELLOW",
  968. }},
  969. {{
  970. "color": "red",
  971. "size": float64(1),
  972. "ts": float64(1541152489252),
  973. "upper": "RED",
  974. }},
  975. },
  976. M: map[string]interface{}{
  977. "op_3_project_0_exceptions_total": int64(0),
  978. "op_3_project_0_process_latency_us": int64(0),
  979. "op_3_project_0_records_in_total": int64(4),
  980. "op_3_project_0_records_out_total": int64(4),
  981. "sink_mockSink_0_exceptions_total": int64(0),
  982. "sink_mockSink_0_records_in_total": int64(4),
  983. "sink_mockSink_0_records_out_total": int64(4),
  984. "source_demoE_0_exceptions_total": int64(0),
  985. "source_demoE_0_records_in_total": int64(6),
  986. "source_demoE_0_records_out_total": int64(6),
  987. },
  988. T: &api.PrintableTopo{
  989. Sources: []string{"source_demoE"},
  990. Edges: map[string][]interface{}{
  991. "source_demoE": {"op_2_watermark"},
  992. "op_2_watermark": {"op_3_project"},
  993. "op_3_project": {"sink_mockSink"},
  994. },
  995. },
  996. },
  997. {
  998. Name: `TestStateFunc`,
  999. Sql: `SELECT *, last_hit_time() as lt, last_hit_count() as lc, event_time() as et FROM demoE WHERE size < 3 AND lc < 2`,
  1000. R: [][]map[string]interface{}{
  1001. {{
  1002. "color": "blue",
  1003. "size": float64(2),
  1004. "ts": float64(1541152487632),
  1005. "lc": float64(0),
  1006. "lt": float64(0),
  1007. "et": float64(1541152487632),
  1008. }},
  1009. {{
  1010. "color": "red",
  1011. "size": float64(1),
  1012. "ts": float64(1541152489252),
  1013. "lc": float64(1),
  1014. "lt": float64(1541152487632),
  1015. "et": float64(1541152489252),
  1016. }},
  1017. },
  1018. M: map[string]interface{}{
  1019. "sink_mockSink_0_exceptions_total": int64(0),
  1020. "sink_mockSink_0_records_in_total": int64(2),
  1021. "sink_mockSink_0_records_out_total": int64(2),
  1022. "source_demoE_0_exceptions_total": int64(0),
  1023. "source_demoE_0_records_in_total": int64(6),
  1024. "source_demoE_0_records_out_total": int64(6),
  1025. },
  1026. },
  1027. {
  1028. Name: `TestChanged`,
  1029. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demoE",
  1030. R: [][]map[string]interface{}{
  1031. {{
  1032. "tt_color": "red",
  1033. "tt_size": float64(3),
  1034. }},
  1035. {{
  1036. "tt_color": "blue",
  1037. "tt_size": float64(2),
  1038. }},
  1039. {{
  1040. "tt_color": "yellow",
  1041. "tt_size": float64(4),
  1042. }},
  1043. {{
  1044. "tt_color": "red",
  1045. "tt_size": float64(1),
  1046. }},
  1047. },
  1048. M: map[string]interface{}{
  1049. "op_3_project_0_exceptions_total": int64(0),
  1050. "op_3_project_0_process_latency_us": int64(0),
  1051. "op_3_project_0_records_in_total": int64(4),
  1052. "op_3_project_0_records_out_total": int64(4),
  1053. "sink_mockSink_0_exceptions_total": int64(0),
  1054. "sink_mockSink_0_records_in_total": int64(4),
  1055. "sink_mockSink_0_records_out_total": int64(4),
  1056. "source_demoE_0_exceptions_total": int64(0),
  1057. "source_demoE_0_records_in_total": int64(6),
  1058. "source_demoE_0_records_out_total": int64(6),
  1059. },
  1060. },
  1061. }
  1062. HandleStream(true, streamList, t)
  1063. options := []*api.RuleOption{
  1064. {
  1065. BufferLength: 100,
  1066. SendError: true,
  1067. IsEventTime: true,
  1068. LateTol: 1000,
  1069. }, {
  1070. BufferLength: 100,
  1071. SendError: true,
  1072. Qos: api.AtLeastOnce,
  1073. CheckpointInterval: 5000,
  1074. IsEventTime: true,
  1075. LateTol: 1000,
  1076. }, {
  1077. BufferLength: 100,
  1078. SendError: true,
  1079. Qos: api.ExactlyOnce,
  1080. CheckpointInterval: 5000,
  1081. IsEventTime: true,
  1082. LateTol: 1000,
  1083. },
  1084. }
  1085. for j, opt := range options {
  1086. DoRuleTest(t, tests, j, opt, 0)
  1087. }
  1088. }
  1089. func TestSingleSQLError(t *testing.T) {
  1090. // Reset
  1091. streamList := []string{"ldemo"}
  1092. HandleStream(false, streamList, t)
  1093. // Data setup
  1094. tests := []RuleTest{
  1095. {
  1096. Name: `TestSingleSQLErrorRule1`,
  1097. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1098. R: [][]map[string]interface{}{
  1099. {{
  1100. "color": "red",
  1101. "ts": float64(1541152486013),
  1102. }},
  1103. {{
  1104. "error": "run Where error: invalid operation string(string) >= int64(3)",
  1105. }},
  1106. {{
  1107. "ts": float64(1541152487632),
  1108. }},
  1109. },
  1110. M: map[string]interface{}{
  1111. "op_3_project_0_exceptions_total": int64(1),
  1112. "op_3_project_0_process_latency_us": int64(0),
  1113. "op_3_project_0_records_in_total": int64(2),
  1114. "op_3_project_0_records_out_total": int64(2),
  1115. "sink_mockSink_0_exceptions_total": int64(0),
  1116. "sink_mockSink_0_records_in_total": int64(3),
  1117. "sink_mockSink_0_records_out_total": int64(3),
  1118. "source_ldemo_0_exceptions_total": int64(0),
  1119. "source_ldemo_0_records_in_total": int64(5),
  1120. "source_ldemo_0_records_out_total": int64(5),
  1121. "op_2_filter_0_exceptions_total": int64(1),
  1122. "op_2_filter_0_process_latency_us": int64(0),
  1123. "op_2_filter_0_records_in_total": int64(5),
  1124. "op_2_filter_0_records_out_total": int64(2),
  1125. },
  1126. }, {
  1127. Name: `TestSingleSQLErrorRule2`,
  1128. Sql: `SELECT size * 5 FROM ldemo`,
  1129. R: [][]map[string]interface{}{
  1130. {{
  1131. "kuiper_field_0": float64(15),
  1132. }},
  1133. {{
  1134. "error": "run Select error: invalid operation string(string) * int64(5)",
  1135. }},
  1136. {{
  1137. "kuiper_field_0": float64(15),
  1138. }},
  1139. {{
  1140. "kuiper_field_0": float64(10),
  1141. }},
  1142. {{}},
  1143. },
  1144. M: map[string]interface{}{
  1145. "op_2_project_0_exceptions_total": int64(1),
  1146. "op_2_project_0_process_latency_us": int64(0),
  1147. "op_2_project_0_records_in_total": int64(5),
  1148. "op_2_project_0_records_out_total": int64(4),
  1149. "sink_mockSink_0_exceptions_total": int64(0),
  1150. "sink_mockSink_0_records_in_total": int64(5),
  1151. "sink_mockSink_0_records_out_total": int64(5),
  1152. "source_ldemo_0_exceptions_total": int64(0),
  1153. "source_ldemo_0_records_in_total": int64(5),
  1154. "source_ldemo_0_records_out_total": int64(5),
  1155. },
  1156. },
  1157. }
  1158. HandleStream(true, streamList, t)
  1159. DoRuleTest(t, tests, 0, &api.RuleOption{
  1160. BufferLength: 100,
  1161. SendError: true,
  1162. }, 0)
  1163. }
  1164. func TestSingleSQLOmitError(t *testing.T) {
  1165. // Reset
  1166. streamList := []string{"ldemo"}
  1167. HandleStream(false, streamList, t)
  1168. // Data setup
  1169. tests := []RuleTest{
  1170. {
  1171. Name: `TestSingleSQLErrorRule1`,
  1172. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1173. R: [][]map[string]interface{}{
  1174. {{
  1175. "color": "red",
  1176. "ts": float64(1541152486013),
  1177. }},
  1178. {{
  1179. "ts": float64(1541152487632),
  1180. }},
  1181. },
  1182. M: map[string]interface{}{
  1183. "op_3_project_0_exceptions_total": int64(0),
  1184. "op_3_project_0_process_latency_us": int64(0),
  1185. "op_3_project_0_records_in_total": int64(2),
  1186. "op_3_project_0_records_out_total": int64(2),
  1187. "sink_mockSink_0_exceptions_total": int64(0),
  1188. "sink_mockSink_0_records_in_total": int64(2),
  1189. "sink_mockSink_0_records_out_total": int64(2),
  1190. "source_ldemo_0_exceptions_total": int64(0),
  1191. "source_ldemo_0_records_in_total": int64(5),
  1192. "source_ldemo_0_records_out_total": int64(5),
  1193. "op_2_filter_0_exceptions_total": int64(1),
  1194. "op_2_filter_0_process_latency_us": int64(0),
  1195. "op_2_filter_0_records_in_total": int64(5),
  1196. "op_2_filter_0_records_out_total": int64(2),
  1197. },
  1198. }, {
  1199. Name: `TestSingleSQLErrorRule2`,
  1200. Sql: `SELECT size * 5 FROM ldemo`,
  1201. R: [][]map[string]interface{}{
  1202. {{
  1203. "kuiper_field_0": float64(15),
  1204. }},
  1205. {{
  1206. "kuiper_field_0": float64(15),
  1207. }},
  1208. {{
  1209. "kuiper_field_0": float64(10),
  1210. }},
  1211. {{}},
  1212. },
  1213. M: map[string]interface{}{
  1214. "op_2_project_0_exceptions_total": int64(1),
  1215. "op_2_project_0_process_latency_us": int64(0),
  1216. "op_2_project_0_records_in_total": int64(5),
  1217. "op_2_project_0_records_out_total": int64(4),
  1218. "sink_mockSink_0_exceptions_total": int64(0),
  1219. "sink_mockSink_0_records_in_total": int64(4),
  1220. "sink_mockSink_0_records_out_total": int64(4),
  1221. "source_ldemo_0_exceptions_total": int64(0),
  1222. "source_ldemo_0_records_in_total": int64(5),
  1223. "source_ldemo_0_records_out_total": int64(5),
  1224. },
  1225. },
  1226. }
  1227. HandleStream(true, streamList, t)
  1228. DoRuleTest(t, tests, 0, &api.RuleOption{
  1229. BufferLength: 100,
  1230. SendError: false,
  1231. }, 0)
  1232. }
  1233. func TestSingleSQLTemplate(t *testing.T) {
  1234. // Reset
  1235. streamList := []string{"demo"}
  1236. HandleStream(false, streamList, t)
  1237. // Data setup
  1238. tests := []RuleTest{
  1239. {
  1240. Name: `TestSingleSQLTemplateRule1`,
  1241. Sql: `SELECT * FROM demo`,
  1242. R: []map[string]interface{}{
  1243. {
  1244. "c": "red",
  1245. "wrapper": "w1",
  1246. },
  1247. {
  1248. "c": "blue",
  1249. "wrapper": "w1",
  1250. },
  1251. {
  1252. "c": "blue",
  1253. "wrapper": "w1",
  1254. },
  1255. {
  1256. "c": "yellow",
  1257. "wrapper": "w1",
  1258. },
  1259. {
  1260. "c": "red",
  1261. "wrapper": "w1",
  1262. },
  1263. },
  1264. M: map[string]interface{}{
  1265. "op_2_project_0_exceptions_total": int64(0),
  1266. "op_2_project_0_process_latency_us": int64(0),
  1267. "op_2_project_0_records_in_total": int64(5),
  1268. "op_2_project_0_records_out_total": int64(5),
  1269. "sink_mockSink_0_exceptions_total": int64(0),
  1270. "sink_mockSink_0_records_in_total": int64(5),
  1271. "sink_mockSink_0_records_out_total": int64(5),
  1272. "source_demo_0_exceptions_total": int64(0),
  1273. "source_demo_0_records_in_total": int64(5),
  1274. "source_demo_0_records_out_total": int64(5),
  1275. },
  1276. },
  1277. }
  1278. HandleStream(true, streamList, t)
  1279. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  1280. BufferLength: 100,
  1281. SendError: true,
  1282. }, 0, map[string]interface{}{
  1283. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  1284. "sendSingle": true,
  1285. }, func(result [][]byte) interface{} {
  1286. var maps []map[string]interface{}
  1287. for _, v := range result {
  1288. var mapRes map[string]interface{}
  1289. err := json.Unmarshal(v, &mapRes)
  1290. if err != nil {
  1291. t.Errorf("Failed to parse the input into map")
  1292. continue
  1293. }
  1294. maps = append(maps, mapRes)
  1295. }
  1296. return maps
  1297. })
  1298. }
  1299. func TestNoneSingleSQLTemplate(t *testing.T) {
  1300. // Reset
  1301. streamList := []string{"demo"}
  1302. HandleStream(false, streamList, t)
  1303. // Data setup
  1304. tests := []RuleTest{
  1305. {
  1306. Name: `TestNoneSingleSQLTemplateRule1`,
  1307. Sql: `SELECT * FROM demo`,
  1308. R: [][]byte{
  1309. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  1310. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  1311. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  1312. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  1313. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  1314. },
  1315. M: map[string]interface{}{
  1316. "op_2_project_0_exceptions_total": int64(0),
  1317. "op_2_project_0_process_latency_us": int64(0),
  1318. "op_2_project_0_records_in_total": int64(5),
  1319. "op_2_project_0_records_out_total": int64(5),
  1320. "sink_mockSink_0_exceptions_total": int64(0),
  1321. "sink_mockSink_0_records_in_total": int64(5),
  1322. "sink_mockSink_0_records_out_total": int64(5),
  1323. "source_demo_0_exceptions_total": int64(0),
  1324. "source_demo_0_records_in_total": int64(5),
  1325. "source_demo_0_records_out_total": int64(5),
  1326. },
  1327. },
  1328. }
  1329. HandleStream(true, streamList, t)
  1330. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  1331. BufferLength: 100,
  1332. SendError: true,
  1333. }, 0, map[string]interface{}{
  1334. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  1335. }, func(result [][]byte) interface{} {
  1336. return result
  1337. })
  1338. }
  1339. func TestSingleSQLForBinary(t *testing.T) {
  1340. // Reset
  1341. streamList := []string{"binDemo"}
  1342. HandleStream(false, streamList, t)
  1343. // Data setup
  1344. tests := []RuleTest{
  1345. {
  1346. Name: `TestSingleSQLRule1`,
  1347. Sql: `SELECT * FROM binDemo`,
  1348. R: [][]map[string]interface{}{
  1349. {{
  1350. "self": mocknode.Image,
  1351. }},
  1352. },
  1353. W: 50,
  1354. M: map[string]interface{}{
  1355. "op_2_project_0_exceptions_total": int64(0),
  1356. "op_2_project_0_process_latency_us": int64(0),
  1357. "op_2_project_0_records_in_total": int64(1),
  1358. "op_2_project_0_records_out_total": int64(1),
  1359. "sink_mockSink_0_exceptions_total": int64(0),
  1360. "sink_mockSink_0_records_in_total": int64(1),
  1361. "sink_mockSink_0_records_out_total": int64(1),
  1362. "source_binDemo_0_exceptions_total": int64(0),
  1363. "source_binDemo_0_records_in_total": int64(1),
  1364. "source_binDemo_0_records_out_total": int64(1),
  1365. },
  1366. },
  1367. }
  1368. HandleStream(true, streamList, t)
  1369. options := []*api.RuleOption{
  1370. {
  1371. BufferLength: 100,
  1372. SendError: true,
  1373. }, {
  1374. BufferLength: 100,
  1375. SendError: true,
  1376. Qos: api.AtLeastOnce,
  1377. CheckpointInterval: 5000,
  1378. }, {
  1379. BufferLength: 100,
  1380. SendError: true,
  1381. Qos: api.ExactlyOnce,
  1382. CheckpointInterval: 5000,
  1383. },
  1384. }
  1385. byteFunc := func(result [][]byte) interface{} {
  1386. var maps [][]map[string]interface{}
  1387. for _, v := range result {
  1388. var mapRes []map[string][]byte
  1389. err := json.Unmarshal(v, &mapRes)
  1390. if err != nil {
  1391. panic("Failed to parse the input into map")
  1392. }
  1393. mapInt := make([]map[string]interface{}, len(mapRes))
  1394. for i, mv := range mapRes {
  1395. mapInt[i] = make(map[string]interface{})
  1396. // assume only one key
  1397. for k, v := range mv {
  1398. mapInt[i][k] = v
  1399. }
  1400. }
  1401. maps = append(maps, mapInt)
  1402. }
  1403. return maps
  1404. }
  1405. for j, opt := range options {
  1406. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  1407. }
  1408. }
  1409. func TestWindowSQL(t *testing.T) {
  1410. // Reset
  1411. streamList := []string{"demoE"}
  1412. HandleStream(false, streamList, t)
  1413. tests := []RuleTest{
  1414. {
  1415. Name: "TestHoppingWindowSQL1",
  1416. Sql: `select size,color from demoE GROUP BY HOPPINGWINDOW(ss, 3, 5)`,
  1417. R: [][]map[string]interface{}{
  1418. {
  1419. {
  1420. "color": "blue",
  1421. "size": float64(2),
  1422. },
  1423. {
  1424. "color": "red",
  1425. "size": float64(1),
  1426. },
  1427. },
  1428. },
  1429. },
  1430. {
  1431. Name: "TestHoppingWindowSQL2",
  1432. Sql: `select size,color from demoE GROUP BY HOPPINGWINDOW(ss, 1, 2)`,
  1433. R: [][]map[string]interface{}{
  1434. {
  1435. {
  1436. "color": "blue",
  1437. "size": float64(2),
  1438. },
  1439. },
  1440. {
  1441. {
  1442. "color": "red",
  1443. "size": float64(1),
  1444. },
  1445. },
  1446. {},
  1447. },
  1448. },
  1449. {
  1450. Name: "TestHoppingWindowSQL3",
  1451. Sql: `select size,color from demoE GROUP BY HOPPINGWINDOW(ss, 2, 5)`,
  1452. R: [][]map[string]interface{}{
  1453. {
  1454. {
  1455. "color": "red",
  1456. "size": float64(1),
  1457. },
  1458. },
  1459. },
  1460. },
  1461. }
  1462. // Data setup
  1463. HandleStream(true, streamList, t)
  1464. options := []*api.RuleOption{
  1465. {
  1466. BufferLength: 100,
  1467. SendError: true,
  1468. Qos: api.AtLeastOnce,
  1469. CheckpointInterval: 5000,
  1470. IsEventTime: true,
  1471. },
  1472. {
  1473. BufferLength: 100,
  1474. SendError: true,
  1475. Qos: api.ExactlyOnce,
  1476. CheckpointInterval: 5000,
  1477. IsEventTime: true,
  1478. },
  1479. }
  1480. for j, opt := range options {
  1481. DoRuleTest(t, tests, j, opt, 0)
  1482. }
  1483. }
  1484. func TestAliasSQL(t *testing.T) {
  1485. streamList := []string{"demo"}
  1486. HandleStream(false, streamList, t)
  1487. tests := []RuleTest{
  1488. {
  1489. Name: "TestAliasSQL1",
  1490. Sql: `select size as a, a + 1 as b from demo`,
  1491. R: [][]map[string]interface{}{
  1492. {
  1493. {
  1494. "a": float64(3),
  1495. "b": float64(4),
  1496. },
  1497. },
  1498. {
  1499. {
  1500. "a": float64(6),
  1501. "b": float64(7),
  1502. },
  1503. },
  1504. {
  1505. {
  1506. "a": float64(2),
  1507. "b": float64(3),
  1508. },
  1509. },
  1510. {
  1511. {
  1512. "a": float64(4),
  1513. "b": float64(5),
  1514. },
  1515. },
  1516. {
  1517. {
  1518. "a": float64(1),
  1519. "b": float64(2),
  1520. },
  1521. },
  1522. },
  1523. },
  1524. {
  1525. Name: "TestAliasSQL2",
  1526. Sql: `select a + 1 as b, size as a from demo`,
  1527. R: [][]map[string]interface{}{
  1528. {
  1529. {
  1530. "a": float64(3),
  1531. "b": float64(4),
  1532. },
  1533. },
  1534. {
  1535. {
  1536. "a": float64(6),
  1537. "b": float64(7),
  1538. },
  1539. },
  1540. {
  1541. {
  1542. "a": float64(2),
  1543. "b": float64(3),
  1544. },
  1545. },
  1546. {
  1547. {
  1548. "a": float64(4),
  1549. "b": float64(5),
  1550. },
  1551. },
  1552. {
  1553. {
  1554. "a": float64(1),
  1555. "b": float64(2),
  1556. },
  1557. },
  1558. },
  1559. },
  1560. }
  1561. // Data setup
  1562. HandleStream(true, streamList, t)
  1563. options := []*api.RuleOption{
  1564. {
  1565. BufferLength: 100,
  1566. SendError: true,
  1567. Qos: api.AtLeastOnce,
  1568. CheckpointInterval: 5000,
  1569. },
  1570. {
  1571. BufferLength: 100,
  1572. SendError: true,
  1573. Qos: api.ExactlyOnce,
  1574. CheckpointInterval: 5000,
  1575. },
  1576. }
  1577. for j, opt := range options {
  1578. DoRuleTest(t, tests, j, opt, 0)
  1579. }
  1580. }