rule_test.go 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "encoding/json"
  17. "testing"
  18. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. )
  21. func TestLimitSQL(t *testing.T) {
  22. // Reset
  23. streamList := []string{"demo", "demoArr"}
  24. HandleStream(false, streamList, t)
  25. var r [][]map[string]interface{}
  26. tests := []RuleTest{
  27. {
  28. Name: "TestLimitSQL0",
  29. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  30. R: [][]map[string]interface{}{
  31. {
  32. {
  33. "col": float64(1),
  34. "size": float64(1),
  35. },
  36. },
  37. },
  38. },
  39. {
  40. Name: "TestLimitSQL1",
  41. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 0;`,
  42. R: r,
  43. },
  44. {
  45. Name: "TestLimitSQL2",
  46. Sql: `SELECT demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  47. R: [][]map[string]interface{}{
  48. {
  49. {
  50. "size": float64(1),
  51. },
  52. },
  53. },
  54. },
  55. {
  56. Name: "TestLimitSQL3",
  57. Sql: `SELECT demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 0;`,
  58. R: r,
  59. },
  60. }
  61. // Data setup
  62. HandleStream(true, streamList, t)
  63. options := []*api.RuleOption{
  64. {
  65. BufferLength: 100,
  66. SendError: true,
  67. },
  68. {
  69. BufferLength: 100,
  70. SendError: true,
  71. Qos: api.AtLeastOnce,
  72. CheckpointInterval: 5000,
  73. },
  74. {
  75. BufferLength: 100,
  76. SendError: true,
  77. Qos: api.ExactlyOnce,
  78. CheckpointInterval: 5000,
  79. },
  80. }
  81. for j, opt := range options {
  82. DoRuleTest(t, tests, j, opt, 0)
  83. }
  84. }
  85. func TestSRFSQL(t *testing.T) {
  86. // Reset
  87. streamList := []string{"demo", "demoArr"}
  88. HandleStream(false, streamList, t)
  89. tests := []RuleTest{
  90. {
  91. Name: "TestSingleSQLRule25",
  92. Sql: "SELECT unnest(a) from demoArr group by SESSIONWINDOW(ss, 2, 1);",
  93. R: [][]map[string]interface{}{
  94. {
  95. {
  96. "error": "the argument for the unnest function should be array",
  97. },
  98. },
  99. },
  100. },
  101. {
  102. Name: "TestSingleSQLRule24",
  103. Sql: "Select unnest(a) from demoArr;",
  104. R: [][]map[string]interface{}{
  105. {
  106. {
  107. "error": "the argument for the unnest function should be array",
  108. },
  109. },
  110. },
  111. },
  112. {
  113. Name: "TestSingleSQLRule21",
  114. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1);`,
  115. R: [][]map[string]interface{}{
  116. {
  117. {
  118. "col": float64(1),
  119. "size": float64(1),
  120. },
  121. {
  122. "col": float64(2),
  123. "size": float64(1),
  124. },
  125. {
  126. "col": float64(3),
  127. "size": float64(1),
  128. },
  129. },
  130. },
  131. },
  132. {
  133. Name: "TestSingleSQLRule22",
  134. Sql: `SELECT unnest(arr3) as col,y From demoArr group by y, SESSIONWINDOW(ss, 2, 1);`,
  135. R: [][]map[string]interface{}{
  136. {
  137. {
  138. "col": float64(1),
  139. "y": float64(2),
  140. },
  141. {
  142. "col": float64(2),
  143. "y": float64(2),
  144. },
  145. {
  146. "col": float64(3),
  147. "y": float64(2),
  148. },
  149. },
  150. },
  151. },
  152. {
  153. Name: "TestSingleSQLRule23",
  154. Sql: "SELECT unnest(arr3) as col,a from demoArr group by SESSIONWINDOW(ss, 2, 1);",
  155. R: [][]map[string]interface{}{
  156. {
  157. {
  158. "col": float64(1),
  159. "a": float64(6),
  160. },
  161. {
  162. "col": float64(2),
  163. "a": float64(6),
  164. },
  165. {
  166. "col": float64(3),
  167. "a": float64(6),
  168. },
  169. },
  170. },
  171. },
  172. {
  173. Name: `TestSingleSQLRule18`,
  174. Sql: `SELECT unnest(arr2) FROM demoArr where x=1`,
  175. R: [][]map[string]interface{}{
  176. {
  177. {
  178. "a": float64(1),
  179. "b": float64(2),
  180. },
  181. },
  182. {
  183. {
  184. "a": float64(3),
  185. "b": float64(4),
  186. },
  187. },
  188. },
  189. },
  190. // The mapping schema created by unnest function will cover the original column if they have the same column name
  191. {
  192. Name: `TestSingleSQLRule19`,
  193. Sql: `SELECT unnest(arr2),a FROM demoArr where x=1`,
  194. R: [][]map[string]interface{}{
  195. {
  196. {
  197. "a": float64(1),
  198. "b": float64(2),
  199. },
  200. },
  201. {
  202. {
  203. "a": float64(3),
  204. "b": float64(4),
  205. },
  206. },
  207. },
  208. },
  209. {
  210. Name: `TestSingleSQLRule20`,
  211. Sql: `SELECT unnest(arr3) as col FROM demoArr where x=1`,
  212. R: [][]map[string]interface{}{
  213. {
  214. {
  215. "col": float64(1),
  216. },
  217. },
  218. {
  219. {
  220. "col": float64(2),
  221. },
  222. },
  223. {
  224. {
  225. "col": float64(3),
  226. },
  227. },
  228. },
  229. },
  230. {
  231. Name: `TestSingleSQLRule21`,
  232. Sql: `SELECT unnest(arr2),x FROM demoArr where x=1`,
  233. R: [][]map[string]interface{}{
  234. {
  235. {
  236. "a": float64(1),
  237. "b": float64(2),
  238. "x": float64(1),
  239. },
  240. },
  241. {
  242. {
  243. "a": float64(3),
  244. "b": float64(4),
  245. "x": float64(1),
  246. },
  247. },
  248. },
  249. },
  250. }
  251. // Data setup
  252. HandleStream(true, streamList, t)
  253. options := []*api.RuleOption{
  254. {
  255. BufferLength: 100,
  256. SendError: true,
  257. }, {
  258. BufferLength: 100,
  259. SendError: true,
  260. Qos: api.AtLeastOnce,
  261. CheckpointInterval: 5000,
  262. }, {
  263. BufferLength: 100,
  264. SendError: true,
  265. Qos: api.ExactlyOnce,
  266. CheckpointInterval: 5000,
  267. },
  268. }
  269. for j, opt := range options {
  270. DoRuleTest(t, tests, j, opt, 0)
  271. }
  272. }
  273. func TestSingleSQL(t *testing.T) {
  274. // Reset
  275. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable", "demoArr"}
  276. HandleStream(false, streamList, t)
  277. // Data setup
  278. tests := []RuleTest{
  279. {
  280. Name: `TestSingleSQLRule0`,
  281. Sql: `SELECT arr[x:y+1] as col1 FROM demoArr where x=1`,
  282. R: [][]map[string]interface{}{
  283. {{
  284. "col1": []interface{}{
  285. float64(2), float64(3),
  286. },
  287. }},
  288. },
  289. },
  290. {
  291. Name: `TestSingleSQLRule1`,
  292. Sql: `SELECT *, upper(color), event_time() FROM demo`,
  293. R: [][]map[string]interface{}{
  294. {{
  295. "color": "red",
  296. "size": float64(3),
  297. "ts": float64(1541152486013),
  298. "upper": "RED",
  299. "event_time": float64(1541152486013),
  300. }},
  301. {{
  302. "color": "blue",
  303. "size": float64(6),
  304. "ts": float64(1541152486822),
  305. "upper": "BLUE",
  306. "event_time": float64(1541152486822),
  307. }},
  308. {{
  309. "color": "blue",
  310. "size": float64(2),
  311. "ts": float64(1541152487632),
  312. "upper": "BLUE",
  313. "event_time": float64(1541152487632),
  314. }},
  315. {{
  316. "color": "yellow",
  317. "size": float64(4),
  318. "ts": float64(1541152488442),
  319. "upper": "YELLOW",
  320. "event_time": float64(1541152488442),
  321. }},
  322. {{
  323. "color": "red",
  324. "size": float64(1),
  325. "ts": float64(1541152489252),
  326. "upper": "RED",
  327. "event_time": float64(1541152489252),
  328. }},
  329. },
  330. M: map[string]interface{}{
  331. "op_2_project_0_exceptions_total": int64(0),
  332. "op_2_project_0_process_latency_us": int64(0),
  333. "op_2_project_0_records_in_total": int64(5),
  334. "op_2_project_0_records_out_total": int64(5),
  335. "sink_mockSink_0_exceptions_total": int64(0),
  336. "sink_mockSink_0_records_in_total": int64(5),
  337. "sink_mockSink_0_records_out_total": int64(5),
  338. "source_demo_0_exceptions_total": int64(0),
  339. "source_demo_0_records_in_total": int64(5),
  340. "source_demo_0_records_out_total": int64(5),
  341. },
  342. T: &api.PrintableTopo{
  343. Sources: []string{"source_demo"},
  344. Edges: map[string][]interface{}{
  345. "source_demo": {"op_2_project"},
  346. "op_2_project": {"sink_mockSink"},
  347. },
  348. },
  349. },
  350. {
  351. Name: `TestSingleSQLRule2`,
  352. Sql: `SELECT color, ts, last_hit_count() + 1 as lc FROM demo where size > 3`,
  353. R: [][]map[string]interface{}{
  354. {{
  355. "color": "blue",
  356. "ts": float64(1541152486822),
  357. "lc": float64(1),
  358. }},
  359. {{
  360. "color": "yellow",
  361. "ts": float64(1541152488442),
  362. "lc": float64(2),
  363. }},
  364. },
  365. M: map[string]interface{}{
  366. "op_3_project_0_exceptions_total": int64(0),
  367. "op_3_project_0_process_latency_us": int64(0),
  368. "op_3_project_0_records_in_total": int64(2),
  369. "op_3_project_0_records_out_total": int64(2),
  370. "sink_mockSink_0_exceptions_total": int64(0),
  371. "sink_mockSink_0_records_in_total": int64(2),
  372. "sink_mockSink_0_records_out_total": int64(2),
  373. "source_demo_0_exceptions_total": int64(0),
  374. "source_demo_0_records_in_total": int64(5),
  375. "source_demo_0_records_out_total": int64(5),
  376. "op_2_filter_0_exceptions_total": int64(0),
  377. "op_2_filter_0_process_latency_us": int64(0),
  378. "op_2_filter_0_records_in_total": int64(5),
  379. "op_2_filter_0_records_out_total": int64(2),
  380. },
  381. },
  382. {
  383. Name: `TestSingleSQLRule3`,
  384. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  385. R: [][]map[string]interface{}{
  386. {{
  387. "Int8": float64(6),
  388. "ts": float64(1541152486822),
  389. }},
  390. {{
  391. "Int8": float64(4),
  392. "ts": float64(1541152488442),
  393. }},
  394. },
  395. M: map[string]interface{}{
  396. "op_3_project_0_exceptions_total": int64(0),
  397. "op_3_project_0_process_latency_us": int64(0),
  398. "op_3_project_0_records_in_total": int64(2),
  399. "op_3_project_0_records_out_total": int64(2),
  400. "sink_mockSink_0_exceptions_total": int64(0),
  401. "sink_mockSink_0_records_in_total": int64(2),
  402. "sink_mockSink_0_records_out_total": int64(2),
  403. "source_demo_0_exceptions_total": int64(0),
  404. "source_demo_0_records_in_total": int64(5),
  405. "source_demo_0_records_out_total": int64(5),
  406. "op_2_filter_0_exceptions_total": int64(0),
  407. "op_2_filter_0_process_latency_us": int64(0),
  408. "op_2_filter_0_records_in_total": int64(5),
  409. "op_2_filter_0_records_out_total": int64(2),
  410. },
  411. },
  412. {
  413. Name: `TestSingleSQLRule4`,
  414. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  415. R: [][]map[string]interface{}{
  416. {{
  417. "error": "error in preprocessor: field size type mismatch: cannot convert string(red) to int64",
  418. }},
  419. {{
  420. "Int8": float64(6),
  421. "ts": float64(1541152486822),
  422. }},
  423. {{
  424. "Int8": float64(4),
  425. "ts": float64(1541152488442),
  426. }},
  427. {{
  428. "error": "error in preprocessor: field size type mismatch: cannot convert string(blue) to int64",
  429. }},
  430. },
  431. M: map[string]interface{}{
  432. "op_3_project_0_exceptions_total": int64(2),
  433. "op_3_project_0_process_latency_us": int64(0),
  434. "op_3_project_0_records_in_total": int64(2),
  435. "op_3_project_0_records_out_total": int64(2),
  436. "sink_mockSink_0_exceptions_total": int64(0),
  437. "sink_mockSink_0_records_in_total": int64(4),
  438. "sink_mockSink_0_records_out_total": int64(4),
  439. "source_demoError_0_exceptions_total": int64(2),
  440. "source_demoError_0_records_in_total": int64(5),
  441. "source_demoError_0_records_out_total": int64(3),
  442. "op_2_filter_0_exceptions_total": int64(2),
  443. "op_2_filter_0_process_latency_us": int64(0),
  444. "op_2_filter_0_records_in_total": int64(3),
  445. "op_2_filter_0_records_out_total": int64(2),
  446. },
  447. },
  448. {
  449. Name: `TestSingleSQLRule5`,
  450. Sql: `SELECT meta(topic) as m, ts FROM demo WHERE last_hit_count() < 4`,
  451. R: [][]map[string]interface{}{
  452. {{
  453. "m": "mock",
  454. "ts": float64(1541152486013),
  455. }},
  456. {{
  457. "m": "mock",
  458. "ts": float64(1541152486822),
  459. }},
  460. {{
  461. "m": "mock",
  462. "ts": float64(1541152487632),
  463. }},
  464. {{
  465. "m": "mock",
  466. "ts": float64(1541152488442),
  467. }},
  468. },
  469. M: map[string]interface{}{
  470. "sink_mockSink_0_exceptions_total": int64(0),
  471. "sink_mockSink_0_records_in_total": int64(4),
  472. "sink_mockSink_0_records_out_total": int64(4),
  473. "source_demo_0_exceptions_total": int64(0),
  474. "source_demo_0_records_in_total": int64(5),
  475. "source_demo_0_records_out_total": int64(5),
  476. },
  477. },
  478. {
  479. Name: `TestSingleSQLRule6`,
  480. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  481. R: [][]map[string]interface{}{
  482. {{
  483. "color": "blue",
  484. "ts": float64(1541152486822),
  485. }},
  486. {{
  487. "color": "yellow",
  488. "ts": float64(1541152488442),
  489. }},
  490. },
  491. M: map[string]interface{}{
  492. "op_3_project_0_exceptions_total": int64(0),
  493. "op_3_project_0_process_latency_us": int64(0),
  494. "op_3_project_0_records_in_total": int64(2),
  495. "op_3_project_0_records_out_total": int64(2),
  496. "sink_mockSink_0_exceptions_total": int64(0),
  497. "sink_mockSink_0_records_in_total": int64(2),
  498. "sink_mockSink_0_records_out_total": int64(2),
  499. "source_demo_0_exceptions_total": int64(0),
  500. "source_demo_0_records_in_total": int64(5),
  501. "source_demo_0_records_out_total": int64(5),
  502. "op_2_filter_0_exceptions_total": int64(0),
  503. "op_2_filter_0_process_latency_us": int64(0),
  504. "op_2_filter_0_records_in_total": int64(5),
  505. "op_2_filter_0_records_out_total": int64(2),
  506. },
  507. },
  508. {
  509. Name: `TestSingleSQLRule7`,
  510. Sql: "SELECT `from` FROM demo1",
  511. R: [][]map[string]interface{}{
  512. {{
  513. "from": "device1",
  514. }},
  515. {{
  516. "from": "device2",
  517. }},
  518. {{
  519. "from": "device3",
  520. }},
  521. {{
  522. "from": "device1",
  523. }},
  524. {{
  525. "from": "device3",
  526. }},
  527. },
  528. M: map[string]interface{}{
  529. "op_2_project_0_exceptions_total": int64(0),
  530. "op_2_project_0_process_latency_us": int64(0),
  531. "op_2_project_0_records_in_total": int64(5),
  532. "op_2_project_0_records_out_total": int64(5),
  533. "sink_mockSink_0_exceptions_total": int64(0),
  534. "sink_mockSink_0_records_in_total": int64(5),
  535. "sink_mockSink_0_records_out_total": int64(5),
  536. "source_demo1_0_exceptions_total": int64(0),
  537. "source_demo1_0_records_in_total": int64(5),
  538. "source_demo1_0_records_out_total": int64(5),
  539. },
  540. },
  541. {
  542. Name: `TestSingleSQLRule8`,
  543. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  544. R: [][]map[string]interface{}{
  545. {{
  546. "temp": float64(25.5),
  547. "hum": float64(65),
  548. "from": "device1",
  549. "ts": float64(1541152486013),
  550. }},
  551. {{
  552. "temp": float64(27.4),
  553. "hum": float64(80),
  554. "from": "device1",
  555. "ts": float64(1541152488442),
  556. }},
  557. },
  558. M: map[string]interface{}{
  559. "op_3_project_0_exceptions_total": int64(0),
  560. "op_3_project_0_process_latency_us": int64(0),
  561. "op_3_project_0_records_in_total": int64(2),
  562. "op_3_project_0_records_out_total": int64(2),
  563. "op_2_filter_0_exceptions_total": int64(0),
  564. "op_2_filter_0_process_latency_us": int64(0),
  565. "op_2_filter_0_records_in_total": int64(5),
  566. "op_2_filter_0_records_out_total": int64(2),
  567. "sink_mockSink_0_exceptions_total": int64(0),
  568. "sink_mockSink_0_records_in_total": int64(2),
  569. "sink_mockSink_0_records_out_total": int64(2),
  570. "source_demo1_0_exceptions_total": int64(0),
  571. "source_demo1_0_records_in_total": int64(5),
  572. "source_demo1_0_records_out_total": int64(5),
  573. },
  574. },
  575. {
  576. Name: `TestSingleSQLRule9`,
  577. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  578. R: [][]map[string]interface{}{
  579. {{
  580. "color": "red",
  581. "s": "M",
  582. "ts": float64(1541152486013),
  583. }},
  584. {{
  585. "color": "blue",
  586. "s": "L",
  587. "ts": float64(1541152486822),
  588. }},
  589. {{
  590. "color": "blue",
  591. "s": "M",
  592. "ts": float64(1541152487632),
  593. }},
  594. {{
  595. "color": "yellow",
  596. "s": "L",
  597. "ts": float64(1541152488442),
  598. }},
  599. {{
  600. "color": "red",
  601. "s": "S",
  602. "ts": float64(1541152489252),
  603. }},
  604. },
  605. M: map[string]interface{}{
  606. "op_2_project_0_exceptions_total": int64(0),
  607. "op_2_project_0_process_latency_us": int64(0),
  608. "op_2_project_0_records_in_total": int64(5),
  609. "op_2_project_0_records_out_total": int64(5),
  610. "sink_mockSink_0_exceptions_total": int64(0),
  611. "sink_mockSink_0_records_in_total": int64(5),
  612. "sink_mockSink_0_records_out_total": int64(5),
  613. "source_demo_0_exceptions_total": int64(0),
  614. "source_demo_0_records_in_total": int64(5),
  615. "source_demo_0_records_out_total": int64(5),
  616. },
  617. T: &api.PrintableTopo{
  618. Sources: []string{"source_demo"},
  619. Edges: map[string][]interface{}{
  620. "source_demo": {"op_2_project"},
  621. "op_2_project": {"sink_mockSink"},
  622. },
  623. },
  624. },
  625. {
  626. Name: `TestSingleSQLRule10`,
  627. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  628. R: [][]map[string]interface{}{
  629. {{
  630. "id": float64(1541152486013),
  631. "name": "name1",
  632. "color": "red",
  633. "size": float64(3),
  634. "ts": float64(1541152486013),
  635. }},
  636. {{
  637. "id": float64(1541152487632),
  638. "name": "name2",
  639. "color": "blue",
  640. "size": float64(2),
  641. "ts": float64(1541152487632),
  642. }},
  643. {{
  644. "id": float64(1541152489252),
  645. "name": "name3",
  646. "color": "red",
  647. "size": float64(1),
  648. "ts": float64(1541152489252),
  649. }},
  650. },
  651. W: 15,
  652. M: map[string]interface{}{
  653. "op_3_join_aligner_0_records_in_total": int64(6),
  654. "op_3_join_aligner_0_records_out_total": int64(5),
  655. "op_4_join_0_exceptions_total": int64(0),
  656. "op_4_join_0_records_in_total": int64(5),
  657. "op_4_join_0_records_out_total": int64(3),
  658. "op_5_project_0_exceptions_total": int64(0),
  659. "op_5_project_0_records_in_total": int64(3),
  660. "op_5_project_0_records_out_total": int64(3),
  661. "sink_mockSink_0_exceptions_total": int64(0),
  662. "sink_mockSink_0_records_in_total": int64(3),
  663. "sink_mockSink_0_records_out_total": int64(3),
  664. "source_demo_0_exceptions_total": int64(0),
  665. "source_demo_0_records_in_total": int64(5),
  666. "source_demo_0_records_out_total": int64(5),
  667. "source_table1_0_exceptions_total": int64(0),
  668. "source_table1_0_records_in_total": int64(4),
  669. "source_table1_0_records_out_total": int64(1),
  670. },
  671. },
  672. {
  673. Name: `TestSingleSQLRule11`,
  674. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  675. R: [][]map[string]interface{}{
  676. {{
  677. "device": "device2",
  678. }},
  679. {{
  680. "device": "device4",
  681. }},
  682. {{
  683. "device": "device5",
  684. }},
  685. },
  686. M: map[string]interface{}{
  687. "op_3_join_aligner_0_records_in_total": int64(10),
  688. "op_3_join_aligner_0_records_out_total": int64(5),
  689. "op_4_join_0_exceptions_total": int64(0),
  690. "op_4_join_0_records_in_total": int64(5),
  691. "op_4_join_0_records_out_total": int64(3),
  692. "op_5_project_0_exceptions_total": int64(0),
  693. "op_5_project_0_records_in_total": int64(3),
  694. "op_5_project_0_records_out_total": int64(3),
  695. "sink_mockSink_0_exceptions_total": int64(0),
  696. "sink_mockSink_0_records_in_total": int64(3),
  697. "sink_mockSink_0_records_out_total": int64(3),
  698. "source_demo_0_exceptions_total": int64(0),
  699. "source_demo_0_records_in_total": int64(5),
  700. "source_demo_0_records_out_total": int64(5),
  701. "source_demoTable_0_exceptions_total": int64(0),
  702. "source_demoTable_0_records_in_total": int64(5),
  703. "source_demoTable_0_records_out_total": int64(5),
  704. },
  705. },
  706. {
  707. Name: `TestSingleSQLRule12`,
  708. Sql: "SELECT demo.ts as demoTs, table1.id as table1Id FROM demo INNER JOIN table1 on demoTs = table1Id",
  709. R: [][]map[string]interface{}{
  710. {{
  711. "table1Id": float64(1541152486013),
  712. "demoTs": float64(1541152486013),
  713. }},
  714. {{
  715. "table1Id": float64(1541152487632),
  716. "demoTs": float64(1541152487632),
  717. }},
  718. {{
  719. "table1Id": float64(1541152489252),
  720. "demoTs": float64(1541152489252),
  721. }},
  722. },
  723. W: 15,
  724. M: map[string]interface{}{
  725. "op_3_join_aligner_0_records_in_total": int64(6),
  726. "op_3_join_aligner_0_records_out_total": int64(5),
  727. "op_4_join_0_exceptions_total": int64(0),
  728. "op_4_join_0_records_in_total": int64(5),
  729. "op_4_join_0_records_out_total": int64(3),
  730. "op_5_project_0_exceptions_total": int64(0),
  731. "op_5_project_0_records_in_total": int64(3),
  732. "op_5_project_0_records_out_total": int64(3),
  733. "sink_mockSink_0_exceptions_total": int64(0),
  734. "sink_mockSink_0_records_in_total": int64(3),
  735. "sink_mockSink_0_records_out_total": int64(3),
  736. "source_demo_0_exceptions_total": int64(0),
  737. "source_demo_0_records_in_total": int64(5),
  738. "source_demo_0_records_out_total": int64(5),
  739. "source_table1_0_exceptions_total": int64(0),
  740. "source_table1_0_records_in_total": int64(4),
  741. "source_table1_0_records_out_total": int64(1),
  742. },
  743. },
  744. {
  745. Name: `TestChanged13`,
  746. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demo",
  747. R: [][]map[string]interface{}{
  748. {{
  749. "tt_color": "red",
  750. "tt_size": float64(3),
  751. }},
  752. {{
  753. "tt_color": "blue",
  754. "tt_size": float64(6),
  755. }},
  756. {{
  757. "tt_size": float64(2),
  758. }},
  759. {{
  760. "tt_color": "yellow",
  761. "tt_size": float64(4),
  762. }},
  763. {{
  764. "tt_color": "red",
  765. "tt_size": float64(1),
  766. }},
  767. },
  768. M: map[string]interface{}{
  769. "op_2_project_0_exceptions_total": int64(0),
  770. "op_2_project_0_process_latency_us": int64(0),
  771. "op_2_project_0_records_in_total": int64(5),
  772. "op_2_project_0_records_out_total": int64(5),
  773. "sink_mockSink_0_exceptions_total": int64(0),
  774. "sink_mockSink_0_records_in_total": int64(5),
  775. "sink_mockSink_0_records_out_total": int64(5),
  776. "source_demo_0_exceptions_total": int64(0),
  777. "source_demo_0_records_in_total": int64(5),
  778. "source_demo_0_records_out_total": int64(5),
  779. },
  780. },
  781. {
  782. Name: `TestAliasOrderBy14`,
  783. Sql: "SELECT color, count(*) as c FROM demo where color != \"red\" GROUP BY COUNTWINDOW(5), color Order by c DESC",
  784. R: [][]map[string]interface{}{
  785. {
  786. {
  787. "color": "blue",
  788. "c": float64(2),
  789. },
  790. {
  791. "color": "yellow",
  792. "c": float64(1),
  793. },
  794. },
  795. },
  796. M: map[string]interface{}{
  797. "op_6_project_0_exceptions_total": int64(0),
  798. "op_6_project_0_process_latency_us": int64(0),
  799. "op_6_project_0_records_in_total": int64(1),
  800. "op_6_project_0_records_out_total": int64(1),
  801. "sink_mockSink_0_exceptions_total": int64(0),
  802. "sink_mockSink_0_records_in_total": int64(1),
  803. "sink_mockSink_0_records_out_total": int64(1),
  804. "source_demo_0_exceptions_total": int64(0),
  805. "source_demo_0_records_in_total": int64(5),
  806. "source_demo_0_records_out_total": int64(5),
  807. },
  808. },
  809. {
  810. Name: `TestSingleSQLRule17`,
  811. Sql: `SELECT arr[x:4] as col1 FROM demoArr where x=1`,
  812. R: [][]map[string]interface{}{
  813. {{
  814. "col1": []interface{}{
  815. float64(2), float64(3), float64(4),
  816. },
  817. }},
  818. },
  819. },
  820. {
  821. Name: `TestSingleSQLRule16`,
  822. Sql: `SELECT arr[1:y] as col1 FROM demoArr where x=1`,
  823. R: [][]map[string]interface{}{
  824. {{
  825. "col1": []interface{}{
  826. float64(2),
  827. },
  828. }},
  829. },
  830. },
  831. {
  832. Name: `TestSingleSQLRule15`,
  833. Sql: `SELECT arr[1] as col1 FROM demoArr where x=1`,
  834. R: [][]map[string]interface{}{
  835. {{
  836. "col1": float64(2),
  837. }},
  838. },
  839. },
  840. {
  841. Name: `TestLagAlias`,
  842. Sql: "SELECT lag(size) as lastSize, lag(had_changed(true,size)), size, lastSize/size as changeRate FROM demo WHERE size > 2",
  843. R: [][]map[string]interface{}{
  844. {{
  845. "size": float64(3),
  846. }},
  847. {{
  848. "lastSize": float64(3),
  849. "size": float64(6),
  850. "lag": true,
  851. "changeRate": float64(0),
  852. }},
  853. {{
  854. "lastSize": float64(2),
  855. "size": float64(4),
  856. "lag": true,
  857. "changeRate": float64(0),
  858. }},
  859. },
  860. M: map[string]interface{}{
  861. "sink_mockSink_0_exceptions_total": int64(0),
  862. "sink_mockSink_0_records_in_total": int64(3),
  863. "sink_mockSink_0_records_out_total": int64(3),
  864. "source_demo_0_exceptions_total": int64(0),
  865. "source_demo_0_records_in_total": int64(5),
  866. "source_demo_0_records_out_total": int64(5),
  867. },
  868. },
  869. {
  870. Name: `TestLagPartition`,
  871. Sql: "SELECT color, lag(size) over (partition by color) as lastSize, size, lastSize/size as changeRate FROM demo",
  872. R: [][]map[string]interface{}{
  873. {{
  874. "color": "red",
  875. "size": float64(3),
  876. }},
  877. {{
  878. "color": "blue",
  879. "size": float64(6),
  880. }},
  881. {{
  882. "color": "blue",
  883. "lastSize": float64(6),
  884. "size": float64(2),
  885. "changeRate": float64(3),
  886. }},
  887. {{
  888. "color": "yellow",
  889. "size": float64(4),
  890. }},
  891. {{
  892. "color": "red",
  893. "lastSize": float64(3),
  894. "size": float64(1),
  895. "changeRate": float64(3),
  896. }},
  897. },
  898. M: map[string]interface{}{
  899. "sink_mockSink_0_exceptions_total": int64(0),
  900. "sink_mockSink_0_records_in_total": int64(5),
  901. "sink_mockSink_0_records_out_total": int64(5),
  902. "source_demo_0_exceptions_total": int64(0),
  903. "source_demo_0_records_in_total": int64(5),
  904. "source_demo_0_records_out_total": int64(5),
  905. },
  906. },
  907. }
  908. HandleStream(true, streamList, t)
  909. options := []*api.RuleOption{
  910. {
  911. BufferLength: 100,
  912. SendError: true,
  913. }, {
  914. BufferLength: 100,
  915. SendError: true,
  916. Qos: api.AtLeastOnce,
  917. CheckpointInterval: 5000,
  918. }, {
  919. BufferLength: 100,
  920. SendError: true,
  921. Qos: api.ExactlyOnce,
  922. CheckpointInterval: 5000,
  923. },
  924. }
  925. for j, opt := range options {
  926. DoRuleTest(t, tests, j, opt, 0)
  927. }
  928. }
  929. func TestSingleSQLWithEventTime(t *testing.T) {
  930. // Reset
  931. streamList := []string{"demoE"}
  932. HandleStream(false, streamList, t)
  933. // Data setup
  934. tests := []RuleTest{
  935. {
  936. Name: `TestSingleSQLRule1`,
  937. Sql: `SELECT *, upper(color) FROM demoE`,
  938. R: [][]map[string]interface{}{
  939. {{
  940. "color": "red",
  941. "size": float64(3),
  942. "ts": float64(1541152486013),
  943. "upper": "RED",
  944. }},
  945. {{
  946. "color": "blue",
  947. "size": float64(2),
  948. "ts": float64(1541152487632),
  949. "upper": "BLUE",
  950. }},
  951. {{
  952. "color": "yellow",
  953. "size": float64(4),
  954. "ts": float64(1541152488442),
  955. "upper": "YELLOW",
  956. }},
  957. {{
  958. "color": "red",
  959. "size": float64(1),
  960. "ts": float64(1541152489252),
  961. "upper": "RED",
  962. }},
  963. },
  964. M: map[string]interface{}{
  965. "op_3_project_0_exceptions_total": int64(0),
  966. "op_3_project_0_process_latency_us": int64(0),
  967. "op_3_project_0_records_in_total": int64(4),
  968. "op_3_project_0_records_out_total": int64(4),
  969. "sink_mockSink_0_exceptions_total": int64(0),
  970. "sink_mockSink_0_records_in_total": int64(4),
  971. "sink_mockSink_0_records_out_total": int64(4),
  972. "source_demoE_0_exceptions_total": int64(0),
  973. "source_demoE_0_records_in_total": int64(6),
  974. "source_demoE_0_records_out_total": int64(6),
  975. },
  976. T: &api.PrintableTopo{
  977. Sources: []string{"source_demoE"},
  978. Edges: map[string][]interface{}{
  979. "source_demoE": {"op_2_watermark"},
  980. "op_2_watermark": {"op_3_project"},
  981. "op_3_project": {"sink_mockSink"},
  982. },
  983. },
  984. },
  985. {
  986. Name: `TestStateFunc`,
  987. Sql: `SELECT *, last_hit_time() as lt, last_hit_count() as lc, event_time() as et FROM demoE WHERE size < 3 AND lc < 2`,
  988. R: [][]map[string]interface{}{
  989. {{
  990. "color": "blue",
  991. "size": float64(2),
  992. "ts": float64(1541152487632),
  993. "lc": float64(0),
  994. "lt": float64(0),
  995. "et": float64(1541152487632),
  996. }},
  997. {{
  998. "color": "red",
  999. "size": float64(1),
  1000. "ts": float64(1541152489252),
  1001. "lc": float64(1),
  1002. "lt": float64(1541152487632),
  1003. "et": float64(1541152489252),
  1004. }},
  1005. },
  1006. M: map[string]interface{}{
  1007. "sink_mockSink_0_exceptions_total": int64(0),
  1008. "sink_mockSink_0_records_in_total": int64(2),
  1009. "sink_mockSink_0_records_out_total": int64(2),
  1010. "source_demoE_0_exceptions_total": int64(0),
  1011. "source_demoE_0_records_in_total": int64(6),
  1012. "source_demoE_0_records_out_total": int64(6),
  1013. },
  1014. },
  1015. {
  1016. Name: `TestChanged`,
  1017. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demoE",
  1018. R: [][]map[string]interface{}{
  1019. {{
  1020. "tt_color": "red",
  1021. "tt_size": float64(3),
  1022. }},
  1023. {{
  1024. "tt_color": "blue",
  1025. "tt_size": float64(2),
  1026. }},
  1027. {{
  1028. "tt_color": "yellow",
  1029. "tt_size": float64(4),
  1030. }},
  1031. {{
  1032. "tt_color": "red",
  1033. "tt_size": float64(1),
  1034. }},
  1035. },
  1036. M: map[string]interface{}{
  1037. "op_3_project_0_exceptions_total": int64(0),
  1038. "op_3_project_0_process_latency_us": int64(0),
  1039. "op_3_project_0_records_in_total": int64(4),
  1040. "op_3_project_0_records_out_total": int64(4),
  1041. "sink_mockSink_0_exceptions_total": int64(0),
  1042. "sink_mockSink_0_records_in_total": int64(4),
  1043. "sink_mockSink_0_records_out_total": int64(4),
  1044. "source_demoE_0_exceptions_total": int64(0),
  1045. "source_demoE_0_records_in_total": int64(6),
  1046. "source_demoE_0_records_out_total": int64(6),
  1047. },
  1048. },
  1049. }
  1050. HandleStream(true, streamList, t)
  1051. options := []*api.RuleOption{
  1052. {
  1053. BufferLength: 100,
  1054. SendError: true,
  1055. IsEventTime: true,
  1056. LateTol: 1000,
  1057. }, {
  1058. BufferLength: 100,
  1059. SendError: true,
  1060. Qos: api.AtLeastOnce,
  1061. CheckpointInterval: 5000,
  1062. IsEventTime: true,
  1063. LateTol: 1000,
  1064. }, {
  1065. BufferLength: 100,
  1066. SendError: true,
  1067. Qos: api.ExactlyOnce,
  1068. CheckpointInterval: 5000,
  1069. IsEventTime: true,
  1070. LateTol: 1000,
  1071. },
  1072. }
  1073. for j, opt := range options {
  1074. DoRuleTest(t, tests, j, opt, 0)
  1075. }
  1076. }
  1077. func TestSingleSQLError(t *testing.T) {
  1078. // Reset
  1079. streamList := []string{"ldemo"}
  1080. HandleStream(false, streamList, t)
  1081. // Data setup
  1082. tests := []RuleTest{
  1083. {
  1084. Name: `TestSingleSQLErrorRule1`,
  1085. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1086. R: [][]map[string]interface{}{
  1087. {{
  1088. "color": "red",
  1089. "ts": float64(1541152486013),
  1090. }},
  1091. {{
  1092. "error": "run Where error: invalid operation string(string) >= int64(3)",
  1093. }},
  1094. {{
  1095. "ts": float64(1541152487632),
  1096. }},
  1097. },
  1098. M: map[string]interface{}{
  1099. "op_3_project_0_exceptions_total": int64(1),
  1100. "op_3_project_0_process_latency_us": int64(0),
  1101. "op_3_project_0_records_in_total": int64(2),
  1102. "op_3_project_0_records_out_total": int64(2),
  1103. "sink_mockSink_0_exceptions_total": int64(0),
  1104. "sink_mockSink_0_records_in_total": int64(3),
  1105. "sink_mockSink_0_records_out_total": int64(3),
  1106. "source_ldemo_0_exceptions_total": int64(0),
  1107. "source_ldemo_0_records_in_total": int64(5),
  1108. "source_ldemo_0_records_out_total": int64(5),
  1109. "op_2_filter_0_exceptions_total": int64(1),
  1110. "op_2_filter_0_process_latency_us": int64(0),
  1111. "op_2_filter_0_records_in_total": int64(5),
  1112. "op_2_filter_0_records_out_total": int64(2),
  1113. },
  1114. }, {
  1115. Name: `TestSingleSQLErrorRule2`,
  1116. Sql: `SELECT size * 5 FROM ldemo`,
  1117. R: [][]map[string]interface{}{
  1118. {{
  1119. "kuiper_field_0": float64(15),
  1120. }},
  1121. {{
  1122. "error": "run Select error: invalid operation string(string) * int64(5)",
  1123. }},
  1124. {{
  1125. "kuiper_field_0": float64(15),
  1126. }},
  1127. {{
  1128. "kuiper_field_0": float64(10),
  1129. }},
  1130. {{}},
  1131. },
  1132. M: map[string]interface{}{
  1133. "op_2_project_0_exceptions_total": int64(1),
  1134. "op_2_project_0_process_latency_us": int64(0),
  1135. "op_2_project_0_records_in_total": int64(5),
  1136. "op_2_project_0_records_out_total": int64(4),
  1137. "sink_mockSink_0_exceptions_total": int64(0),
  1138. "sink_mockSink_0_records_in_total": int64(5),
  1139. "sink_mockSink_0_records_out_total": int64(5),
  1140. "source_ldemo_0_exceptions_total": int64(0),
  1141. "source_ldemo_0_records_in_total": int64(5),
  1142. "source_ldemo_0_records_out_total": int64(5),
  1143. },
  1144. },
  1145. }
  1146. HandleStream(true, streamList, t)
  1147. DoRuleTest(t, tests, 0, &api.RuleOption{
  1148. BufferLength: 100,
  1149. SendError: true,
  1150. }, 0)
  1151. }
  1152. func TestSingleSQLOmitError(t *testing.T) {
  1153. // Reset
  1154. streamList := []string{"ldemo"}
  1155. HandleStream(false, streamList, t)
  1156. // Data setup
  1157. tests := []RuleTest{
  1158. {
  1159. Name: `TestSingleSQLErrorRule1`,
  1160. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1161. R: [][]map[string]interface{}{
  1162. {{
  1163. "color": "red",
  1164. "ts": float64(1541152486013),
  1165. }},
  1166. {{
  1167. "ts": float64(1541152487632),
  1168. }},
  1169. },
  1170. M: map[string]interface{}{
  1171. "op_3_project_0_exceptions_total": int64(0),
  1172. "op_3_project_0_process_latency_us": int64(0),
  1173. "op_3_project_0_records_in_total": int64(2),
  1174. "op_3_project_0_records_out_total": int64(2),
  1175. "sink_mockSink_0_exceptions_total": int64(0),
  1176. "sink_mockSink_0_records_in_total": int64(2),
  1177. "sink_mockSink_0_records_out_total": int64(2),
  1178. "source_ldemo_0_exceptions_total": int64(0),
  1179. "source_ldemo_0_records_in_total": int64(5),
  1180. "source_ldemo_0_records_out_total": int64(5),
  1181. "op_2_filter_0_exceptions_total": int64(1),
  1182. "op_2_filter_0_process_latency_us": int64(0),
  1183. "op_2_filter_0_records_in_total": int64(5),
  1184. "op_2_filter_0_records_out_total": int64(2),
  1185. },
  1186. }, {
  1187. Name: `TestSingleSQLErrorRule2`,
  1188. Sql: `SELECT size * 5 FROM ldemo`,
  1189. R: [][]map[string]interface{}{
  1190. {{
  1191. "kuiper_field_0": float64(15),
  1192. }},
  1193. {{
  1194. "kuiper_field_0": float64(15),
  1195. }},
  1196. {{
  1197. "kuiper_field_0": float64(10),
  1198. }},
  1199. {{}},
  1200. },
  1201. M: map[string]interface{}{
  1202. "op_2_project_0_exceptions_total": int64(1),
  1203. "op_2_project_0_process_latency_us": int64(0),
  1204. "op_2_project_0_records_in_total": int64(5),
  1205. "op_2_project_0_records_out_total": int64(4),
  1206. "sink_mockSink_0_exceptions_total": int64(0),
  1207. "sink_mockSink_0_records_in_total": int64(4),
  1208. "sink_mockSink_0_records_out_total": int64(4),
  1209. "source_ldemo_0_exceptions_total": int64(0),
  1210. "source_ldemo_0_records_in_total": int64(5),
  1211. "source_ldemo_0_records_out_total": int64(5),
  1212. },
  1213. },
  1214. }
  1215. HandleStream(true, streamList, t)
  1216. DoRuleTest(t, tests, 0, &api.RuleOption{
  1217. BufferLength: 100,
  1218. SendError: false,
  1219. }, 0)
  1220. }
  1221. func TestSingleSQLTemplate(t *testing.T) {
  1222. // Reset
  1223. streamList := []string{"demo"}
  1224. HandleStream(false, streamList, t)
  1225. // Data setup
  1226. tests := []RuleTest{
  1227. {
  1228. Name: `TestSingleSQLTemplateRule1`,
  1229. Sql: `SELECT * FROM demo`,
  1230. R: []map[string]interface{}{
  1231. {
  1232. "c": "red",
  1233. "wrapper": "w1",
  1234. },
  1235. {
  1236. "c": "blue",
  1237. "wrapper": "w1",
  1238. },
  1239. {
  1240. "c": "blue",
  1241. "wrapper": "w1",
  1242. },
  1243. {
  1244. "c": "yellow",
  1245. "wrapper": "w1",
  1246. },
  1247. {
  1248. "c": "red",
  1249. "wrapper": "w1",
  1250. },
  1251. },
  1252. M: map[string]interface{}{
  1253. "op_2_project_0_exceptions_total": int64(0),
  1254. "op_2_project_0_process_latency_us": int64(0),
  1255. "op_2_project_0_records_in_total": int64(5),
  1256. "op_2_project_0_records_out_total": int64(5),
  1257. "sink_mockSink_0_exceptions_total": int64(0),
  1258. "sink_mockSink_0_records_in_total": int64(5),
  1259. "sink_mockSink_0_records_out_total": int64(5),
  1260. "source_demo_0_exceptions_total": int64(0),
  1261. "source_demo_0_records_in_total": int64(5),
  1262. "source_demo_0_records_out_total": int64(5),
  1263. },
  1264. },
  1265. }
  1266. HandleStream(true, streamList, t)
  1267. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  1268. BufferLength: 100,
  1269. SendError: true,
  1270. }, 0, map[string]interface{}{
  1271. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  1272. "sendSingle": true,
  1273. }, func(result [][]byte) interface{} {
  1274. var maps []map[string]interface{}
  1275. for _, v := range result {
  1276. var mapRes map[string]interface{}
  1277. err := json.Unmarshal(v, &mapRes)
  1278. if err != nil {
  1279. t.Errorf("Failed to parse the input into map")
  1280. continue
  1281. }
  1282. maps = append(maps, mapRes)
  1283. }
  1284. return maps
  1285. })
  1286. }
  1287. func TestNoneSingleSQLTemplate(t *testing.T) {
  1288. // Reset
  1289. streamList := []string{"demo"}
  1290. HandleStream(false, streamList, t)
  1291. // Data setup
  1292. tests := []RuleTest{
  1293. {
  1294. Name: `TestNoneSingleSQLTemplateRule1`,
  1295. Sql: `SELECT * FROM demo`,
  1296. R: [][]byte{
  1297. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  1298. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  1299. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  1300. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  1301. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  1302. },
  1303. M: map[string]interface{}{
  1304. "op_2_project_0_exceptions_total": int64(0),
  1305. "op_2_project_0_process_latency_us": int64(0),
  1306. "op_2_project_0_records_in_total": int64(5),
  1307. "op_2_project_0_records_out_total": int64(5),
  1308. "sink_mockSink_0_exceptions_total": int64(0),
  1309. "sink_mockSink_0_records_in_total": int64(5),
  1310. "sink_mockSink_0_records_out_total": int64(5),
  1311. "source_demo_0_exceptions_total": int64(0),
  1312. "source_demo_0_records_in_total": int64(5),
  1313. "source_demo_0_records_out_total": int64(5),
  1314. },
  1315. },
  1316. }
  1317. HandleStream(true, streamList, t)
  1318. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  1319. BufferLength: 100,
  1320. SendError: true,
  1321. }, 0, map[string]interface{}{
  1322. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  1323. }, func(result [][]byte) interface{} {
  1324. return result
  1325. })
  1326. }
  1327. func TestSingleSQLForBinary(t *testing.T) {
  1328. // Reset
  1329. streamList := []string{"binDemo"}
  1330. HandleStream(false, streamList, t)
  1331. // Data setup
  1332. tests := []RuleTest{
  1333. {
  1334. Name: `TestSingleSQLRule1`,
  1335. Sql: `SELECT * FROM binDemo`,
  1336. R: [][]map[string]interface{}{
  1337. {{
  1338. "self": mocknode.Image,
  1339. }},
  1340. },
  1341. W: 50,
  1342. M: map[string]interface{}{
  1343. "op_2_project_0_exceptions_total": int64(0),
  1344. "op_2_project_0_process_latency_us": int64(0),
  1345. "op_2_project_0_records_in_total": int64(1),
  1346. "op_2_project_0_records_out_total": int64(1),
  1347. "sink_mockSink_0_exceptions_total": int64(0),
  1348. "sink_mockSink_0_records_in_total": int64(1),
  1349. "sink_mockSink_0_records_out_total": int64(1),
  1350. "source_binDemo_0_exceptions_total": int64(0),
  1351. "source_binDemo_0_records_in_total": int64(1),
  1352. "source_binDemo_0_records_out_total": int64(1),
  1353. },
  1354. },
  1355. }
  1356. HandleStream(true, streamList, t)
  1357. options := []*api.RuleOption{
  1358. {
  1359. BufferLength: 100,
  1360. SendError: true,
  1361. }, {
  1362. BufferLength: 100,
  1363. SendError: true,
  1364. Qos: api.AtLeastOnce,
  1365. CheckpointInterval: 5000,
  1366. }, {
  1367. BufferLength: 100,
  1368. SendError: true,
  1369. Qos: api.ExactlyOnce,
  1370. CheckpointInterval: 5000,
  1371. },
  1372. }
  1373. byteFunc := func(result [][]byte) interface{} {
  1374. var maps [][]map[string]interface{}
  1375. for _, v := range result {
  1376. var mapRes []map[string][]byte
  1377. err := json.Unmarshal(v, &mapRes)
  1378. if err != nil {
  1379. panic("Failed to parse the input into map")
  1380. }
  1381. mapInt := make([]map[string]interface{}, len(mapRes))
  1382. for i, mv := range mapRes {
  1383. mapInt[i] = make(map[string]interface{})
  1384. // assume only one key
  1385. for k, v := range mv {
  1386. mapInt[i][k] = v
  1387. }
  1388. }
  1389. maps = append(maps, mapInt)
  1390. }
  1391. return maps
  1392. }
  1393. for j, opt := range options {
  1394. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  1395. }
  1396. }