rule_test.go 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "encoding/json"
  17. "testing"
  18. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. )
  21. func TestLimitSQL(t *testing.T) {
  22. // Reset
  23. streamList := []string{"demo", "demoArr", "demoArr2"}
  24. HandleStream(false, streamList, t)
  25. var r [][]map[string]interface{}
  26. tests := []RuleTest{
  27. {
  28. Name: "TestLimitSQL01",
  29. Sql: `SELECT unnest(demoArr2.arr) as col, demo.size FROM demo inner join demoArr2 on demo.size = demoArr2.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  30. R: [][]map[string]interface{}{
  31. {
  32. {
  33. "col": float64(1),
  34. "size": float64(1),
  35. },
  36. },
  37. },
  38. },
  39. {
  40. Name: "TestLimitSQL0",
  41. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  42. R: [][]map[string]interface{}{
  43. {
  44. {
  45. "col": float64(1),
  46. "size": float64(1),
  47. },
  48. },
  49. },
  50. },
  51. {
  52. Name: "TestLimitSQL1",
  53. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 0;`,
  54. R: r,
  55. },
  56. {
  57. Name: "TestLimitSQL2",
  58. Sql: `SELECT demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 1;`,
  59. R: [][]map[string]interface{}{
  60. {
  61. {
  62. "size": float64(1),
  63. },
  64. },
  65. },
  66. },
  67. {
  68. Name: "TestLimitSQL3",
  69. Sql: `SELECT demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1) limit 0;`,
  70. R: r,
  71. },
  72. }
  73. // Data setup
  74. HandleStream(true, streamList, t)
  75. options := []*api.RuleOption{
  76. {
  77. BufferLength: 100,
  78. SendError: true,
  79. },
  80. {
  81. BufferLength: 100,
  82. SendError: true,
  83. Qos: api.AtLeastOnce,
  84. CheckpointInterval: 5000,
  85. },
  86. {
  87. BufferLength: 100,
  88. SendError: true,
  89. Qos: api.ExactlyOnce,
  90. CheckpointInterval: 5000,
  91. },
  92. }
  93. for j, opt := range options {
  94. DoRuleTest(t, tests, j, opt, 0)
  95. }
  96. }
  97. func TestSRFSQL(t *testing.T) {
  98. // Reset
  99. streamList := []string{"demo", "demoArr"}
  100. HandleStream(false, streamList, t)
  101. tests := []RuleTest{
  102. {
  103. Name: "TestSingleSQLRule25",
  104. Sql: "SELECT unnest(a) from demoArr group by SESSIONWINDOW(ss, 2, 1);",
  105. R: [][]map[string]interface{}{
  106. {
  107. {
  108. "error": "the argument for the unnest function should be array",
  109. },
  110. },
  111. },
  112. },
  113. {
  114. Name: "TestSingleSQLRule24",
  115. Sql: "Select unnest(a) from demoArr;",
  116. R: [][]map[string]interface{}{
  117. {
  118. {
  119. "error": "the argument for the unnest function should be array",
  120. },
  121. },
  122. },
  123. },
  124. {
  125. Name: "TestSingleSQLRule21",
  126. Sql: `SELECT unnest(demoArr.arr3) as col, demo.size FROM demo inner join demoArr on demo.size = demoArr.x group by SESSIONWINDOW(ss, 2, 1);`,
  127. R: [][]map[string]interface{}{
  128. {
  129. {
  130. "col": float64(1),
  131. "size": float64(1),
  132. },
  133. {
  134. "col": float64(2),
  135. "size": float64(1),
  136. },
  137. {
  138. "col": float64(3),
  139. "size": float64(1),
  140. },
  141. },
  142. },
  143. },
  144. {
  145. Name: "TestSingleSQLRule22",
  146. Sql: `SELECT unnest(arr3) as col,y From demoArr group by y, SESSIONWINDOW(ss, 2, 1);`,
  147. R: [][]map[string]interface{}{
  148. {
  149. {
  150. "col": float64(1),
  151. "y": float64(2),
  152. },
  153. {
  154. "col": float64(2),
  155. "y": float64(2),
  156. },
  157. {
  158. "col": float64(3),
  159. "y": float64(2),
  160. },
  161. },
  162. },
  163. },
  164. {
  165. Name: "TestSingleSQLRule23",
  166. Sql: "SELECT unnest(arr3) as col,a from demoArr group by SESSIONWINDOW(ss, 2, 1);",
  167. R: [][]map[string]interface{}{
  168. {
  169. {
  170. "col": float64(1),
  171. "a": float64(6),
  172. },
  173. {
  174. "col": float64(2),
  175. "a": float64(6),
  176. },
  177. {
  178. "col": float64(3),
  179. "a": float64(6),
  180. },
  181. },
  182. },
  183. },
  184. {
  185. Name: `TestSingleSQLRule18`,
  186. Sql: `SELECT unnest(arr2) FROM demoArr where x=1`,
  187. R: [][]map[string]interface{}{
  188. {
  189. {
  190. "a": float64(1),
  191. "b": float64(2),
  192. },
  193. },
  194. {
  195. {
  196. "a": float64(3),
  197. "b": float64(4),
  198. },
  199. },
  200. },
  201. },
  202. // The mapping schema created by unnest function will cover the original column if they have the same column name
  203. {
  204. Name: `TestSingleSQLRule19`,
  205. Sql: `SELECT unnest(arr2),a FROM demoArr where x=1`,
  206. R: [][]map[string]interface{}{
  207. {
  208. {
  209. "a": float64(1),
  210. "b": float64(2),
  211. },
  212. },
  213. {
  214. {
  215. "a": float64(3),
  216. "b": float64(4),
  217. },
  218. },
  219. },
  220. },
  221. {
  222. Name: `TestSingleSQLRule20`,
  223. Sql: `SELECT unnest(arr3) as col FROM demoArr where x=1`,
  224. R: [][]map[string]interface{}{
  225. {
  226. {
  227. "col": float64(1),
  228. },
  229. },
  230. {
  231. {
  232. "col": float64(2),
  233. },
  234. },
  235. {
  236. {
  237. "col": float64(3),
  238. },
  239. },
  240. },
  241. },
  242. {
  243. Name: `TestSingleSQLRule21`,
  244. Sql: `SELECT unnest(arr2),x FROM demoArr where x=1`,
  245. R: [][]map[string]interface{}{
  246. {
  247. {
  248. "a": float64(1),
  249. "b": float64(2),
  250. "x": float64(1),
  251. },
  252. },
  253. {
  254. {
  255. "a": float64(3),
  256. "b": float64(4),
  257. "x": float64(1),
  258. },
  259. },
  260. },
  261. },
  262. }
  263. // Data setup
  264. HandleStream(true, streamList, t)
  265. options := []*api.RuleOption{
  266. {
  267. BufferLength: 100,
  268. SendError: true,
  269. }, {
  270. BufferLength: 100,
  271. SendError: true,
  272. Qos: api.AtLeastOnce,
  273. CheckpointInterval: 5000,
  274. }, {
  275. BufferLength: 100,
  276. SendError: true,
  277. Qos: api.ExactlyOnce,
  278. CheckpointInterval: 5000,
  279. },
  280. }
  281. for j, opt := range options {
  282. DoRuleTest(t, tests, j, opt, 0)
  283. }
  284. }
  285. func TestSingleSQL(t *testing.T) {
  286. // Reset
  287. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable", "demoArr"}
  288. HandleStream(false, streamList, t)
  289. // Data setup
  290. tests := []RuleTest{
  291. {
  292. Name: `TestSingleSQLRule0`,
  293. Sql: `SELECT arr[x:y+1] as col1 FROM demoArr where x=1`,
  294. R: [][]map[string]interface{}{
  295. {{
  296. "col1": []interface{}{
  297. float64(2), float64(3),
  298. },
  299. }},
  300. },
  301. },
  302. {
  303. Name: `TestSingleSQLRule1`,
  304. Sql: `SELECT *, upper(color), event_time() FROM demo`,
  305. R: [][]map[string]interface{}{
  306. {{
  307. "color": "red",
  308. "size": float64(3),
  309. "ts": float64(1541152486013),
  310. "upper": "RED",
  311. "event_time": float64(1541152486013),
  312. }},
  313. {{
  314. "color": "blue",
  315. "size": float64(6),
  316. "ts": float64(1541152486822),
  317. "upper": "BLUE",
  318. "event_time": float64(1541152486822),
  319. }},
  320. {{
  321. "color": "blue",
  322. "size": float64(2),
  323. "ts": float64(1541152487632),
  324. "upper": "BLUE",
  325. "event_time": float64(1541152487632),
  326. }},
  327. {{
  328. "color": "yellow",
  329. "size": float64(4),
  330. "ts": float64(1541152488442),
  331. "upper": "YELLOW",
  332. "event_time": float64(1541152488442),
  333. }},
  334. {{
  335. "color": "red",
  336. "size": float64(1),
  337. "ts": float64(1541152489252),
  338. "upper": "RED",
  339. "event_time": float64(1541152489252),
  340. }},
  341. },
  342. M: map[string]interface{}{
  343. "op_2_project_0_exceptions_total": int64(0),
  344. "op_2_project_0_process_latency_us": int64(0),
  345. "op_2_project_0_records_in_total": int64(5),
  346. "op_2_project_0_records_out_total": int64(5),
  347. "sink_mockSink_0_exceptions_total": int64(0),
  348. "sink_mockSink_0_records_in_total": int64(5),
  349. "sink_mockSink_0_records_out_total": int64(5),
  350. "source_demo_0_exceptions_total": int64(0),
  351. "source_demo_0_records_in_total": int64(5),
  352. "source_demo_0_records_out_total": int64(5),
  353. },
  354. T: &api.PrintableTopo{
  355. Sources: []string{"source_demo"},
  356. Edges: map[string][]interface{}{
  357. "source_demo": {"op_2_project"},
  358. "op_2_project": {"sink_mockSink"},
  359. },
  360. },
  361. },
  362. {
  363. Name: `TestSingleSQLRule2`,
  364. Sql: `SELECT color, ts, last_hit_count() + 1 as lc FROM demo where size > 3`,
  365. R: [][]map[string]interface{}{
  366. {{
  367. "color": "blue",
  368. "ts": float64(1541152486822),
  369. "lc": float64(1),
  370. }},
  371. {{
  372. "color": "yellow",
  373. "ts": float64(1541152488442),
  374. "lc": float64(2),
  375. }},
  376. },
  377. M: map[string]interface{}{
  378. "op_3_project_0_exceptions_total": int64(0),
  379. "op_3_project_0_process_latency_us": int64(0),
  380. "op_3_project_0_records_in_total": int64(2),
  381. "op_3_project_0_records_out_total": int64(2),
  382. "sink_mockSink_0_exceptions_total": int64(0),
  383. "sink_mockSink_0_records_in_total": int64(2),
  384. "sink_mockSink_0_records_out_total": int64(2),
  385. "source_demo_0_exceptions_total": int64(0),
  386. "source_demo_0_records_in_total": int64(5),
  387. "source_demo_0_records_out_total": int64(5),
  388. "op_2_filter_0_exceptions_total": int64(0),
  389. "op_2_filter_0_process_latency_us": int64(0),
  390. "op_2_filter_0_records_in_total": int64(5),
  391. "op_2_filter_0_records_out_total": int64(2),
  392. },
  393. },
  394. {
  395. Name: `TestSingleSQLRule3`,
  396. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  397. R: [][]map[string]interface{}{
  398. {{
  399. "Int8": float64(6),
  400. "ts": float64(1541152486822),
  401. }},
  402. {{
  403. "Int8": float64(4),
  404. "ts": float64(1541152488442),
  405. }},
  406. },
  407. M: map[string]interface{}{
  408. "op_3_project_0_exceptions_total": int64(0),
  409. "op_3_project_0_process_latency_us": int64(0),
  410. "op_3_project_0_records_in_total": int64(2),
  411. "op_3_project_0_records_out_total": int64(2),
  412. "sink_mockSink_0_exceptions_total": int64(0),
  413. "sink_mockSink_0_records_in_total": int64(2),
  414. "sink_mockSink_0_records_out_total": int64(2),
  415. "source_demo_0_exceptions_total": int64(0),
  416. "source_demo_0_records_in_total": int64(5),
  417. "source_demo_0_records_out_total": int64(5),
  418. "op_2_filter_0_exceptions_total": int64(0),
  419. "op_2_filter_0_process_latency_us": int64(0),
  420. "op_2_filter_0_records_in_total": int64(5),
  421. "op_2_filter_0_records_out_total": int64(2),
  422. },
  423. },
  424. {
  425. Name: `TestSingleSQLRule4`,
  426. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  427. R: [][]map[string]interface{}{
  428. {{
  429. "error": "error in preprocessor: field size type mismatch: cannot convert string(red) to int64",
  430. }},
  431. {{
  432. "Int8": float64(6),
  433. "ts": float64(1541152486822),
  434. }},
  435. {{
  436. "Int8": float64(4),
  437. "ts": float64(1541152488442),
  438. }},
  439. {{
  440. "error": "error in preprocessor: field size type mismatch: cannot convert string(blue) to int64",
  441. }},
  442. },
  443. M: map[string]interface{}{
  444. "op_3_project_0_exceptions_total": int64(2),
  445. "op_3_project_0_process_latency_us": int64(0),
  446. "op_3_project_0_records_in_total": int64(2),
  447. "op_3_project_0_records_out_total": int64(2),
  448. "sink_mockSink_0_exceptions_total": int64(0),
  449. "sink_mockSink_0_records_in_total": int64(4),
  450. "sink_mockSink_0_records_out_total": int64(4),
  451. "source_demoError_0_exceptions_total": int64(2),
  452. "source_demoError_0_records_in_total": int64(5),
  453. "source_demoError_0_records_out_total": int64(3),
  454. "op_2_filter_0_exceptions_total": int64(2),
  455. "op_2_filter_0_process_latency_us": int64(0),
  456. "op_2_filter_0_records_in_total": int64(3),
  457. "op_2_filter_0_records_out_total": int64(2),
  458. },
  459. },
  460. {
  461. Name: `TestSingleSQLRule5`,
  462. Sql: `SELECT meta(topic) as m, ts FROM demo WHERE last_hit_count() < 4`,
  463. R: [][]map[string]interface{}{
  464. {{
  465. "m": "mock",
  466. "ts": float64(1541152486013),
  467. }},
  468. {{
  469. "m": "mock",
  470. "ts": float64(1541152486822),
  471. }},
  472. {{
  473. "m": "mock",
  474. "ts": float64(1541152487632),
  475. }},
  476. {{
  477. "m": "mock",
  478. "ts": float64(1541152488442),
  479. }},
  480. },
  481. M: map[string]interface{}{
  482. "sink_mockSink_0_exceptions_total": int64(0),
  483. "sink_mockSink_0_records_in_total": int64(4),
  484. "sink_mockSink_0_records_out_total": int64(4),
  485. "source_demo_0_exceptions_total": int64(0),
  486. "source_demo_0_records_in_total": int64(5),
  487. "source_demo_0_records_out_total": int64(5),
  488. },
  489. },
  490. {
  491. Name: `TestSingleSQLRule6`,
  492. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  493. R: [][]map[string]interface{}{
  494. {{
  495. "color": "blue",
  496. "ts": float64(1541152486822),
  497. }},
  498. {{
  499. "color": "yellow",
  500. "ts": float64(1541152488442),
  501. }},
  502. },
  503. M: map[string]interface{}{
  504. "op_3_project_0_exceptions_total": int64(0),
  505. "op_3_project_0_process_latency_us": int64(0),
  506. "op_3_project_0_records_in_total": int64(2),
  507. "op_3_project_0_records_out_total": int64(2),
  508. "sink_mockSink_0_exceptions_total": int64(0),
  509. "sink_mockSink_0_records_in_total": int64(2),
  510. "sink_mockSink_0_records_out_total": int64(2),
  511. "source_demo_0_exceptions_total": int64(0),
  512. "source_demo_0_records_in_total": int64(5),
  513. "source_demo_0_records_out_total": int64(5),
  514. "op_2_filter_0_exceptions_total": int64(0),
  515. "op_2_filter_0_process_latency_us": int64(0),
  516. "op_2_filter_0_records_in_total": int64(5),
  517. "op_2_filter_0_records_out_total": int64(2),
  518. },
  519. },
  520. {
  521. Name: `TestSingleSQLRule7`,
  522. Sql: "SELECT `from` FROM demo1",
  523. R: [][]map[string]interface{}{
  524. {{
  525. "from": "device1",
  526. }},
  527. {{
  528. "from": "device2",
  529. }},
  530. {{
  531. "from": "device3",
  532. }},
  533. {{
  534. "from": "device1",
  535. }},
  536. {{
  537. "from": "device3",
  538. }},
  539. },
  540. M: map[string]interface{}{
  541. "op_2_project_0_exceptions_total": int64(0),
  542. "op_2_project_0_process_latency_us": int64(0),
  543. "op_2_project_0_records_in_total": int64(5),
  544. "op_2_project_0_records_out_total": int64(5),
  545. "sink_mockSink_0_exceptions_total": int64(0),
  546. "sink_mockSink_0_records_in_total": int64(5),
  547. "sink_mockSink_0_records_out_total": int64(5),
  548. "source_demo1_0_exceptions_total": int64(0),
  549. "source_demo1_0_records_in_total": int64(5),
  550. "source_demo1_0_records_out_total": int64(5),
  551. },
  552. },
  553. {
  554. Name: `TestSingleSQLRule8`,
  555. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  556. R: [][]map[string]interface{}{
  557. {{
  558. "temp": float64(25.5),
  559. "hum": float64(65),
  560. "from": "device1",
  561. "ts": float64(1541152486013),
  562. }},
  563. {{
  564. "temp": float64(27.4),
  565. "hum": float64(80),
  566. "from": "device1",
  567. "ts": float64(1541152488442),
  568. }},
  569. },
  570. M: map[string]interface{}{
  571. "op_3_project_0_exceptions_total": int64(0),
  572. "op_3_project_0_process_latency_us": int64(0),
  573. "op_3_project_0_records_in_total": int64(2),
  574. "op_3_project_0_records_out_total": int64(2),
  575. "op_2_filter_0_exceptions_total": int64(0),
  576. "op_2_filter_0_process_latency_us": int64(0),
  577. "op_2_filter_0_records_in_total": int64(5),
  578. "op_2_filter_0_records_out_total": int64(2),
  579. "sink_mockSink_0_exceptions_total": int64(0),
  580. "sink_mockSink_0_records_in_total": int64(2),
  581. "sink_mockSink_0_records_out_total": int64(2),
  582. "source_demo1_0_exceptions_total": int64(0),
  583. "source_demo1_0_records_in_total": int64(5),
  584. "source_demo1_0_records_out_total": int64(5),
  585. },
  586. },
  587. {
  588. Name: `TestSingleSQLRule9`,
  589. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  590. R: [][]map[string]interface{}{
  591. {{
  592. "color": "red",
  593. "s": "M",
  594. "ts": float64(1541152486013),
  595. }},
  596. {{
  597. "color": "blue",
  598. "s": "L",
  599. "ts": float64(1541152486822),
  600. }},
  601. {{
  602. "color": "blue",
  603. "s": "M",
  604. "ts": float64(1541152487632),
  605. }},
  606. {{
  607. "color": "yellow",
  608. "s": "L",
  609. "ts": float64(1541152488442),
  610. }},
  611. {{
  612. "color": "red",
  613. "s": "S",
  614. "ts": float64(1541152489252),
  615. }},
  616. },
  617. M: map[string]interface{}{
  618. "op_2_project_0_exceptions_total": int64(0),
  619. "op_2_project_0_process_latency_us": int64(0),
  620. "op_2_project_0_records_in_total": int64(5),
  621. "op_2_project_0_records_out_total": int64(5),
  622. "sink_mockSink_0_exceptions_total": int64(0),
  623. "sink_mockSink_0_records_in_total": int64(5),
  624. "sink_mockSink_0_records_out_total": int64(5),
  625. "source_demo_0_exceptions_total": int64(0),
  626. "source_demo_0_records_in_total": int64(5),
  627. "source_demo_0_records_out_total": int64(5),
  628. },
  629. T: &api.PrintableTopo{
  630. Sources: []string{"source_demo"},
  631. Edges: map[string][]interface{}{
  632. "source_demo": {"op_2_project"},
  633. "op_2_project": {"sink_mockSink"},
  634. },
  635. },
  636. },
  637. {
  638. Name: `TestSingleSQLRule10`,
  639. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  640. R: [][]map[string]interface{}{
  641. {{
  642. "id": float64(1541152486013),
  643. "name": "name1",
  644. "color": "red",
  645. "size": float64(3),
  646. "ts": float64(1541152486013),
  647. }},
  648. {{
  649. "id": float64(1541152487632),
  650. "name": "name2",
  651. "color": "blue",
  652. "size": float64(2),
  653. "ts": float64(1541152487632),
  654. }},
  655. {{
  656. "id": float64(1541152489252),
  657. "name": "name3",
  658. "color": "red",
  659. "size": float64(1),
  660. "ts": float64(1541152489252),
  661. }},
  662. },
  663. W: 15,
  664. M: map[string]interface{}{
  665. "op_3_join_aligner_0_records_in_total": int64(6),
  666. "op_3_join_aligner_0_records_out_total": int64(5),
  667. "op_4_join_0_exceptions_total": int64(0),
  668. "op_4_join_0_records_in_total": int64(5),
  669. "op_4_join_0_records_out_total": int64(3),
  670. "op_5_project_0_exceptions_total": int64(0),
  671. "op_5_project_0_records_in_total": int64(3),
  672. "op_5_project_0_records_out_total": int64(3),
  673. "sink_mockSink_0_exceptions_total": int64(0),
  674. "sink_mockSink_0_records_in_total": int64(3),
  675. "sink_mockSink_0_records_out_total": int64(3),
  676. "source_demo_0_exceptions_total": int64(0),
  677. "source_demo_0_records_in_total": int64(5),
  678. "source_demo_0_records_out_total": int64(5),
  679. "source_table1_0_exceptions_total": int64(0),
  680. "source_table1_0_records_in_total": int64(4),
  681. "source_table1_0_records_out_total": int64(1),
  682. },
  683. },
  684. {
  685. Name: `TestSingleSQLRule11`,
  686. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  687. R: [][]map[string]interface{}{
  688. {{
  689. "device": "device2",
  690. }},
  691. {{
  692. "device": "device4",
  693. }},
  694. {{
  695. "device": "device5",
  696. }},
  697. },
  698. M: map[string]interface{}{
  699. "op_3_join_aligner_0_records_in_total": int64(10),
  700. "op_3_join_aligner_0_records_out_total": int64(5),
  701. "op_4_join_0_exceptions_total": int64(0),
  702. "op_4_join_0_records_in_total": int64(5),
  703. "op_4_join_0_records_out_total": int64(3),
  704. "op_5_project_0_exceptions_total": int64(0),
  705. "op_5_project_0_records_in_total": int64(3),
  706. "op_5_project_0_records_out_total": int64(3),
  707. "sink_mockSink_0_exceptions_total": int64(0),
  708. "sink_mockSink_0_records_in_total": int64(3),
  709. "sink_mockSink_0_records_out_total": int64(3),
  710. "source_demo_0_exceptions_total": int64(0),
  711. "source_demo_0_records_in_total": int64(5),
  712. "source_demo_0_records_out_total": int64(5),
  713. "source_demoTable_0_exceptions_total": int64(0),
  714. "source_demoTable_0_records_in_total": int64(5),
  715. "source_demoTable_0_records_out_total": int64(5),
  716. },
  717. },
  718. {
  719. Name: `TestSingleSQLRule12`,
  720. Sql: "SELECT demo.ts as demoTs, table1.id as table1Id FROM demo INNER JOIN table1 on demoTs = table1Id",
  721. R: [][]map[string]interface{}{
  722. {{
  723. "table1Id": float64(1541152486013),
  724. "demoTs": float64(1541152486013),
  725. }},
  726. {{
  727. "table1Id": float64(1541152487632),
  728. "demoTs": float64(1541152487632),
  729. }},
  730. {{
  731. "table1Id": float64(1541152489252),
  732. "demoTs": float64(1541152489252),
  733. }},
  734. },
  735. W: 15,
  736. M: map[string]interface{}{
  737. "op_3_join_aligner_0_records_in_total": int64(6),
  738. "op_3_join_aligner_0_records_out_total": int64(5),
  739. "op_4_join_0_exceptions_total": int64(0),
  740. "op_4_join_0_records_in_total": int64(5),
  741. "op_4_join_0_records_out_total": int64(3),
  742. "op_5_project_0_exceptions_total": int64(0),
  743. "op_5_project_0_records_in_total": int64(3),
  744. "op_5_project_0_records_out_total": int64(3),
  745. "sink_mockSink_0_exceptions_total": int64(0),
  746. "sink_mockSink_0_records_in_total": int64(3),
  747. "sink_mockSink_0_records_out_total": int64(3),
  748. "source_demo_0_exceptions_total": int64(0),
  749. "source_demo_0_records_in_total": int64(5),
  750. "source_demo_0_records_out_total": int64(5),
  751. "source_table1_0_exceptions_total": int64(0),
  752. "source_table1_0_records_in_total": int64(4),
  753. "source_table1_0_records_out_total": int64(1),
  754. },
  755. },
  756. {
  757. Name: `TestChanged13`,
  758. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demo",
  759. R: [][]map[string]interface{}{
  760. {{
  761. "tt_color": "red",
  762. "tt_size": float64(3),
  763. }},
  764. {{
  765. "tt_color": "blue",
  766. "tt_size": float64(6),
  767. }},
  768. {{
  769. "tt_size": float64(2),
  770. }},
  771. {{
  772. "tt_color": "yellow",
  773. "tt_size": float64(4),
  774. }},
  775. {{
  776. "tt_color": "red",
  777. "tt_size": float64(1),
  778. }},
  779. },
  780. M: map[string]interface{}{
  781. "op_2_project_0_exceptions_total": int64(0),
  782. "op_2_project_0_process_latency_us": int64(0),
  783. "op_2_project_0_records_in_total": int64(5),
  784. "op_2_project_0_records_out_total": int64(5),
  785. "sink_mockSink_0_exceptions_total": int64(0),
  786. "sink_mockSink_0_records_in_total": int64(5),
  787. "sink_mockSink_0_records_out_total": int64(5),
  788. "source_demo_0_exceptions_total": int64(0),
  789. "source_demo_0_records_in_total": int64(5),
  790. "source_demo_0_records_out_total": int64(5),
  791. },
  792. },
  793. {
  794. Name: `TestAliasOrderBy14`,
  795. Sql: "SELECT color, count(*) as c FROM demo where color != \"red\" GROUP BY COUNTWINDOW(5), color Order by c DESC",
  796. R: [][]map[string]interface{}{
  797. {
  798. {
  799. "color": "blue",
  800. "c": float64(2),
  801. },
  802. {
  803. "color": "yellow",
  804. "c": float64(1),
  805. },
  806. },
  807. },
  808. M: map[string]interface{}{
  809. "op_6_project_0_exceptions_total": int64(0),
  810. "op_6_project_0_process_latency_us": int64(0),
  811. "op_6_project_0_records_in_total": int64(1),
  812. "op_6_project_0_records_out_total": int64(1),
  813. "sink_mockSink_0_exceptions_total": int64(0),
  814. "sink_mockSink_0_records_in_total": int64(1),
  815. "sink_mockSink_0_records_out_total": int64(1),
  816. "source_demo_0_exceptions_total": int64(0),
  817. "source_demo_0_records_in_total": int64(5),
  818. "source_demo_0_records_out_total": int64(5),
  819. },
  820. },
  821. {
  822. Name: `TestSingleSQLRule17`,
  823. Sql: `SELECT arr[x:4] as col1 FROM demoArr where x=1`,
  824. R: [][]map[string]interface{}{
  825. {{
  826. "col1": []interface{}{
  827. float64(2), float64(3), float64(4),
  828. },
  829. }},
  830. },
  831. },
  832. {
  833. Name: `TestSingleSQLRule16`,
  834. Sql: `SELECT arr[1:y] as col1 FROM demoArr where x=1`,
  835. R: [][]map[string]interface{}{
  836. {{
  837. "col1": []interface{}{
  838. float64(2),
  839. },
  840. }},
  841. },
  842. },
  843. {
  844. Name: `TestSingleSQLRule15`,
  845. Sql: `SELECT arr[1] as col1 FROM demoArr where x=1`,
  846. R: [][]map[string]interface{}{
  847. {{
  848. "col1": float64(2),
  849. }},
  850. },
  851. },
  852. {
  853. Name: `TestLagAlias`,
  854. Sql: "SELECT lag(size) as lastSize, lag(had_changed(true,size)), size, lastSize/size as changeRate FROM demo WHERE size > 2",
  855. R: [][]map[string]interface{}{
  856. {{
  857. "size": float64(3),
  858. }},
  859. {{
  860. "lastSize": float64(3),
  861. "size": float64(6),
  862. "lag": true,
  863. "changeRate": float64(0),
  864. }},
  865. {{
  866. "lastSize": float64(2),
  867. "size": float64(4),
  868. "lag": true,
  869. "changeRate": float64(0),
  870. }},
  871. },
  872. M: map[string]interface{}{
  873. "sink_mockSink_0_exceptions_total": int64(0),
  874. "sink_mockSink_0_records_in_total": int64(3),
  875. "sink_mockSink_0_records_out_total": int64(3),
  876. "source_demo_0_exceptions_total": int64(0),
  877. "source_demo_0_records_in_total": int64(5),
  878. "source_demo_0_records_out_total": int64(5),
  879. },
  880. },
  881. {
  882. Name: `TestLagPartition`,
  883. Sql: "SELECT color, lag(size) over (partition by color) as lastSize, size, lastSize/size as changeRate FROM demo",
  884. R: [][]map[string]interface{}{
  885. {{
  886. "color": "red",
  887. "size": float64(3),
  888. }},
  889. {{
  890. "color": "blue",
  891. "size": float64(6),
  892. }},
  893. {{
  894. "color": "blue",
  895. "lastSize": float64(6),
  896. "size": float64(2),
  897. "changeRate": float64(3),
  898. }},
  899. {{
  900. "color": "yellow",
  901. "size": float64(4),
  902. }},
  903. {{
  904. "color": "red",
  905. "lastSize": float64(3),
  906. "size": float64(1),
  907. "changeRate": float64(3),
  908. }},
  909. },
  910. M: map[string]interface{}{
  911. "sink_mockSink_0_exceptions_total": int64(0),
  912. "sink_mockSink_0_records_in_total": int64(5),
  913. "sink_mockSink_0_records_out_total": int64(5),
  914. "source_demo_0_exceptions_total": int64(0),
  915. "source_demo_0_records_in_total": int64(5),
  916. "source_demo_0_records_out_total": int64(5),
  917. },
  918. },
  919. }
  920. HandleStream(true, streamList, t)
  921. options := []*api.RuleOption{
  922. {
  923. BufferLength: 100,
  924. SendError: true,
  925. }, {
  926. BufferLength: 100,
  927. SendError: true,
  928. Qos: api.AtLeastOnce,
  929. CheckpointInterval: 5000,
  930. }, {
  931. BufferLength: 100,
  932. SendError: true,
  933. Qos: api.ExactlyOnce,
  934. CheckpointInterval: 5000,
  935. },
  936. }
  937. for j, opt := range options {
  938. DoRuleTest(t, tests, j, opt, 0)
  939. }
  940. }
  941. func TestSingleSQLWithEventTime(t *testing.T) {
  942. // Reset
  943. streamList := []string{"demoE"}
  944. HandleStream(false, streamList, t)
  945. // Data setup
  946. tests := []RuleTest{
  947. {
  948. Name: `TestSingleSQLRule1`,
  949. Sql: `SELECT *, upper(color) FROM demoE`,
  950. R: [][]map[string]interface{}{
  951. {{
  952. "color": "red",
  953. "size": float64(3),
  954. "ts": float64(1541152486013),
  955. "upper": "RED",
  956. }},
  957. {{
  958. "color": "blue",
  959. "size": float64(2),
  960. "ts": float64(1541152487632),
  961. "upper": "BLUE",
  962. }},
  963. {{
  964. "color": "yellow",
  965. "size": float64(4),
  966. "ts": float64(1541152488442),
  967. "upper": "YELLOW",
  968. }},
  969. {{
  970. "color": "red",
  971. "size": float64(1),
  972. "ts": float64(1541152489252),
  973. "upper": "RED",
  974. }},
  975. },
  976. M: map[string]interface{}{
  977. "op_3_project_0_exceptions_total": int64(0),
  978. "op_3_project_0_process_latency_us": int64(0),
  979. "op_3_project_0_records_in_total": int64(4),
  980. "op_3_project_0_records_out_total": int64(4),
  981. "sink_mockSink_0_exceptions_total": int64(0),
  982. "sink_mockSink_0_records_in_total": int64(4),
  983. "sink_mockSink_0_records_out_total": int64(4),
  984. "source_demoE_0_exceptions_total": int64(0),
  985. "source_demoE_0_records_in_total": int64(6),
  986. "source_demoE_0_records_out_total": int64(6),
  987. },
  988. T: &api.PrintableTopo{
  989. Sources: []string{"source_demoE"},
  990. Edges: map[string][]interface{}{
  991. "source_demoE": {"op_2_watermark"},
  992. "op_2_watermark": {"op_3_project"},
  993. "op_3_project": {"sink_mockSink"},
  994. },
  995. },
  996. },
  997. {
  998. Name: `TestStateFunc`,
  999. Sql: `SELECT *, last_hit_time() as lt, last_hit_count() as lc, event_time() as et FROM demoE WHERE size < 3 AND lc < 2`,
  1000. R: [][]map[string]interface{}{
  1001. {{
  1002. "color": "blue",
  1003. "size": float64(2),
  1004. "ts": float64(1541152487632),
  1005. "lc": float64(0),
  1006. "lt": float64(0),
  1007. "et": float64(1541152487632),
  1008. }},
  1009. {{
  1010. "color": "red",
  1011. "size": float64(1),
  1012. "ts": float64(1541152489252),
  1013. "lc": float64(1),
  1014. "lt": float64(1541152487632),
  1015. "et": float64(1541152489252),
  1016. }},
  1017. },
  1018. M: map[string]interface{}{
  1019. "sink_mockSink_0_exceptions_total": int64(0),
  1020. "sink_mockSink_0_records_in_total": int64(2),
  1021. "sink_mockSink_0_records_out_total": int64(2),
  1022. "source_demoE_0_exceptions_total": int64(0),
  1023. "source_demoE_0_records_in_total": int64(6),
  1024. "source_demoE_0_records_out_total": int64(6),
  1025. },
  1026. },
  1027. {
  1028. Name: `TestChanged`,
  1029. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demoE",
  1030. R: [][]map[string]interface{}{
  1031. {{
  1032. "tt_color": "red",
  1033. "tt_size": float64(3),
  1034. }},
  1035. {{
  1036. "tt_color": "blue",
  1037. "tt_size": float64(2),
  1038. }},
  1039. {{
  1040. "tt_color": "yellow",
  1041. "tt_size": float64(4),
  1042. }},
  1043. {{
  1044. "tt_color": "red",
  1045. "tt_size": float64(1),
  1046. }},
  1047. },
  1048. M: map[string]interface{}{
  1049. "op_3_project_0_exceptions_total": int64(0),
  1050. "op_3_project_0_process_latency_us": int64(0),
  1051. "op_3_project_0_records_in_total": int64(4),
  1052. "op_3_project_0_records_out_total": int64(4),
  1053. "sink_mockSink_0_exceptions_total": int64(0),
  1054. "sink_mockSink_0_records_in_total": int64(4),
  1055. "sink_mockSink_0_records_out_total": int64(4),
  1056. "source_demoE_0_exceptions_total": int64(0),
  1057. "source_demoE_0_records_in_total": int64(6),
  1058. "source_demoE_0_records_out_total": int64(6),
  1059. },
  1060. },
  1061. }
  1062. HandleStream(true, streamList, t)
  1063. options := []*api.RuleOption{
  1064. {
  1065. BufferLength: 100,
  1066. SendError: true,
  1067. IsEventTime: true,
  1068. LateTol: 1000,
  1069. }, {
  1070. BufferLength: 100,
  1071. SendError: true,
  1072. Qos: api.AtLeastOnce,
  1073. CheckpointInterval: 5000,
  1074. IsEventTime: true,
  1075. LateTol: 1000,
  1076. }, {
  1077. BufferLength: 100,
  1078. SendError: true,
  1079. Qos: api.ExactlyOnce,
  1080. CheckpointInterval: 5000,
  1081. IsEventTime: true,
  1082. LateTol: 1000,
  1083. },
  1084. }
  1085. for j, opt := range options {
  1086. DoRuleTest(t, tests, j, opt, 0)
  1087. }
  1088. }
  1089. func TestSingleSQLError(t *testing.T) {
  1090. // Reset
  1091. streamList := []string{"ldemo"}
  1092. HandleStream(false, streamList, t)
  1093. // Data setup
  1094. tests := []RuleTest{
  1095. {
  1096. Name: `TestSingleSQLErrorRule1`,
  1097. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1098. R: [][]map[string]interface{}{
  1099. {{
  1100. "color": "red",
  1101. "ts": float64(1541152486013),
  1102. }},
  1103. {{
  1104. "error": "run Where error: invalid operation string(string) >= int64(3)",
  1105. }},
  1106. {{
  1107. "ts": float64(1541152487632),
  1108. }},
  1109. },
  1110. M: map[string]interface{}{
  1111. "op_3_project_0_exceptions_total": int64(1),
  1112. "op_3_project_0_process_latency_us": int64(0),
  1113. "op_3_project_0_records_in_total": int64(2),
  1114. "op_3_project_0_records_out_total": int64(2),
  1115. "sink_mockSink_0_exceptions_total": int64(0),
  1116. "sink_mockSink_0_records_in_total": int64(3),
  1117. "sink_mockSink_0_records_out_total": int64(3),
  1118. "source_ldemo_0_exceptions_total": int64(0),
  1119. "source_ldemo_0_records_in_total": int64(5),
  1120. "source_ldemo_0_records_out_total": int64(5),
  1121. "op_2_filter_0_exceptions_total": int64(1),
  1122. "op_2_filter_0_process_latency_us": int64(0),
  1123. "op_2_filter_0_records_in_total": int64(5),
  1124. "op_2_filter_0_records_out_total": int64(2),
  1125. },
  1126. }, {
  1127. Name: `TestSingleSQLErrorRule2`,
  1128. Sql: `SELECT size * 5 FROM ldemo`,
  1129. R: [][]map[string]interface{}{
  1130. {{
  1131. "kuiper_field_0": float64(15),
  1132. }},
  1133. {{
  1134. "error": "run Select error: invalid operation string(string) * int64(5)",
  1135. }},
  1136. {{
  1137. "kuiper_field_0": float64(15),
  1138. }},
  1139. {{
  1140. "kuiper_field_0": float64(10),
  1141. }},
  1142. {{}},
  1143. },
  1144. M: map[string]interface{}{
  1145. "op_2_project_0_exceptions_total": int64(1),
  1146. "op_2_project_0_process_latency_us": int64(0),
  1147. "op_2_project_0_records_in_total": int64(5),
  1148. "op_2_project_0_records_out_total": int64(4),
  1149. "sink_mockSink_0_exceptions_total": int64(0),
  1150. "sink_mockSink_0_records_in_total": int64(5),
  1151. "sink_mockSink_0_records_out_total": int64(5),
  1152. "source_ldemo_0_exceptions_total": int64(0),
  1153. "source_ldemo_0_records_in_total": int64(5),
  1154. "source_ldemo_0_records_out_total": int64(5),
  1155. },
  1156. },
  1157. }
  1158. HandleStream(true, streamList, t)
  1159. DoRuleTest(t, tests, 0, &api.RuleOption{
  1160. BufferLength: 100,
  1161. SendError: true,
  1162. }, 0)
  1163. }
  1164. func TestSingleSQLOmitError(t *testing.T) {
  1165. // Reset
  1166. streamList := []string{"ldemo"}
  1167. HandleStream(false, streamList, t)
  1168. // Data setup
  1169. tests := []RuleTest{
  1170. {
  1171. Name: `TestSingleSQLErrorRule1`,
  1172. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1173. R: [][]map[string]interface{}{
  1174. {{
  1175. "color": "red",
  1176. "ts": float64(1541152486013),
  1177. }},
  1178. {{
  1179. "ts": float64(1541152487632),
  1180. }},
  1181. },
  1182. M: map[string]interface{}{
  1183. "op_3_project_0_exceptions_total": int64(0),
  1184. "op_3_project_0_process_latency_us": int64(0),
  1185. "op_3_project_0_records_in_total": int64(2),
  1186. "op_3_project_0_records_out_total": int64(2),
  1187. "sink_mockSink_0_exceptions_total": int64(0),
  1188. "sink_mockSink_0_records_in_total": int64(2),
  1189. "sink_mockSink_0_records_out_total": int64(2),
  1190. "source_ldemo_0_exceptions_total": int64(0),
  1191. "source_ldemo_0_records_in_total": int64(5),
  1192. "source_ldemo_0_records_out_total": int64(5),
  1193. "op_2_filter_0_exceptions_total": int64(1),
  1194. "op_2_filter_0_process_latency_us": int64(0),
  1195. "op_2_filter_0_records_in_total": int64(5),
  1196. "op_2_filter_0_records_out_total": int64(2),
  1197. },
  1198. }, {
  1199. Name: `TestSingleSQLErrorRule2`,
  1200. Sql: `SELECT size * 5 FROM ldemo`,
  1201. R: [][]map[string]interface{}{
  1202. {{
  1203. "kuiper_field_0": float64(15),
  1204. }},
  1205. {{
  1206. "kuiper_field_0": float64(15),
  1207. }},
  1208. {{
  1209. "kuiper_field_0": float64(10),
  1210. }},
  1211. {{}},
  1212. },
  1213. M: map[string]interface{}{
  1214. "op_2_project_0_exceptions_total": int64(1),
  1215. "op_2_project_0_process_latency_us": int64(0),
  1216. "op_2_project_0_records_in_total": int64(5),
  1217. "op_2_project_0_records_out_total": int64(4),
  1218. "sink_mockSink_0_exceptions_total": int64(0),
  1219. "sink_mockSink_0_records_in_total": int64(4),
  1220. "sink_mockSink_0_records_out_total": int64(4),
  1221. "source_ldemo_0_exceptions_total": int64(0),
  1222. "source_ldemo_0_records_in_total": int64(5),
  1223. "source_ldemo_0_records_out_total": int64(5),
  1224. },
  1225. },
  1226. }
  1227. HandleStream(true, streamList, t)
  1228. DoRuleTest(t, tests, 0, &api.RuleOption{
  1229. BufferLength: 100,
  1230. SendError: false,
  1231. }, 0)
  1232. }
  1233. func TestSingleSQLTemplate(t *testing.T) {
  1234. // Reset
  1235. streamList := []string{"demo"}
  1236. HandleStream(false, streamList, t)
  1237. // Data setup
  1238. tests := []RuleTest{
  1239. {
  1240. Name: `TestSingleSQLTemplateRule1`,
  1241. Sql: `SELECT * FROM demo`,
  1242. R: []map[string]interface{}{
  1243. {
  1244. "c": "red",
  1245. "wrapper": "w1",
  1246. },
  1247. {
  1248. "c": "blue",
  1249. "wrapper": "w1",
  1250. },
  1251. {
  1252. "c": "blue",
  1253. "wrapper": "w1",
  1254. },
  1255. {
  1256. "c": "yellow",
  1257. "wrapper": "w1",
  1258. },
  1259. {
  1260. "c": "red",
  1261. "wrapper": "w1",
  1262. },
  1263. },
  1264. M: map[string]interface{}{
  1265. "op_2_project_0_exceptions_total": int64(0),
  1266. "op_2_project_0_process_latency_us": int64(0),
  1267. "op_2_project_0_records_in_total": int64(5),
  1268. "op_2_project_0_records_out_total": int64(5),
  1269. "sink_mockSink_0_exceptions_total": int64(0),
  1270. "sink_mockSink_0_records_in_total": int64(5),
  1271. "sink_mockSink_0_records_out_total": int64(5),
  1272. "source_demo_0_exceptions_total": int64(0),
  1273. "source_demo_0_records_in_total": int64(5),
  1274. "source_demo_0_records_out_total": int64(5),
  1275. },
  1276. },
  1277. }
  1278. HandleStream(true, streamList, t)
  1279. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  1280. BufferLength: 100,
  1281. SendError: true,
  1282. }, 0, map[string]interface{}{
  1283. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  1284. "sendSingle": true,
  1285. }, func(result [][]byte) interface{} {
  1286. var maps []map[string]interface{}
  1287. for _, v := range result {
  1288. var mapRes map[string]interface{}
  1289. err := json.Unmarshal(v, &mapRes)
  1290. if err != nil {
  1291. t.Errorf("Failed to parse the input into map")
  1292. continue
  1293. }
  1294. maps = append(maps, mapRes)
  1295. }
  1296. return maps
  1297. })
  1298. }
  1299. func TestNoneSingleSQLTemplate(t *testing.T) {
  1300. // Reset
  1301. streamList := []string{"demo"}
  1302. HandleStream(false, streamList, t)
  1303. // Data setup
  1304. tests := []RuleTest{
  1305. {
  1306. Name: `TestNoneSingleSQLTemplateRule1`,
  1307. Sql: `SELECT * FROM demo`,
  1308. R: [][]byte{
  1309. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  1310. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  1311. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  1312. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  1313. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  1314. },
  1315. M: map[string]interface{}{
  1316. "op_2_project_0_exceptions_total": int64(0),
  1317. "op_2_project_0_process_latency_us": int64(0),
  1318. "op_2_project_0_records_in_total": int64(5),
  1319. "op_2_project_0_records_out_total": int64(5),
  1320. "sink_mockSink_0_exceptions_total": int64(0),
  1321. "sink_mockSink_0_records_in_total": int64(5),
  1322. "sink_mockSink_0_records_out_total": int64(5),
  1323. "source_demo_0_exceptions_total": int64(0),
  1324. "source_demo_0_records_in_total": int64(5),
  1325. "source_demo_0_records_out_total": int64(5),
  1326. },
  1327. },
  1328. }
  1329. HandleStream(true, streamList, t)
  1330. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  1331. BufferLength: 100,
  1332. SendError: true,
  1333. }, 0, map[string]interface{}{
  1334. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  1335. }, func(result [][]byte) interface{} {
  1336. return result
  1337. })
  1338. }
  1339. func TestSingleSQLForBinary(t *testing.T) {
  1340. // Reset
  1341. streamList := []string{"binDemo"}
  1342. HandleStream(false, streamList, t)
  1343. // Data setup
  1344. tests := []RuleTest{
  1345. {
  1346. Name: `TestSingleSQLRule1`,
  1347. Sql: `SELECT * FROM binDemo`,
  1348. R: [][]map[string]interface{}{
  1349. {{
  1350. "self": mocknode.Image,
  1351. }},
  1352. },
  1353. W: 50,
  1354. M: map[string]interface{}{
  1355. "op_2_project_0_exceptions_total": int64(0),
  1356. "op_2_project_0_process_latency_us": int64(0),
  1357. "op_2_project_0_records_in_total": int64(1),
  1358. "op_2_project_0_records_out_total": int64(1),
  1359. "sink_mockSink_0_exceptions_total": int64(0),
  1360. "sink_mockSink_0_records_in_total": int64(1),
  1361. "sink_mockSink_0_records_out_total": int64(1),
  1362. "source_binDemo_0_exceptions_total": int64(0),
  1363. "source_binDemo_0_records_in_total": int64(1),
  1364. "source_binDemo_0_records_out_total": int64(1),
  1365. },
  1366. },
  1367. }
  1368. HandleStream(true, streamList, t)
  1369. options := []*api.RuleOption{
  1370. {
  1371. BufferLength: 100,
  1372. SendError: true,
  1373. }, {
  1374. BufferLength: 100,
  1375. SendError: true,
  1376. Qos: api.AtLeastOnce,
  1377. CheckpointInterval: 5000,
  1378. }, {
  1379. BufferLength: 100,
  1380. SendError: true,
  1381. Qos: api.ExactlyOnce,
  1382. CheckpointInterval: 5000,
  1383. },
  1384. }
  1385. byteFunc := func(result [][]byte) interface{} {
  1386. var maps [][]map[string]interface{}
  1387. for _, v := range result {
  1388. var mapRes []map[string][]byte
  1389. err := json.Unmarshal(v, &mapRes)
  1390. if err != nil {
  1391. panic("Failed to parse the input into map")
  1392. }
  1393. mapInt := make([]map[string]interface{}, len(mapRes))
  1394. for i, mv := range mapRes {
  1395. mapInt[i] = make(map[string]interface{})
  1396. // assume only one key
  1397. for k, v := range mv {
  1398. mapInt[i][k] = v
  1399. }
  1400. }
  1401. maps = append(maps, mapInt)
  1402. }
  1403. return maps
  1404. }
  1405. for j, opt := range options {
  1406. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  1407. }
  1408. }
  1409. func TestWindowSQL(t *testing.T) {
  1410. // Reset
  1411. streamList := []string{"demoE"}
  1412. HandleStream(false, streamList, t)
  1413. tests := []RuleTest{
  1414. {
  1415. Name: "TestHoppingWindowSQL1",
  1416. Sql: `select size,color from demoE GROUP BY HOPPINGWINDOW(ss, 3, 5)`,
  1417. R: [][]map[string]interface{}{
  1418. {
  1419. {
  1420. "color": "blue",
  1421. "size": float64(2),
  1422. },
  1423. {
  1424. "color": "red",
  1425. "size": float64(1),
  1426. },
  1427. },
  1428. },
  1429. },
  1430. {
  1431. Name: "TestHoppingWindowSQL2",
  1432. Sql: `select size,color from demoE GROUP BY HOPPINGWINDOW(ss, 1, 2)`,
  1433. R: [][]map[string]interface{}{
  1434. {
  1435. {
  1436. "color": "blue",
  1437. "size": float64(2),
  1438. },
  1439. },
  1440. {
  1441. {
  1442. "color": "red",
  1443. "size": float64(1),
  1444. },
  1445. },
  1446. {},
  1447. },
  1448. },
  1449. {
  1450. Name: "TestHoppingWindowSQL3",
  1451. Sql: `select size,color from demoE GROUP BY HOPPINGWINDOW(ss, 2, 5)`,
  1452. R: [][]map[string]interface{}{
  1453. {
  1454. {
  1455. "color": "red",
  1456. "size": float64(1),
  1457. },
  1458. },
  1459. },
  1460. },
  1461. }
  1462. // Data setup
  1463. HandleStream(true, streamList, t)
  1464. options := []*api.RuleOption{
  1465. {
  1466. BufferLength: 100,
  1467. SendError: true,
  1468. Qos: api.AtLeastOnce,
  1469. CheckpointInterval: 5000,
  1470. IsEventTime: true,
  1471. },
  1472. {
  1473. BufferLength: 100,
  1474. SendError: true,
  1475. Qos: api.ExactlyOnce,
  1476. CheckpointInterval: 5000,
  1477. IsEventTime: true,
  1478. },
  1479. }
  1480. for j, opt := range options {
  1481. DoRuleTest(t, tests, j, opt, 0)
  1482. }
  1483. }