rule_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912
  1. package topotest
  2. import (
  3. "encoding/json"
  4. "github.com/lf-edge/ekuiper/internal/topo"
  5. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  6. "github.com/lf-edge/ekuiper/pkg/api"
  7. "testing"
  8. )
  9. func TestSingleSQL(t *testing.T) {
  10. //Reset
  11. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable"}
  12. HandleStream(false, streamList, t)
  13. //Data setup
  14. var tests = []RuleTest{
  15. {
  16. Name: `TestSingleSQLRule1`,
  17. Sql: `SELECT * FROM demo`,
  18. R: [][]map[string]interface{}{
  19. {{
  20. "color": "red",
  21. "size": float64(3),
  22. "ts": float64(1541152486013),
  23. }},
  24. {{
  25. "color": "blue",
  26. "size": float64(6),
  27. "ts": float64(1541152486822),
  28. }},
  29. {{
  30. "color": "blue",
  31. "size": float64(2),
  32. "ts": float64(1541152487632),
  33. }},
  34. {{
  35. "color": "yellow",
  36. "size": float64(4),
  37. "ts": float64(1541152488442),
  38. }},
  39. {{
  40. "color": "red",
  41. "size": float64(1),
  42. "ts": float64(1541152489252),
  43. }},
  44. },
  45. M: map[string]interface{}{
  46. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  47. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  48. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  49. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  50. "op_2_project_0_exceptions_total": int64(0),
  51. "op_2_project_0_process_latency_us": int64(0),
  52. "op_2_project_0_records_in_total": int64(5),
  53. "op_2_project_0_records_out_total": int64(5),
  54. "sink_mockSink_0_exceptions_total": int64(0),
  55. "sink_mockSink_0_records_in_total": int64(5),
  56. "sink_mockSink_0_records_out_total": int64(5),
  57. "source_demo_0_exceptions_total": int64(0),
  58. "source_demo_0_records_in_total": int64(5),
  59. "source_demo_0_records_out_total": int64(5),
  60. },
  61. T: &topo.PrintableTopo{
  62. Sources: []string{"source_demo"},
  63. Edges: map[string][]string{
  64. "source_demo": {"op_1_preprocessor_demo"},
  65. "op_1_preprocessor_demo": {"op_2_project"},
  66. "op_2_project": {"sink_mockSink"},
  67. },
  68. },
  69. }, {
  70. Name: `TestSingleSQLRule2`,
  71. Sql: `SELECT color, ts FROM demo where size > 3`,
  72. R: [][]map[string]interface{}{
  73. {{
  74. "color": "blue",
  75. "ts": float64(1541152486822),
  76. }},
  77. {{
  78. "color": "yellow",
  79. "ts": float64(1541152488442),
  80. }},
  81. },
  82. M: map[string]interface{}{
  83. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  84. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  85. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  86. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  87. "op_3_project_0_exceptions_total": int64(0),
  88. "op_3_project_0_process_latency_us": int64(0),
  89. "op_3_project_0_records_in_total": int64(2),
  90. "op_3_project_0_records_out_total": int64(2),
  91. "sink_mockSink_0_exceptions_total": int64(0),
  92. "sink_mockSink_0_records_in_total": int64(2),
  93. "sink_mockSink_0_records_out_total": int64(2),
  94. "source_demo_0_exceptions_total": int64(0),
  95. "source_demo_0_records_in_total": int64(5),
  96. "source_demo_0_records_out_total": int64(5),
  97. "op_2_filter_0_exceptions_total": int64(0),
  98. "op_2_filter_0_process_latency_us": int64(0),
  99. "op_2_filter_0_records_in_total": int64(5),
  100. "op_2_filter_0_records_out_total": int64(2),
  101. },
  102. }, {
  103. Name: `TestSingleSQLRule3`,
  104. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  105. R: [][]map[string]interface{}{
  106. {{
  107. "Int8": float64(6),
  108. "ts": float64(1541152486822),
  109. }},
  110. {{
  111. "Int8": float64(4),
  112. "ts": float64(1541152488442),
  113. }},
  114. },
  115. M: map[string]interface{}{
  116. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  117. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  118. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  119. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  120. "op_3_project_0_exceptions_total": int64(0),
  121. "op_3_project_0_process_latency_us": int64(0),
  122. "op_3_project_0_records_in_total": int64(2),
  123. "op_3_project_0_records_out_total": int64(2),
  124. "sink_mockSink_0_exceptions_total": int64(0),
  125. "sink_mockSink_0_records_in_total": int64(2),
  126. "sink_mockSink_0_records_out_total": int64(2),
  127. "source_demo_0_exceptions_total": int64(0),
  128. "source_demo_0_records_in_total": int64(5),
  129. "source_demo_0_records_out_total": int64(5),
  130. "op_2_filter_0_exceptions_total": int64(0),
  131. "op_2_filter_0_process_latency_us": int64(0),
  132. "op_2_filter_0_records_in_total": int64(5),
  133. "op_2_filter_0_records_out_total": int64(2),
  134. },
  135. }, {
  136. Name: `TestSingleSQLRule4`,
  137. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  138. R: [][]map[string]interface{}{
  139. {{
  140. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(red)",
  141. }},
  142. {{
  143. "Int8": float64(6),
  144. "ts": float64(1541152486822),
  145. }},
  146. {{
  147. "Int8": float64(4),
  148. "ts": float64(1541152488442),
  149. }},
  150. {{
  151. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  152. }},
  153. },
  154. M: map[string]interface{}{
  155. "op_1_preprocessor_demoError_0_exceptions_total": int64(2),
  156. "op_1_preprocessor_demoError_0_process_latency_us": int64(0),
  157. "op_1_preprocessor_demoError_0_records_in_total": int64(5),
  158. "op_1_preprocessor_demoError_0_records_out_total": int64(3),
  159. "op_3_project_0_exceptions_total": int64(2),
  160. "op_3_project_0_process_latency_us": int64(0),
  161. "op_3_project_0_records_in_total": int64(4),
  162. "op_3_project_0_records_out_total": int64(2),
  163. "sink_mockSink_0_exceptions_total": int64(0),
  164. "sink_mockSink_0_records_in_total": int64(4),
  165. "sink_mockSink_0_records_out_total": int64(4),
  166. "source_demoError_0_exceptions_total": int64(0),
  167. "source_demoError_0_records_in_total": int64(5),
  168. "source_demoError_0_records_out_total": int64(5),
  169. "op_2_filter_0_exceptions_total": int64(2),
  170. "op_2_filter_0_process_latency_us": int64(0),
  171. "op_2_filter_0_records_in_total": int64(5),
  172. "op_2_filter_0_records_out_total": int64(2),
  173. },
  174. }, {
  175. Name: `TestSingleSQLRule5`,
  176. Sql: `SELECT meta(topic) as m, ts FROM demo`,
  177. R: [][]map[string]interface{}{
  178. {{
  179. "m": "mock",
  180. "ts": float64(1541152486013),
  181. }},
  182. {{
  183. "m": "mock",
  184. "ts": float64(1541152486822),
  185. }},
  186. {{
  187. "m": "mock",
  188. "ts": float64(1541152487632),
  189. }},
  190. {{
  191. "m": "mock",
  192. "ts": float64(1541152488442),
  193. }},
  194. {{
  195. "m": "mock",
  196. "ts": float64(1541152489252),
  197. }},
  198. },
  199. M: map[string]interface{}{
  200. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  201. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  202. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  203. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  204. "op_2_project_0_exceptions_total": int64(0),
  205. "op_2_project_0_process_latency_us": int64(0),
  206. "op_2_project_0_records_in_total": int64(5),
  207. "op_2_project_0_records_out_total": int64(5),
  208. "sink_mockSink_0_exceptions_total": int64(0),
  209. "sink_mockSink_0_records_in_total": int64(5),
  210. "sink_mockSink_0_records_out_total": int64(5),
  211. "source_demo_0_exceptions_total": int64(0),
  212. "source_demo_0_records_in_total": int64(5),
  213. "source_demo_0_records_out_total": int64(5),
  214. },
  215. }, {
  216. Name: `TestSingleSQLRule6`,
  217. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  218. R: [][]map[string]interface{}{
  219. {{
  220. "color": "blue",
  221. "ts": float64(1541152486822),
  222. }},
  223. {{
  224. "color": "yellow",
  225. "ts": float64(1541152488442),
  226. }},
  227. },
  228. M: map[string]interface{}{
  229. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  230. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  231. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  232. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  233. "op_3_project_0_exceptions_total": int64(0),
  234. "op_3_project_0_process_latency_us": int64(0),
  235. "op_3_project_0_records_in_total": int64(2),
  236. "op_3_project_0_records_out_total": int64(2),
  237. "sink_mockSink_0_exceptions_total": int64(0),
  238. "sink_mockSink_0_records_in_total": int64(2),
  239. "sink_mockSink_0_records_out_total": int64(2),
  240. "source_demo_0_exceptions_total": int64(0),
  241. "source_demo_0_records_in_total": int64(5),
  242. "source_demo_0_records_out_total": int64(5),
  243. "op_2_filter_0_exceptions_total": int64(0),
  244. "op_2_filter_0_process_latency_us": int64(0),
  245. "op_2_filter_0_records_in_total": int64(5),
  246. "op_2_filter_0_records_out_total": int64(2),
  247. },
  248. }, {
  249. Name: `TestSingleSQLRule7`,
  250. Sql: "SELECT `from` FROM demo1",
  251. R: [][]map[string]interface{}{
  252. {{
  253. "from": "device1",
  254. }},
  255. {{
  256. "from": "device2",
  257. }},
  258. {{
  259. "from": "device3",
  260. }},
  261. {{
  262. "from": "device1",
  263. }},
  264. {{
  265. "from": "device3",
  266. }},
  267. },
  268. M: map[string]interface{}{
  269. "op_1_preprocessor_demo1_0_exceptions_total": int64(0),
  270. "op_1_preprocessor_demo1_0_process_latency_us": int64(0),
  271. "op_1_preprocessor_demo1_0_records_in_total": int64(5),
  272. "op_1_preprocessor_demo1_0_records_out_total": int64(5),
  273. "op_2_project_0_exceptions_total": int64(0),
  274. "op_2_project_0_process_latency_us": int64(0),
  275. "op_2_project_0_records_in_total": int64(5),
  276. "op_2_project_0_records_out_total": int64(5),
  277. "sink_mockSink_0_exceptions_total": int64(0),
  278. "sink_mockSink_0_records_in_total": int64(5),
  279. "sink_mockSink_0_records_out_total": int64(5),
  280. "source_demo1_0_exceptions_total": int64(0),
  281. "source_demo1_0_records_in_total": int64(5),
  282. "source_demo1_0_records_out_total": int64(5),
  283. },
  284. }, {
  285. Name: `TestSingleSQLRule8`,
  286. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  287. R: [][]map[string]interface{}{
  288. {{
  289. "temp": float64(25.5),
  290. "hum": float64(65),
  291. "from": "device1",
  292. "ts": float64(1541152486013),
  293. }},
  294. {{
  295. "temp": float64(27.4),
  296. "hum": float64(80),
  297. "from": "device1",
  298. "ts": float64(1541152488442),
  299. }},
  300. },
  301. M: map[string]interface{}{
  302. "op_1_preprocessor_demo1_0_exceptions_total": int64(0),
  303. "op_1_preprocessor_demo1_0_process_latency_us": int64(0),
  304. "op_1_preprocessor_demo1_0_records_in_total": int64(5),
  305. "op_1_preprocessor_demo1_0_records_out_total": int64(5),
  306. "op_3_project_0_exceptions_total": int64(0),
  307. "op_3_project_0_process_latency_us": int64(0),
  308. "op_3_project_0_records_in_total": int64(2),
  309. "op_3_project_0_records_out_total": int64(2),
  310. "op_2_filter_0_exceptions_total": int64(0),
  311. "op_2_filter_0_process_latency_us": int64(0),
  312. "op_2_filter_0_records_in_total": int64(5),
  313. "op_2_filter_0_records_out_total": int64(2),
  314. "sink_mockSink_0_exceptions_total": int64(0),
  315. "sink_mockSink_0_records_in_total": int64(2),
  316. "sink_mockSink_0_records_out_total": int64(2),
  317. "source_demo1_0_exceptions_total": int64(0),
  318. "source_demo1_0_records_in_total": int64(5),
  319. "source_demo1_0_records_out_total": int64(5),
  320. },
  321. }, {
  322. Name: `TestSingleSQLRule9`,
  323. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  324. R: [][]map[string]interface{}{
  325. {{
  326. "color": "red",
  327. "s": "M",
  328. "ts": float64(1541152486013),
  329. }},
  330. {{
  331. "color": "blue",
  332. "s": "L",
  333. "ts": float64(1541152486822),
  334. }},
  335. {{
  336. "color": "blue",
  337. "s": "M",
  338. "ts": float64(1541152487632),
  339. }},
  340. {{
  341. "color": "yellow",
  342. "s": "L",
  343. "ts": float64(1541152488442),
  344. }},
  345. {{
  346. "color": "red",
  347. "s": "S",
  348. "ts": float64(1541152489252),
  349. }},
  350. },
  351. M: map[string]interface{}{
  352. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  353. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  354. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  355. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  356. "op_2_project_0_exceptions_total": int64(0),
  357. "op_2_project_0_process_latency_us": int64(0),
  358. "op_2_project_0_records_in_total": int64(5),
  359. "op_2_project_0_records_out_total": int64(5),
  360. "sink_mockSink_0_exceptions_total": int64(0),
  361. "sink_mockSink_0_records_in_total": int64(5),
  362. "sink_mockSink_0_records_out_total": int64(5),
  363. "source_demo_0_exceptions_total": int64(0),
  364. "source_demo_0_records_in_total": int64(5),
  365. "source_demo_0_records_out_total": int64(5),
  366. },
  367. T: &topo.PrintableTopo{
  368. Sources: []string{"source_demo"},
  369. Edges: map[string][]string{
  370. "source_demo": {"op_1_preprocessor_demo"},
  371. "op_1_preprocessor_demo": {"op_2_project"},
  372. "op_2_project": {"sink_mockSink"},
  373. },
  374. },
  375. }, {
  376. Name: `TestSingleSQLRule10`,
  377. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  378. R: [][]map[string]interface{}{
  379. {{
  380. "id": float64(1541152486013),
  381. "name": "name1",
  382. "color": "red",
  383. "size": float64(3),
  384. "ts": float64(1541152486013),
  385. }},
  386. {{
  387. "id": float64(1541152487632),
  388. "name": "name2",
  389. "color": "blue",
  390. "size": float64(2),
  391. "ts": float64(1541152487632),
  392. }},
  393. {{
  394. "id": float64(1541152489252),
  395. "name": "name3",
  396. "color": "red",
  397. "size": float64(1),
  398. "ts": float64(1541152489252),
  399. }},
  400. },
  401. W: 15,
  402. M: map[string]interface{}{
  403. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  404. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  405. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  406. "op_2_tableprocessor_table1_0_exceptions_total": int64(0),
  407. "op_2_tableprocessor_table1_0_records_in_total": int64(4),
  408. "op_2_tableprocessor_table1_0_records_out_total": int64(1),
  409. "op_3_join_aligner_0_records_in_total": int64(6),
  410. "op_3_join_aligner_0_records_out_total": int64(5),
  411. "op_4_join_0_exceptions_total": int64(0),
  412. "op_4_join_0_records_in_total": int64(5),
  413. "op_4_join_0_records_out_total": int64(3),
  414. "op_5_project_0_exceptions_total": int64(0),
  415. "op_5_project_0_records_in_total": int64(3),
  416. "op_5_project_0_records_out_total": int64(3),
  417. "sink_mockSink_0_exceptions_total": int64(0),
  418. "sink_mockSink_0_records_in_total": int64(3),
  419. "sink_mockSink_0_records_out_total": int64(3),
  420. "source_demo_0_exceptions_total": int64(0),
  421. "source_demo_0_records_in_total": int64(5),
  422. "source_demo_0_records_out_total": int64(5),
  423. "source_table1_0_exceptions_total": int64(0),
  424. "source_table1_0_records_in_total": int64(4),
  425. "source_table1_0_records_out_total": int64(4),
  426. },
  427. }, {
  428. Name: `TestSingleSQLRule11`,
  429. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  430. R: [][]map[string]interface{}{
  431. {{
  432. "device": "device2",
  433. }},
  434. {{
  435. "device": "device4",
  436. }},
  437. {{
  438. "device": "device5",
  439. }},
  440. },
  441. M: map[string]interface{}{
  442. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  443. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  444. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  445. "op_2_tableprocessor_demoTable_0_exceptions_total": int64(0),
  446. "op_2_tableprocessor_demoTable_0_records_in_total": int64(5),
  447. "op_2_tableprocessor_demoTable_0_records_out_total": int64(5),
  448. "op_3_join_aligner_0_records_in_total": int64(10),
  449. "op_3_join_aligner_0_records_out_total": int64(5),
  450. "op_4_join_0_exceptions_total": int64(0),
  451. "op_4_join_0_records_in_total": int64(5),
  452. "op_4_join_0_records_out_total": int64(3),
  453. "op_5_project_0_exceptions_total": int64(0),
  454. "op_5_project_0_records_in_total": int64(3),
  455. "op_5_project_0_records_out_total": int64(3),
  456. "sink_mockSink_0_exceptions_total": int64(0),
  457. "sink_mockSink_0_records_in_total": int64(3),
  458. "sink_mockSink_0_records_out_total": int64(3),
  459. "source_demo_0_exceptions_total": int64(0),
  460. "source_demo_0_records_in_total": int64(5),
  461. "source_demo_0_records_out_total": int64(5),
  462. "source_demoTable_0_exceptions_total": int64(0),
  463. "source_demoTable_0_records_in_total": int64(5),
  464. "source_demoTable_0_records_out_total": int64(5),
  465. },
  466. },
  467. }
  468. HandleStream(true, streamList, t)
  469. options := []*api.RuleOption{
  470. {
  471. BufferLength: 100,
  472. SendError: true,
  473. }, {
  474. BufferLength: 100,
  475. SendError: true,
  476. Qos: api.AtLeastOnce,
  477. CheckpointInterval: 5000,
  478. }, {
  479. BufferLength: 100,
  480. SendError: true,
  481. Qos: api.ExactlyOnce,
  482. CheckpointInterval: 5000,
  483. },
  484. }
  485. for j, opt := range options {
  486. DoRuleTest(t, tests, j, opt, 0)
  487. }
  488. }
  489. func TestSingleSQLError(t *testing.T) {
  490. //Reset
  491. streamList := []string{"ldemo"}
  492. HandleStream(false, streamList, t)
  493. //Data setup
  494. var tests = []RuleTest{
  495. {
  496. Name: `TestSingleSQLErrorRule1`,
  497. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  498. R: [][]map[string]interface{}{
  499. {{
  500. "color": "red",
  501. "ts": float64(1541152486013),
  502. }},
  503. {{
  504. "error": "run Where error: invalid operation string(string) >= int64(3)",
  505. }},
  506. {{
  507. "ts": float64(1541152487632),
  508. }},
  509. },
  510. M: map[string]interface{}{
  511. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  512. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  513. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  514. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  515. "op_3_project_0_exceptions_total": int64(1),
  516. "op_3_project_0_process_latency_us": int64(0),
  517. "op_3_project_0_records_in_total": int64(3),
  518. "op_3_project_0_records_out_total": int64(2),
  519. "sink_mockSink_0_exceptions_total": int64(0),
  520. "sink_mockSink_0_records_in_total": int64(3),
  521. "sink_mockSink_0_records_out_total": int64(3),
  522. "source_ldemo_0_exceptions_total": int64(0),
  523. "source_ldemo_0_records_in_total": int64(5),
  524. "source_ldemo_0_records_out_total": int64(5),
  525. "op_2_filter_0_exceptions_total": int64(1),
  526. "op_2_filter_0_process_latency_us": int64(0),
  527. "op_2_filter_0_records_in_total": int64(5),
  528. "op_2_filter_0_records_out_total": int64(2),
  529. },
  530. }, {
  531. Name: `TestSingleSQLErrorRule2`,
  532. Sql: `SELECT size * 5 FROM ldemo`,
  533. R: [][]map[string]interface{}{
  534. {{
  535. "kuiper_field_0": float64(15),
  536. }},
  537. {{
  538. "error": "run Select error: invalid operation string(string) * int64(5)",
  539. }},
  540. {{
  541. "kuiper_field_0": float64(15),
  542. }},
  543. {{
  544. "kuiper_field_0": float64(10),
  545. }},
  546. {{}},
  547. },
  548. M: map[string]interface{}{
  549. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  550. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  551. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  552. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  553. "op_2_project_0_exceptions_total": int64(1),
  554. "op_2_project_0_process_latency_us": int64(0),
  555. "op_2_project_0_records_in_total": int64(5),
  556. "op_2_project_0_records_out_total": int64(4),
  557. "sink_mockSink_0_exceptions_total": int64(0),
  558. "sink_mockSink_0_records_in_total": int64(5),
  559. "sink_mockSink_0_records_out_total": int64(5),
  560. "source_ldemo_0_exceptions_total": int64(0),
  561. "source_ldemo_0_records_in_total": int64(5),
  562. "source_ldemo_0_records_out_total": int64(5),
  563. },
  564. },
  565. }
  566. HandleStream(true, streamList, t)
  567. DoRuleTest(t, tests, 0, &api.RuleOption{
  568. BufferLength: 100,
  569. SendError: true,
  570. }, 0)
  571. }
  572. func TestSingleSQLOmitError(t *testing.T) {
  573. //Reset
  574. streamList := []string{"ldemo"}
  575. HandleStream(false, streamList, t)
  576. //Data setup
  577. var tests = []RuleTest{
  578. {
  579. Name: `TestSingleSQLErrorRule1`,
  580. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  581. R: [][]map[string]interface{}{
  582. {{
  583. "color": "red",
  584. "ts": float64(1541152486013),
  585. }},
  586. {{
  587. "ts": float64(1541152487632),
  588. }},
  589. },
  590. M: map[string]interface{}{
  591. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  592. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  593. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  594. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  595. "op_3_project_0_exceptions_total": int64(0),
  596. "op_3_project_0_process_latency_us": int64(0),
  597. "op_3_project_0_records_in_total": int64(2),
  598. "op_3_project_0_records_out_total": int64(2),
  599. "sink_mockSink_0_exceptions_total": int64(0),
  600. "sink_mockSink_0_records_in_total": int64(2),
  601. "sink_mockSink_0_records_out_total": int64(2),
  602. "source_ldemo_0_exceptions_total": int64(0),
  603. "source_ldemo_0_records_in_total": int64(5),
  604. "source_ldemo_0_records_out_total": int64(5),
  605. "op_2_filter_0_exceptions_total": int64(1),
  606. "op_2_filter_0_process_latency_us": int64(0),
  607. "op_2_filter_0_records_in_total": int64(5),
  608. "op_2_filter_0_records_out_total": int64(2),
  609. },
  610. }, {
  611. Name: `TestSingleSQLErrorRule2`,
  612. Sql: `SELECT size * 5 FROM ldemo`,
  613. R: [][]map[string]interface{}{
  614. {{
  615. "kuiper_field_0": float64(15),
  616. }},
  617. {{
  618. "kuiper_field_0": float64(15),
  619. }},
  620. {{
  621. "kuiper_field_0": float64(10),
  622. }},
  623. {{}},
  624. },
  625. M: map[string]interface{}{
  626. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  627. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  628. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  629. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  630. "op_2_project_0_exceptions_total": int64(1),
  631. "op_2_project_0_process_latency_us": int64(0),
  632. "op_2_project_0_records_in_total": int64(5),
  633. "op_2_project_0_records_out_total": int64(4),
  634. "sink_mockSink_0_exceptions_total": int64(0),
  635. "sink_mockSink_0_records_in_total": int64(4),
  636. "sink_mockSink_0_records_out_total": int64(4),
  637. "source_ldemo_0_exceptions_total": int64(0),
  638. "source_ldemo_0_records_in_total": int64(5),
  639. "source_ldemo_0_records_out_total": int64(5),
  640. },
  641. },
  642. }
  643. HandleStream(true, streamList, t)
  644. DoRuleTest(t, tests, 0, &api.RuleOption{
  645. BufferLength: 100,
  646. SendError: false,
  647. }, 0)
  648. }
  649. func TestSingleSQLTemplate(t *testing.T) {
  650. //Reset
  651. streamList := []string{"demo"}
  652. HandleStream(false, streamList, t)
  653. //Data setup
  654. var tests = []RuleTest{
  655. {
  656. Name: `TestSingleSQLTemplateRule1`,
  657. Sql: `SELECT * FROM demo`,
  658. R: []map[string]interface{}{
  659. {
  660. "c": "red",
  661. "wrapper": "w1",
  662. },
  663. {
  664. "c": "blue",
  665. "wrapper": "w1",
  666. },
  667. {
  668. "c": "blue",
  669. "wrapper": "w1",
  670. },
  671. {
  672. "c": "yellow",
  673. "wrapper": "w1",
  674. },
  675. {
  676. "c": "red",
  677. "wrapper": "w1",
  678. },
  679. },
  680. M: map[string]interface{}{
  681. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  682. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  683. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  684. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  685. "op_2_project_0_exceptions_total": int64(0),
  686. "op_2_project_0_process_latency_us": int64(0),
  687. "op_2_project_0_records_in_total": int64(5),
  688. "op_2_project_0_records_out_total": int64(5),
  689. "sink_mockSink_0_exceptions_total": int64(0),
  690. "sink_mockSink_0_records_in_total": int64(5),
  691. "sink_mockSink_0_records_out_total": int64(5),
  692. "source_demo_0_exceptions_total": int64(0),
  693. "source_demo_0_records_in_total": int64(5),
  694. "source_demo_0_records_out_total": int64(5),
  695. },
  696. },
  697. }
  698. HandleStream(true, streamList, t)
  699. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  700. BufferLength: 100,
  701. SendError: true,
  702. }, 0, map[string]interface{}{
  703. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  704. "sendSingle": true,
  705. }, func(result [][]byte) interface{} {
  706. var maps []map[string]interface{}
  707. for _, v := range result {
  708. var mapRes map[string]interface{}
  709. err := json.Unmarshal(v, &mapRes)
  710. if err != nil {
  711. t.Errorf("Failed to parse the input into map")
  712. continue
  713. }
  714. maps = append(maps, mapRes)
  715. }
  716. return maps
  717. })
  718. }
  719. func TestNoneSingleSQLTemplate(t *testing.T) {
  720. //Reset
  721. streamList := []string{"demo"}
  722. HandleStream(false, streamList, t)
  723. //Data setup
  724. var tests = []RuleTest{
  725. {
  726. Name: `TestNoneSingleSQLTemplateRule1`,
  727. Sql: `SELECT * FROM demo`,
  728. R: [][]byte{
  729. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  730. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  731. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  732. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  733. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  734. },
  735. M: map[string]interface{}{
  736. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  737. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  738. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  739. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  740. "op_2_project_0_exceptions_total": int64(0),
  741. "op_2_project_0_process_latency_us": int64(0),
  742. "op_2_project_0_records_in_total": int64(5),
  743. "op_2_project_0_records_out_total": int64(5),
  744. "sink_mockSink_0_exceptions_total": int64(0),
  745. "sink_mockSink_0_records_in_total": int64(5),
  746. "sink_mockSink_0_records_out_total": int64(5),
  747. "source_demo_0_exceptions_total": int64(0),
  748. "source_demo_0_records_in_total": int64(5),
  749. "source_demo_0_records_out_total": int64(5),
  750. },
  751. },
  752. }
  753. HandleStream(true, streamList, t)
  754. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  755. BufferLength: 100,
  756. SendError: true,
  757. }, 0, map[string]interface{}{
  758. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  759. }, func(result [][]byte) interface{} {
  760. return result
  761. })
  762. }
  763. func TestSingleSQLForBinary(t *testing.T) {
  764. //Reset
  765. streamList := []string{"binDemo"}
  766. HandleStream(false, streamList, t)
  767. //Data setup
  768. var tests = []RuleTest{
  769. {
  770. Name: `TestSingleSQLRule1`,
  771. Sql: `SELECT * FROM binDemo`,
  772. R: [][]map[string]interface{}{
  773. {{
  774. "self": mocknode.Image,
  775. }},
  776. },
  777. W: 50,
  778. M: map[string]interface{}{
  779. "op_1_preprocessor_binDemo_0_exceptions_total": int64(0),
  780. "op_1_preprocessor_binDemo_0_process_latency_us": int64(0),
  781. "op_1_preprocessor_binDemo_0_records_in_total": int64(1),
  782. "op_1_preprocessor_binDemo_0_records_out_total": int64(1),
  783. "op_2_project_0_exceptions_total": int64(0),
  784. "op_2_project_0_process_latency_us": int64(0),
  785. "op_2_project_0_records_in_total": int64(1),
  786. "op_2_project_0_records_out_total": int64(1),
  787. "sink_mockSink_0_exceptions_total": int64(0),
  788. "sink_mockSink_0_records_in_total": int64(1),
  789. "sink_mockSink_0_records_out_total": int64(1),
  790. "source_binDemo_0_exceptions_total": int64(0),
  791. "source_binDemo_0_records_in_total": int64(1),
  792. "source_binDemo_0_records_out_total": int64(1),
  793. },
  794. },
  795. }
  796. HandleStream(true, streamList, t)
  797. options := []*api.RuleOption{
  798. {
  799. BufferLength: 100,
  800. SendError: true,
  801. }, {
  802. BufferLength: 100,
  803. SendError: true,
  804. Qos: api.AtLeastOnce,
  805. CheckpointInterval: 5000,
  806. }, {
  807. BufferLength: 100,
  808. SendError: true,
  809. Qos: api.ExactlyOnce,
  810. CheckpointInterval: 5000,
  811. },
  812. }
  813. byteFunc := func(result [][]byte) interface{} {
  814. var maps [][]map[string]interface{}
  815. for _, v := range result {
  816. var mapRes []map[string][]byte
  817. err := json.Unmarshal(v, &mapRes)
  818. if err != nil {
  819. panic("Failed to parse the input into map")
  820. }
  821. mapInt := make([]map[string]interface{}, len(mapRes))
  822. for i, mv := range mapRes {
  823. mapInt[i] = make(map[string]interface{})
  824. //assume only one key
  825. for k, v := range mv {
  826. mapInt[i][k] = v
  827. }
  828. }
  829. maps = append(maps, mapInt)
  830. }
  831. return maps
  832. }
  833. for j, opt := range options {
  834. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  835. }
  836. }