rule_test.go 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "encoding/json"
  17. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "testing"
  20. )
  21. func TestSingleSQL(t *testing.T) {
  22. //Reset
  23. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable"}
  24. HandleStream(false, streamList, t)
  25. //Data setup
  26. var tests = []RuleTest{
  27. {
  28. Name: `TestSingleSQLRule1`,
  29. Sql: `SELECT * FROM demo`,
  30. R: [][]map[string]interface{}{
  31. {{
  32. "color": "red",
  33. "size": float64(3),
  34. "ts": float64(1541152486013),
  35. }},
  36. {{
  37. "color": "blue",
  38. "size": float64(6),
  39. "ts": float64(1541152486822),
  40. }},
  41. {{
  42. "color": "blue",
  43. "size": float64(2),
  44. "ts": float64(1541152487632),
  45. }},
  46. {{
  47. "color": "yellow",
  48. "size": float64(4),
  49. "ts": float64(1541152488442),
  50. }},
  51. {{
  52. "color": "red",
  53. "size": float64(1),
  54. "ts": float64(1541152489252),
  55. }},
  56. },
  57. M: map[string]interface{}{
  58. "op_2_project_0_exceptions_total": int64(0),
  59. "op_2_project_0_process_latency_us": int64(0),
  60. "op_2_project_0_records_in_total": int64(5),
  61. "op_2_project_0_records_out_total": int64(5),
  62. "sink_mockSink_0_exceptions_total": int64(0),
  63. "sink_mockSink_0_records_in_total": int64(5),
  64. "sink_mockSink_0_records_out_total": int64(5),
  65. "source_demo_0_exceptions_total": int64(0),
  66. "source_demo_0_records_in_total": int64(5),
  67. "source_demo_0_records_out_total": int64(5),
  68. },
  69. T: &api.PrintableTopo{
  70. Sources: []string{"source_demo"},
  71. Edges: map[string][]string{
  72. "source_demo": {"op_2_project"},
  73. "op_2_project": {"sink_mockSink"},
  74. },
  75. },
  76. },
  77. {
  78. Name: `TestSingleSQLRule2`,
  79. Sql: `SELECT color, ts FROM demo where size > 3`,
  80. R: [][]map[string]interface{}{
  81. {{
  82. "color": "blue",
  83. "ts": float64(1541152486822),
  84. }},
  85. {{
  86. "color": "yellow",
  87. "ts": float64(1541152488442),
  88. }},
  89. },
  90. M: map[string]interface{}{
  91. "op_3_project_0_exceptions_total": int64(0),
  92. "op_3_project_0_process_latency_us": int64(0),
  93. "op_3_project_0_records_in_total": int64(2),
  94. "op_3_project_0_records_out_total": int64(2),
  95. "sink_mockSink_0_exceptions_total": int64(0),
  96. "sink_mockSink_0_records_in_total": int64(2),
  97. "sink_mockSink_0_records_out_total": int64(2),
  98. "source_demo_0_exceptions_total": int64(0),
  99. "source_demo_0_records_in_total": int64(5),
  100. "source_demo_0_records_out_total": int64(5),
  101. "op_2_filter_0_exceptions_total": int64(0),
  102. "op_2_filter_0_process_latency_us": int64(0),
  103. "op_2_filter_0_records_in_total": int64(5),
  104. "op_2_filter_0_records_out_total": int64(2),
  105. },
  106. }, {
  107. Name: `TestSingleSQLRule3`,
  108. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  109. R: [][]map[string]interface{}{
  110. {{
  111. "Int8": float64(6),
  112. "ts": float64(1541152486822),
  113. }},
  114. {{
  115. "Int8": float64(4),
  116. "ts": float64(1541152488442),
  117. }},
  118. },
  119. M: map[string]interface{}{
  120. "op_3_project_0_exceptions_total": int64(0),
  121. "op_3_project_0_process_latency_us": int64(0),
  122. "op_3_project_0_records_in_total": int64(2),
  123. "op_3_project_0_records_out_total": int64(2),
  124. "sink_mockSink_0_exceptions_total": int64(0),
  125. "sink_mockSink_0_records_in_total": int64(2),
  126. "sink_mockSink_0_records_out_total": int64(2),
  127. "source_demo_0_exceptions_total": int64(0),
  128. "source_demo_0_records_in_total": int64(5),
  129. "source_demo_0_records_out_total": int64(5),
  130. "op_2_filter_0_exceptions_total": int64(0),
  131. "op_2_filter_0_process_latency_us": int64(0),
  132. "op_2_filter_0_records_in_total": int64(5),
  133. "op_2_filter_0_records_out_total": int64(2),
  134. },
  135. }, {
  136. Name: `TestSingleSQLRule4`,
  137. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  138. R: [][]map[string]interface{}{
  139. {{
  140. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(red)",
  141. }},
  142. {{
  143. "Int8": float64(6),
  144. "ts": float64(1541152486822),
  145. }},
  146. {{
  147. "Int8": float64(4),
  148. "ts": float64(1541152488442),
  149. }},
  150. {{
  151. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  152. }},
  153. },
  154. M: map[string]interface{}{
  155. "op_3_project_0_exceptions_total": int64(2),
  156. "op_3_project_0_process_latency_us": int64(0),
  157. "op_3_project_0_records_in_total": int64(4),
  158. "op_3_project_0_records_out_total": int64(2),
  159. "sink_mockSink_0_exceptions_total": int64(0),
  160. "sink_mockSink_0_records_in_total": int64(4),
  161. "sink_mockSink_0_records_out_total": int64(4),
  162. "source_demoError_0_exceptions_total": int64(2),
  163. "source_demoError_0_records_in_total": int64(5),
  164. "source_demoError_0_records_out_total": int64(5),
  165. "op_2_filter_0_exceptions_total": int64(2),
  166. "op_2_filter_0_process_latency_us": int64(0),
  167. "op_2_filter_0_records_in_total": int64(5),
  168. "op_2_filter_0_records_out_total": int64(2),
  169. },
  170. }, {
  171. Name: `TestSingleSQLRule5`,
  172. Sql: `SELECT meta(topic) as m, ts FROM demo`,
  173. R: [][]map[string]interface{}{
  174. {{
  175. "m": "mock",
  176. "ts": float64(1541152486013),
  177. }},
  178. {{
  179. "m": "mock",
  180. "ts": float64(1541152486822),
  181. }},
  182. {{
  183. "m": "mock",
  184. "ts": float64(1541152487632),
  185. }},
  186. {{
  187. "m": "mock",
  188. "ts": float64(1541152488442),
  189. }},
  190. {{
  191. "m": "mock",
  192. "ts": float64(1541152489252),
  193. }},
  194. },
  195. M: map[string]interface{}{
  196. "op_2_project_0_exceptions_total": int64(0),
  197. "op_2_project_0_process_latency_us": int64(0),
  198. "op_2_project_0_records_in_total": int64(5),
  199. "op_2_project_0_records_out_total": int64(5),
  200. "sink_mockSink_0_exceptions_total": int64(0),
  201. "sink_mockSink_0_records_in_total": int64(5),
  202. "sink_mockSink_0_records_out_total": int64(5),
  203. "source_demo_0_exceptions_total": int64(0),
  204. "source_demo_0_records_in_total": int64(5),
  205. "source_demo_0_records_out_total": int64(5),
  206. },
  207. }, {
  208. Name: `TestSingleSQLRule6`,
  209. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  210. R: [][]map[string]interface{}{
  211. {{
  212. "color": "blue",
  213. "ts": float64(1541152486822),
  214. }},
  215. {{
  216. "color": "yellow",
  217. "ts": float64(1541152488442),
  218. }},
  219. },
  220. M: map[string]interface{}{
  221. "op_3_project_0_exceptions_total": int64(0),
  222. "op_3_project_0_process_latency_us": int64(0),
  223. "op_3_project_0_records_in_total": int64(2),
  224. "op_3_project_0_records_out_total": int64(2),
  225. "sink_mockSink_0_exceptions_total": int64(0),
  226. "sink_mockSink_0_records_in_total": int64(2),
  227. "sink_mockSink_0_records_out_total": int64(2),
  228. "source_demo_0_exceptions_total": int64(0),
  229. "source_demo_0_records_in_total": int64(5),
  230. "source_demo_0_records_out_total": int64(5),
  231. "op_2_filter_0_exceptions_total": int64(0),
  232. "op_2_filter_0_process_latency_us": int64(0),
  233. "op_2_filter_0_records_in_total": int64(5),
  234. "op_2_filter_0_records_out_total": int64(2),
  235. },
  236. }, {
  237. Name: `TestSingleSQLRule7`,
  238. Sql: "SELECT `from` FROM demo1",
  239. R: [][]map[string]interface{}{
  240. {{
  241. "from": "device1",
  242. }},
  243. {{
  244. "from": "device2",
  245. }},
  246. {{
  247. "from": "device3",
  248. }},
  249. {{
  250. "from": "device1",
  251. }},
  252. {{
  253. "from": "device3",
  254. }},
  255. },
  256. M: map[string]interface{}{
  257. "op_2_project_0_exceptions_total": int64(0),
  258. "op_2_project_0_process_latency_us": int64(0),
  259. "op_2_project_0_records_in_total": int64(5),
  260. "op_2_project_0_records_out_total": int64(5),
  261. "sink_mockSink_0_exceptions_total": int64(0),
  262. "sink_mockSink_0_records_in_total": int64(5),
  263. "sink_mockSink_0_records_out_total": int64(5),
  264. "source_demo1_0_exceptions_total": int64(0),
  265. "source_demo1_0_records_in_total": int64(5),
  266. "source_demo1_0_records_out_total": int64(5),
  267. },
  268. }, {
  269. Name: `TestSingleSQLRule8`,
  270. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  271. R: [][]map[string]interface{}{
  272. {{
  273. "temp": float64(25.5),
  274. "hum": float64(65),
  275. "from": "device1",
  276. "ts": float64(1541152486013),
  277. }},
  278. {{
  279. "temp": float64(27.4),
  280. "hum": float64(80),
  281. "from": "device1",
  282. "ts": float64(1541152488442),
  283. }},
  284. },
  285. M: map[string]interface{}{
  286. "op_3_project_0_exceptions_total": int64(0),
  287. "op_3_project_0_process_latency_us": int64(0),
  288. "op_3_project_0_records_in_total": int64(2),
  289. "op_3_project_0_records_out_total": int64(2),
  290. "op_2_filter_0_exceptions_total": int64(0),
  291. "op_2_filter_0_process_latency_us": int64(0),
  292. "op_2_filter_0_records_in_total": int64(5),
  293. "op_2_filter_0_records_out_total": int64(2),
  294. "sink_mockSink_0_exceptions_total": int64(0),
  295. "sink_mockSink_0_records_in_total": int64(2),
  296. "sink_mockSink_0_records_out_total": int64(2),
  297. "source_demo1_0_exceptions_total": int64(0),
  298. "source_demo1_0_records_in_total": int64(5),
  299. "source_demo1_0_records_out_total": int64(5),
  300. },
  301. }, {
  302. Name: `TestSingleSQLRule9`,
  303. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  304. R: [][]map[string]interface{}{
  305. {{
  306. "color": "red",
  307. "s": "M",
  308. "ts": float64(1541152486013),
  309. }},
  310. {{
  311. "color": "blue",
  312. "s": "L",
  313. "ts": float64(1541152486822),
  314. }},
  315. {{
  316. "color": "blue",
  317. "s": "M",
  318. "ts": float64(1541152487632),
  319. }},
  320. {{
  321. "color": "yellow",
  322. "s": "L",
  323. "ts": float64(1541152488442),
  324. }},
  325. {{
  326. "color": "red",
  327. "s": "S",
  328. "ts": float64(1541152489252),
  329. }},
  330. },
  331. M: map[string]interface{}{
  332. "op_2_project_0_exceptions_total": int64(0),
  333. "op_2_project_0_process_latency_us": int64(0),
  334. "op_2_project_0_records_in_total": int64(5),
  335. "op_2_project_0_records_out_total": int64(5),
  336. "sink_mockSink_0_exceptions_total": int64(0),
  337. "sink_mockSink_0_records_in_total": int64(5),
  338. "sink_mockSink_0_records_out_total": int64(5),
  339. "source_demo_0_exceptions_total": int64(0),
  340. "source_demo_0_records_in_total": int64(5),
  341. "source_demo_0_records_out_total": int64(5),
  342. },
  343. T: &api.PrintableTopo{
  344. Sources: []string{"source_demo"},
  345. Edges: map[string][]string{
  346. "source_demo": {"op_2_project"},
  347. "op_2_project": {"sink_mockSink"},
  348. },
  349. },
  350. }, {
  351. Name: `TestSingleSQLRule10`,
  352. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  353. R: [][]map[string]interface{}{
  354. {{
  355. "id": float64(1541152486013),
  356. "name": "name1",
  357. "color": "red",
  358. "size": float64(3),
  359. "ts": float64(1541152486013),
  360. }},
  361. {{
  362. "id": float64(1541152487632),
  363. "name": "name2",
  364. "color": "blue",
  365. "size": float64(2),
  366. "ts": float64(1541152487632),
  367. }},
  368. {{
  369. "id": float64(1541152489252),
  370. "name": "name3",
  371. "color": "red",
  372. "size": float64(1),
  373. "ts": float64(1541152489252),
  374. }},
  375. },
  376. W: 15,
  377. M: map[string]interface{}{
  378. "op_3_join_aligner_0_records_in_total": int64(6),
  379. "op_3_join_aligner_0_records_out_total": int64(5),
  380. "op_4_join_0_exceptions_total": int64(0),
  381. "op_4_join_0_records_in_total": int64(5),
  382. "op_4_join_0_records_out_total": int64(3),
  383. "op_5_project_0_exceptions_total": int64(0),
  384. "op_5_project_0_records_in_total": int64(3),
  385. "op_5_project_0_records_out_total": int64(3),
  386. "sink_mockSink_0_exceptions_total": int64(0),
  387. "sink_mockSink_0_records_in_total": int64(3),
  388. "sink_mockSink_0_records_out_total": int64(3),
  389. "source_demo_0_exceptions_total": int64(0),
  390. "source_demo_0_records_in_total": int64(5),
  391. "source_demo_0_records_out_total": int64(5),
  392. "source_table1_0_exceptions_total": int64(0),
  393. "source_table1_0_records_in_total": int64(4),
  394. "source_table1_0_records_out_total": int64(1),
  395. },
  396. }, {
  397. Name: `TestSingleSQLRule11`,
  398. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  399. R: [][]map[string]interface{}{
  400. {{
  401. "device": "device2",
  402. }},
  403. {{
  404. "device": "device4",
  405. }},
  406. {{
  407. "device": "device5",
  408. }},
  409. },
  410. M: map[string]interface{}{
  411. "op_3_join_aligner_0_records_in_total": int64(10),
  412. "op_3_join_aligner_0_records_out_total": int64(5),
  413. "op_4_join_0_exceptions_total": int64(0),
  414. "op_4_join_0_records_in_total": int64(5),
  415. "op_4_join_0_records_out_total": int64(3),
  416. "op_5_project_0_exceptions_total": int64(0),
  417. "op_5_project_0_records_in_total": int64(3),
  418. "op_5_project_0_records_out_total": int64(3),
  419. "sink_mockSink_0_exceptions_total": int64(0),
  420. "sink_mockSink_0_records_in_total": int64(3),
  421. "sink_mockSink_0_records_out_total": int64(3),
  422. "source_demo_0_exceptions_total": int64(0),
  423. "source_demo_0_records_in_total": int64(5),
  424. "source_demo_0_records_out_total": int64(5),
  425. "source_demoTable_0_exceptions_total": int64(0),
  426. "source_demoTable_0_records_in_total": int64(5),
  427. "source_demoTable_0_records_out_total": int64(5),
  428. },
  429. }, {
  430. Name: `TestSingleSQLRule12`,
  431. Sql: "SELECT demo.ts as demoTs, table1.id as table1Id FROM demo INNER JOIN table1 on demoTs = table1Id",
  432. R: [][]map[string]interface{}{
  433. {{
  434. "table1Id": float64(1541152486013),
  435. "demoTs": float64(1541152486013),
  436. }},
  437. {{
  438. "table1Id": float64(1541152487632),
  439. "demoTs": float64(1541152487632),
  440. }},
  441. {{
  442. "table1Id": float64(1541152489252),
  443. "demoTs": float64(1541152489252),
  444. }},
  445. },
  446. W: 15,
  447. M: map[string]interface{}{
  448. "op_3_join_aligner_0_records_in_total": int64(6),
  449. "op_3_join_aligner_0_records_out_total": int64(5),
  450. "op_4_join_0_exceptions_total": int64(0),
  451. "op_4_join_0_records_in_total": int64(5),
  452. "op_4_join_0_records_out_total": int64(3),
  453. "op_5_project_0_exceptions_total": int64(0),
  454. "op_5_project_0_records_in_total": int64(3),
  455. "op_5_project_0_records_out_total": int64(3),
  456. "sink_mockSink_0_exceptions_total": int64(0),
  457. "sink_mockSink_0_records_in_total": int64(3),
  458. "sink_mockSink_0_records_out_total": int64(3),
  459. "source_demo_0_exceptions_total": int64(0),
  460. "source_demo_0_records_in_total": int64(5),
  461. "source_demo_0_records_out_total": int64(5),
  462. "source_table1_0_exceptions_total": int64(0),
  463. "source_table1_0_records_in_total": int64(4),
  464. "source_table1_0_records_out_total": int64(1),
  465. },
  466. }, {
  467. Name: `TestChanged13`,
  468. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demo",
  469. R: [][]map[string]interface{}{
  470. {{
  471. "tt_color": "red",
  472. "tt_size": float64(3),
  473. }},
  474. {{
  475. "tt_color": "blue",
  476. "tt_size": float64(6),
  477. }},
  478. {{
  479. "tt_size": float64(2),
  480. }},
  481. {{
  482. "tt_color": "yellow",
  483. "tt_size": float64(4),
  484. }},
  485. {{
  486. "tt_color": "red",
  487. "tt_size": float64(1),
  488. }},
  489. },
  490. M: map[string]interface{}{
  491. "op_2_project_0_exceptions_total": int64(0),
  492. "op_2_project_0_process_latency_us": int64(0),
  493. "op_2_project_0_records_in_total": int64(5),
  494. "op_2_project_0_records_out_total": int64(5),
  495. "sink_mockSink_0_exceptions_total": int64(0),
  496. "sink_mockSink_0_records_in_total": int64(5),
  497. "sink_mockSink_0_records_out_total": int64(5),
  498. "source_demo_0_exceptions_total": int64(0),
  499. "source_demo_0_records_in_total": int64(5),
  500. "source_demo_0_records_out_total": int64(5),
  501. },
  502. }, {
  503. Name: `TestAliasOrderBy14`,
  504. Sql: "SELECT color, count(*) as c FROM demo where color != \"red\" GROUP BY COUNTWINDOW(5), color Order by c DESC",
  505. R: [][]map[string]interface{}{
  506. {{
  507. "color": "blue",
  508. "c": float64(2),
  509. },
  510. {
  511. "color": "yellow",
  512. "c": float64(1),
  513. },
  514. },
  515. },
  516. M: map[string]interface{}{
  517. "op_6_project_0_exceptions_total": int64(0),
  518. "op_6_project_0_process_latency_us": int64(0),
  519. "op_6_project_0_records_in_total": int64(1),
  520. "op_6_project_0_records_out_total": int64(1),
  521. "sink_mockSink_0_exceptions_total": int64(0),
  522. "sink_mockSink_0_records_in_total": int64(1),
  523. "sink_mockSink_0_records_out_total": int64(1),
  524. "source_demo_0_exceptions_total": int64(0),
  525. "source_demo_0_records_in_total": int64(5),
  526. "source_demo_0_records_out_total": int64(5),
  527. },
  528. },
  529. {
  530. Name: `TestLagAlias`,
  531. Sql: "SELECT lag(size) as lastSize, size, lastSize/size as changeRate FROM demo WHERE size > 2",
  532. R: [][]map[string]interface{}{
  533. {{
  534. "size": float64(3),
  535. }},
  536. {{
  537. "lastSize": float64(3),
  538. "size": float64(6),
  539. "changeRate": float64(0),
  540. }},
  541. {{
  542. "lastSize": float64(2),
  543. "size": float64(4),
  544. "changeRate": float64(0),
  545. }},
  546. },
  547. M: map[string]interface{}{
  548. "sink_mockSink_0_exceptions_total": int64(0),
  549. "sink_mockSink_0_records_in_total": int64(3),
  550. "sink_mockSink_0_records_out_total": int64(3),
  551. "source_demo_0_exceptions_total": int64(0),
  552. "source_demo_0_records_in_total": int64(5),
  553. "source_demo_0_records_out_total": int64(5),
  554. },
  555. },
  556. {
  557. Name: `TestLagPartition`,
  558. Sql: "SELECT color, lag(size) over (partition by color) as lastSize, size, lastSize/size as changeRate FROM demo",
  559. R: [][]map[string]interface{}{
  560. {{
  561. "color": "red",
  562. "size": float64(3),
  563. }},
  564. {{
  565. "color": "blue",
  566. "size": float64(6),
  567. }},
  568. {{
  569. "color": "blue",
  570. "lastSize": float64(6),
  571. "size": float64(2),
  572. "changeRate": float64(3),
  573. }},
  574. {{
  575. "color": "yellow",
  576. "size": float64(4),
  577. }},
  578. {{
  579. "color": "red",
  580. "lastSize": float64(3),
  581. "size": float64(1),
  582. "changeRate": float64(3),
  583. }},
  584. },
  585. M: map[string]interface{}{
  586. "sink_mockSink_0_exceptions_total": int64(0),
  587. "sink_mockSink_0_records_in_total": int64(5),
  588. "sink_mockSink_0_records_out_total": int64(5),
  589. "source_demo_0_exceptions_total": int64(0),
  590. "source_demo_0_records_in_total": int64(5),
  591. "source_demo_0_records_out_total": int64(5),
  592. },
  593. },
  594. }
  595. HandleStream(true, streamList, t)
  596. options := []*api.RuleOption{
  597. {
  598. BufferLength: 100,
  599. SendError: true,
  600. }, {
  601. BufferLength: 100,
  602. SendError: true,
  603. Qos: api.AtLeastOnce,
  604. CheckpointInterval: 5000,
  605. }, {
  606. BufferLength: 100,
  607. SendError: true,
  608. Qos: api.ExactlyOnce,
  609. CheckpointInterval: 5000,
  610. },
  611. }
  612. for j, opt := range options {
  613. DoRuleTest(t, tests, j, opt, 0)
  614. }
  615. }
  616. func TestSingleSQLError(t *testing.T) {
  617. //Reset
  618. streamList := []string{"ldemo"}
  619. HandleStream(false, streamList, t)
  620. //Data setup
  621. var tests = []RuleTest{
  622. {
  623. Name: `TestSingleSQLErrorRule1`,
  624. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  625. R: [][]map[string]interface{}{
  626. {{
  627. "color": "red",
  628. "ts": float64(1541152486013),
  629. }},
  630. {{
  631. "error": "run Where error: invalid operation string(string) >= int64(3)",
  632. }},
  633. {{
  634. "ts": float64(1541152487632),
  635. }},
  636. },
  637. M: map[string]interface{}{
  638. "op_3_project_0_exceptions_total": int64(1),
  639. "op_3_project_0_process_latency_us": int64(0),
  640. "op_3_project_0_records_in_total": int64(3),
  641. "op_3_project_0_records_out_total": int64(2),
  642. "sink_mockSink_0_exceptions_total": int64(0),
  643. "sink_mockSink_0_records_in_total": int64(3),
  644. "sink_mockSink_0_records_out_total": int64(3),
  645. "source_ldemo_0_exceptions_total": int64(0),
  646. "source_ldemo_0_records_in_total": int64(5),
  647. "source_ldemo_0_records_out_total": int64(5),
  648. "op_2_filter_0_exceptions_total": int64(1),
  649. "op_2_filter_0_process_latency_us": int64(0),
  650. "op_2_filter_0_records_in_total": int64(5),
  651. "op_2_filter_0_records_out_total": int64(2),
  652. },
  653. }, {
  654. Name: `TestSingleSQLErrorRule2`,
  655. Sql: `SELECT size * 5 FROM ldemo`,
  656. R: [][]map[string]interface{}{
  657. {{
  658. "kuiper_field_0": float64(15),
  659. }},
  660. {{
  661. "error": "run Select error: invalid operation string(string) * int64(5)",
  662. }},
  663. {{
  664. "kuiper_field_0": float64(15),
  665. }},
  666. {{
  667. "kuiper_field_0": float64(10),
  668. }},
  669. {{}},
  670. },
  671. M: map[string]interface{}{
  672. "op_2_project_0_exceptions_total": int64(1),
  673. "op_2_project_0_process_latency_us": int64(0),
  674. "op_2_project_0_records_in_total": int64(5),
  675. "op_2_project_0_records_out_total": int64(4),
  676. "sink_mockSink_0_exceptions_total": int64(0),
  677. "sink_mockSink_0_records_in_total": int64(5),
  678. "sink_mockSink_0_records_out_total": int64(5),
  679. "source_ldemo_0_exceptions_total": int64(0),
  680. "source_ldemo_0_records_in_total": int64(5),
  681. "source_ldemo_0_records_out_total": int64(5),
  682. },
  683. },
  684. }
  685. HandleStream(true, streamList, t)
  686. DoRuleTest(t, tests, 0, &api.RuleOption{
  687. BufferLength: 100,
  688. SendError: true,
  689. }, 0)
  690. }
  691. func TestSingleSQLOmitError(t *testing.T) {
  692. //Reset
  693. streamList := []string{"ldemo"}
  694. HandleStream(false, streamList, t)
  695. //Data setup
  696. var tests = []RuleTest{
  697. {
  698. Name: `TestSingleSQLErrorRule1`,
  699. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  700. R: [][]map[string]interface{}{
  701. {{
  702. "color": "red",
  703. "ts": float64(1541152486013),
  704. }},
  705. {{
  706. "ts": float64(1541152487632),
  707. }},
  708. },
  709. M: map[string]interface{}{
  710. "op_3_project_0_exceptions_total": int64(0),
  711. "op_3_project_0_process_latency_us": int64(0),
  712. "op_3_project_0_records_in_total": int64(2),
  713. "op_3_project_0_records_out_total": int64(2),
  714. "sink_mockSink_0_exceptions_total": int64(0),
  715. "sink_mockSink_0_records_in_total": int64(2),
  716. "sink_mockSink_0_records_out_total": int64(2),
  717. "source_ldemo_0_exceptions_total": int64(0),
  718. "source_ldemo_0_records_in_total": int64(5),
  719. "source_ldemo_0_records_out_total": int64(5),
  720. "op_2_filter_0_exceptions_total": int64(1),
  721. "op_2_filter_0_process_latency_us": int64(0),
  722. "op_2_filter_0_records_in_total": int64(5),
  723. "op_2_filter_0_records_out_total": int64(2),
  724. },
  725. }, {
  726. Name: `TestSingleSQLErrorRule2`,
  727. Sql: `SELECT size * 5 FROM ldemo`,
  728. R: [][]map[string]interface{}{
  729. {{
  730. "kuiper_field_0": float64(15),
  731. }},
  732. {{
  733. "kuiper_field_0": float64(15),
  734. }},
  735. {{
  736. "kuiper_field_0": float64(10),
  737. }},
  738. {{}},
  739. },
  740. M: map[string]interface{}{
  741. "op_2_project_0_exceptions_total": int64(1),
  742. "op_2_project_0_process_latency_us": int64(0),
  743. "op_2_project_0_records_in_total": int64(5),
  744. "op_2_project_0_records_out_total": int64(4),
  745. "sink_mockSink_0_exceptions_total": int64(0),
  746. "sink_mockSink_0_records_in_total": int64(4),
  747. "sink_mockSink_0_records_out_total": int64(4),
  748. "source_ldemo_0_exceptions_total": int64(0),
  749. "source_ldemo_0_records_in_total": int64(5),
  750. "source_ldemo_0_records_out_total": int64(5),
  751. },
  752. },
  753. }
  754. HandleStream(true, streamList, t)
  755. DoRuleTest(t, tests, 0, &api.RuleOption{
  756. BufferLength: 100,
  757. SendError: false,
  758. }, 0)
  759. }
  760. func TestSingleSQLTemplate(t *testing.T) {
  761. //Reset
  762. streamList := []string{"demo"}
  763. HandleStream(false, streamList, t)
  764. //Data setup
  765. var tests = []RuleTest{
  766. {
  767. Name: `TestSingleSQLTemplateRule1`,
  768. Sql: `SELECT * FROM demo`,
  769. R: []map[string]interface{}{
  770. {
  771. "c": "red",
  772. "wrapper": "w1",
  773. },
  774. {
  775. "c": "blue",
  776. "wrapper": "w1",
  777. },
  778. {
  779. "c": "blue",
  780. "wrapper": "w1",
  781. },
  782. {
  783. "c": "yellow",
  784. "wrapper": "w1",
  785. },
  786. {
  787. "c": "red",
  788. "wrapper": "w1",
  789. },
  790. },
  791. M: map[string]interface{}{
  792. "op_2_project_0_exceptions_total": int64(0),
  793. "op_2_project_0_process_latency_us": int64(0),
  794. "op_2_project_0_records_in_total": int64(5),
  795. "op_2_project_0_records_out_total": int64(5),
  796. "sink_mockSink_0_exceptions_total": int64(0),
  797. "sink_mockSink_0_records_in_total": int64(5),
  798. "sink_mockSink_0_records_out_total": int64(5),
  799. "source_demo_0_exceptions_total": int64(0),
  800. "source_demo_0_records_in_total": int64(5),
  801. "source_demo_0_records_out_total": int64(5),
  802. },
  803. },
  804. }
  805. HandleStream(true, streamList, t)
  806. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  807. BufferLength: 100,
  808. SendError: true,
  809. }, 0, map[string]interface{}{
  810. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  811. "sendSingle": true,
  812. }, func(result [][]byte) interface{} {
  813. var maps []map[string]interface{}
  814. for _, v := range result {
  815. var mapRes map[string]interface{}
  816. err := json.Unmarshal(v, &mapRes)
  817. if err != nil {
  818. t.Errorf("Failed to parse the input into map")
  819. continue
  820. }
  821. maps = append(maps, mapRes)
  822. }
  823. return maps
  824. })
  825. }
  826. func TestNoneSingleSQLTemplate(t *testing.T) {
  827. //Reset
  828. streamList := []string{"demo"}
  829. HandleStream(false, streamList, t)
  830. //Data setup
  831. var tests = []RuleTest{
  832. {
  833. Name: `TestNoneSingleSQLTemplateRule1`,
  834. Sql: `SELECT * FROM demo`,
  835. R: [][]byte{
  836. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  837. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  838. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  839. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  840. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  841. },
  842. M: map[string]interface{}{
  843. "op_2_project_0_exceptions_total": int64(0),
  844. "op_2_project_0_process_latency_us": int64(0),
  845. "op_2_project_0_records_in_total": int64(5),
  846. "op_2_project_0_records_out_total": int64(5),
  847. "sink_mockSink_0_exceptions_total": int64(0),
  848. "sink_mockSink_0_records_in_total": int64(5),
  849. "sink_mockSink_0_records_out_total": int64(5),
  850. "source_demo_0_exceptions_total": int64(0),
  851. "source_demo_0_records_in_total": int64(5),
  852. "source_demo_0_records_out_total": int64(5),
  853. },
  854. },
  855. }
  856. HandleStream(true, streamList, t)
  857. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  858. BufferLength: 100,
  859. SendError: true,
  860. }, 0, map[string]interface{}{
  861. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  862. }, func(result [][]byte) interface{} {
  863. return result
  864. })
  865. }
  866. func TestSingleSQLForBinary(t *testing.T) {
  867. //Reset
  868. streamList := []string{"binDemo"}
  869. HandleStream(false, streamList, t)
  870. //Data setup
  871. var tests = []RuleTest{
  872. {
  873. Name: `TestSingleSQLRule1`,
  874. Sql: `SELECT * FROM binDemo`,
  875. R: [][]map[string]interface{}{
  876. {{
  877. "self": mocknode.Image,
  878. }},
  879. },
  880. W: 50,
  881. M: map[string]interface{}{
  882. "op_2_project_0_exceptions_total": int64(0),
  883. "op_2_project_0_process_latency_us": int64(0),
  884. "op_2_project_0_records_in_total": int64(1),
  885. "op_2_project_0_records_out_total": int64(1),
  886. "sink_mockSink_0_exceptions_total": int64(0),
  887. "sink_mockSink_0_records_in_total": int64(1),
  888. "sink_mockSink_0_records_out_total": int64(1),
  889. "source_binDemo_0_exceptions_total": int64(0),
  890. "source_binDemo_0_records_in_total": int64(1),
  891. "source_binDemo_0_records_out_total": int64(1),
  892. },
  893. },
  894. }
  895. HandleStream(true, streamList, t)
  896. options := []*api.RuleOption{
  897. {
  898. BufferLength: 100,
  899. SendError: true,
  900. }, {
  901. BufferLength: 100,
  902. SendError: true,
  903. Qos: api.AtLeastOnce,
  904. CheckpointInterval: 5000,
  905. }, {
  906. BufferLength: 100,
  907. SendError: true,
  908. Qos: api.ExactlyOnce,
  909. CheckpointInterval: 5000,
  910. },
  911. }
  912. byteFunc := func(result [][]byte) interface{} {
  913. var maps [][]map[string]interface{}
  914. for _, v := range result {
  915. var mapRes []map[string][]byte
  916. err := json.Unmarshal(v, &mapRes)
  917. if err != nil {
  918. panic("Failed to parse the input into map")
  919. }
  920. mapInt := make([]map[string]interface{}, len(mapRes))
  921. for i, mv := range mapRes {
  922. mapInt[i] = make(map[string]interface{})
  923. //assume only one key
  924. for k, v := range mv {
  925. mapInt[i][k] = v
  926. }
  927. }
  928. maps = append(maps, mapInt)
  929. }
  930. return maps
  931. }
  932. for j, opt := range options {
  933. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  934. }
  935. }