rule_test.go 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "encoding/json"
  17. "testing"
  18. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. )
  21. func TestLimitSQL(t *testing.T) {
  22. // Reset
  23. streamList := []string{"demo", "demoArr", "demoArr2"}
  24. HandleStream(false, streamList, t)
  25. var r [][]map[string]interface{}
  26. tests := []RuleTest{
  27. {
  28. Name: "TestLimitSQL01",
  29. Sql: `SELECT unnest(demoArr2.arr) as col, demo.size FROM demo inner join demoArr2 on demo.size = demoArr2.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  30. R: [][]map[string]interface{}{
  31. {
  32. {
  33. "col": float64(1),
  34. "size": float64(1),
  35. },
  36. },
  37. },
  38. },
  39. {
  40. Name: "TestLimitSQL0",
  41. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  42. R: [][]map[string]interface{}{
  43. {
  44. {
  45. "col": float64(1),
  46. "size": float64(1),
  47. },
  48. },
  49. },
  50. },
  51. {
  52. Name: "TestLimitSQL1",
  53. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 0;`,
  54. R: r,
  55. },
  56. {
  57. Name: "TestLimitSQL2",
  58. Sql: `SELECT demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  59. R: [][]map[string]interface{}{
  60. {
  61. {
  62. "size": float64(1),
  63. },
  64. },
  65. },
  66. },
  67. {
  68. Name: "TestLimitSQL3",
  69. Sql: `SELECT demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 0;`,
  70. R: r,
  71. },
  72. }
  73. // Data setup
  74. HandleStream(true, streamList, t)
  75. options := []*api.RuleOption{
  76. {
  77. BufferLength: 100,
  78. SendError: true,
  79. },
  80. {
  81. BufferLength: 100,
  82. SendError: true,
  83. Qos: api.AtLeastOnce,
  84. CheckpointInterval: 5000,
  85. },
  86. {
  87. BufferLength: 100,
  88. SendError: true,
  89. Qos: api.ExactlyOnce,
  90. CheckpointInterval: 5000,
  91. },
  92. }
  93. for j, opt := range options {
  94. DoRuleTest(t, tests, j, opt, 0)
  95. }
  96. }
  97. func TestSRFSQL(t *testing.T) {
  98. // Reset
  99. streamList := []string{"demo", "demoArr"}
  100. HandleStream(false, streamList, t)
  101. tests := []RuleTest{
  102. {
  103. Name: "TestSingleSQLRule25",
  104. Sql: "SELECT unnest(a) from demoArr group by SESSIONWINDOW(ss, 2, 1);",
  105. R: [][]map[string]interface{}{
  106. {
  107. {
  108. "error": "the argument for the unnest function should be array",
  109. },
  110. },
  111. },
  112. },
  113. {
  114. Name: "TestSingleSQLRule24",
  115. Sql: "Select unnest(a) from demoArr;",
  116. R: [][]map[string]interface{}{
  117. {
  118. {
  119. "error": "the argument for the unnest function should be array",
  120. },
  121. },
  122. },
  123. },
  124. {
  125. Name: "TestSingleSQLRule21",
  126. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1);`,
  127. R: [][]map[string]interface{}{
  128. {
  129. {
  130. "col": float64(1),
  131. "size": float64(1),
  132. },
  133. {
  134. "col": float64(2),
  135. "size": float64(1),
  136. },
  137. {
  138. "col": float64(3),
  139. "size": float64(1),
  140. },
  141. },
  142. },
  143. },
  144. {
  145. Name: "TestSingleSQLRule22",
  146. Sql: `SELECT unnest(arr3) as col,y From demoArr group by y, SESSIONWINDOW(ss, 2, 1);`,
  147. R: [][]map[string]interface{}{
  148. {
  149. {
  150. "col": float64(1),
  151. "y": float64(2),
  152. },
  153. {
  154. "col": float64(2),
  155. "y": float64(2),
  156. },
  157. {
  158. "col": float64(3),
  159. "y": float64(2),
  160. },
  161. },
  162. },
  163. },
  164. {
  165. Name: "TestSingleSQLRule23",
  166. Sql: "SELECT unnest(arr3) as col,a from demoArr group by SESSIONWINDOW(ss, 2, 1);",
  167. R: [][]map[string]interface{}{
  168. {
  169. {
  170. "col": float64(1),
  171. "a": float64(6),
  172. },
  173. {
  174. "col": float64(2),
  175. "a": float64(6),
  176. },
  177. {
  178. "col": float64(3),
  179. "a": float64(6),
  180. },
  181. },
  182. },
  183. },
  184. {
  185. Name: `TestSingleSQLRule18`,
  186. Sql: `SELECT unnest(arr2) FROM demoArr where x=1`,
  187. R: [][]map[string]interface{}{
  188. {
  189. {
  190. "a": float64(1),
  191. "b": float64(2),
  192. },
  193. },
  194. {
  195. {
  196. "a": float64(3),
  197. "b": float64(4),
  198. },
  199. },
  200. },
  201. },
  202. // The mapping schema created by unnest function will cover the original column if they have the same column name
  203. {
  204. Name: `TestSingleSQLRule19`,
  205. Sql: `SELECT unnest(arr2),a FROM demoArr where x=1`,
  206. R: [][]map[string]interface{}{
  207. {
  208. {
  209. "a": float64(1),
  210. "b": float64(2),
  211. },
  212. },
  213. {
  214. {
  215. "a": float64(3),
  216. "b": float64(4),
  217. },
  218. },
  219. },
  220. },
  221. {
  222. Name: `TestSingleSQLRule20`,
  223. Sql: `SELECT unnest(arr3) as col FROM demoArr where x=1`,
  224. R: [][]map[string]interface{}{
  225. {
  226. {
  227. "col": float64(1),
  228. },
  229. },
  230. {
  231. {
  232. "col": float64(2),
  233. },
  234. },
  235. {
  236. {
  237. "col": float64(3),
  238. },
  239. },
  240. },
  241. },
  242. {
  243. Name: `TestSingleSQLRule21`,
  244. Sql: `SELECT unnest(arr2),x FROM demoArr where x=1`,
  245. R: [][]map[string]interface{}{
  246. {
  247. {
  248. "a": float64(1),
  249. "b": float64(2),
  250. "x": float64(1),
  251. },
  252. },
  253. {
  254. {
  255. "a": float64(3),
  256. "b": float64(4),
  257. "x": float64(1),
  258. },
  259. },
  260. },
  261. },
  262. }
  263. // Data setup
  264. HandleStream(true, streamList, t)
  265. options := []*api.RuleOption{
  266. {
  267. BufferLength: 100,
  268. SendError: true,
  269. }, {
  270. BufferLength: 100,
  271. SendError: true,
  272. Qos: api.AtLeastOnce,
  273. CheckpointInterval: 5000,
  274. }, {
  275. BufferLength: 100,
  276. SendError: true,
  277. Qos: api.ExactlyOnce,
  278. CheckpointInterval: 5000,
  279. },
  280. }
  281. for j, opt := range options {
  282. DoRuleTest(t, tests, j, opt, 0)
  283. }
  284. }
  285. func TestSingleSQL(t *testing.T) {
  286. // Reset
  287. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable", "demoArr"}
  288. HandleStream(false, streamList, t)
  289. // Data setup
  290. tests := []RuleTest{
  291. {
  292. Name: `TestAnalyzeFuncAlias1`,
  293. Sql: `SELECT lag(size,1,0) + 1 as b, lag(b,1,0),size FROM demo Group by COUNTWINDOW(5)`,
  294. R: [][]map[string]interface{}{
  295. {
  296. {
  297. "b": float64(1),
  298. "lag": float64(0),
  299. "size": float64(3),
  300. },
  301. {
  302. "b": float64(4),
  303. "lag": float64(1),
  304. "size": float64(6),
  305. },
  306. {
  307. "b": float64(7),
  308. "lag": float64(4),
  309. "size": float64(2),
  310. },
  311. {
  312. "b": float64(3),
  313. "lag": float64(7),
  314. "size": float64(4),
  315. },
  316. {
  317. "b": float64(5),
  318. "lag": float64(3),
  319. "size": float64(1),
  320. },
  321. },
  322. },
  323. },
  324. {
  325. Name: `TestAnalyzeFuncAlias2`,
  326. Sql: `SELECT lag(size,1,0) + 1 as b, lag(b,1,0),size FROM demo`,
  327. R: [][]map[string]interface{}{
  328. {
  329. {
  330. "b": float64(1),
  331. "lag": float64(0),
  332. "size": float64(3),
  333. },
  334. },
  335. {
  336. {
  337. "b": float64(4),
  338. "lag": float64(1),
  339. "size": float64(6),
  340. },
  341. },
  342. {
  343. {
  344. "b": float64(7),
  345. "lag": float64(4),
  346. "size": float64(2),
  347. },
  348. },
  349. {
  350. {
  351. "b": float64(3),
  352. "lag": float64(7),
  353. "size": float64(4),
  354. },
  355. },
  356. {
  357. {
  358. "b": float64(5),
  359. "lag": float64(3),
  360. "size": float64(1),
  361. },
  362. },
  363. },
  364. },
  365. {
  366. Name: `TestSingleSQLRule0`,
  367. Sql: `SELECT arr[x:y+1] as col1 FROM demoArr where x=1`,
  368. R: [][]map[string]interface{}{
  369. {{
  370. "col1": []interface{}{
  371. float64(2), float64(3),
  372. },
  373. }},
  374. },
  375. },
  376. {
  377. Name: `TestSingleSQLRule1`,
  378. Sql: `SELECT *, upper(color), event_time() FROM demo`,
  379. R: [][]map[string]interface{}{
  380. {{
  381. "color": "red",
  382. "size": float64(3),
  383. "ts": float64(1541152486013),
  384. "upper": "RED",
  385. "event_time": float64(1541152486013),
  386. }},
  387. {{
  388. "color": "blue",
  389. "size": float64(6),
  390. "ts": float64(1541152486822),
  391. "upper": "BLUE",
  392. "event_time": float64(1541152486822),
  393. }},
  394. {{
  395. "color": "blue",
  396. "size": float64(2),
  397. "ts": float64(1541152487632),
  398. "upper": "BLUE",
  399. "event_time": float64(1541152487632),
  400. }},
  401. {{
  402. "color": "yellow",
  403. "size": float64(4),
  404. "ts": float64(1541152488442),
  405. "upper": "YELLOW",
  406. "event_time": float64(1541152488442),
  407. }},
  408. {{
  409. "color": "red",
  410. "size": float64(1),
  411. "ts": float64(1541152489252),
  412. "upper": "RED",
  413. "event_time": float64(1541152489252),
  414. }},
  415. },
  416. M: map[string]interface{}{
  417. "op_2_project_0_exceptions_total": int64(0),
  418. "op_2_project_0_process_latency_us": int64(0),
  419. "op_2_project_0_records_in_total": int64(5),
  420. "op_2_project_0_records_out_total": int64(5),
  421. "sink_mockSink_0_exceptions_total": int64(0),
  422. "sink_mockSink_0_records_in_total": int64(5),
  423. "sink_mockSink_0_records_out_total": int64(5),
  424. "source_demo_0_exceptions_total": int64(0),
  425. "source_demo_0_records_in_total": int64(5),
  426. "source_demo_0_records_out_total": int64(5),
  427. },
  428. T: &api.PrintableTopo{
  429. Sources: []string{"source_demo"},
  430. Edges: map[string][]interface{}{
  431. "source_demo": {"op_2_project"},
  432. "op_2_project": {"sink_mockSink"},
  433. },
  434. },
  435. },
  436. {
  437. Name: `TestSingleSQLRule2`,
  438. Sql: `SELECT color, ts, last_hit_count() + 1 as lc FROM demo where size > 3`,
  439. R: [][]map[string]interface{}{
  440. {{
  441. "color": "blue",
  442. "ts": float64(1541152486822),
  443. "lc": float64(1),
  444. }},
  445. {{
  446. "color": "yellow",
  447. "ts": float64(1541152488442),
  448. "lc": float64(2),
  449. }},
  450. },
  451. M: map[string]interface{}{
  452. "op_3_project_0_exceptions_total": int64(0),
  453. "op_3_project_0_process_latency_us": int64(0),
  454. "op_3_project_0_records_in_total": int64(2),
  455. "op_3_project_0_records_out_total": int64(2),
  456. "sink_mockSink_0_exceptions_total": int64(0),
  457. "sink_mockSink_0_records_in_total": int64(2),
  458. "sink_mockSink_0_records_out_total": int64(2),
  459. "source_demo_0_exceptions_total": int64(0),
  460. "source_demo_0_records_in_total": int64(5),
  461. "source_demo_0_records_out_total": int64(5),
  462. "op_2_filter_0_exceptions_total": int64(0),
  463. "op_2_filter_0_process_latency_us": int64(0),
  464. "op_2_filter_0_records_in_total": int64(5),
  465. "op_2_filter_0_records_out_total": int64(2),
  466. },
  467. },
  468. {
  469. Name: `TestSingleSQLRule3`,
  470. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  471. R: [][]map[string]interface{}{
  472. {{
  473. "Int8": float64(6),
  474. "ts": float64(1541152486822),
  475. }},
  476. {{
  477. "Int8": float64(4),
  478. "ts": float64(1541152488442),
  479. }},
  480. },
  481. M: map[string]interface{}{
  482. "op_3_project_0_exceptions_total": int64(0),
  483. "op_3_project_0_process_latency_us": int64(0),
  484. "op_3_project_0_records_in_total": int64(2),
  485. "op_3_project_0_records_out_total": int64(2),
  486. "sink_mockSink_0_exceptions_total": int64(0),
  487. "sink_mockSink_0_records_in_total": int64(2),
  488. "sink_mockSink_0_records_out_total": int64(2),
  489. "source_demo_0_exceptions_total": int64(0),
  490. "source_demo_0_records_in_total": int64(5),
  491. "source_demo_0_records_out_total": int64(5),
  492. "op_2_filter_0_exceptions_total": int64(0),
  493. "op_2_filter_0_process_latency_us": int64(0),
  494. "op_2_filter_0_records_in_total": int64(5),
  495. "op_2_filter_0_records_out_total": int64(2),
  496. },
  497. },
  498. {
  499. Name: `TestSingleSQLRule4`,
  500. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  501. R: [][]map[string]interface{}{
  502. {{
  503. "error": "error in preprocessor: field size type mismatch: cannot convert string(red) to int64",
  504. }},
  505. {{
  506. "Int8": float64(6),
  507. "ts": float64(1541152486822),
  508. }},
  509. {{
  510. "Int8": float64(4),
  511. "ts": float64(1541152488442),
  512. }},
  513. {{
  514. "error": "error in preprocessor: field size type mismatch: cannot convert string(blue) to int64",
  515. }},
  516. },
  517. M: map[string]interface{}{
  518. "op_3_project_0_exceptions_total": int64(2),
  519. "op_3_project_0_process_latency_us": int64(0),
  520. "op_3_project_0_records_in_total": int64(2),
  521. "op_3_project_0_records_out_total": int64(2),
  522. "sink_mockSink_0_exceptions_total": int64(0),
  523. "sink_mockSink_0_records_in_total": int64(4),
  524. "sink_mockSink_0_records_out_total": int64(4),
  525. "source_demoError_0_exceptions_total": int64(2),
  526. "source_demoError_0_records_in_total": int64(5),
  527. "source_demoError_0_records_out_total": int64(3),
  528. "op_2_filter_0_exceptions_total": int64(2),
  529. "op_2_filter_0_process_latency_us": int64(0),
  530. "op_2_filter_0_records_in_total": int64(3),
  531. "op_2_filter_0_records_out_total": int64(2),
  532. },
  533. },
  534. {
  535. Name: `TestSingleSQLRule5`,
  536. Sql: `SELECT meta(topic) as m, ts FROM demo WHERE last_hit_count() < 4`,
  537. R: [][]map[string]interface{}{
  538. {{
  539. "m": "mock",
  540. "ts": float64(1541152486013),
  541. }},
  542. {{
  543. "m": "mock",
  544. "ts": float64(1541152486822),
  545. }},
  546. {{
  547. "m": "mock",
  548. "ts": float64(1541152487632),
  549. }},
  550. {{
  551. "m": "mock",
  552. "ts": float64(1541152488442),
  553. }},
  554. },
  555. M: map[string]interface{}{
  556. "sink_mockSink_0_exceptions_total": int64(0),
  557. "sink_mockSink_0_records_in_total": int64(4),
  558. "sink_mockSink_0_records_out_total": int64(4),
  559. "source_demo_0_exceptions_total": int64(0),
  560. "source_demo_0_records_in_total": int64(5),
  561. "source_demo_0_records_out_total": int64(5),
  562. },
  563. },
  564. {
  565. Name: `TestSingleSQLRule6`,
  566. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  567. R: [][]map[string]interface{}{
  568. {{
  569. "color": "blue",
  570. "ts": float64(1541152486822),
  571. }},
  572. {{
  573. "color": "yellow",
  574. "ts": float64(1541152488442),
  575. }},
  576. },
  577. M: map[string]interface{}{
  578. "op_3_project_0_exceptions_total": int64(0),
  579. "op_3_project_0_process_latency_us": int64(0),
  580. "op_3_project_0_records_in_total": int64(2),
  581. "op_3_project_0_records_out_total": int64(2),
  582. "sink_mockSink_0_exceptions_total": int64(0),
  583. "sink_mockSink_0_records_in_total": int64(2),
  584. "sink_mockSink_0_records_out_total": int64(2),
  585. "source_demo_0_exceptions_total": int64(0),
  586. "source_demo_0_records_in_total": int64(5),
  587. "source_demo_0_records_out_total": int64(5),
  588. "op_2_filter_0_exceptions_total": int64(0),
  589. "op_2_filter_0_process_latency_us": int64(0),
  590. "op_2_filter_0_records_in_total": int64(5),
  591. "op_2_filter_0_records_out_total": int64(2),
  592. },
  593. },
  594. {
  595. Name: `TestSingleSQLRule7`,
  596. Sql: "SELECT `from` FROM demo1",
  597. R: [][]map[string]interface{}{
  598. {{
  599. "from": "device1",
  600. }},
  601. {{
  602. "from": "device2",
  603. }},
  604. {{
  605. "from": "device3",
  606. }},
  607. {{
  608. "from": "device1",
  609. }},
  610. {{
  611. "from": "device3",
  612. }},
  613. },
  614. M: map[string]interface{}{
  615. "op_2_project_0_exceptions_total": int64(0),
  616. "op_2_project_0_process_latency_us": int64(0),
  617. "op_2_project_0_records_in_total": int64(5),
  618. "op_2_project_0_records_out_total": int64(5),
  619. "sink_mockSink_0_exceptions_total": int64(0),
  620. "sink_mockSink_0_records_in_total": int64(5),
  621. "sink_mockSink_0_records_out_total": int64(5),
  622. "source_demo1_0_exceptions_total": int64(0),
  623. "source_demo1_0_records_in_total": int64(5),
  624. "source_demo1_0_records_out_total": int64(5),
  625. },
  626. },
  627. {
  628. Name: `TestSingleSQLRule8`,
  629. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  630. R: [][]map[string]interface{}{
  631. {{
  632. "temp": float64(25.5),
  633. "hum": float64(65),
  634. "from": "device1",
  635. "ts": float64(1541152486013),
  636. }},
  637. {{
  638. "temp": float64(27.4),
  639. "hum": float64(80),
  640. "from": "device1",
  641. "ts": float64(1541152488442),
  642. }},
  643. },
  644. M: map[string]interface{}{
  645. "op_3_project_0_exceptions_total": int64(0),
  646. "op_3_project_0_process_latency_us": int64(0),
  647. "op_3_project_0_records_in_total": int64(2),
  648. "op_3_project_0_records_out_total": int64(2),
  649. "op_2_filter_0_exceptions_total": int64(0),
  650. "op_2_filter_0_process_latency_us": int64(0),
  651. "op_2_filter_0_records_in_total": int64(5),
  652. "op_2_filter_0_records_out_total": int64(2),
  653. "sink_mockSink_0_exceptions_total": int64(0),
  654. "sink_mockSink_0_records_in_total": int64(2),
  655. "sink_mockSink_0_records_out_total": int64(2),
  656. "source_demo1_0_exceptions_total": int64(0),
  657. "source_demo1_0_records_in_total": int64(5),
  658. "source_demo1_0_records_out_total": int64(5),
  659. },
  660. },
  661. {
  662. Name: `TestSingleSQLRule9`,
  663. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  664. R: [][]map[string]interface{}{
  665. {{
  666. "color": "red",
  667. "s": "M",
  668. "ts": float64(1541152486013),
  669. }},
  670. {{
  671. "color": "blue",
  672. "s": "L",
  673. "ts": float64(1541152486822),
  674. }},
  675. {{
  676. "color": "blue",
  677. "s": "M",
  678. "ts": float64(1541152487632),
  679. }},
  680. {{
  681. "color": "yellow",
  682. "s": "L",
  683. "ts": float64(1541152488442),
  684. }},
  685. {{
  686. "color": "red",
  687. "s": "S",
  688. "ts": float64(1541152489252),
  689. }},
  690. },
  691. M: map[string]interface{}{
  692. "op_2_project_0_exceptions_total": int64(0),
  693. "op_2_project_0_process_latency_us": int64(0),
  694. "op_2_project_0_records_in_total": int64(5),
  695. "op_2_project_0_records_out_total": int64(5),
  696. "sink_mockSink_0_exceptions_total": int64(0),
  697. "sink_mockSink_0_records_in_total": int64(5),
  698. "sink_mockSink_0_records_out_total": int64(5),
  699. "source_demo_0_exceptions_total": int64(0),
  700. "source_demo_0_records_in_total": int64(5),
  701. "source_demo_0_records_out_total": int64(5),
  702. },
  703. T: &api.PrintableTopo{
  704. Sources: []string{"source_demo"},
  705. Edges: map[string][]interface{}{
  706. "source_demo": {"op_2_project"},
  707. "op_2_project": {"sink_mockSink"},
  708. },
  709. },
  710. },
  711. {
  712. Name: `TestSingleSQLRule10`,
  713. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  714. R: [][]map[string]interface{}{
  715. {{
  716. "id": float64(1541152486013),
  717. "name": "name1",
  718. "color": "red",
  719. "size": float64(3),
  720. "ts": float64(1541152486013),
  721. }},
  722. {{
  723. "id": float64(1541152487632),
  724. "name": "name2",
  725. "color": "blue",
  726. "size": float64(2),
  727. "ts": float64(1541152487632),
  728. }},
  729. {{
  730. "id": float64(1541152489252),
  731. "name": "name3",
  732. "color": "red",
  733. "size": float64(1),
  734. "ts": float64(1541152489252),
  735. }},
  736. },
  737. W: 15,
  738. M: map[string]interface{}{
  739. "op_3_join_aligner_0_records_in_total": int64(6),
  740. "op_3_join_aligner_0_records_out_total": int64(5),
  741. "op_4_join_0_exceptions_total": int64(0),
  742. "op_4_join_0_records_in_total": int64(5),
  743. "op_4_join_0_records_out_total": int64(3),
  744. "op_5_project_0_exceptions_total": int64(0),
  745. "op_5_project_0_records_in_total": int64(3),
  746. "op_5_project_0_records_out_total": int64(3),
  747. "sink_mockSink_0_exceptions_total": int64(0),
  748. "sink_mockSink_0_records_in_total": int64(3),
  749. "sink_mockSink_0_records_out_total": int64(3),
  750. "source_demo_0_exceptions_total": int64(0),
  751. "source_demo_0_records_in_total": int64(5),
  752. "source_demo_0_records_out_total": int64(5),
  753. "source_table1_0_exceptions_total": int64(0),
  754. "source_table1_0_records_in_total": int64(4),
  755. "source_table1_0_records_out_total": int64(1),
  756. },
  757. },
  758. {
  759. Name: `TestSingleSQLRule11`,
  760. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  761. R: [][]map[string]interface{}{
  762. {{
  763. "device": "device2",
  764. }},
  765. {{
  766. "device": "device4",
  767. }},
  768. {{
  769. "device": "device5",
  770. }},
  771. },
  772. M: map[string]interface{}{
  773. "op_3_join_aligner_0_records_in_total": int64(10),
  774. "op_3_join_aligner_0_records_out_total": int64(5),
  775. "op_4_join_0_exceptions_total": int64(0),
  776. "op_4_join_0_records_in_total": int64(5),
  777. "op_4_join_0_records_out_total": int64(3),
  778. "op_5_project_0_exceptions_total": int64(0),
  779. "op_5_project_0_records_in_total": int64(3),
  780. "op_5_project_0_records_out_total": int64(3),
  781. "sink_mockSink_0_exceptions_total": int64(0),
  782. "sink_mockSink_0_records_in_total": int64(3),
  783. "sink_mockSink_0_records_out_total": int64(3),
  784. "source_demo_0_exceptions_total": int64(0),
  785. "source_demo_0_records_in_total": int64(5),
  786. "source_demo_0_records_out_total": int64(5),
  787. "source_demoTable_0_exceptions_total": int64(0),
  788. "source_demoTable_0_records_in_total": int64(5),
  789. "source_demoTable_0_records_out_total": int64(5),
  790. },
  791. },
  792. {
  793. Name: `TestSingleSQLRule12`,
  794. Sql: "SELECT demo.ts as demoTs, table1.id as table1Id FROM demo INNER JOIN table1 on demoTs = table1Id",
  795. R: [][]map[string]interface{}{
  796. {{
  797. "table1Id": float64(1541152486013),
  798. "demoTs": float64(1541152486013),
  799. }},
  800. {{
  801. "table1Id": float64(1541152487632),
  802. "demoTs": float64(1541152487632),
  803. }},
  804. {{
  805. "table1Id": float64(1541152489252),
  806. "demoTs": float64(1541152489252),
  807. }},
  808. },
  809. W: 15,
  810. M: map[string]interface{}{
  811. "op_3_join_aligner_0_records_in_total": int64(6),
  812. "op_3_join_aligner_0_records_out_total": int64(5),
  813. "op_4_join_0_exceptions_total": int64(0),
  814. "op_4_join_0_records_in_total": int64(5),
  815. "op_4_join_0_records_out_total": int64(3),
  816. "op_5_project_0_exceptions_total": int64(0),
  817. "op_5_project_0_records_in_total": int64(3),
  818. "op_5_project_0_records_out_total": int64(3),
  819. "sink_mockSink_0_exceptions_total": int64(0),
  820. "sink_mockSink_0_records_in_total": int64(3),
  821. "sink_mockSink_0_records_out_total": int64(3),
  822. "source_demo_0_exceptions_total": int64(0),
  823. "source_demo_0_records_in_total": int64(5),
  824. "source_demo_0_records_out_total": int64(5),
  825. "source_table1_0_exceptions_total": int64(0),
  826. "source_table1_0_records_in_total": int64(4),
  827. "source_table1_0_records_out_total": int64(1),
  828. },
  829. },
  830. {
  831. Name: `TestChanged13`,
  832. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demo",
  833. R: [][]map[string]interface{}{
  834. {{
  835. "tt_color": "red",
  836. "tt_size": float64(3),
  837. }},
  838. {{
  839. "tt_color": "blue",
  840. "tt_size": float64(6),
  841. }},
  842. {{
  843. "tt_size": float64(2),
  844. }},
  845. {{
  846. "tt_color": "yellow",
  847. "tt_size": float64(4),
  848. }},
  849. {{
  850. "tt_color": "red",
  851. "tt_size": float64(1),
  852. }},
  853. },
  854. M: map[string]interface{}{
  855. "op_2_project_0_exceptions_total": int64(0),
  856. "op_2_project_0_process_latency_us": int64(0),
  857. "op_2_project_0_records_in_total": int64(5),
  858. "op_2_project_0_records_out_total": int64(5),
  859. "sink_mockSink_0_exceptions_total": int64(0),
  860. "sink_mockSink_0_records_in_total": int64(5),
  861. "sink_mockSink_0_records_out_total": int64(5),
  862. "source_demo_0_exceptions_total": int64(0),
  863. "source_demo_0_records_in_total": int64(5),
  864. "source_demo_0_records_out_total": int64(5),
  865. },
  866. },
  867. {
  868. Name: `TestAliasOrderBy14`,
  869. Sql: "SELECT color, count(*) as c FROM demo where color != \"red\" GROUP BY COUNTWINDOW(5), color Order by c DESC",
  870. R: [][]map[string]interface{}{
  871. {
  872. {
  873. "color": "blue",
  874. "c": float64(2),
  875. },
  876. {
  877. "color": "yellow",
  878. "c": float64(1),
  879. },
  880. },
  881. },
  882. M: map[string]interface{}{
  883. "op_6_project_0_exceptions_total": int64(0),
  884. "op_6_project_0_process_latency_us": int64(0),
  885. "op_6_project_0_records_in_total": int64(1),
  886. "op_6_project_0_records_out_total": int64(1),
  887. "sink_mockSink_0_exceptions_total": int64(0),
  888. "sink_mockSink_0_records_in_total": int64(1),
  889. "sink_mockSink_0_records_out_total": int64(1),
  890. "source_demo_0_exceptions_total": int64(0),
  891. "source_demo_0_records_in_total": int64(5),
  892. "source_demo_0_records_out_total": int64(5),
  893. },
  894. },
  895. {
  896. Name: `TestSingleSQLRule17`,
  897. Sql: `SELECT arr[x:4] as col1 FROM demoArr where x=1`,
  898. R: [][]map[string]interface{}{
  899. {{
  900. "col1": []interface{}{
  901. float64(2), float64(3), float64(4),
  902. },
  903. }},
  904. },
  905. },
  906. {
  907. Name: `TestSingleSQLRule16`,
  908. Sql: `SELECT arr[1:y] as col1 FROM demoArr where x=1`,
  909. R: [][]map[string]interface{}{
  910. {{
  911. "col1": []interface{}{
  912. float64(2),
  913. },
  914. }},
  915. },
  916. },
  917. {
  918. Name: `TestSingleSQLRule15`,
  919. Sql: `SELECT arr[1] as col1 FROM demoArr where x=1`,
  920. R: [][]map[string]interface{}{
  921. {{
  922. "col1": float64(2),
  923. }},
  924. },
  925. },
  926. {
  927. Name: `TestLagAlias`,
  928. Sql: "SELECT lag(size) as lastSize, lag(had_changed(true,size)), size, lastSize/size as changeRate FROM demo WHERE size > 2",
  929. R: [][]map[string]interface{}{
  930. {{
  931. "size": float64(3),
  932. }},
  933. {{
  934. "lastSize": float64(3),
  935. "size": float64(6),
  936. "lag": true,
  937. "changeRate": float64(0),
  938. }},
  939. {{
  940. "lastSize": float64(2),
  941. "size": float64(4),
  942. "lag": true,
  943. "changeRate": float64(0),
  944. }},
  945. },
  946. M: map[string]interface{}{
  947. "sink_mockSink_0_exceptions_total": int64(0),
  948. "sink_mockSink_0_records_in_total": int64(3),
  949. "sink_mockSink_0_records_out_total": int64(3),
  950. "source_demo_0_exceptions_total": int64(0),
  951. "source_demo_0_records_in_total": int64(5),
  952. "source_demo_0_records_out_total": int64(5),
  953. },
  954. },
  955. {
  956. Name: `TestLagPartition`,
  957. Sql: "SELECT color, lag(size) over (partition by color) as lastSize, size, lastSize/size as changeRate FROM demo",
  958. R: [][]map[string]interface{}{
  959. {{
  960. "color": "red",
  961. "size": float64(3),
  962. }},
  963. {{
  964. "color": "blue",
  965. "size": float64(6),
  966. }},
  967. {{
  968. "color": "blue",
  969. "lastSize": float64(6),
  970. "size": float64(2),
  971. "changeRate": float64(3),
  972. }},
  973. {{
  974. "color": "yellow",
  975. "size": float64(4),
  976. }},
  977. {{
  978. "color": "red",
  979. "lastSize": float64(3),
  980. "size": float64(1),
  981. "changeRate": float64(3),
  982. }},
  983. },
  984. M: map[string]interface{}{
  985. "sink_mockSink_0_exceptions_total": int64(0),
  986. "sink_mockSink_0_records_in_total": int64(5),
  987. "sink_mockSink_0_records_out_total": int64(5),
  988. "source_demo_0_exceptions_total": int64(0),
  989. "source_demo_0_records_in_total": int64(5),
  990. "source_demo_0_records_out_total": int64(5),
  991. },
  992. },
  993. }
  994. HandleStream(true, streamList, t)
  995. options := []*api.RuleOption{
  996. {
  997. BufferLength: 100,
  998. SendError: true,
  999. },
  1000. {
  1001. BufferLength: 100,
  1002. SendError: true,
  1003. Qos: api.AtLeastOnce,
  1004. CheckpointInterval: 5000,
  1005. },
  1006. {
  1007. BufferLength: 100,
  1008. SendError: true,
  1009. Qos: api.ExactlyOnce,
  1010. CheckpointInterval: 5000,
  1011. },
  1012. }
  1013. for j, opt := range options {
  1014. DoRuleTest(t, tests, j, opt, 0)
  1015. }
  1016. }
  1017. func TestSingleSQLWithEventTime(t *testing.T) {
  1018. // Reset
  1019. streamList := []string{"demoE"}
  1020. HandleStream(false, streamList, t)
  1021. // Data setup
  1022. tests := []RuleTest{
  1023. {
  1024. Name: `TestSingleSQLRule1`,
  1025. Sql: `SELECT *, upper(color) FROM demoE`,
  1026. R: [][]map[string]interface{}{
  1027. {{
  1028. "color": "red",
  1029. "size": float64(3),
  1030. "ts": float64(1541152486013),
  1031. "upper": "RED",
  1032. }},
  1033. {{
  1034. "color": "blue",
  1035. "size": float64(2),
  1036. "ts": float64(1541152487632),
  1037. "upper": "BLUE",
  1038. }},
  1039. {{
  1040. "color": "yellow",
  1041. "size": float64(4),
  1042. "ts": float64(1541152488442),
  1043. "upper": "YELLOW",
  1044. }},
  1045. {{
  1046. "color": "red",
  1047. "size": float64(1),
  1048. "ts": float64(1541152489252),
  1049. "upper": "RED",
  1050. }},
  1051. },
  1052. M: map[string]interface{}{
  1053. "op_3_project_0_exceptions_total": int64(0),
  1054. "op_3_project_0_process_latency_us": int64(0),
  1055. "op_3_project_0_records_in_total": int64(4),
  1056. "op_3_project_0_records_out_total": int64(4),
  1057. "sink_mockSink_0_exceptions_total": int64(0),
  1058. "sink_mockSink_0_records_in_total": int64(4),
  1059. "sink_mockSink_0_records_out_total": int64(4),
  1060. "source_demoE_0_exceptions_total": int64(0),
  1061. "source_demoE_0_records_in_total": int64(6),
  1062. "source_demoE_0_records_out_total": int64(6),
  1063. },
  1064. T: &api.PrintableTopo{
  1065. Sources: []string{"source_demoE"},
  1066. Edges: map[string][]interface{}{
  1067. "source_demoE": {"op_2_watermark"},
  1068. "op_2_watermark": {"op_3_project"},
  1069. "op_3_project": {"sink_mockSink"},
  1070. },
  1071. },
  1072. },
  1073. {
  1074. Name: `TestStateFunc`,
  1075. Sql: `SELECT *, last_hit_time() as lt, last_hit_count() as lc, event_time() as et FROM demoE WHERE size < 3 AND lc < 2`,
  1076. R: [][]map[string]interface{}{
  1077. {{
  1078. "color": "blue",
  1079. "size": float64(2),
  1080. "ts": float64(1541152487632),
  1081. "lc": float64(0),
  1082. "lt": float64(0),
  1083. "et": float64(1541152487632),
  1084. }},
  1085. {{
  1086. "color": "red",
  1087. "size": float64(1),
  1088. "ts": float64(1541152489252),
  1089. "lc": float64(1),
  1090. "lt": float64(1541152487632),
  1091. "et": float64(1541152489252),
  1092. }},
  1093. },
  1094. M: map[string]interface{}{
  1095. "sink_mockSink_0_exceptions_total": int64(0),
  1096. "sink_mockSink_0_records_in_total": int64(2),
  1097. "sink_mockSink_0_records_out_total": int64(2),
  1098. "source_demoE_0_exceptions_total": int64(0),
  1099. "source_demoE_0_records_in_total": int64(6),
  1100. "source_demoE_0_records_out_total": int64(6),
  1101. },
  1102. },
  1103. {
  1104. Name: `TestChanged`,
  1105. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demoE",
  1106. R: [][]map[string]interface{}{
  1107. {{
  1108. "tt_color": "red",
  1109. "tt_size": float64(3),
  1110. }},
  1111. {{
  1112. "tt_color": "blue",
  1113. "tt_size": float64(2),
  1114. }},
  1115. {{
  1116. "tt_color": "yellow",
  1117. "tt_size": float64(4),
  1118. }},
  1119. {{
  1120. "tt_color": "red",
  1121. "tt_size": float64(1),
  1122. }},
  1123. },
  1124. M: map[string]interface{}{
  1125. "op_3_project_0_exceptions_total": int64(0),
  1126. "op_3_project_0_process_latency_us": int64(0),
  1127. "op_3_project_0_records_in_total": int64(4),
  1128. "op_3_project_0_records_out_total": int64(4),
  1129. "sink_mockSink_0_exceptions_total": int64(0),
  1130. "sink_mockSink_0_records_in_total": int64(4),
  1131. "sink_mockSink_0_records_out_total": int64(4),
  1132. "source_demoE_0_exceptions_total": int64(0),
  1133. "source_demoE_0_records_in_total": int64(6),
  1134. "source_demoE_0_records_out_total": int64(6),
  1135. },
  1136. },
  1137. }
  1138. HandleStream(true, streamList, t)
  1139. options := []*api.RuleOption{
  1140. {
  1141. BufferLength: 100,
  1142. SendError: true,
  1143. IsEventTime: true,
  1144. LateTol: 1000,
  1145. }, {
  1146. BufferLength: 100,
  1147. SendError: true,
  1148. Qos: api.AtLeastOnce,
  1149. CheckpointInterval: 5000,
  1150. IsEventTime: true,
  1151. LateTol: 1000,
  1152. }, {
  1153. BufferLength: 100,
  1154. SendError: true,
  1155. Qos: api.ExactlyOnce,
  1156. CheckpointInterval: 5000,
  1157. IsEventTime: true,
  1158. LateTol: 1000,
  1159. },
  1160. }
  1161. for j, opt := range options {
  1162. DoRuleTest(t, tests, j, opt, 0)
  1163. }
  1164. }
  1165. func TestSingleSQLError(t *testing.T) {
  1166. // Reset
  1167. streamList := []string{"ldemo"}
  1168. HandleStream(false, streamList, t)
  1169. // Data setup
  1170. tests := []RuleTest{
  1171. {
  1172. Name: `TestSingleSQLErrorRule1`,
  1173. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1174. R: [][]map[string]interface{}{
  1175. {{
  1176. "color": "red",
  1177. "ts": float64(1541152486013),
  1178. }},
  1179. {{
  1180. "error": "run Where error: invalid operation string(string) >= int64(3)",
  1181. }},
  1182. {{
  1183. "ts": float64(1541152487632),
  1184. }},
  1185. },
  1186. M: map[string]interface{}{
  1187. "op_3_project_0_exceptions_total": int64(1),
  1188. "op_3_project_0_process_latency_us": int64(0),
  1189. "op_3_project_0_records_in_total": int64(2),
  1190. "op_3_project_0_records_out_total": int64(2),
  1191. "sink_mockSink_0_exceptions_total": int64(0),
  1192. "sink_mockSink_0_records_in_total": int64(3),
  1193. "sink_mockSink_0_records_out_total": int64(3),
  1194. "source_ldemo_0_exceptions_total": int64(0),
  1195. "source_ldemo_0_records_in_total": int64(5),
  1196. "source_ldemo_0_records_out_total": int64(5),
  1197. "op_2_filter_0_exceptions_total": int64(1),
  1198. "op_2_filter_0_process_latency_us": int64(0),
  1199. "op_2_filter_0_records_in_total": int64(5),
  1200. "op_2_filter_0_records_out_total": int64(2),
  1201. },
  1202. }, {
  1203. Name: `TestSingleSQLErrorRule2`,
  1204. Sql: `SELECT size * 5 FROM ldemo`,
  1205. R: [][]map[string]interface{}{
  1206. {{
  1207. "kuiper_field_0": float64(15),
  1208. }},
  1209. {{
  1210. "error": "run Select error: invalid operation string(string) * int64(5)",
  1211. }},
  1212. {{
  1213. "kuiper_field_0": float64(15),
  1214. }},
  1215. {{
  1216. "kuiper_field_0": float64(10),
  1217. }},
  1218. {{}},
  1219. },
  1220. M: map[string]interface{}{
  1221. "op_2_project_0_exceptions_total": int64(1),
  1222. "op_2_project_0_process_latency_us": int64(0),
  1223. "op_2_project_0_records_in_total": int64(5),
  1224. "op_2_project_0_records_out_total": int64(4),
  1225. "sink_mockSink_0_exceptions_total": int64(0),
  1226. "sink_mockSink_0_records_in_total": int64(5),
  1227. "sink_mockSink_0_records_out_total": int64(5),
  1228. "source_ldemo_0_exceptions_total": int64(0),
  1229. "source_ldemo_0_records_in_total": int64(5),
  1230. "source_ldemo_0_records_out_total": int64(5),
  1231. },
  1232. },
  1233. }
  1234. HandleStream(true, streamList, t)
  1235. DoRuleTest(t, tests, 0, &api.RuleOption{
  1236. BufferLength: 100,
  1237. SendError: true,
  1238. }, 0)
  1239. }
  1240. func TestSingleSQLOmitError(t *testing.T) {
  1241. // Reset
  1242. streamList := []string{"ldemo"}
  1243. HandleStream(false, streamList, t)
  1244. // Data setup
  1245. tests := []RuleTest{
  1246. {
  1247. Name: `TestSingleSQLErrorRule1`,
  1248. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1249. R: [][]map[string]interface{}{
  1250. {{
  1251. "color": "red",
  1252. "ts": float64(1541152486013),
  1253. }},
  1254. {{
  1255. "ts": float64(1541152487632),
  1256. }},
  1257. },
  1258. M: map[string]interface{}{
  1259. "op_3_project_0_exceptions_total": int64(0),
  1260. "op_3_project_0_process_latency_us": int64(0),
  1261. "op_3_project_0_records_in_total": int64(2),
  1262. "op_3_project_0_records_out_total": int64(2),
  1263. "sink_mockSink_0_exceptions_total": int64(0),
  1264. "sink_mockSink_0_records_in_total": int64(2),
  1265. "sink_mockSink_0_records_out_total": int64(2),
  1266. "source_ldemo_0_exceptions_total": int64(0),
  1267. "source_ldemo_0_records_in_total": int64(5),
  1268. "source_ldemo_0_records_out_total": int64(5),
  1269. "op_2_filter_0_exceptions_total": int64(1),
  1270. "op_2_filter_0_process_latency_us": int64(0),
  1271. "op_2_filter_0_records_in_total": int64(5),
  1272. "op_2_filter_0_records_out_total": int64(2),
  1273. },
  1274. }, {
  1275. Name: `TestSingleSQLErrorRule2`,
  1276. Sql: `SELECT size * 5 FROM ldemo`,
  1277. R: [][]map[string]interface{}{
  1278. {{
  1279. "kuiper_field_0": float64(15),
  1280. }},
  1281. {{
  1282. "kuiper_field_0": float64(15),
  1283. }},
  1284. {{
  1285. "kuiper_field_0": float64(10),
  1286. }},
  1287. {{}},
  1288. },
  1289. M: map[string]interface{}{
  1290. "op_2_project_0_exceptions_total": int64(1),
  1291. "op_2_project_0_process_latency_us": int64(0),
  1292. "op_2_project_0_records_in_total": int64(5),
  1293. "op_2_project_0_records_out_total": int64(4),
  1294. "sink_mockSink_0_exceptions_total": int64(0),
  1295. "sink_mockSink_0_records_in_total": int64(4),
  1296. "sink_mockSink_0_records_out_total": int64(4),
  1297. "source_ldemo_0_exceptions_total": int64(0),
  1298. "source_ldemo_0_records_in_total": int64(5),
  1299. "source_ldemo_0_records_out_total": int64(5),
  1300. },
  1301. },
  1302. }
  1303. HandleStream(true, streamList, t)
  1304. DoRuleTest(t, tests, 0, &api.RuleOption{
  1305. BufferLength: 100,
  1306. SendError: false,
  1307. }, 0)
  1308. }
  1309. func TestSingleSQLTemplate(t *testing.T) {
  1310. // Reset
  1311. streamList := []string{"demo"}
  1312. HandleStream(false, streamList, t)
  1313. // Data setup
  1314. tests := []RuleTest{
  1315. {
  1316. Name: `TestSingleSQLTemplateRule1`,
  1317. Sql: `SELECT * FROM demo`,
  1318. R: []map[string]interface{}{
  1319. {
  1320. "c": "red",
  1321. "wrapper": "w1",
  1322. },
  1323. {
  1324. "c": "blue",
  1325. "wrapper": "w1",
  1326. },
  1327. {
  1328. "c": "blue",
  1329. "wrapper": "w1",
  1330. },
  1331. {
  1332. "c": "yellow",
  1333. "wrapper": "w1",
  1334. },
  1335. {
  1336. "c": "red",
  1337. "wrapper": "w1",
  1338. },
  1339. },
  1340. M: map[string]interface{}{
  1341. "op_2_project_0_exceptions_total": int64(0),
  1342. "op_2_project_0_process_latency_us": int64(0),
  1343. "op_2_project_0_records_in_total": int64(5),
  1344. "op_2_project_0_records_out_total": int64(5),
  1345. "sink_mockSink_0_exceptions_total": int64(0),
  1346. "sink_mockSink_0_records_in_total": int64(5),
  1347. "sink_mockSink_0_records_out_total": int64(5),
  1348. "source_demo_0_exceptions_total": int64(0),
  1349. "source_demo_0_records_in_total": int64(5),
  1350. "source_demo_0_records_out_total": int64(5),
  1351. },
  1352. },
  1353. }
  1354. HandleStream(true, streamList, t)
  1355. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  1356. BufferLength: 100,
  1357. SendError: true,
  1358. }, 0, map[string]interface{}{
  1359. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  1360. "sendSingle": true,
  1361. }, func(result [][]byte) interface{} {
  1362. var maps []map[string]interface{}
  1363. for _, v := range result {
  1364. var mapRes map[string]interface{}
  1365. err := json.Unmarshal(v, &mapRes)
  1366. if err != nil {
  1367. t.Errorf("Failed to parse the input into map")
  1368. continue
  1369. }
  1370. maps = append(maps, mapRes)
  1371. }
  1372. return maps
  1373. })
  1374. }
  1375. func TestNoneSingleSQLTemplate(t *testing.T) {
  1376. // Reset
  1377. streamList := []string{"demo"}
  1378. HandleStream(false, streamList, t)
  1379. // Data setup
  1380. tests := []RuleTest{
  1381. {
  1382. Name: `TestNoneSingleSQLTemplateRule1`,
  1383. Sql: `SELECT * FROM demo`,
  1384. R: [][]byte{
  1385. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  1386. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  1387. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  1388. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  1389. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  1390. },
  1391. M: map[string]interface{}{
  1392. "op_2_project_0_exceptions_total": int64(0),
  1393. "op_2_project_0_process_latency_us": int64(0),
  1394. "op_2_project_0_records_in_total": int64(5),
  1395. "op_2_project_0_records_out_total": int64(5),
  1396. "sink_mockSink_0_exceptions_total": int64(0),
  1397. "sink_mockSink_0_records_in_total": int64(5),
  1398. "sink_mockSink_0_records_out_total": int64(5),
  1399. "source_demo_0_exceptions_total": int64(0),
  1400. "source_demo_0_records_in_total": int64(5),
  1401. "source_demo_0_records_out_total": int64(5),
  1402. },
  1403. },
  1404. }
  1405. HandleStream(true, streamList, t)
  1406. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  1407. BufferLength: 100,
  1408. SendError: true,
  1409. }, 0, map[string]interface{}{
  1410. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  1411. }, func(result [][]byte) interface{} {
  1412. return result
  1413. })
  1414. }
  1415. func TestSingleSQLForBinary(t *testing.T) {
  1416. // Reset
  1417. streamList := []string{"binDemo"}
  1418. HandleStream(false, streamList, t)
  1419. // Data setup
  1420. tests := []RuleTest{
  1421. {
  1422. Name: `TestSingleSQLRule1`,
  1423. Sql: `SELECT * FROM binDemo`,
  1424. R: [][]map[string]interface{}{
  1425. {{
  1426. "self": mocknode.Image,
  1427. }},
  1428. },
  1429. W: 50,
  1430. M: map[string]interface{}{
  1431. "op_2_project_0_exceptions_total": int64(0),
  1432. "op_2_project_0_process_latency_us": int64(0),
  1433. "op_2_project_0_records_in_total": int64(1),
  1434. "op_2_project_0_records_out_total": int64(1),
  1435. "sink_mockSink_0_exceptions_total": int64(0),
  1436. "sink_mockSink_0_records_in_total": int64(1),
  1437. "sink_mockSink_0_records_out_total": int64(1),
  1438. "source_binDemo_0_exceptions_total": int64(0),
  1439. "source_binDemo_0_records_in_total": int64(1),
  1440. "source_binDemo_0_records_out_total": int64(1),
  1441. },
  1442. },
  1443. }
  1444. HandleStream(true, streamList, t)
  1445. options := []*api.RuleOption{
  1446. {
  1447. BufferLength: 100,
  1448. SendError: true,
  1449. }, {
  1450. BufferLength: 100,
  1451. SendError: true,
  1452. Qos: api.AtLeastOnce,
  1453. CheckpointInterval: 5000,
  1454. }, {
  1455. BufferLength: 100,
  1456. SendError: true,
  1457. Qos: api.ExactlyOnce,
  1458. CheckpointInterval: 5000,
  1459. },
  1460. }
  1461. byteFunc := func(result [][]byte) interface{} {
  1462. var maps [][]map[string]interface{}
  1463. for _, v := range result {
  1464. var mapRes []map[string][]byte
  1465. err := json.Unmarshal(v, &mapRes)
  1466. if err != nil {
  1467. panic("Failed to parse the input into map")
  1468. }
  1469. mapInt := make([]map[string]interface{}, len(mapRes))
  1470. for i, mv := range mapRes {
  1471. mapInt[i] = make(map[string]interface{})
  1472. // assume only one key
  1473. for k, v := range mv {
  1474. mapInt[i][k] = v
  1475. }
  1476. }
  1477. maps = append(maps, mapInt)
  1478. }
  1479. return maps
  1480. }
  1481. for j, opt := range options {
  1482. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  1483. }
  1484. }
  1485. func TestWindowSQL(t *testing.T) {
  1486. // Reset
  1487. streamList := []string{"demoE"}
  1488. HandleStream(false, streamList, t)
  1489. tests := []RuleTest{
  1490. {
  1491. Name: "TestHoppingWindowSQL1",
  1492. Sql: `select size,color from demoE GROUP BY HOPPINGWINDOW(ss, 3, 5)`,
  1493. R: [][]map[string]interface{}{
  1494. {
  1495. {
  1496. "color": "blue",
  1497. "size": float64(2),
  1498. },
  1499. {
  1500. "color": "red",
  1501. "size": float64(1),
  1502. },
  1503. },
  1504. },
  1505. },
  1506. {
  1507. Name: "TestHoppingWindowSQL2",
  1508. Sql: `select size,color from demoE GROUP BY HOPPINGWINDOW(ss, 1, 2)`,
  1509. R: [][]map[string]interface{}{
  1510. {
  1511. {
  1512. "color": "blue",
  1513. "size": float64(2),
  1514. },
  1515. },
  1516. {
  1517. {
  1518. "color": "red",
  1519. "size": float64(1),
  1520. },
  1521. },
  1522. {},
  1523. },
  1524. },
  1525. {
  1526. Name: "TestHoppingWindowSQL3",
  1527. Sql: `select size,color from demoE GROUP BY HOPPINGWINDOW(ss, 2, 5)`,
  1528. R: [][]map[string]interface{}{
  1529. {
  1530. {
  1531. "color": "red",
  1532. "size": float64(1),
  1533. },
  1534. },
  1535. },
  1536. },
  1537. }
  1538. // Data setup
  1539. HandleStream(true, streamList, t)
  1540. options := []*api.RuleOption{
  1541. {
  1542. BufferLength: 100,
  1543. SendError: true,
  1544. Qos: api.AtLeastOnce,
  1545. CheckpointInterval: 5000,
  1546. IsEventTime: true,
  1547. },
  1548. {
  1549. BufferLength: 100,
  1550. SendError: true,
  1551. Qos: api.ExactlyOnce,
  1552. CheckpointInterval: 5000,
  1553. IsEventTime: true,
  1554. },
  1555. }
  1556. for j, opt := range options {
  1557. DoRuleTest(t, tests, j, opt, 0)
  1558. }
  1559. }
  1560. func TestAliasSQL(t *testing.T) {
  1561. streamList := []string{"demo"}
  1562. HandleStream(false, streamList, t)
  1563. tests := []RuleTest{
  1564. {
  1565. Name: "TestAliasSQL1",
  1566. Sql: `select size as a, a + 1 as b from demo`,
  1567. R: [][]map[string]interface{}{
  1568. {
  1569. {
  1570. "a": float64(3),
  1571. "b": float64(4),
  1572. },
  1573. },
  1574. {
  1575. {
  1576. "a": float64(6),
  1577. "b": float64(7),
  1578. },
  1579. },
  1580. {
  1581. {
  1582. "a": float64(2),
  1583. "b": float64(3),
  1584. },
  1585. },
  1586. {
  1587. {
  1588. "a": float64(4),
  1589. "b": float64(5),
  1590. },
  1591. },
  1592. {
  1593. {
  1594. "a": float64(1),
  1595. "b": float64(2),
  1596. },
  1597. },
  1598. },
  1599. },
  1600. {
  1601. Name: "TestAliasSQL2",
  1602. Sql: `select a + 1 as b, size as a from demo`,
  1603. R: [][]map[string]interface{}{
  1604. {
  1605. {
  1606. "a": float64(3),
  1607. "b": float64(4),
  1608. },
  1609. },
  1610. {
  1611. {
  1612. "a": float64(6),
  1613. "b": float64(7),
  1614. },
  1615. },
  1616. {
  1617. {
  1618. "a": float64(2),
  1619. "b": float64(3),
  1620. },
  1621. },
  1622. {
  1623. {
  1624. "a": float64(4),
  1625. "b": float64(5),
  1626. },
  1627. },
  1628. {
  1629. {
  1630. "a": float64(1),
  1631. "b": float64(2),
  1632. },
  1633. },
  1634. },
  1635. },
  1636. }
  1637. // Data setup
  1638. HandleStream(true, streamList, t)
  1639. options := []*api.RuleOption{
  1640. {
  1641. BufferLength: 100,
  1642. SendError: true,
  1643. Qos: api.AtLeastOnce,
  1644. CheckpointInterval: 5000,
  1645. },
  1646. {
  1647. BufferLength: 100,
  1648. SendError: true,
  1649. Qos: api.ExactlyOnce,
  1650. CheckpointInterval: 5000,
  1651. },
  1652. }
  1653. for j, opt := range options {
  1654. DoRuleTest(t, tests, j, opt, 0)
  1655. }
  1656. }