rule_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911
  1. package topotest
  2. import (
  3. "encoding/json"
  4. "github.com/emqx/kuiper/xstream"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "testing"
  7. )
  8. func TestSingleSQL(t *testing.T) {
  9. //Reset
  10. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable"}
  11. HandleStream(false, streamList, t)
  12. //Data setup
  13. var tests = []RuleTest{
  14. {
  15. Name: `TestSingleSQLRule1`,
  16. Sql: `SELECT * FROM demo`,
  17. R: [][]map[string]interface{}{
  18. {{
  19. "color": "red",
  20. "size": float64(3),
  21. "ts": float64(1541152486013),
  22. }},
  23. {{
  24. "color": "blue",
  25. "size": float64(6),
  26. "ts": float64(1541152486822),
  27. }},
  28. {{
  29. "color": "blue",
  30. "size": float64(2),
  31. "ts": float64(1541152487632),
  32. }},
  33. {{
  34. "color": "yellow",
  35. "size": float64(4),
  36. "ts": float64(1541152488442),
  37. }},
  38. {{
  39. "color": "red",
  40. "size": float64(1),
  41. "ts": float64(1541152489252),
  42. }},
  43. },
  44. M: map[string]interface{}{
  45. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  46. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  47. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  48. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  49. "op_2_project_0_exceptions_total": int64(0),
  50. "op_2_project_0_process_latency_us": int64(0),
  51. "op_2_project_0_records_in_total": int64(5),
  52. "op_2_project_0_records_out_total": int64(5),
  53. "sink_mockSink_0_exceptions_total": int64(0),
  54. "sink_mockSink_0_records_in_total": int64(5),
  55. "sink_mockSink_0_records_out_total": int64(5),
  56. "source_demo_0_exceptions_total": int64(0),
  57. "source_demo_0_records_in_total": int64(5),
  58. "source_demo_0_records_out_total": int64(5),
  59. },
  60. T: &xstream.PrintableTopo{
  61. Sources: []string{"source_demo"},
  62. Edges: map[string][]string{
  63. "source_demo": {"op_1_preprocessor_demo"},
  64. "op_1_preprocessor_demo": {"op_2_project"},
  65. "op_2_project": {"sink_mockSink"},
  66. },
  67. },
  68. }, {
  69. Name: `TestSingleSQLRule2`,
  70. Sql: `SELECT color, ts FROM demo where size > 3`,
  71. R: [][]map[string]interface{}{
  72. {{
  73. "color": "blue",
  74. "ts": float64(1541152486822),
  75. }},
  76. {{
  77. "color": "yellow",
  78. "ts": float64(1541152488442),
  79. }},
  80. },
  81. M: map[string]interface{}{
  82. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  83. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  84. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  85. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  86. "op_3_project_0_exceptions_total": int64(0),
  87. "op_3_project_0_process_latency_us": int64(0),
  88. "op_3_project_0_records_in_total": int64(2),
  89. "op_3_project_0_records_out_total": int64(2),
  90. "sink_mockSink_0_exceptions_total": int64(0),
  91. "sink_mockSink_0_records_in_total": int64(2),
  92. "sink_mockSink_0_records_out_total": int64(2),
  93. "source_demo_0_exceptions_total": int64(0),
  94. "source_demo_0_records_in_total": int64(5),
  95. "source_demo_0_records_out_total": int64(5),
  96. "op_2_filter_0_exceptions_total": int64(0),
  97. "op_2_filter_0_process_latency_us": int64(0),
  98. "op_2_filter_0_records_in_total": int64(5),
  99. "op_2_filter_0_records_out_total": int64(2),
  100. },
  101. }, {
  102. Name: `TestSingleSQLRule3`,
  103. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  104. R: [][]map[string]interface{}{
  105. {{
  106. "Int8": float64(6),
  107. "ts": float64(1541152486822),
  108. }},
  109. {{
  110. "Int8": float64(4),
  111. "ts": float64(1541152488442),
  112. }},
  113. },
  114. M: map[string]interface{}{
  115. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  116. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  117. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  118. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  119. "op_3_project_0_exceptions_total": int64(0),
  120. "op_3_project_0_process_latency_us": int64(0),
  121. "op_3_project_0_records_in_total": int64(2),
  122. "op_3_project_0_records_out_total": int64(2),
  123. "sink_mockSink_0_exceptions_total": int64(0),
  124. "sink_mockSink_0_records_in_total": int64(2),
  125. "sink_mockSink_0_records_out_total": int64(2),
  126. "source_demo_0_exceptions_total": int64(0),
  127. "source_demo_0_records_in_total": int64(5),
  128. "source_demo_0_records_out_total": int64(5),
  129. "op_2_filter_0_exceptions_total": int64(0),
  130. "op_2_filter_0_process_latency_us": int64(0),
  131. "op_2_filter_0_records_in_total": int64(5),
  132. "op_2_filter_0_records_out_total": int64(2),
  133. },
  134. }, {
  135. Name: `TestSingleSQLRule4`,
  136. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  137. R: [][]map[string]interface{}{
  138. {{
  139. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(red)",
  140. }},
  141. {{
  142. "Int8": float64(6),
  143. "ts": float64(1541152486822),
  144. }},
  145. {{
  146. "Int8": float64(4),
  147. "ts": float64(1541152488442),
  148. }},
  149. {{
  150. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  151. }},
  152. },
  153. M: map[string]interface{}{
  154. "op_1_preprocessor_demoError_0_exceptions_total": int64(2),
  155. "op_1_preprocessor_demoError_0_process_latency_us": int64(0),
  156. "op_1_preprocessor_demoError_0_records_in_total": int64(5),
  157. "op_1_preprocessor_demoError_0_records_out_total": int64(3),
  158. "op_3_project_0_exceptions_total": int64(2),
  159. "op_3_project_0_process_latency_us": int64(0),
  160. "op_3_project_0_records_in_total": int64(4),
  161. "op_3_project_0_records_out_total": int64(2),
  162. "sink_mockSink_0_exceptions_total": int64(0),
  163. "sink_mockSink_0_records_in_total": int64(4),
  164. "sink_mockSink_0_records_out_total": int64(4),
  165. "source_demoError_0_exceptions_total": int64(0),
  166. "source_demoError_0_records_in_total": int64(5),
  167. "source_demoError_0_records_out_total": int64(5),
  168. "op_2_filter_0_exceptions_total": int64(2),
  169. "op_2_filter_0_process_latency_us": int64(0),
  170. "op_2_filter_0_records_in_total": int64(5),
  171. "op_2_filter_0_records_out_total": int64(2),
  172. },
  173. }, {
  174. Name: `TestSingleSQLRule5`,
  175. Sql: `SELECT meta(topic) as m, ts FROM demo`,
  176. R: [][]map[string]interface{}{
  177. {{
  178. "m": "mock",
  179. "ts": float64(1541152486013),
  180. }},
  181. {{
  182. "m": "mock",
  183. "ts": float64(1541152486822),
  184. }},
  185. {{
  186. "m": "mock",
  187. "ts": float64(1541152487632),
  188. }},
  189. {{
  190. "m": "mock",
  191. "ts": float64(1541152488442),
  192. }},
  193. {{
  194. "m": "mock",
  195. "ts": float64(1541152489252),
  196. }},
  197. },
  198. M: map[string]interface{}{
  199. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  200. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  201. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  202. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  203. "op_2_project_0_exceptions_total": int64(0),
  204. "op_2_project_0_process_latency_us": int64(0),
  205. "op_2_project_0_records_in_total": int64(5),
  206. "op_2_project_0_records_out_total": int64(5),
  207. "sink_mockSink_0_exceptions_total": int64(0),
  208. "sink_mockSink_0_records_in_total": int64(5),
  209. "sink_mockSink_0_records_out_total": int64(5),
  210. "source_demo_0_exceptions_total": int64(0),
  211. "source_demo_0_records_in_total": int64(5),
  212. "source_demo_0_records_out_total": int64(5),
  213. },
  214. }, {
  215. Name: `TestSingleSQLRule6`,
  216. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  217. R: [][]map[string]interface{}{
  218. {{
  219. "color": "blue",
  220. "ts": float64(1541152486822),
  221. }},
  222. {{
  223. "color": "yellow",
  224. "ts": float64(1541152488442),
  225. }},
  226. },
  227. M: map[string]interface{}{
  228. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  229. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  230. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  231. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  232. "op_3_project_0_exceptions_total": int64(0),
  233. "op_3_project_0_process_latency_us": int64(0),
  234. "op_3_project_0_records_in_total": int64(2),
  235. "op_3_project_0_records_out_total": int64(2),
  236. "sink_mockSink_0_exceptions_total": int64(0),
  237. "sink_mockSink_0_records_in_total": int64(2),
  238. "sink_mockSink_0_records_out_total": int64(2),
  239. "source_demo_0_exceptions_total": int64(0),
  240. "source_demo_0_records_in_total": int64(5),
  241. "source_demo_0_records_out_total": int64(5),
  242. "op_2_filter_0_exceptions_total": int64(0),
  243. "op_2_filter_0_process_latency_us": int64(0),
  244. "op_2_filter_0_records_in_total": int64(5),
  245. "op_2_filter_0_records_out_total": int64(2),
  246. },
  247. }, {
  248. Name: `TestSingleSQLRule7`,
  249. Sql: "SELECT `from` FROM demo1",
  250. R: [][]map[string]interface{}{
  251. {{
  252. "from": "device1",
  253. }},
  254. {{
  255. "from": "device2",
  256. }},
  257. {{
  258. "from": "device3",
  259. }},
  260. {{
  261. "from": "device1",
  262. }},
  263. {{
  264. "from": "device3",
  265. }},
  266. },
  267. M: map[string]interface{}{
  268. "op_1_preprocessor_demo1_0_exceptions_total": int64(0),
  269. "op_1_preprocessor_demo1_0_process_latency_us": int64(0),
  270. "op_1_preprocessor_demo1_0_records_in_total": int64(5),
  271. "op_1_preprocessor_demo1_0_records_out_total": int64(5),
  272. "op_2_project_0_exceptions_total": int64(0),
  273. "op_2_project_0_process_latency_us": int64(0),
  274. "op_2_project_0_records_in_total": int64(5),
  275. "op_2_project_0_records_out_total": int64(5),
  276. "sink_mockSink_0_exceptions_total": int64(0),
  277. "sink_mockSink_0_records_in_total": int64(5),
  278. "sink_mockSink_0_records_out_total": int64(5),
  279. "source_demo1_0_exceptions_total": int64(0),
  280. "source_demo1_0_records_in_total": int64(5),
  281. "source_demo1_0_records_out_total": int64(5),
  282. },
  283. }, {
  284. Name: `TestSingleSQLRule8`,
  285. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  286. R: [][]map[string]interface{}{
  287. {{
  288. "temp": float64(25.5),
  289. "hum": float64(65),
  290. "from": "device1",
  291. "ts": float64(1541152486013),
  292. }},
  293. {{
  294. "temp": float64(27.4),
  295. "hum": float64(80),
  296. "from": "device1",
  297. "ts": float64(1541152488442),
  298. }},
  299. },
  300. M: map[string]interface{}{
  301. "op_1_preprocessor_demo1_0_exceptions_total": int64(0),
  302. "op_1_preprocessor_demo1_0_process_latency_us": int64(0),
  303. "op_1_preprocessor_demo1_0_records_in_total": int64(5),
  304. "op_1_preprocessor_demo1_0_records_out_total": int64(5),
  305. "op_3_project_0_exceptions_total": int64(0),
  306. "op_3_project_0_process_latency_us": int64(0),
  307. "op_3_project_0_records_in_total": int64(2),
  308. "op_3_project_0_records_out_total": int64(2),
  309. "op_2_filter_0_exceptions_total": int64(0),
  310. "op_2_filter_0_process_latency_us": int64(0),
  311. "op_2_filter_0_records_in_total": int64(5),
  312. "op_2_filter_0_records_out_total": int64(2),
  313. "sink_mockSink_0_exceptions_total": int64(0),
  314. "sink_mockSink_0_records_in_total": int64(2),
  315. "sink_mockSink_0_records_out_total": int64(2),
  316. "source_demo1_0_exceptions_total": int64(0),
  317. "source_demo1_0_records_in_total": int64(5),
  318. "source_demo1_0_records_out_total": int64(5),
  319. },
  320. }, {
  321. Name: `TestSingleSQLRule9`,
  322. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  323. R: [][]map[string]interface{}{
  324. {{
  325. "color": "red",
  326. "s": "M",
  327. "ts": float64(1541152486013),
  328. }},
  329. {{
  330. "color": "blue",
  331. "s": "L",
  332. "ts": float64(1541152486822),
  333. }},
  334. {{
  335. "color": "blue",
  336. "s": "M",
  337. "ts": float64(1541152487632),
  338. }},
  339. {{
  340. "color": "yellow",
  341. "s": "L",
  342. "ts": float64(1541152488442),
  343. }},
  344. {{
  345. "color": "red",
  346. "s": "S",
  347. "ts": float64(1541152489252),
  348. }},
  349. },
  350. M: map[string]interface{}{
  351. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  352. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  353. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  354. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  355. "op_2_project_0_exceptions_total": int64(0),
  356. "op_2_project_0_process_latency_us": int64(0),
  357. "op_2_project_0_records_in_total": int64(5),
  358. "op_2_project_0_records_out_total": int64(5),
  359. "sink_mockSink_0_exceptions_total": int64(0),
  360. "sink_mockSink_0_records_in_total": int64(5),
  361. "sink_mockSink_0_records_out_total": int64(5),
  362. "source_demo_0_exceptions_total": int64(0),
  363. "source_demo_0_records_in_total": int64(5),
  364. "source_demo_0_records_out_total": int64(5),
  365. },
  366. T: &xstream.PrintableTopo{
  367. Sources: []string{"source_demo"},
  368. Edges: map[string][]string{
  369. "source_demo": {"op_1_preprocessor_demo"},
  370. "op_1_preprocessor_demo": {"op_2_project"},
  371. "op_2_project": {"sink_mockSink"},
  372. },
  373. },
  374. }, {
  375. Name: `TestSingleSQLRule10`,
  376. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  377. R: [][]map[string]interface{}{
  378. {{
  379. "id": float64(1541152486013),
  380. "name": "name1",
  381. "color": "red",
  382. "size": float64(3),
  383. "ts": float64(1541152486013),
  384. }},
  385. {{
  386. "id": float64(1541152487632),
  387. "name": "name2",
  388. "color": "blue",
  389. "size": float64(2),
  390. "ts": float64(1541152487632),
  391. }},
  392. {{
  393. "id": float64(1541152489252),
  394. "name": "name3",
  395. "color": "red",
  396. "size": float64(1),
  397. "ts": float64(1541152489252),
  398. }},
  399. },
  400. W: 15,
  401. M: map[string]interface{}{
  402. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  403. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  404. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  405. "op_2_tableprocessor_table1_0_exceptions_total": int64(0),
  406. "op_2_tableprocessor_table1_0_records_in_total": int64(4),
  407. "op_2_tableprocessor_table1_0_records_out_total": int64(1),
  408. "op_3_join_aligner_0_records_in_total": int64(6),
  409. "op_3_join_aligner_0_records_out_total": int64(5),
  410. "op_4_join_0_exceptions_total": int64(0),
  411. "op_4_join_0_records_in_total": int64(5),
  412. "op_4_join_0_records_out_total": int64(3),
  413. "op_5_project_0_exceptions_total": int64(0),
  414. "op_5_project_0_records_in_total": int64(3),
  415. "op_5_project_0_records_out_total": int64(3),
  416. "sink_mockSink_0_exceptions_total": int64(0),
  417. "sink_mockSink_0_records_in_total": int64(3),
  418. "sink_mockSink_0_records_out_total": int64(3),
  419. "source_demo_0_exceptions_total": int64(0),
  420. "source_demo_0_records_in_total": int64(5),
  421. "source_demo_0_records_out_total": int64(5),
  422. "source_table1_0_exceptions_total": int64(0),
  423. "source_table1_0_records_in_total": int64(4),
  424. "source_table1_0_records_out_total": int64(4),
  425. },
  426. }, {
  427. Name: `TestSingleSQLRule11`,
  428. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  429. R: [][]map[string]interface{}{
  430. {{
  431. "device": "device2",
  432. }},
  433. {{
  434. "device": "device4",
  435. }},
  436. {{
  437. "device": "device5",
  438. }},
  439. },
  440. M: map[string]interface{}{
  441. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  442. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  443. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  444. "op_2_tableprocessor_demoTable_0_exceptions_total": int64(0),
  445. "op_2_tableprocessor_demoTable_0_records_in_total": int64(5),
  446. "op_2_tableprocessor_demoTable_0_records_out_total": int64(5),
  447. "op_3_join_aligner_0_records_in_total": int64(10),
  448. "op_3_join_aligner_0_records_out_total": int64(5),
  449. "op_4_join_0_exceptions_total": int64(0),
  450. "op_4_join_0_records_in_total": int64(5),
  451. "op_4_join_0_records_out_total": int64(3),
  452. "op_5_project_0_exceptions_total": int64(0),
  453. "op_5_project_0_records_in_total": int64(3),
  454. "op_5_project_0_records_out_total": int64(3),
  455. "sink_mockSink_0_exceptions_total": int64(0),
  456. "sink_mockSink_0_records_in_total": int64(3),
  457. "sink_mockSink_0_records_out_total": int64(3),
  458. "source_demo_0_exceptions_total": int64(0),
  459. "source_demo_0_records_in_total": int64(5),
  460. "source_demo_0_records_out_total": int64(5),
  461. "source_demoTable_0_exceptions_total": int64(0),
  462. "source_demoTable_0_records_in_total": int64(5),
  463. "source_demoTable_0_records_out_total": int64(5),
  464. },
  465. },
  466. }
  467. HandleStream(true, streamList, t)
  468. options := []*api.RuleOption{
  469. {
  470. BufferLength: 100,
  471. SendError: true,
  472. }, {
  473. BufferLength: 100,
  474. SendError: true,
  475. Qos: api.AtLeastOnce,
  476. CheckpointInterval: 5000,
  477. }, {
  478. BufferLength: 100,
  479. SendError: true,
  480. Qos: api.ExactlyOnce,
  481. CheckpointInterval: 5000,
  482. },
  483. }
  484. for j, opt := range options {
  485. DoRuleTest(t, tests, j, opt, 0)
  486. }
  487. }
  488. func TestSingleSQLError(t *testing.T) {
  489. //Reset
  490. streamList := []string{"ldemo"}
  491. HandleStream(false, streamList, t)
  492. //Data setup
  493. var tests = []RuleTest{
  494. {
  495. Name: `TestSingleSQLErrorRule1`,
  496. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  497. R: [][]map[string]interface{}{
  498. {{
  499. "color": "red",
  500. "ts": float64(1541152486013),
  501. }},
  502. {{
  503. "error": "run Where error: invalid operation string(string) >= int64(3)",
  504. }},
  505. {{
  506. "ts": float64(1541152487632),
  507. }},
  508. },
  509. M: map[string]interface{}{
  510. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  511. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  512. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  513. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  514. "op_3_project_0_exceptions_total": int64(1),
  515. "op_3_project_0_process_latency_us": int64(0),
  516. "op_3_project_0_records_in_total": int64(3),
  517. "op_3_project_0_records_out_total": int64(2),
  518. "sink_mockSink_0_exceptions_total": int64(0),
  519. "sink_mockSink_0_records_in_total": int64(3),
  520. "sink_mockSink_0_records_out_total": int64(3),
  521. "source_ldemo_0_exceptions_total": int64(0),
  522. "source_ldemo_0_records_in_total": int64(5),
  523. "source_ldemo_0_records_out_total": int64(5),
  524. "op_2_filter_0_exceptions_total": int64(1),
  525. "op_2_filter_0_process_latency_us": int64(0),
  526. "op_2_filter_0_records_in_total": int64(5),
  527. "op_2_filter_0_records_out_total": int64(2),
  528. },
  529. }, {
  530. Name: `TestSingleSQLErrorRule2`,
  531. Sql: `SELECT size * 5 FROM ldemo`,
  532. R: [][]map[string]interface{}{
  533. {{
  534. "kuiper_field_0": float64(15),
  535. }},
  536. {{
  537. "error": "run Select error: invalid operation string(string) * int64(5)",
  538. }},
  539. {{
  540. "kuiper_field_0": float64(15),
  541. }},
  542. {{
  543. "kuiper_field_0": float64(10),
  544. }},
  545. {{}},
  546. },
  547. M: map[string]interface{}{
  548. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  549. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  550. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  551. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  552. "op_2_project_0_exceptions_total": int64(1),
  553. "op_2_project_0_process_latency_us": int64(0),
  554. "op_2_project_0_records_in_total": int64(5),
  555. "op_2_project_0_records_out_total": int64(4),
  556. "sink_mockSink_0_exceptions_total": int64(0),
  557. "sink_mockSink_0_records_in_total": int64(5),
  558. "sink_mockSink_0_records_out_total": int64(5),
  559. "source_ldemo_0_exceptions_total": int64(0),
  560. "source_ldemo_0_records_in_total": int64(5),
  561. "source_ldemo_0_records_out_total": int64(5),
  562. },
  563. },
  564. }
  565. HandleStream(true, streamList, t)
  566. DoRuleTest(t, tests, 0, &api.RuleOption{
  567. BufferLength: 100,
  568. SendError: true,
  569. }, 0)
  570. }
  571. func TestSingleSQLOmitError(t *testing.T) {
  572. //Reset
  573. streamList := []string{"ldemo"}
  574. HandleStream(false, streamList, t)
  575. //Data setup
  576. var tests = []RuleTest{
  577. {
  578. Name: `TestSingleSQLErrorRule1`,
  579. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  580. R: [][]map[string]interface{}{
  581. {{
  582. "color": "red",
  583. "ts": float64(1541152486013),
  584. }},
  585. {{
  586. "ts": float64(1541152487632),
  587. }},
  588. },
  589. M: map[string]interface{}{
  590. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  591. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  592. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  593. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  594. "op_3_project_0_exceptions_total": int64(0),
  595. "op_3_project_0_process_latency_us": int64(0),
  596. "op_3_project_0_records_in_total": int64(2),
  597. "op_3_project_0_records_out_total": int64(2),
  598. "sink_mockSink_0_exceptions_total": int64(0),
  599. "sink_mockSink_0_records_in_total": int64(2),
  600. "sink_mockSink_0_records_out_total": int64(2),
  601. "source_ldemo_0_exceptions_total": int64(0),
  602. "source_ldemo_0_records_in_total": int64(5),
  603. "source_ldemo_0_records_out_total": int64(5),
  604. "op_2_filter_0_exceptions_total": int64(1),
  605. "op_2_filter_0_process_latency_us": int64(0),
  606. "op_2_filter_0_records_in_total": int64(5),
  607. "op_2_filter_0_records_out_total": int64(2),
  608. },
  609. }, {
  610. Name: `TestSingleSQLErrorRule2`,
  611. Sql: `SELECT size * 5 FROM ldemo`,
  612. R: [][]map[string]interface{}{
  613. {{
  614. "kuiper_field_0": float64(15),
  615. }},
  616. {{
  617. "kuiper_field_0": float64(15),
  618. }},
  619. {{
  620. "kuiper_field_0": float64(10),
  621. }},
  622. {{}},
  623. },
  624. M: map[string]interface{}{
  625. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  626. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  627. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  628. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  629. "op_2_project_0_exceptions_total": int64(1),
  630. "op_2_project_0_process_latency_us": int64(0),
  631. "op_2_project_0_records_in_total": int64(5),
  632. "op_2_project_0_records_out_total": int64(4),
  633. "sink_mockSink_0_exceptions_total": int64(0),
  634. "sink_mockSink_0_records_in_total": int64(4),
  635. "sink_mockSink_0_records_out_total": int64(4),
  636. "source_ldemo_0_exceptions_total": int64(0),
  637. "source_ldemo_0_records_in_total": int64(5),
  638. "source_ldemo_0_records_out_total": int64(5),
  639. },
  640. },
  641. }
  642. HandleStream(true, streamList, t)
  643. DoRuleTest(t, tests, 0, &api.RuleOption{
  644. BufferLength: 100,
  645. SendError: false,
  646. }, 0)
  647. }
  648. func TestSingleSQLTemplate(t *testing.T) {
  649. //Reset
  650. streamList := []string{"demo"}
  651. HandleStream(false, streamList, t)
  652. //Data setup
  653. var tests = []RuleTest{
  654. {
  655. Name: `TestSingleSQLTemplateRule1`,
  656. Sql: `SELECT * FROM demo`,
  657. R: []map[string]interface{}{
  658. {
  659. "c": "red",
  660. "wrapper": "w1",
  661. },
  662. {
  663. "c": "blue",
  664. "wrapper": "w1",
  665. },
  666. {
  667. "c": "blue",
  668. "wrapper": "w1",
  669. },
  670. {
  671. "c": "yellow",
  672. "wrapper": "w1",
  673. },
  674. {
  675. "c": "red",
  676. "wrapper": "w1",
  677. },
  678. },
  679. M: map[string]interface{}{
  680. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  681. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  682. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  683. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  684. "op_2_project_0_exceptions_total": int64(0),
  685. "op_2_project_0_process_latency_us": int64(0),
  686. "op_2_project_0_records_in_total": int64(5),
  687. "op_2_project_0_records_out_total": int64(5),
  688. "sink_mockSink_0_exceptions_total": int64(0),
  689. "sink_mockSink_0_records_in_total": int64(5),
  690. "sink_mockSink_0_records_out_total": int64(5),
  691. "source_demo_0_exceptions_total": int64(0),
  692. "source_demo_0_records_in_total": int64(5),
  693. "source_demo_0_records_out_total": int64(5),
  694. },
  695. },
  696. }
  697. HandleStream(true, streamList, t)
  698. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  699. BufferLength: 100,
  700. SendError: true,
  701. }, 0, map[string]interface{}{
  702. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  703. "sendSingle": true,
  704. }, func(result [][]byte) interface{} {
  705. var maps []map[string]interface{}
  706. for _, v := range result {
  707. var mapRes map[string]interface{}
  708. err := json.Unmarshal(v, &mapRes)
  709. if err != nil {
  710. t.Errorf("Failed to parse the input into map")
  711. continue
  712. }
  713. maps = append(maps, mapRes)
  714. }
  715. return maps
  716. })
  717. }
  718. func TestNoneSingleSQLTemplate(t *testing.T) {
  719. //Reset
  720. streamList := []string{"demo"}
  721. HandleStream(false, streamList, t)
  722. //Data setup
  723. var tests = []RuleTest{
  724. {
  725. Name: `TestNoneSingleSQLTemplateRule1`,
  726. Sql: `SELECT * FROM demo`,
  727. R: [][]byte{
  728. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  729. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  730. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  731. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  732. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  733. },
  734. M: map[string]interface{}{
  735. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  736. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  737. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  738. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  739. "op_2_project_0_exceptions_total": int64(0),
  740. "op_2_project_0_process_latency_us": int64(0),
  741. "op_2_project_0_records_in_total": int64(5),
  742. "op_2_project_0_records_out_total": int64(5),
  743. "sink_mockSink_0_exceptions_total": int64(0),
  744. "sink_mockSink_0_records_in_total": int64(5),
  745. "sink_mockSink_0_records_out_total": int64(5),
  746. "source_demo_0_exceptions_total": int64(0),
  747. "source_demo_0_records_in_total": int64(5),
  748. "source_demo_0_records_out_total": int64(5),
  749. },
  750. },
  751. }
  752. HandleStream(true, streamList, t)
  753. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  754. BufferLength: 100,
  755. SendError: true,
  756. }, 0, map[string]interface{}{
  757. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  758. }, func(result [][]byte) interface{} {
  759. return result
  760. })
  761. }
  762. func TestSingleSQLForBinary(t *testing.T) {
  763. //Reset
  764. streamList := []string{"binDemo"}
  765. HandleStream(false, streamList, t)
  766. //Data setup
  767. var tests = []RuleTest{
  768. {
  769. Name: `TestSingleSQLRule1`,
  770. Sql: `SELECT * FROM binDemo`,
  771. R: [][]map[string]interface{}{
  772. {{
  773. "self": image,
  774. }},
  775. },
  776. W: 50,
  777. M: map[string]interface{}{
  778. "op_1_preprocessor_binDemo_0_exceptions_total": int64(0),
  779. "op_1_preprocessor_binDemo_0_process_latency_us": int64(0),
  780. "op_1_preprocessor_binDemo_0_records_in_total": int64(1),
  781. "op_1_preprocessor_binDemo_0_records_out_total": int64(1),
  782. "op_2_project_0_exceptions_total": int64(0),
  783. "op_2_project_0_process_latency_us": int64(0),
  784. "op_2_project_0_records_in_total": int64(1),
  785. "op_2_project_0_records_out_total": int64(1),
  786. "sink_mockSink_0_exceptions_total": int64(0),
  787. "sink_mockSink_0_records_in_total": int64(1),
  788. "sink_mockSink_0_records_out_total": int64(1),
  789. "source_binDemo_0_exceptions_total": int64(0),
  790. "source_binDemo_0_records_in_total": int64(1),
  791. "source_binDemo_0_records_out_total": int64(1),
  792. },
  793. },
  794. }
  795. HandleStream(true, streamList, t)
  796. options := []*api.RuleOption{
  797. {
  798. BufferLength: 100,
  799. SendError: true,
  800. }, {
  801. BufferLength: 100,
  802. SendError: true,
  803. Qos: api.AtLeastOnce,
  804. CheckpointInterval: 5000,
  805. }, {
  806. BufferLength: 100,
  807. SendError: true,
  808. Qos: api.ExactlyOnce,
  809. CheckpointInterval: 5000,
  810. },
  811. }
  812. byteFunc := func(result [][]byte) interface{} {
  813. var maps [][]map[string]interface{}
  814. for _, v := range result {
  815. var mapRes []map[string][]byte
  816. err := json.Unmarshal(v, &mapRes)
  817. if err != nil {
  818. panic("Failed to parse the input into map")
  819. }
  820. mapInt := make([]map[string]interface{}, len(mapRes))
  821. for i, mv := range mapRes {
  822. mapInt[i] = make(map[string]interface{})
  823. //assume only one key
  824. for k, v := range mv {
  825. mapInt[i][k] = v
  826. }
  827. }
  828. maps = append(maps, mapInt)
  829. }
  830. return maps
  831. }
  832. for j, opt := range options {
  833. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  834. }
  835. }