rule_test.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "encoding/json"
  17. "github.com/lf-edge/ekuiper/internal/topo"
  18. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "testing"
  21. )
  22. func TestSingleSQL(t *testing.T) {
  23. //Reset
  24. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable"}
  25. HandleStream(false, streamList, t)
  26. //Data setup
  27. var tests = []RuleTest{
  28. {
  29. Name: `TestSingleSQLRule1`,
  30. Sql: `SELECT * FROM demo`,
  31. R: [][]map[string]interface{}{
  32. {{
  33. "color": "red",
  34. "size": float64(3),
  35. "ts": float64(1541152486013),
  36. }},
  37. {{
  38. "color": "blue",
  39. "size": float64(6),
  40. "ts": float64(1541152486822),
  41. }},
  42. {{
  43. "color": "blue",
  44. "size": float64(2),
  45. "ts": float64(1541152487632),
  46. }},
  47. {{
  48. "color": "yellow",
  49. "size": float64(4),
  50. "ts": float64(1541152488442),
  51. }},
  52. {{
  53. "color": "red",
  54. "size": float64(1),
  55. "ts": float64(1541152489252),
  56. }},
  57. },
  58. M: map[string]interface{}{
  59. "op_2_project_0_exceptions_total": int64(0),
  60. "op_2_project_0_process_latency_us": int64(0),
  61. "op_2_project_0_records_in_total": int64(5),
  62. "op_2_project_0_records_out_total": int64(5),
  63. "sink_mockSink_0_exceptions_total": int64(0),
  64. "sink_mockSink_0_records_in_total": int64(5),
  65. "sink_mockSink_0_records_out_total": int64(5),
  66. "source_demo_0_exceptions_total": int64(0),
  67. "source_demo_0_records_in_total": int64(5),
  68. "source_demo_0_records_out_total": int64(5),
  69. },
  70. T: &topo.PrintableTopo{
  71. Sources: []string{"source_demo"},
  72. Edges: map[string][]string{
  73. "source_demo": {"op_2_project"},
  74. "op_2_project": {"sink_mockSink"},
  75. },
  76. },
  77. }, {
  78. Name: `TestSingleSQLRule2`,
  79. Sql: `SELECT color, ts FROM demo where size > 3`,
  80. R: [][]map[string]interface{}{
  81. {{
  82. "color": "blue",
  83. "ts": float64(1541152486822),
  84. }},
  85. {{
  86. "color": "yellow",
  87. "ts": float64(1541152488442),
  88. }},
  89. },
  90. M: map[string]interface{}{
  91. "op_3_project_0_exceptions_total": int64(0),
  92. "op_3_project_0_process_latency_us": int64(0),
  93. "op_3_project_0_records_in_total": int64(2),
  94. "op_3_project_0_records_out_total": int64(2),
  95. "sink_mockSink_0_exceptions_total": int64(0),
  96. "sink_mockSink_0_records_in_total": int64(2),
  97. "sink_mockSink_0_records_out_total": int64(2),
  98. "source_demo_0_exceptions_total": int64(0),
  99. "source_demo_0_records_in_total": int64(5),
  100. "source_demo_0_records_out_total": int64(5),
  101. "op_2_filter_0_exceptions_total": int64(0),
  102. "op_2_filter_0_process_latency_us": int64(0),
  103. "op_2_filter_0_records_in_total": int64(5),
  104. "op_2_filter_0_records_out_total": int64(2),
  105. },
  106. }, {
  107. Name: `TestSingleSQLRule3`,
  108. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  109. R: [][]map[string]interface{}{
  110. {{
  111. "Int8": float64(6),
  112. "ts": float64(1541152486822),
  113. }},
  114. {{
  115. "Int8": float64(4),
  116. "ts": float64(1541152488442),
  117. }},
  118. },
  119. M: map[string]interface{}{
  120. "op_3_project_0_exceptions_total": int64(0),
  121. "op_3_project_0_process_latency_us": int64(0),
  122. "op_3_project_0_records_in_total": int64(2),
  123. "op_3_project_0_records_out_total": int64(2),
  124. "sink_mockSink_0_exceptions_total": int64(0),
  125. "sink_mockSink_0_records_in_total": int64(2),
  126. "sink_mockSink_0_records_out_total": int64(2),
  127. "source_demo_0_exceptions_total": int64(0),
  128. "source_demo_0_records_in_total": int64(5),
  129. "source_demo_0_records_out_total": int64(5),
  130. "op_2_filter_0_exceptions_total": int64(0),
  131. "op_2_filter_0_process_latency_us": int64(0),
  132. "op_2_filter_0_records_in_total": int64(5),
  133. "op_2_filter_0_records_out_total": int64(2),
  134. },
  135. }, {
  136. Name: `TestSingleSQLRule4`,
  137. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  138. R: [][]map[string]interface{}{
  139. {{
  140. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(red)",
  141. }},
  142. {{
  143. "Int8": float64(6),
  144. "ts": float64(1541152486822),
  145. }},
  146. {{
  147. "Int8": float64(4),
  148. "ts": float64(1541152488442),
  149. }},
  150. {{
  151. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  152. }},
  153. },
  154. M: map[string]interface{}{
  155. "op_3_project_0_exceptions_total": int64(2),
  156. "op_3_project_0_process_latency_us": int64(0),
  157. "op_3_project_0_records_in_total": int64(4),
  158. "op_3_project_0_records_out_total": int64(2),
  159. "sink_mockSink_0_exceptions_total": int64(0),
  160. "sink_mockSink_0_records_in_total": int64(4),
  161. "sink_mockSink_0_records_out_total": int64(4),
  162. "source_demoError_0_exceptions_total": int64(2),
  163. "source_demoError_0_records_in_total": int64(5),
  164. "source_demoError_0_records_out_total": int64(5),
  165. "op_2_filter_0_exceptions_total": int64(2),
  166. "op_2_filter_0_process_latency_us": int64(0),
  167. "op_2_filter_0_records_in_total": int64(5),
  168. "op_2_filter_0_records_out_total": int64(2),
  169. },
  170. }, {
  171. Name: `TestSingleSQLRule5`,
  172. Sql: `SELECT meta(topic) as m, ts FROM demo`,
  173. R: [][]map[string]interface{}{
  174. {{
  175. "m": "mock",
  176. "ts": float64(1541152486013),
  177. }},
  178. {{
  179. "m": "mock",
  180. "ts": float64(1541152486822),
  181. }},
  182. {{
  183. "m": "mock",
  184. "ts": float64(1541152487632),
  185. }},
  186. {{
  187. "m": "mock",
  188. "ts": float64(1541152488442),
  189. }},
  190. {{
  191. "m": "mock",
  192. "ts": float64(1541152489252),
  193. }},
  194. },
  195. M: map[string]interface{}{
  196. "op_2_project_0_exceptions_total": int64(0),
  197. "op_2_project_0_process_latency_us": int64(0),
  198. "op_2_project_0_records_in_total": int64(5),
  199. "op_2_project_0_records_out_total": int64(5),
  200. "sink_mockSink_0_exceptions_total": int64(0),
  201. "sink_mockSink_0_records_in_total": int64(5),
  202. "sink_mockSink_0_records_out_total": int64(5),
  203. "source_demo_0_exceptions_total": int64(0),
  204. "source_demo_0_records_in_total": int64(5),
  205. "source_demo_0_records_out_total": int64(5),
  206. },
  207. }, {
  208. Name: `TestSingleSQLRule6`,
  209. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  210. R: [][]map[string]interface{}{
  211. {{
  212. "color": "blue",
  213. "ts": float64(1541152486822),
  214. }},
  215. {{
  216. "color": "yellow",
  217. "ts": float64(1541152488442),
  218. }},
  219. },
  220. M: map[string]interface{}{
  221. "op_3_project_0_exceptions_total": int64(0),
  222. "op_3_project_0_process_latency_us": int64(0),
  223. "op_3_project_0_records_in_total": int64(2),
  224. "op_3_project_0_records_out_total": int64(2),
  225. "sink_mockSink_0_exceptions_total": int64(0),
  226. "sink_mockSink_0_records_in_total": int64(2),
  227. "sink_mockSink_0_records_out_total": int64(2),
  228. "source_demo_0_exceptions_total": int64(0),
  229. "source_demo_0_records_in_total": int64(5),
  230. "source_demo_0_records_out_total": int64(5),
  231. "op_2_filter_0_exceptions_total": int64(0),
  232. "op_2_filter_0_process_latency_us": int64(0),
  233. "op_2_filter_0_records_in_total": int64(5),
  234. "op_2_filter_0_records_out_total": int64(2),
  235. },
  236. }, {
  237. Name: `TestSingleSQLRule7`,
  238. Sql: "SELECT `from` FROM demo1",
  239. R: [][]map[string]interface{}{
  240. {{
  241. "from": "device1",
  242. }},
  243. {{
  244. "from": "device2",
  245. }},
  246. {{
  247. "from": "device3",
  248. }},
  249. {{
  250. "from": "device1",
  251. }},
  252. {{
  253. "from": "device3",
  254. }},
  255. },
  256. M: map[string]interface{}{
  257. "op_2_project_0_exceptions_total": int64(0),
  258. "op_2_project_0_process_latency_us": int64(0),
  259. "op_2_project_0_records_in_total": int64(5),
  260. "op_2_project_0_records_out_total": int64(5),
  261. "sink_mockSink_0_exceptions_total": int64(0),
  262. "sink_mockSink_0_records_in_total": int64(5),
  263. "sink_mockSink_0_records_out_total": int64(5),
  264. "source_demo1_0_exceptions_total": int64(0),
  265. "source_demo1_0_records_in_total": int64(5),
  266. "source_demo1_0_records_out_total": int64(5),
  267. },
  268. }, {
  269. Name: `TestSingleSQLRule8`,
  270. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  271. R: [][]map[string]interface{}{
  272. {{
  273. "temp": float64(25.5),
  274. "hum": float64(65),
  275. "from": "device1",
  276. "ts": float64(1541152486013),
  277. }},
  278. {{
  279. "temp": float64(27.4),
  280. "hum": float64(80),
  281. "from": "device1",
  282. "ts": float64(1541152488442),
  283. }},
  284. },
  285. M: map[string]interface{}{
  286. "op_3_project_0_exceptions_total": int64(0),
  287. "op_3_project_0_process_latency_us": int64(0),
  288. "op_3_project_0_records_in_total": int64(2),
  289. "op_3_project_0_records_out_total": int64(2),
  290. "op_2_filter_0_exceptions_total": int64(0),
  291. "op_2_filter_0_process_latency_us": int64(0),
  292. "op_2_filter_0_records_in_total": int64(5),
  293. "op_2_filter_0_records_out_total": int64(2),
  294. "sink_mockSink_0_exceptions_total": int64(0),
  295. "sink_mockSink_0_records_in_total": int64(2),
  296. "sink_mockSink_0_records_out_total": int64(2),
  297. "source_demo1_0_exceptions_total": int64(0),
  298. "source_demo1_0_records_in_total": int64(5),
  299. "source_demo1_0_records_out_total": int64(5),
  300. },
  301. }, {
  302. Name: `TestSingleSQLRule9`,
  303. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  304. R: [][]map[string]interface{}{
  305. {{
  306. "color": "red",
  307. "s": "M",
  308. "ts": float64(1541152486013),
  309. }},
  310. {{
  311. "color": "blue",
  312. "s": "L",
  313. "ts": float64(1541152486822),
  314. }},
  315. {{
  316. "color": "blue",
  317. "s": "M",
  318. "ts": float64(1541152487632),
  319. }},
  320. {{
  321. "color": "yellow",
  322. "s": "L",
  323. "ts": float64(1541152488442),
  324. }},
  325. {{
  326. "color": "red",
  327. "s": "S",
  328. "ts": float64(1541152489252),
  329. }},
  330. },
  331. M: map[string]interface{}{
  332. "op_2_project_0_exceptions_total": int64(0),
  333. "op_2_project_0_process_latency_us": int64(0),
  334. "op_2_project_0_records_in_total": int64(5),
  335. "op_2_project_0_records_out_total": int64(5),
  336. "sink_mockSink_0_exceptions_total": int64(0),
  337. "sink_mockSink_0_records_in_total": int64(5),
  338. "sink_mockSink_0_records_out_total": int64(5),
  339. "source_demo_0_exceptions_total": int64(0),
  340. "source_demo_0_records_in_total": int64(5),
  341. "source_demo_0_records_out_total": int64(5),
  342. },
  343. T: &topo.PrintableTopo{
  344. Sources: []string{"source_demo"},
  345. Edges: map[string][]string{
  346. "source_demo": {"op_2_project"},
  347. "op_2_project": {"sink_mockSink"},
  348. },
  349. },
  350. }, {
  351. Name: `TestSingleSQLRule10`,
  352. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  353. R: [][]map[string]interface{}{
  354. {{
  355. "id": float64(1541152486013),
  356. "name": "name1",
  357. "color": "red",
  358. "size": float64(3),
  359. "ts": float64(1541152486013),
  360. }},
  361. {{
  362. "id": float64(1541152487632),
  363. "name": "name2",
  364. "color": "blue",
  365. "size": float64(2),
  366. "ts": float64(1541152487632),
  367. }},
  368. {{
  369. "id": float64(1541152489252),
  370. "name": "name3",
  371. "color": "red",
  372. "size": float64(1),
  373. "ts": float64(1541152489252),
  374. }},
  375. },
  376. W: 15,
  377. M: map[string]interface{}{
  378. "op_3_join_aligner_0_records_in_total": int64(6),
  379. "op_3_join_aligner_0_records_out_total": int64(5),
  380. "op_4_join_0_exceptions_total": int64(0),
  381. "op_4_join_0_records_in_total": int64(5),
  382. "op_4_join_0_records_out_total": int64(3),
  383. "op_5_project_0_exceptions_total": int64(0),
  384. "op_5_project_0_records_in_total": int64(3),
  385. "op_5_project_0_records_out_total": int64(3),
  386. "sink_mockSink_0_exceptions_total": int64(0),
  387. "sink_mockSink_0_records_in_total": int64(3),
  388. "sink_mockSink_0_records_out_total": int64(3),
  389. "source_demo_0_exceptions_total": int64(0),
  390. "source_demo_0_records_in_total": int64(5),
  391. "source_demo_0_records_out_total": int64(5),
  392. "source_table1_0_exceptions_total": int64(0),
  393. "source_table1_0_records_in_total": int64(4),
  394. "source_table1_0_records_out_total": int64(1),
  395. },
  396. }, {
  397. Name: `TestSingleSQLRule11`,
  398. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  399. R: [][]map[string]interface{}{
  400. {{
  401. "device": "device2",
  402. }},
  403. {{
  404. "device": "device4",
  405. }},
  406. {{
  407. "device": "device5",
  408. }},
  409. },
  410. M: map[string]interface{}{
  411. "op_3_join_aligner_0_records_in_total": int64(10),
  412. "op_3_join_aligner_0_records_out_total": int64(5),
  413. "op_4_join_0_exceptions_total": int64(0),
  414. "op_4_join_0_records_in_total": int64(5),
  415. "op_4_join_0_records_out_total": int64(3),
  416. "op_5_project_0_exceptions_total": int64(0),
  417. "op_5_project_0_records_in_total": int64(3),
  418. "op_5_project_0_records_out_total": int64(3),
  419. "sink_mockSink_0_exceptions_total": int64(0),
  420. "sink_mockSink_0_records_in_total": int64(3),
  421. "sink_mockSink_0_records_out_total": int64(3),
  422. "source_demo_0_exceptions_total": int64(0),
  423. "source_demo_0_records_in_total": int64(5),
  424. "source_demo_0_records_out_total": int64(5),
  425. "source_demoTable_0_exceptions_total": int64(0),
  426. "source_demoTable_0_records_in_total": int64(5),
  427. "source_demoTable_0_records_out_total": int64(5),
  428. },
  429. }, {
  430. Name: `TestSingleSQLRule12`,
  431. Sql: "SELECT demo.ts as demoTs, table1.id as table1Id FROM demo INNER JOIN table1 on demoTs = table1Id",
  432. R: [][]map[string]interface{}{
  433. {{
  434. "table1Id": float64(1541152486013),
  435. "demoTs": float64(1541152486013),
  436. }},
  437. {{
  438. "table1Id": float64(1541152487632),
  439. "demoTs": float64(1541152487632),
  440. }},
  441. {{
  442. "table1Id": float64(1541152489252),
  443. "demoTs": float64(1541152489252),
  444. }},
  445. },
  446. W: 15,
  447. M: map[string]interface{}{
  448. "op_3_join_aligner_0_records_in_total": int64(6),
  449. "op_3_join_aligner_0_records_out_total": int64(5),
  450. "op_4_join_0_exceptions_total": int64(0),
  451. "op_4_join_0_records_in_total": int64(5),
  452. "op_4_join_0_records_out_total": int64(3),
  453. "op_5_project_0_exceptions_total": int64(0),
  454. "op_5_project_0_records_in_total": int64(3),
  455. "op_5_project_0_records_out_total": int64(3),
  456. "sink_mockSink_0_exceptions_total": int64(0),
  457. "sink_mockSink_0_records_in_total": int64(3),
  458. "sink_mockSink_0_records_out_total": int64(3),
  459. "source_demo_0_exceptions_total": int64(0),
  460. "source_demo_0_records_in_total": int64(5),
  461. "source_demo_0_records_out_total": int64(5),
  462. "source_table1_0_exceptions_total": int64(0),
  463. "source_table1_0_records_in_total": int64(4),
  464. "source_table1_0_records_out_total": int64(1),
  465. },
  466. }, {
  467. Name: `TestChanged13`,
  468. Sql: "SELECT changed_cols(\"tt_\", true, color, size) FROM demo",
  469. R: [][]map[string]interface{}{
  470. {{
  471. "tt_color": "red",
  472. "tt_size": float64(3),
  473. }},
  474. {{
  475. "tt_color": "blue",
  476. "tt_size": float64(6),
  477. }},
  478. {{
  479. "tt_size": float64(2),
  480. }},
  481. {{
  482. "tt_color": "yellow",
  483. "tt_size": float64(4),
  484. }},
  485. {{
  486. "tt_color": "red",
  487. "tt_size": float64(1),
  488. }},
  489. },
  490. M: map[string]interface{}{
  491. "op_2_project_0_exceptions_total": int64(0),
  492. "op_2_project_0_process_latency_us": int64(0),
  493. "op_2_project_0_records_in_total": int64(5),
  494. "op_2_project_0_records_out_total": int64(5),
  495. "sink_mockSink_0_exceptions_total": int64(0),
  496. "sink_mockSink_0_records_in_total": int64(5),
  497. "sink_mockSink_0_records_out_total": int64(5),
  498. "source_demo_0_exceptions_total": int64(0),
  499. "source_demo_0_records_in_total": int64(5),
  500. "source_demo_0_records_out_total": int64(5),
  501. },
  502. }, {
  503. Name: `TestAliasOrderBy14`,
  504. Sql: "SELECT color, count(*) as c FROM demo where color != \"red\" GROUP BY COUNTWINDOW(5), color Order by c DESC",
  505. R: [][]map[string]interface{}{
  506. {{
  507. "color": "blue",
  508. "c": float64(2),
  509. },
  510. {
  511. "color": "yellow",
  512. "c": float64(1),
  513. },
  514. },
  515. },
  516. M: map[string]interface{}{
  517. "op_6_project_0_exceptions_total": int64(0),
  518. "op_6_project_0_process_latency_us": int64(0),
  519. "op_6_project_0_records_in_total": int64(1),
  520. "op_6_project_0_records_out_total": int64(1),
  521. "sink_mockSink_0_exceptions_total": int64(0),
  522. "sink_mockSink_0_records_in_total": int64(1),
  523. "sink_mockSink_0_records_out_total": int64(1),
  524. "source_demo_0_exceptions_total": int64(0),
  525. "source_demo_0_records_in_total": int64(5),
  526. "source_demo_0_records_out_total": int64(5),
  527. },
  528. },
  529. }
  530. HandleStream(true, streamList, t)
  531. options := []*api.RuleOption{
  532. {
  533. BufferLength: 100,
  534. SendError: true,
  535. }, {
  536. BufferLength: 100,
  537. SendError: true,
  538. Qos: api.AtLeastOnce,
  539. CheckpointInterval: 5000,
  540. }, {
  541. BufferLength: 100,
  542. SendError: true,
  543. Qos: api.ExactlyOnce,
  544. CheckpointInterval: 5000,
  545. },
  546. }
  547. for j, opt := range options {
  548. DoRuleTest(t, tests, j, opt, 0)
  549. }
  550. }
  551. func TestSingleSQLError(t *testing.T) {
  552. //Reset
  553. streamList := []string{"ldemo"}
  554. HandleStream(false, streamList, t)
  555. //Data setup
  556. var tests = []RuleTest{
  557. {
  558. Name: `TestSingleSQLErrorRule1`,
  559. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  560. R: [][]map[string]interface{}{
  561. {{
  562. "color": "red",
  563. "ts": float64(1541152486013),
  564. }},
  565. {{
  566. "error": "run Where error: invalid operation string(string) >= int64(3)",
  567. }},
  568. {{
  569. "ts": float64(1541152487632),
  570. }},
  571. },
  572. M: map[string]interface{}{
  573. "op_3_project_0_exceptions_total": int64(1),
  574. "op_3_project_0_process_latency_us": int64(0),
  575. "op_3_project_0_records_in_total": int64(3),
  576. "op_3_project_0_records_out_total": int64(2),
  577. "sink_mockSink_0_exceptions_total": int64(0),
  578. "sink_mockSink_0_records_in_total": int64(3),
  579. "sink_mockSink_0_records_out_total": int64(3),
  580. "source_ldemo_0_exceptions_total": int64(0),
  581. "source_ldemo_0_records_in_total": int64(5),
  582. "source_ldemo_0_records_out_total": int64(5),
  583. "op_2_filter_0_exceptions_total": int64(1),
  584. "op_2_filter_0_process_latency_us": int64(0),
  585. "op_2_filter_0_records_in_total": int64(5),
  586. "op_2_filter_0_records_out_total": int64(2),
  587. },
  588. }, {
  589. Name: `TestSingleSQLErrorRule2`,
  590. Sql: `SELECT size * 5 FROM ldemo`,
  591. R: [][]map[string]interface{}{
  592. {{
  593. "kuiper_field_0": float64(15),
  594. }},
  595. {{
  596. "error": "run Select error: invalid operation string(string) * int64(5)",
  597. }},
  598. {{
  599. "kuiper_field_0": float64(15),
  600. }},
  601. {{
  602. "kuiper_field_0": float64(10),
  603. }},
  604. {{}},
  605. },
  606. M: map[string]interface{}{
  607. "op_2_project_0_exceptions_total": int64(1),
  608. "op_2_project_0_process_latency_us": int64(0),
  609. "op_2_project_0_records_in_total": int64(5),
  610. "op_2_project_0_records_out_total": int64(4),
  611. "sink_mockSink_0_exceptions_total": int64(0),
  612. "sink_mockSink_0_records_in_total": int64(5),
  613. "sink_mockSink_0_records_out_total": int64(5),
  614. "source_ldemo_0_exceptions_total": int64(0),
  615. "source_ldemo_0_records_in_total": int64(5),
  616. "source_ldemo_0_records_out_total": int64(5),
  617. },
  618. },
  619. }
  620. HandleStream(true, streamList, t)
  621. DoRuleTest(t, tests, 0, &api.RuleOption{
  622. BufferLength: 100,
  623. SendError: true,
  624. }, 0)
  625. }
  626. func TestSingleSQLOmitError(t *testing.T) {
  627. //Reset
  628. streamList := []string{"ldemo"}
  629. HandleStream(false, streamList, t)
  630. //Data setup
  631. var tests = []RuleTest{
  632. {
  633. Name: `TestSingleSQLErrorRule1`,
  634. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  635. R: [][]map[string]interface{}{
  636. {{
  637. "color": "red",
  638. "ts": float64(1541152486013),
  639. }},
  640. {{
  641. "ts": float64(1541152487632),
  642. }},
  643. },
  644. M: map[string]interface{}{
  645. "op_3_project_0_exceptions_total": int64(0),
  646. "op_3_project_0_process_latency_us": int64(0),
  647. "op_3_project_0_records_in_total": int64(2),
  648. "op_3_project_0_records_out_total": int64(2),
  649. "sink_mockSink_0_exceptions_total": int64(0),
  650. "sink_mockSink_0_records_in_total": int64(2),
  651. "sink_mockSink_0_records_out_total": int64(2),
  652. "source_ldemo_0_exceptions_total": int64(0),
  653. "source_ldemo_0_records_in_total": int64(5),
  654. "source_ldemo_0_records_out_total": int64(5),
  655. "op_2_filter_0_exceptions_total": int64(1),
  656. "op_2_filter_0_process_latency_us": int64(0),
  657. "op_2_filter_0_records_in_total": int64(5),
  658. "op_2_filter_0_records_out_total": int64(2),
  659. },
  660. }, {
  661. Name: `TestSingleSQLErrorRule2`,
  662. Sql: `SELECT size * 5 FROM ldemo`,
  663. R: [][]map[string]interface{}{
  664. {{
  665. "kuiper_field_0": float64(15),
  666. }},
  667. {{
  668. "kuiper_field_0": float64(15),
  669. }},
  670. {{
  671. "kuiper_field_0": float64(10),
  672. }},
  673. {{}},
  674. },
  675. M: map[string]interface{}{
  676. "op_2_project_0_exceptions_total": int64(1),
  677. "op_2_project_0_process_latency_us": int64(0),
  678. "op_2_project_0_records_in_total": int64(5),
  679. "op_2_project_0_records_out_total": int64(4),
  680. "sink_mockSink_0_exceptions_total": int64(0),
  681. "sink_mockSink_0_records_in_total": int64(4),
  682. "sink_mockSink_0_records_out_total": int64(4),
  683. "source_ldemo_0_exceptions_total": int64(0),
  684. "source_ldemo_0_records_in_total": int64(5),
  685. "source_ldemo_0_records_out_total": int64(5),
  686. },
  687. },
  688. }
  689. HandleStream(true, streamList, t)
  690. DoRuleTest(t, tests, 0, &api.RuleOption{
  691. BufferLength: 100,
  692. SendError: false,
  693. }, 0)
  694. }
  695. func TestSingleSQLTemplate(t *testing.T) {
  696. //Reset
  697. streamList := []string{"demo"}
  698. HandleStream(false, streamList, t)
  699. //Data setup
  700. var tests = []RuleTest{
  701. {
  702. Name: `TestSingleSQLTemplateRule1`,
  703. Sql: `SELECT * FROM demo`,
  704. R: []map[string]interface{}{
  705. {
  706. "c": "red",
  707. "wrapper": "w1",
  708. },
  709. {
  710. "c": "blue",
  711. "wrapper": "w1",
  712. },
  713. {
  714. "c": "blue",
  715. "wrapper": "w1",
  716. },
  717. {
  718. "c": "yellow",
  719. "wrapper": "w1",
  720. },
  721. {
  722. "c": "red",
  723. "wrapper": "w1",
  724. },
  725. },
  726. M: map[string]interface{}{
  727. "op_2_project_0_exceptions_total": int64(0),
  728. "op_2_project_0_process_latency_us": int64(0),
  729. "op_2_project_0_records_in_total": int64(5),
  730. "op_2_project_0_records_out_total": int64(5),
  731. "sink_mockSink_0_exceptions_total": int64(0),
  732. "sink_mockSink_0_records_in_total": int64(5),
  733. "sink_mockSink_0_records_out_total": int64(5),
  734. "source_demo_0_exceptions_total": int64(0),
  735. "source_demo_0_records_in_total": int64(5),
  736. "source_demo_0_records_out_total": int64(5),
  737. },
  738. },
  739. }
  740. HandleStream(true, streamList, t)
  741. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  742. BufferLength: 100,
  743. SendError: true,
  744. }, 0, map[string]interface{}{
  745. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  746. "sendSingle": true,
  747. }, func(result [][]byte) interface{} {
  748. var maps []map[string]interface{}
  749. for _, v := range result {
  750. var mapRes map[string]interface{}
  751. err := json.Unmarshal(v, &mapRes)
  752. if err != nil {
  753. t.Errorf("Failed to parse the input into map")
  754. continue
  755. }
  756. maps = append(maps, mapRes)
  757. }
  758. return maps
  759. })
  760. }
  761. func TestNoneSingleSQLTemplate(t *testing.T) {
  762. //Reset
  763. streamList := []string{"demo"}
  764. HandleStream(false, streamList, t)
  765. //Data setup
  766. var tests = []RuleTest{
  767. {
  768. Name: `TestNoneSingleSQLTemplateRule1`,
  769. Sql: `SELECT * FROM demo`,
  770. R: [][]byte{
  771. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  772. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  773. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  774. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  775. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  776. },
  777. M: map[string]interface{}{
  778. "op_2_project_0_exceptions_total": int64(0),
  779. "op_2_project_0_process_latency_us": int64(0),
  780. "op_2_project_0_records_in_total": int64(5),
  781. "op_2_project_0_records_out_total": int64(5),
  782. "sink_mockSink_0_exceptions_total": int64(0),
  783. "sink_mockSink_0_records_in_total": int64(5),
  784. "sink_mockSink_0_records_out_total": int64(5),
  785. "source_demo_0_exceptions_total": int64(0),
  786. "source_demo_0_records_in_total": int64(5),
  787. "source_demo_0_records_out_total": int64(5),
  788. },
  789. },
  790. }
  791. HandleStream(true, streamList, t)
  792. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  793. BufferLength: 100,
  794. SendError: true,
  795. }, 0, map[string]interface{}{
  796. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  797. }, func(result [][]byte) interface{} {
  798. return result
  799. })
  800. }
  801. func TestSingleSQLForBinary(t *testing.T) {
  802. //Reset
  803. streamList := []string{"binDemo"}
  804. HandleStream(false, streamList, t)
  805. //Data setup
  806. var tests = []RuleTest{
  807. {
  808. Name: `TestSingleSQLRule1`,
  809. Sql: `SELECT * FROM binDemo`,
  810. R: [][]map[string]interface{}{
  811. {{
  812. "self": mocknode.Image,
  813. }},
  814. },
  815. W: 50,
  816. M: map[string]interface{}{
  817. "op_2_project_0_exceptions_total": int64(0),
  818. "op_2_project_0_process_latency_us": int64(0),
  819. "op_2_project_0_records_in_total": int64(1),
  820. "op_2_project_0_records_out_total": int64(1),
  821. "sink_mockSink_0_exceptions_total": int64(0),
  822. "sink_mockSink_0_records_in_total": int64(1),
  823. "sink_mockSink_0_records_out_total": int64(1),
  824. "source_binDemo_0_exceptions_total": int64(0),
  825. "source_binDemo_0_records_in_total": int64(1),
  826. "source_binDemo_0_records_out_total": int64(1),
  827. },
  828. },
  829. }
  830. HandleStream(true, streamList, t)
  831. options := []*api.RuleOption{
  832. {
  833. BufferLength: 100,
  834. SendError: true,
  835. }, {
  836. BufferLength: 100,
  837. SendError: true,
  838. Qos: api.AtLeastOnce,
  839. CheckpointInterval: 5000,
  840. }, {
  841. BufferLength: 100,
  842. SendError: true,
  843. Qos: api.ExactlyOnce,
  844. CheckpointInterval: 5000,
  845. },
  846. }
  847. byteFunc := func(result [][]byte) interface{} {
  848. var maps [][]map[string]interface{}
  849. for _, v := range result {
  850. var mapRes []map[string][]byte
  851. err := json.Unmarshal(v, &mapRes)
  852. if err != nil {
  853. panic("Failed to parse the input into map")
  854. }
  855. mapInt := make([]map[string]interface{}, len(mapRes))
  856. for i, mv := range mapRes {
  857. mapInt[i] = make(map[string]interface{})
  858. //assume only one key
  859. for k, v := range mv {
  860. mapInt[i][k] = v
  861. }
  862. }
  863. maps = append(maps, mapInt)
  864. }
  865. return maps
  866. }
  867. for j, opt := range options {
  868. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  869. }
  870. }