rule_test.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "encoding/json"
  17. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "testing"
  20. )
  21. func TestSingleSQL(t *testing.T) {
  22. //Reset
  23. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable"}
  24. HandleStream(false, streamList, t)
  25. //Data setup
  26. var tests = []RuleTest{
  27. {
  28. Name: `TestSingleSQLRule1`,
  29. Sql: `SELECT * FROM demo`,
  30. R: [][]map[string]interface{}{
  31. {{
  32. "color": "red",
  33. "size": float64(3),
  34. "ts": float64(1541152486013),
  35. }},
  36. {{
  37. "color": "blue",
  38. "size": float64(6),
  39. "ts": float64(1541152486822),
  40. }},
  41. {{
  42. "color": "blue",
  43. "size": float64(2),
  44. "ts": float64(1541152487632),
  45. }},
  46. {{
  47. "color": "yellow",
  48. "size": float64(4),
  49. "ts": float64(1541152488442),
  50. }},
  51. {{
  52. "color": "red",
  53. "size": float64(1),
  54. "ts": float64(1541152489252),
  55. }},
  56. },
  57. M: map[string]interface{}{
  58. "op_2_project_0_exceptions_total": int64(0),
  59. "op_2_project_0_process_latency_us": int64(0),
  60. "op_2_project_0_records_in_total": int64(5),
  61. "op_2_project_0_records_out_total": int64(5),
  62. "sink_mockSink_0_exceptions_total": int64(0),
  63. "sink_mockSink_0_records_in_total": int64(5),
  64. "sink_mockSink_0_records_out_total": int64(5),
  65. "source_demo_0_exceptions_total": int64(0),
  66. "source_demo_0_records_in_total": int64(5),
  67. "source_demo_0_records_out_total": int64(5),
  68. },
  69. T: &api.PrintableTopo{
  70. Sources: []string{"source_demo"},
  71. Edges: map[string][]string{
  72. "source_demo": {"op_2_project"},
  73. "op_2_project": {"sink_mockSink"},
  74. },
  75. },
  76. }, {
  77. Name: `TestSingleSQLRule2`,
  78. Sql: `SELECT color, ts FROM demo where size > 3`,
  79. R: [][]map[string]interface{}{
  80. {{
  81. "color": "blue",
  82. "ts": float64(1541152486822),
  83. }},
  84. {{
  85. "color": "yellow",
  86. "ts": float64(1541152488442),
  87. }},
  88. },
  89. M: map[string]interface{}{
  90. "op_3_project_0_exceptions_total": int64(0),
  91. "op_3_project_0_process_latency_us": int64(0),
  92. "op_3_project_0_records_in_total": int64(2),
  93. "op_3_project_0_records_out_total": int64(2),
  94. "sink_mockSink_0_exceptions_total": int64(0),
  95. "sink_mockSink_0_records_in_total": int64(2),
  96. "sink_mockSink_0_records_out_total": int64(2),
  97. "source_demo_0_exceptions_total": int64(0),
  98. "source_demo_0_records_in_total": int64(5),
  99. "source_demo_0_records_out_total": int64(5),
  100. "op_2_filter_0_exceptions_total": int64(0),
  101. "op_2_filter_0_process_latency_us": int64(0),
  102. "op_2_filter_0_records_in_total": int64(5),
  103. "op_2_filter_0_records_out_total": int64(2),
  104. },
  105. }, {
  106. Name: `TestSingleSQLRule3`,
  107. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  108. R: [][]map[string]interface{}{
  109. {{
  110. "Int8": float64(6),
  111. "ts": float64(1541152486822),
  112. }},
  113. {{
  114. "Int8": float64(4),
  115. "ts": float64(1541152488442),
  116. }},
  117. },
  118. M: map[string]interface{}{
  119. "op_3_project_0_exceptions_total": int64(0),
  120. "op_3_project_0_process_latency_us": int64(0),
  121. "op_3_project_0_records_in_total": int64(2),
  122. "op_3_project_0_records_out_total": int64(2),
  123. "sink_mockSink_0_exceptions_total": int64(0),
  124. "sink_mockSink_0_records_in_total": int64(2),
  125. "sink_mockSink_0_records_out_total": int64(2),
  126. "source_demo_0_exceptions_total": int64(0),
  127. "source_demo_0_records_in_total": int64(5),
  128. "source_demo_0_records_out_total": int64(5),
  129. "op_2_filter_0_exceptions_total": int64(0),
  130. "op_2_filter_0_process_latency_us": int64(0),
  131. "op_2_filter_0_records_in_total": int64(5),
  132. "op_2_filter_0_records_out_total": int64(2),
  133. },
  134. }, {
  135. Name: `TestSingleSQLRule4`,
  136. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  137. R: [][]map[string]interface{}{
  138. {{
  139. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(red)",
  140. }},
  141. {{
  142. "Int8": float64(6),
  143. "ts": float64(1541152486822),
  144. }},
  145. {{
  146. "Int8": float64(4),
  147. "ts": float64(1541152488442),
  148. }},
  149. {{
  150. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  151. }},
  152. },
  153. M: map[string]interface{}{
  154. "op_3_project_0_exceptions_total": int64(2),
  155. "op_3_project_0_process_latency_us": int64(0),
  156. "op_3_project_0_records_in_total": int64(4),
  157. "op_3_project_0_records_out_total": int64(2),
  158. "sink_mockSink_0_exceptions_total": int64(0),
  159. "sink_mockSink_0_records_in_total": int64(4),
  160. "sink_mockSink_0_records_out_total": int64(4),
  161. "source_demoError_0_exceptions_total": int64(2),
  162. "source_demoError_0_records_in_total": int64(5),
  163. "source_demoError_0_records_out_total": int64(5),
  164. "op_2_filter_0_exceptions_total": int64(2),
  165. "op_2_filter_0_process_latency_us": int64(0),
  166. "op_2_filter_0_records_in_total": int64(5),
  167. "op_2_filter_0_records_out_total": int64(2),
  168. },
  169. }, {
  170. Name: `TestSingleSQLRule5`,
  171. Sql: `SELECT meta(topic) as m, ts FROM demo`,
  172. R: [][]map[string]interface{}{
  173. {{
  174. "m": "mock",
  175. "ts": float64(1541152486013),
  176. }},
  177. {{
  178. "m": "mock",
  179. "ts": float64(1541152486822),
  180. }},
  181. {{
  182. "m": "mock",
  183. "ts": float64(1541152487632),
  184. }},
  185. {{
  186. "m": "mock",
  187. "ts": float64(1541152488442),
  188. }},
  189. {{
  190. "m": "mock",
  191. "ts": float64(1541152489252),
  192. }},
  193. },
  194. M: map[string]interface{}{
  195. "op_2_project_0_exceptions_total": int64(0),
  196. "op_2_project_0_process_latency_us": int64(0),
  197. "op_2_project_0_records_in_total": int64(5),
  198. "op_2_project_0_records_out_total": int64(5),
  199. "sink_mockSink_0_exceptions_total": int64(0),
  200. "sink_mockSink_0_records_in_total": int64(5),
  201. "sink_mockSink_0_records_out_total": int64(5),
  202. "source_demo_0_exceptions_total": int64(0),
  203. "source_demo_0_records_in_total": int64(5),
  204. "source_demo_0_records_out_total": int64(5),
  205. },
  206. }, {
  207. Name: `TestSingleSQLRule6`,
  208. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  209. R: [][]map[string]interface{}{
  210. {{
  211. "color": "blue",
  212. "ts": float64(1541152486822),
  213. }},
  214. {{
  215. "color": "yellow",
  216. "ts": float64(1541152488442),
  217. }},
  218. },
  219. M: map[string]interface{}{
  220. "op_3_project_0_exceptions_total": int64(0),
  221. "op_3_project_0_process_latency_us": int64(0),
  222. "op_3_project_0_records_in_total": int64(2),
  223. "op_3_project_0_records_out_total": int64(2),
  224. "sink_mockSink_0_exceptions_total": int64(0),
  225. "sink_mockSink_0_records_in_total": int64(2),
  226. "sink_mockSink_0_records_out_total": int64(2),
  227. "source_demo_0_exceptions_total": int64(0),
  228. "source_demo_0_records_in_total": int64(5),
  229. "source_demo_0_records_out_total": int64(5),
  230. "op_2_filter_0_exceptions_total": int64(0),
  231. "op_2_filter_0_process_latency_us": int64(0),
  232. "op_2_filter_0_records_in_total": int64(5),
  233. "op_2_filter_0_records_out_total": int64(2),
  234. },
  235. }, {
  236. Name: `TestSingleSQLRule7`,
  237. Sql: "SELECT `from` FROM demo1",
  238. R: [][]map[string]interface{}{
  239. {{
  240. "from": "device1",
  241. }},
  242. {{
  243. "from": "device2",
  244. }},
  245. {{
  246. "from": "device3",
  247. }},
  248. {{
  249. "from": "device1",
  250. }},
  251. {{
  252. "from": "device3",
  253. }},
  254. },
  255. M: map[string]interface{}{
  256. "op_2_project_0_exceptions_total": int64(0),
  257. "op_2_project_0_process_latency_us": int64(0),
  258. "op_2_project_0_records_in_total": int64(5),
  259. "op_2_project_0_records_out_total": int64(5),
  260. "sink_mockSink_0_exceptions_total": int64(0),
  261. "sink_mockSink_0_records_in_total": int64(5),
  262. "sink_mockSink_0_records_out_total": int64(5),
  263. "source_demo1_0_exceptions_total": int64(0),
  264. "source_demo1_0_records_in_total": int64(5),
  265. "source_demo1_0_records_out_total": int64(5),
  266. },
  267. }, {
  268. Name: `TestSingleSQLRule8`,
  269. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  270. R: [][]map[string]interface{}{
  271. {{
  272. "temp": float64(25.5),
  273. "hum": float64(65),
  274. "from": "device1",
  275. "ts": float64(1541152486013),
  276. }},
  277. {{
  278. "temp": float64(27.4),
  279. "hum": float64(80),
  280. "from": "device1",
  281. "ts": float64(1541152488442),
  282. }},
  283. },
  284. M: map[string]interface{}{
  285. "op_3_project_0_exceptions_total": int64(0),
  286. "op_3_project_0_process_latency_us": int64(0),
  287. "op_3_project_0_records_in_total": int64(2),
  288. "op_3_project_0_records_out_total": int64(2),
  289. "op_2_filter_0_exceptions_total": int64(0),
  290. "op_2_filter_0_process_latency_us": int64(0),
  291. "op_2_filter_0_records_in_total": int64(5),
  292. "op_2_filter_0_records_out_total": int64(2),
  293. "sink_mockSink_0_exceptions_total": int64(0),
  294. "sink_mockSink_0_records_in_total": int64(2),
  295. "sink_mockSink_0_records_out_total": int64(2),
  296. "source_demo1_0_exceptions_total": int64(0),
  297. "source_demo1_0_records_in_total": int64(5),
  298. "source_demo1_0_records_out_total": int64(5),
  299. },
  300. }, {
  301. Name: `TestSingleSQLRule9`,
  302. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  303. R: [][]map[string]interface{}{
  304. {{
  305. "color": "red",
  306. "s": "M",
  307. "ts": float64(1541152486013),
  308. }},
  309. {{
  310. "color": "blue",
  311. "s": "L",
  312. "ts": float64(1541152486822),
  313. }},
  314. {{
  315. "color": "blue",
  316. "s": "M",
  317. "ts": float64(1541152487632),
  318. }},
  319. {{
  320. "color": "yellow",
  321. "s": "L",
  322. "ts": float64(1541152488442),
  323. }},
  324. {{
  325. "color": "red",
  326. "s": "S",
  327. "ts": float64(1541152489252),
  328. }},
  329. },
  330. M: map[string]interface{}{
  331. "op_2_project_0_exceptions_total": int64(0),
  332. "op_2_project_0_process_latency_us": int64(0),
  333. "op_2_project_0_records_in_total": int64(5),
  334. "op_2_project_0_records_out_total": int64(5),
  335. "sink_mockSink_0_exceptions_total": int64(0),
  336. "sink_mockSink_0_records_in_total": int64(5),
  337. "sink_mockSink_0_records_out_total": int64(5),
  338. "source_demo_0_exceptions_total": int64(0),
  339. "source_demo_0_records_in_total": int64(5),
  340. "source_demo_0_records_out_total": int64(5),
  341. },
  342. T: &api.PrintableTopo{
  343. Sources: []string{"source_demo"},
  344. Edges: map[string][]string{
  345. "source_demo": {"op_2_project"},
  346. "op_2_project": {"sink_mockSink"},
  347. },
  348. },
  349. }, {
  350. Name: `TestSingleSQLRule10`,
  351. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  352. R: [][]map[string]interface{}{
  353. {{
  354. "id": float64(1541152486013),
  355. "name": "name1",
  356. "color": "red",
  357. "size": float64(3),
  358. "ts": float64(1541152486013),
  359. }},
  360. {{
  361. "id": float64(1541152487632),
  362. "name": "name2",
  363. "color": "blue",
  364. "size": float64(2),
  365. "ts": float64(1541152487632),
  366. }},
  367. {{
  368. "id": float64(1541152489252),
  369. "name": "name3",
  370. "color": "red",
  371. "size": float64(1),
  372. "ts": float64(1541152489252),
  373. }},
  374. },
  375. W: 15,
  376. M: map[string]interface{}{
  377. "op_3_join_aligner_0_records_in_total": int64(6),
  378. "op_3_join_aligner_0_records_out_total": int64(5),
  379. "op_4_join_0_exceptions_total": int64(0),
  380. "op_4_join_0_records_in_total": int64(5),
  381. "op_4_join_0_records_out_total": int64(3),
  382. "op_5_project_0_exceptions_total": int64(0),
  383. "op_5_project_0_records_in_total": int64(3),
  384. "op_5_project_0_records_out_total": int64(3),
  385. "sink_mockSink_0_exceptions_total": int64(0),
  386. "sink_mockSink_0_records_in_total": int64(3),
  387. "sink_mockSink_0_records_out_total": int64(3),
  388. "source_demo_0_exceptions_total": int64(0),
  389. "source_demo_0_records_in_total": int64(5),
  390. "source_demo_0_records_out_total": int64(5),
  391. "source_table1_0_exceptions_total": int64(0),
  392. "source_table1_0_records_in_total": int64(4),
  393. "source_table1_0_records_out_total": int64(1),
  394. },
  395. }, {
  396. Name: `TestSingleSQLRule11`,
  397. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  398. R: [][]map[string]interface{}{
  399. {{
  400. "device": "device2",
  401. }},
  402. {{
  403. "device": "device4",
  404. }},
  405. {{
  406. "device": "device5",
  407. }},
  408. },
  409. M: map[string]interface{}{
  410. "op_3_join_aligner_0_records_in_total": int64(10),
  411. "op_3_join_aligner_0_records_out_total": int64(5),
  412. "op_4_join_0_exceptions_total": int64(0),
  413. "op_4_join_0_records_in_total": int64(5),
  414. "op_4_join_0_records_out_total": int64(3),
  415. "op_5_project_0_exceptions_total": int64(0),
  416. "op_5_project_0_records_in_total": int64(3),
  417. "op_5_project_0_records_out_total": int64(3),
  418. "sink_mockSink_0_exceptions_total": int64(0),
  419. "sink_mockSink_0_records_in_total": int64(3),
  420. "sink_mockSink_0_records_out_total": int64(3),
  421. "source_demo_0_exceptions_total": int64(0),
  422. "source_demo_0_records_in_total": int64(5),
  423. "source_demo_0_records_out_total": int64(5),
  424. "source_demoTable_0_exceptions_total": int64(0),
  425. "source_demoTable_0_records_in_total": int64(5),
  426. "source_demoTable_0_records_out_total": int64(5),
  427. },
  428. }, {
  429. Name: `TestSingleSQLRule12`,
  430. Sql: "SELECT demo.ts as demoTs, table1.id as table1Id FROM demo INNER JOIN table1 on demoTs = table1Id",
  431. R: [][]map[string]interface{}{
  432. {{
  433. "table1Id": float64(1541152486013),
  434. "demoTs": float64(1541152486013),
  435. }},
  436. {{
  437. "table1Id": float64(1541152487632),
  438. "demoTs": float64(1541152487632),
  439. }},
  440. {{
  441. "table1Id": float64(1541152489252),
  442. "demoTs": float64(1541152489252),
  443. }},
  444. },
  445. W: 15,
  446. M: map[string]interface{}{
  447. "op_3_join_aligner_0_records_in_total": int64(6),
  448. "op_3_join_aligner_0_records_out_total": int64(5),
  449. "op_4_join_0_exceptions_total": int64(0),
  450. "op_4_join_0_records_in_total": int64(5),
  451. "op_4_join_0_records_out_total": int64(3),
  452. "op_5_project_0_exceptions_total": int64(0),
  453. "op_5_project_0_records_in_total": int64(3),
  454. "op_5_project_0_records_out_total": int64(3),
  455. "sink_mockSink_0_exceptions_total": int64(0),
  456. "sink_mockSink_0_records_in_total": int64(3),
  457. "sink_mockSink_0_records_out_total": int64(3),
  458. "source_demo_0_exceptions_total": int64(0),
  459. "source_demo_0_records_in_total": int64(5),
  460. "source_demo_0_records_out_total": int64(5),
  461. "source_table1_0_exceptions_total": int64(0),
  462. "source_table1_0_records_in_total": int64(4),
  463. "source_table1_0_records_out_total": int64(1),
  464. },
  465. }, {
  466. Name: `TestChanged13`,
  467. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demo",
  468. R: [][]map[string]interface{}{
  469. {{
  470. "tt_color": "red",
  471. "tt_size": float64(3),
  472. }},
  473. {{
  474. "tt_color": "blue",
  475. "tt_size": float64(6),
  476. }},
  477. {{
  478. "tt_size": float64(2),
  479. }},
  480. {{
  481. "tt_color": "yellow",
  482. "tt_size": float64(4),
  483. }},
  484. {{
  485. "tt_color": "red",
  486. "tt_size": float64(1),
  487. }},
  488. },
  489. M: map[string]interface{}{
  490. "op_2_project_0_exceptions_total": int64(0),
  491. "op_2_project_0_process_latency_us": int64(0),
  492. "op_2_project_0_records_in_total": int64(5),
  493. "op_2_project_0_records_out_total": int64(5),
  494. "sink_mockSink_0_exceptions_total": int64(0),
  495. "sink_mockSink_0_records_in_total": int64(5),
  496. "sink_mockSink_0_records_out_total": int64(5),
  497. "source_demo_0_exceptions_total": int64(0),
  498. "source_demo_0_records_in_total": int64(5),
  499. "source_demo_0_records_out_total": int64(5),
  500. },
  501. }, {
  502. Name: `TestAliasOrderBy14`,
  503. Sql: "SELECT color, count(*) as c FROM demo where color != \"red\" GROUP BY COUNTWINDOW(5), color Order by c DESC",
  504. R: [][]map[string]interface{}{
  505. {{
  506. "color": "blue",
  507. "c": float64(2),
  508. },
  509. {
  510. "color": "yellow",
  511. "c": float64(1),
  512. },
  513. },
  514. },
  515. M: map[string]interface{}{
  516. "op_6_project_0_exceptions_total": int64(0),
  517. "op_6_project_0_process_latency_us": int64(0),
  518. "op_6_project_0_records_in_total": int64(1),
  519. "op_6_project_0_records_out_total": int64(1),
  520. "sink_mockSink_0_exceptions_total": int64(0),
  521. "sink_mockSink_0_records_in_total": int64(1),
  522. "sink_mockSink_0_records_out_total": int64(1),
  523. "source_demo_0_exceptions_total": int64(0),
  524. "source_demo_0_records_in_total": int64(5),
  525. "source_demo_0_records_out_total": int64(5),
  526. },
  527. },
  528. }
  529. HandleStream(true, streamList, t)
  530. options := []*api.RuleOption{
  531. {
  532. BufferLength: 100,
  533. SendError: true,
  534. }, {
  535. BufferLength: 100,
  536. SendError: true,
  537. Qos: api.AtLeastOnce,
  538. CheckpointInterval: 5000,
  539. }, {
  540. BufferLength: 100,
  541. SendError: true,
  542. Qos: api.ExactlyOnce,
  543. CheckpointInterval: 5000,
  544. },
  545. }
  546. for j, opt := range options {
  547. DoRuleTest(t, tests, j, opt, 0)
  548. }
  549. }
  550. func TestSingleSQLError(t *testing.T) {
  551. //Reset
  552. streamList := []string{"ldemo"}
  553. HandleStream(false, streamList, t)
  554. //Data setup
  555. var tests = []RuleTest{
  556. {
  557. Name: `TestSingleSQLErrorRule1`,
  558. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  559. R: [][]map[string]interface{}{
  560. {{
  561. "color": "red",
  562. "ts": float64(1541152486013),
  563. }},
  564. {{
  565. "error": "run Where error: invalid operation string(string) >= int64(3)",
  566. }},
  567. {{
  568. "ts": float64(1541152487632),
  569. }},
  570. },
  571. M: map[string]interface{}{
  572. "op_3_project_0_exceptions_total": int64(1),
  573. "op_3_project_0_process_latency_us": int64(0),
  574. "op_3_project_0_records_in_total": int64(3),
  575. "op_3_project_0_records_out_total": int64(2),
  576. "sink_mockSink_0_exceptions_total": int64(0),
  577. "sink_mockSink_0_records_in_total": int64(3),
  578. "sink_mockSink_0_records_out_total": int64(3),
  579. "source_ldemo_0_exceptions_total": int64(0),
  580. "source_ldemo_0_records_in_total": int64(5),
  581. "source_ldemo_0_records_out_total": int64(5),
  582. "op_2_filter_0_exceptions_total": int64(1),
  583. "op_2_filter_0_process_latency_us": int64(0),
  584. "op_2_filter_0_records_in_total": int64(5),
  585. "op_2_filter_0_records_out_total": int64(2),
  586. },
  587. }, {
  588. Name: `TestSingleSQLErrorRule2`,
  589. Sql: `SELECT size * 5 FROM ldemo`,
  590. R: [][]map[string]interface{}{
  591. {{
  592. "kuiper_field_0": float64(15),
  593. }},
  594. {{
  595. "error": "run Select error: invalid operation string(string) * int64(5)",
  596. }},
  597. {{
  598. "kuiper_field_0": float64(15),
  599. }},
  600. {{
  601. "kuiper_field_0": float64(10),
  602. }},
  603. {{}},
  604. },
  605. M: map[string]interface{}{
  606. "op_2_project_0_exceptions_total": int64(1),
  607. "op_2_project_0_process_latency_us": int64(0),
  608. "op_2_project_0_records_in_total": int64(5),
  609. "op_2_project_0_records_out_total": int64(4),
  610. "sink_mockSink_0_exceptions_total": int64(0),
  611. "sink_mockSink_0_records_in_total": int64(5),
  612. "sink_mockSink_0_records_out_total": int64(5),
  613. "source_ldemo_0_exceptions_total": int64(0),
  614. "source_ldemo_0_records_in_total": int64(5),
  615. "source_ldemo_0_records_out_total": int64(5),
  616. },
  617. },
  618. }
  619. HandleStream(true, streamList, t)
  620. DoRuleTest(t, tests, 0, &api.RuleOption{
  621. BufferLength: 100,
  622. SendError: true,
  623. }, 0)
  624. }
  625. func TestSingleSQLOmitError(t *testing.T) {
  626. //Reset
  627. streamList := []string{"ldemo"}
  628. HandleStream(false, streamList, t)
  629. //Data setup
  630. var tests = []RuleTest{
  631. {
  632. Name: `TestSingleSQLErrorRule1`,
  633. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  634. R: [][]map[string]interface{}{
  635. {{
  636. "color": "red",
  637. "ts": float64(1541152486013),
  638. }},
  639. {{
  640. "ts": float64(1541152487632),
  641. }},
  642. },
  643. M: map[string]interface{}{
  644. "op_3_project_0_exceptions_total": int64(0),
  645. "op_3_project_0_process_latency_us": int64(0),
  646. "op_3_project_0_records_in_total": int64(2),
  647. "op_3_project_0_records_out_total": int64(2),
  648. "sink_mockSink_0_exceptions_total": int64(0),
  649. "sink_mockSink_0_records_in_total": int64(2),
  650. "sink_mockSink_0_records_out_total": int64(2),
  651. "source_ldemo_0_exceptions_total": int64(0),
  652. "source_ldemo_0_records_in_total": int64(5),
  653. "source_ldemo_0_records_out_total": int64(5),
  654. "op_2_filter_0_exceptions_total": int64(1),
  655. "op_2_filter_0_process_latency_us": int64(0),
  656. "op_2_filter_0_records_in_total": int64(5),
  657. "op_2_filter_0_records_out_total": int64(2),
  658. },
  659. }, {
  660. Name: `TestSingleSQLErrorRule2`,
  661. Sql: `SELECT size * 5 FROM ldemo`,
  662. R: [][]map[string]interface{}{
  663. {{
  664. "kuiper_field_0": float64(15),
  665. }},
  666. {{
  667. "kuiper_field_0": float64(15),
  668. }},
  669. {{
  670. "kuiper_field_0": float64(10),
  671. }},
  672. {{}},
  673. },
  674. M: map[string]interface{}{
  675. "op_2_project_0_exceptions_total": int64(1),
  676. "op_2_project_0_process_latency_us": int64(0),
  677. "op_2_project_0_records_in_total": int64(5),
  678. "op_2_project_0_records_out_total": int64(4),
  679. "sink_mockSink_0_exceptions_total": int64(0),
  680. "sink_mockSink_0_records_in_total": int64(4),
  681. "sink_mockSink_0_records_out_total": int64(4),
  682. "source_ldemo_0_exceptions_total": int64(0),
  683. "source_ldemo_0_records_in_total": int64(5),
  684. "source_ldemo_0_records_out_total": int64(5),
  685. },
  686. },
  687. }
  688. HandleStream(true, streamList, t)
  689. DoRuleTest(t, tests, 0, &api.RuleOption{
  690. BufferLength: 100,
  691. SendError: false,
  692. }, 0)
  693. }
  694. func TestSingleSQLTemplate(t *testing.T) {
  695. //Reset
  696. streamList := []string{"demo"}
  697. HandleStream(false, streamList, t)
  698. //Data setup
  699. var tests = []RuleTest{
  700. {
  701. Name: `TestSingleSQLTemplateRule1`,
  702. Sql: `SELECT * FROM demo`,
  703. R: []map[string]interface{}{
  704. {
  705. "c": "red",
  706. "wrapper": "w1",
  707. },
  708. {
  709. "c": "blue",
  710. "wrapper": "w1",
  711. },
  712. {
  713. "c": "blue",
  714. "wrapper": "w1",
  715. },
  716. {
  717. "c": "yellow",
  718. "wrapper": "w1",
  719. },
  720. {
  721. "c": "red",
  722. "wrapper": "w1",
  723. },
  724. },
  725. M: map[string]interface{}{
  726. "op_2_project_0_exceptions_total": int64(0),
  727. "op_2_project_0_process_latency_us": int64(0),
  728. "op_2_project_0_records_in_total": int64(5),
  729. "op_2_project_0_records_out_total": int64(5),
  730. "sink_mockSink_0_exceptions_total": int64(0),
  731. "sink_mockSink_0_records_in_total": int64(5),
  732. "sink_mockSink_0_records_out_total": int64(5),
  733. "source_demo_0_exceptions_total": int64(0),
  734. "source_demo_0_records_in_total": int64(5),
  735. "source_demo_0_records_out_total": int64(5),
  736. },
  737. },
  738. }
  739. HandleStream(true, streamList, t)
  740. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  741. BufferLength: 100,
  742. SendError: true,
  743. }, 0, map[string]interface{}{
  744. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  745. "sendSingle": true,
  746. }, func(result [][]byte) interface{} {
  747. var maps []map[string]interface{}
  748. for _, v := range result {
  749. var mapRes map[string]interface{}
  750. err := json.Unmarshal(v, &mapRes)
  751. if err != nil {
  752. t.Errorf("Failed to parse the input into map")
  753. continue
  754. }
  755. maps = append(maps, mapRes)
  756. }
  757. return maps
  758. })
  759. }
  760. func TestNoneSingleSQLTemplate(t *testing.T) {
  761. //Reset
  762. streamList := []string{"demo"}
  763. HandleStream(false, streamList, t)
  764. //Data setup
  765. var tests = []RuleTest{
  766. {
  767. Name: `TestNoneSingleSQLTemplateRule1`,
  768. Sql: `SELECT * FROM demo`,
  769. R: [][]byte{
  770. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  771. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  772. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  773. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  774. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  775. },
  776. M: map[string]interface{}{
  777. "op_2_project_0_exceptions_total": int64(0),
  778. "op_2_project_0_process_latency_us": int64(0),
  779. "op_2_project_0_records_in_total": int64(5),
  780. "op_2_project_0_records_out_total": int64(5),
  781. "sink_mockSink_0_exceptions_total": int64(0),
  782. "sink_mockSink_0_records_in_total": int64(5),
  783. "sink_mockSink_0_records_out_total": int64(5),
  784. "source_demo_0_exceptions_total": int64(0),
  785. "source_demo_0_records_in_total": int64(5),
  786. "source_demo_0_records_out_total": int64(5),
  787. },
  788. },
  789. }
  790. HandleStream(true, streamList, t)
  791. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  792. BufferLength: 100,
  793. SendError: true,
  794. }, 0, map[string]interface{}{
  795. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  796. }, func(result [][]byte) interface{} {
  797. return result
  798. })
  799. }
  800. func TestSingleSQLForBinary(t *testing.T) {
  801. //Reset
  802. streamList := []string{"binDemo"}
  803. HandleStream(false, streamList, t)
  804. //Data setup
  805. var tests = []RuleTest{
  806. {
  807. Name: `TestSingleSQLRule1`,
  808. Sql: `SELECT * FROM binDemo`,
  809. R: [][]map[string]interface{}{
  810. {{
  811. "self": mocknode.Image,
  812. }},
  813. },
  814. W: 50,
  815. M: map[string]interface{}{
  816. "op_2_project_0_exceptions_total": int64(0),
  817. "op_2_project_0_process_latency_us": int64(0),
  818. "op_2_project_0_records_in_total": int64(1),
  819. "op_2_project_0_records_out_total": int64(1),
  820. "sink_mockSink_0_exceptions_total": int64(0),
  821. "sink_mockSink_0_records_in_total": int64(1),
  822. "sink_mockSink_0_records_out_total": int64(1),
  823. "source_binDemo_0_exceptions_total": int64(0),
  824. "source_binDemo_0_records_in_total": int64(1),
  825. "source_binDemo_0_records_out_total": int64(1),
  826. },
  827. },
  828. }
  829. HandleStream(true, streamList, t)
  830. options := []*api.RuleOption{
  831. {
  832. BufferLength: 100,
  833. SendError: true,
  834. }, {
  835. BufferLength: 100,
  836. SendError: true,
  837. Qos: api.AtLeastOnce,
  838. CheckpointInterval: 5000,
  839. }, {
  840. BufferLength: 100,
  841. SendError: true,
  842. Qos: api.ExactlyOnce,
  843. CheckpointInterval: 5000,
  844. },
  845. }
  846. byteFunc := func(result [][]byte) interface{} {
  847. var maps [][]map[string]interface{}
  848. for _, v := range result {
  849. var mapRes []map[string][]byte
  850. err := json.Unmarshal(v, &mapRes)
  851. if err != nil {
  852. panic("Failed to parse the input into map")
  853. }
  854. mapInt := make([]map[string]interface{}, len(mapRes))
  855. for i, mv := range mapRes {
  856. mapInt[i] = make(map[string]interface{})
  857. //assume only one key
  858. for k, v := range mv {
  859. mapInt[i][k] = v
  860. }
  861. }
  862. maps = append(maps, mapInt)
  863. }
  864. return maps
  865. }
  866. for j, opt := range options {
  867. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  868. }
  869. }