rule_test.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package topotest
  15. import (
  16. "encoding/json"
  17. "github.com/lf-edge/ekuiper/internal/topo"
  18. "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "testing"
  21. )
  22. func TestSingleSQL(t *testing.T) {
  23. //Reset
  24. streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable"}
  25. HandleStream(false, streamList, t)
  26. //Data setup
  27. var tests = []RuleTest{
  28. {
  29. Name: `TestSingleSQLRule1`,
  30. Sql: `SELECT * FROM demo`,
  31. R: [][]map[string]interface{}{
  32. {{
  33. "color": "red",
  34. "size": float64(3),
  35. "ts": float64(1541152486013),
  36. }},
  37. {{
  38. "color": "blue",
  39. "size": float64(6),
  40. "ts": float64(1541152486822),
  41. }},
  42. {{
  43. "color": "blue",
  44. "size": float64(2),
  45. "ts": float64(1541152487632),
  46. }},
  47. {{
  48. "color": "yellow",
  49. "size": float64(4),
  50. "ts": float64(1541152488442),
  51. }},
  52. {{
  53. "color": "red",
  54. "size": float64(1),
  55. "ts": float64(1541152489252),
  56. }},
  57. },
  58. M: map[string]interface{}{
  59. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  60. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  61. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  62. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  63. "op_2_project_0_exceptions_total": int64(0),
  64. "op_2_project_0_process_latency_us": int64(0),
  65. "op_2_project_0_records_in_total": int64(5),
  66. "op_2_project_0_records_out_total": int64(5),
  67. "sink_mockSink_0_exceptions_total": int64(0),
  68. "sink_mockSink_0_records_in_total": int64(5),
  69. "sink_mockSink_0_records_out_total": int64(5),
  70. "source_demo_0_exceptions_total": int64(0),
  71. "source_demo_0_records_in_total": int64(5),
  72. "source_demo_0_records_out_total": int64(5),
  73. },
  74. T: &topo.PrintableTopo{
  75. Sources: []string{"source_demo"},
  76. Edges: map[string][]string{
  77. "source_demo": {"op_1_preprocessor_demo"},
  78. "op_1_preprocessor_demo": {"op_2_project"},
  79. "op_2_project": {"sink_mockSink"},
  80. },
  81. },
  82. }, {
  83. Name: `TestSingleSQLRule2`,
  84. Sql: `SELECT color, ts FROM demo where size > 3`,
  85. R: [][]map[string]interface{}{
  86. {{
  87. "color": "blue",
  88. "ts": float64(1541152486822),
  89. }},
  90. {{
  91. "color": "yellow",
  92. "ts": float64(1541152488442),
  93. }},
  94. },
  95. M: map[string]interface{}{
  96. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  97. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  98. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  99. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  100. "op_3_project_0_exceptions_total": int64(0),
  101. "op_3_project_0_process_latency_us": int64(0),
  102. "op_3_project_0_records_in_total": int64(2),
  103. "op_3_project_0_records_out_total": int64(2),
  104. "sink_mockSink_0_exceptions_total": int64(0),
  105. "sink_mockSink_0_records_in_total": int64(2),
  106. "sink_mockSink_0_records_out_total": int64(2),
  107. "source_demo_0_exceptions_total": int64(0),
  108. "source_demo_0_records_in_total": int64(5),
  109. "source_demo_0_records_out_total": int64(5),
  110. "op_2_filter_0_exceptions_total": int64(0),
  111. "op_2_filter_0_process_latency_us": int64(0),
  112. "op_2_filter_0_records_in_total": int64(5),
  113. "op_2_filter_0_records_out_total": int64(2),
  114. },
  115. }, {
  116. Name: `TestSingleSQLRule3`,
  117. Sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  118. R: [][]map[string]interface{}{
  119. {{
  120. "Int8": float64(6),
  121. "ts": float64(1541152486822),
  122. }},
  123. {{
  124. "Int8": float64(4),
  125. "ts": float64(1541152488442),
  126. }},
  127. },
  128. M: map[string]interface{}{
  129. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  130. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  131. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  132. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  133. "op_3_project_0_exceptions_total": int64(0),
  134. "op_3_project_0_process_latency_us": int64(0),
  135. "op_3_project_0_records_in_total": int64(2),
  136. "op_3_project_0_records_out_total": int64(2),
  137. "sink_mockSink_0_exceptions_total": int64(0),
  138. "sink_mockSink_0_records_in_total": int64(2),
  139. "sink_mockSink_0_records_out_total": int64(2),
  140. "source_demo_0_exceptions_total": int64(0),
  141. "source_demo_0_records_in_total": int64(5),
  142. "source_demo_0_records_out_total": int64(5),
  143. "op_2_filter_0_exceptions_total": int64(0),
  144. "op_2_filter_0_process_latency_us": int64(0),
  145. "op_2_filter_0_records_in_total": int64(5),
  146. "op_2_filter_0_records_out_total": int64(2),
  147. },
  148. }, {
  149. Name: `TestSingleSQLRule4`,
  150. Sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  151. R: [][]map[string]interface{}{
  152. {{
  153. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(red)",
  154. }},
  155. {{
  156. "Int8": float64(6),
  157. "ts": float64(1541152486822),
  158. }},
  159. {{
  160. "Int8": float64(4),
  161. "ts": float64(1541152488442),
  162. }},
  163. {{
  164. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  165. }},
  166. },
  167. M: map[string]interface{}{
  168. "op_1_preprocessor_demoError_0_exceptions_total": int64(2),
  169. "op_1_preprocessor_demoError_0_process_latency_us": int64(0),
  170. "op_1_preprocessor_demoError_0_records_in_total": int64(5),
  171. "op_1_preprocessor_demoError_0_records_out_total": int64(3),
  172. "op_3_project_0_exceptions_total": int64(2),
  173. "op_3_project_0_process_latency_us": int64(0),
  174. "op_3_project_0_records_in_total": int64(4),
  175. "op_3_project_0_records_out_total": int64(2),
  176. "sink_mockSink_0_exceptions_total": int64(0),
  177. "sink_mockSink_0_records_in_total": int64(4),
  178. "sink_mockSink_0_records_out_total": int64(4),
  179. "source_demoError_0_exceptions_total": int64(0),
  180. "source_demoError_0_records_in_total": int64(5),
  181. "source_demoError_0_records_out_total": int64(5),
  182. "op_2_filter_0_exceptions_total": int64(2),
  183. "op_2_filter_0_process_latency_us": int64(0),
  184. "op_2_filter_0_records_in_total": int64(5),
  185. "op_2_filter_0_records_out_total": int64(2),
  186. },
  187. }, {
  188. Name: `TestSingleSQLRule5`,
  189. Sql: `SELECT meta(topic) as m, ts FROM demo`,
  190. R: [][]map[string]interface{}{
  191. {{
  192. "m": "mock",
  193. "ts": float64(1541152486013),
  194. }},
  195. {{
  196. "m": "mock",
  197. "ts": float64(1541152486822),
  198. }},
  199. {{
  200. "m": "mock",
  201. "ts": float64(1541152487632),
  202. }},
  203. {{
  204. "m": "mock",
  205. "ts": float64(1541152488442),
  206. }},
  207. {{
  208. "m": "mock",
  209. "ts": float64(1541152489252),
  210. }},
  211. },
  212. M: map[string]interface{}{
  213. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  214. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  215. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  216. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  217. "op_2_project_0_exceptions_total": int64(0),
  218. "op_2_project_0_process_latency_us": int64(0),
  219. "op_2_project_0_records_in_total": int64(5),
  220. "op_2_project_0_records_out_total": int64(5),
  221. "sink_mockSink_0_exceptions_total": int64(0),
  222. "sink_mockSink_0_records_in_total": int64(5),
  223. "sink_mockSink_0_records_out_total": int64(5),
  224. "source_demo_0_exceptions_total": int64(0),
  225. "source_demo_0_records_in_total": int64(5),
  226. "source_demo_0_records_out_total": int64(5),
  227. },
  228. }, {
  229. Name: `TestSingleSQLRule6`,
  230. Sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  231. R: [][]map[string]interface{}{
  232. {{
  233. "color": "blue",
  234. "ts": float64(1541152486822),
  235. }},
  236. {{
  237. "color": "yellow",
  238. "ts": float64(1541152488442),
  239. }},
  240. },
  241. M: map[string]interface{}{
  242. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  243. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  244. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  245. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  246. "op_3_project_0_exceptions_total": int64(0),
  247. "op_3_project_0_process_latency_us": int64(0),
  248. "op_3_project_0_records_in_total": int64(2),
  249. "op_3_project_0_records_out_total": int64(2),
  250. "sink_mockSink_0_exceptions_total": int64(0),
  251. "sink_mockSink_0_records_in_total": int64(2),
  252. "sink_mockSink_0_records_out_total": int64(2),
  253. "source_demo_0_exceptions_total": int64(0),
  254. "source_demo_0_records_in_total": int64(5),
  255. "source_demo_0_records_out_total": int64(5),
  256. "op_2_filter_0_exceptions_total": int64(0),
  257. "op_2_filter_0_process_latency_us": int64(0),
  258. "op_2_filter_0_records_in_total": int64(5),
  259. "op_2_filter_0_records_out_total": int64(2),
  260. },
  261. }, {
  262. Name: `TestSingleSQLRule7`,
  263. Sql: "SELECT `from` FROM demo1",
  264. R: [][]map[string]interface{}{
  265. {{
  266. "from": "device1",
  267. }},
  268. {{
  269. "from": "device2",
  270. }},
  271. {{
  272. "from": "device3",
  273. }},
  274. {{
  275. "from": "device1",
  276. }},
  277. {{
  278. "from": "device3",
  279. }},
  280. },
  281. M: map[string]interface{}{
  282. "op_1_preprocessor_demo1_0_exceptions_total": int64(0),
  283. "op_1_preprocessor_demo1_0_process_latency_us": int64(0),
  284. "op_1_preprocessor_demo1_0_records_in_total": int64(5),
  285. "op_1_preprocessor_demo1_0_records_out_total": int64(5),
  286. "op_2_project_0_exceptions_total": int64(0),
  287. "op_2_project_0_process_latency_us": int64(0),
  288. "op_2_project_0_records_in_total": int64(5),
  289. "op_2_project_0_records_out_total": int64(5),
  290. "sink_mockSink_0_exceptions_total": int64(0),
  291. "sink_mockSink_0_records_in_total": int64(5),
  292. "sink_mockSink_0_records_out_total": int64(5),
  293. "source_demo1_0_exceptions_total": int64(0),
  294. "source_demo1_0_records_in_total": int64(5),
  295. "source_demo1_0_records_out_total": int64(5),
  296. },
  297. }, {
  298. Name: `TestSingleSQLRule8`,
  299. Sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  300. R: [][]map[string]interface{}{
  301. {{
  302. "temp": float64(25.5),
  303. "hum": float64(65),
  304. "from": "device1",
  305. "ts": float64(1541152486013),
  306. }},
  307. {{
  308. "temp": float64(27.4),
  309. "hum": float64(80),
  310. "from": "device1",
  311. "ts": float64(1541152488442),
  312. }},
  313. },
  314. M: map[string]interface{}{
  315. "op_1_preprocessor_demo1_0_exceptions_total": int64(0),
  316. "op_1_preprocessor_demo1_0_process_latency_us": int64(0),
  317. "op_1_preprocessor_demo1_0_records_in_total": int64(5),
  318. "op_1_preprocessor_demo1_0_records_out_total": int64(5),
  319. "op_3_project_0_exceptions_total": int64(0),
  320. "op_3_project_0_process_latency_us": int64(0),
  321. "op_3_project_0_records_in_total": int64(2),
  322. "op_3_project_0_records_out_total": int64(2),
  323. "op_2_filter_0_exceptions_total": int64(0),
  324. "op_2_filter_0_process_latency_us": int64(0),
  325. "op_2_filter_0_records_in_total": int64(5),
  326. "op_2_filter_0_records_out_total": int64(2),
  327. "sink_mockSink_0_exceptions_total": int64(0),
  328. "sink_mockSink_0_records_in_total": int64(2),
  329. "sink_mockSink_0_records_out_total": int64(2),
  330. "source_demo1_0_exceptions_total": int64(0),
  331. "source_demo1_0_records_in_total": int64(5),
  332. "source_demo1_0_records_out_total": int64(5),
  333. },
  334. }, {
  335. Name: `TestSingleSQLRule9`,
  336. Sql: `SELECT color, CASE WHEN size < 2 THEN "S" WHEN size < 4 THEN "M" ELSE "L" END as s, ts FROM demo`,
  337. R: [][]map[string]interface{}{
  338. {{
  339. "color": "red",
  340. "s": "M",
  341. "ts": float64(1541152486013),
  342. }},
  343. {{
  344. "color": "blue",
  345. "s": "L",
  346. "ts": float64(1541152486822),
  347. }},
  348. {{
  349. "color": "blue",
  350. "s": "M",
  351. "ts": float64(1541152487632),
  352. }},
  353. {{
  354. "color": "yellow",
  355. "s": "L",
  356. "ts": float64(1541152488442),
  357. }},
  358. {{
  359. "color": "red",
  360. "s": "S",
  361. "ts": float64(1541152489252),
  362. }},
  363. },
  364. M: map[string]interface{}{
  365. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  366. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  367. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  368. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  369. "op_2_project_0_exceptions_total": int64(0),
  370. "op_2_project_0_process_latency_us": int64(0),
  371. "op_2_project_0_records_in_total": int64(5),
  372. "op_2_project_0_records_out_total": int64(5),
  373. "sink_mockSink_0_exceptions_total": int64(0),
  374. "sink_mockSink_0_records_in_total": int64(5),
  375. "sink_mockSink_0_records_out_total": int64(5),
  376. "source_demo_0_exceptions_total": int64(0),
  377. "source_demo_0_records_in_total": int64(5),
  378. "source_demo_0_records_out_total": int64(5),
  379. },
  380. T: &topo.PrintableTopo{
  381. Sources: []string{"source_demo"},
  382. Edges: map[string][]string{
  383. "source_demo": {"op_1_preprocessor_demo"},
  384. "op_1_preprocessor_demo": {"op_2_project"},
  385. "op_2_project": {"sink_mockSink"},
  386. },
  387. },
  388. }, {
  389. Name: `TestSingleSQLRule10`,
  390. Sql: "SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id",
  391. R: [][]map[string]interface{}{
  392. {{
  393. "id": float64(1541152486013),
  394. "name": "name1",
  395. "color": "red",
  396. "size": float64(3),
  397. "ts": float64(1541152486013),
  398. }},
  399. {{
  400. "id": float64(1541152487632),
  401. "name": "name2",
  402. "color": "blue",
  403. "size": float64(2),
  404. "ts": float64(1541152487632),
  405. }},
  406. {{
  407. "id": float64(1541152489252),
  408. "name": "name3",
  409. "color": "red",
  410. "size": float64(1),
  411. "ts": float64(1541152489252),
  412. }},
  413. },
  414. W: 15,
  415. M: map[string]interface{}{
  416. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  417. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  418. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  419. "op_2_tableprocessor_table1_0_exceptions_total": int64(0),
  420. "op_2_tableprocessor_table1_0_records_in_total": int64(4),
  421. "op_2_tableprocessor_table1_0_records_out_total": int64(1),
  422. "op_3_join_aligner_0_records_in_total": int64(6),
  423. "op_3_join_aligner_0_records_out_total": int64(5),
  424. "op_4_join_0_exceptions_total": int64(0),
  425. "op_4_join_0_records_in_total": int64(5),
  426. "op_4_join_0_records_out_total": int64(3),
  427. "op_5_project_0_exceptions_total": int64(0),
  428. "op_5_project_0_records_in_total": int64(3),
  429. "op_5_project_0_records_out_total": int64(3),
  430. "sink_mockSink_0_exceptions_total": int64(0),
  431. "sink_mockSink_0_records_in_total": int64(3),
  432. "sink_mockSink_0_records_out_total": int64(3),
  433. "source_demo_0_exceptions_total": int64(0),
  434. "source_demo_0_records_in_total": int64(5),
  435. "source_demo_0_records_out_total": int64(5),
  436. "source_table1_0_exceptions_total": int64(0),
  437. "source_table1_0_records_in_total": int64(4),
  438. "source_table1_0_records_out_total": int64(4),
  439. },
  440. }, {
  441. Name: `TestSingleSQLRule11`,
  442. Sql: "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
  443. R: [][]map[string]interface{}{
  444. {{
  445. "device": "device2",
  446. }},
  447. {{
  448. "device": "device4",
  449. }},
  450. {{
  451. "device": "device5",
  452. }},
  453. },
  454. M: map[string]interface{}{
  455. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  456. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  457. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  458. "op_2_tableprocessor_demoTable_0_exceptions_total": int64(0),
  459. "op_2_tableprocessor_demoTable_0_records_in_total": int64(5),
  460. "op_2_tableprocessor_demoTable_0_records_out_total": int64(5),
  461. "op_3_join_aligner_0_records_in_total": int64(10),
  462. "op_3_join_aligner_0_records_out_total": int64(5),
  463. "op_4_join_0_exceptions_total": int64(0),
  464. "op_4_join_0_records_in_total": int64(5),
  465. "op_4_join_0_records_out_total": int64(3),
  466. "op_5_project_0_exceptions_total": int64(0),
  467. "op_5_project_0_records_in_total": int64(3),
  468. "op_5_project_0_records_out_total": int64(3),
  469. "sink_mockSink_0_exceptions_total": int64(0),
  470. "sink_mockSink_0_records_in_total": int64(3),
  471. "sink_mockSink_0_records_out_total": int64(3),
  472. "source_demo_0_exceptions_total": int64(0),
  473. "source_demo_0_records_in_total": int64(5),
  474. "source_demo_0_records_out_total": int64(5),
  475. "source_demoTable_0_exceptions_total": int64(0),
  476. "source_demoTable_0_records_in_total": int64(5),
  477. "source_demoTable_0_records_out_total": int64(5),
  478. },
  479. }, {
  480. Name: `TestSingleSQLRule12`,
  481. Sql: "SELECT demo.ts as demoTs, table1.id as table1Id FROM demo INNER JOIN table1 on demoTs = table1Id",
  482. R: [][]map[string]interface{}{
  483. {{
  484. "table1Id": float64(1541152486013),
  485. "demoTs": float64(1541152486013),
  486. }},
  487. {{
  488. "table1Id": float64(1541152487632),
  489. "demoTs": float64(1541152487632),
  490. }},
  491. {{
  492. "table1Id": float64(1541152489252),
  493. "demoTs": float64(1541152489252),
  494. }},
  495. },
  496. W: 15,
  497. M: map[string]interface{}{
  498. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  499. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  500. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  501. "op_2_tableprocessor_table1_0_exceptions_total": int64(0),
  502. "op_2_tableprocessor_table1_0_records_in_total": int64(4),
  503. "op_2_tableprocessor_table1_0_records_out_total": int64(1),
  504. "op_3_join_aligner_0_records_in_total": int64(6),
  505. "op_3_join_aligner_0_records_out_total": int64(5),
  506. "op_4_join_0_exceptions_total": int64(0),
  507. "op_4_join_0_records_in_total": int64(5),
  508. "op_4_join_0_records_out_total": int64(3),
  509. "op_5_project_0_exceptions_total": int64(0),
  510. "op_5_project_0_records_in_total": int64(3),
  511. "op_5_project_0_records_out_total": int64(3),
  512. "sink_mockSink_0_exceptions_total": int64(0),
  513. "sink_mockSink_0_records_in_total": int64(3),
  514. "sink_mockSink_0_records_out_total": int64(3),
  515. "source_demo_0_exceptions_total": int64(0),
  516. "source_demo_0_records_in_total": int64(5),
  517. "source_demo_0_records_out_total": int64(5),
  518. "source_table1_0_exceptions_total": int64(0),
  519. "source_table1_0_records_in_total": int64(4),
  520. "source_table1_0_records_out_total": int64(4),
  521. },
  522. },
  523. }
  524. HandleStream(true, streamList, t)
  525. options := []*api.RuleOption{
  526. {
  527. BufferLength: 100,
  528. SendError: true,
  529. }, {
  530. BufferLength: 100,
  531. SendError: true,
  532. Qos: api.AtLeastOnce,
  533. CheckpointInterval: 5000,
  534. }, {
  535. BufferLength: 100,
  536. SendError: true,
  537. Qos: api.ExactlyOnce,
  538. CheckpointInterval: 5000,
  539. },
  540. }
  541. for j, opt := range options {
  542. DoRuleTest(t, tests, j, opt, 0)
  543. }
  544. }
  545. func TestSingleSQLError(t *testing.T) {
  546. //Reset
  547. streamList := []string{"ldemo"}
  548. HandleStream(false, streamList, t)
  549. //Data setup
  550. var tests = []RuleTest{
  551. {
  552. Name: `TestSingleSQLErrorRule1`,
  553. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  554. R: [][]map[string]interface{}{
  555. {{
  556. "color": "red",
  557. "ts": float64(1541152486013),
  558. }},
  559. {{
  560. "error": "run Where error: invalid operation string(string) >= int64(3)",
  561. }},
  562. {{
  563. "ts": float64(1541152487632),
  564. }},
  565. },
  566. M: map[string]interface{}{
  567. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  568. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  569. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  570. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  571. "op_3_project_0_exceptions_total": int64(1),
  572. "op_3_project_0_process_latency_us": int64(0),
  573. "op_3_project_0_records_in_total": int64(3),
  574. "op_3_project_0_records_out_total": int64(2),
  575. "sink_mockSink_0_exceptions_total": int64(0),
  576. "sink_mockSink_0_records_in_total": int64(3),
  577. "sink_mockSink_0_records_out_total": int64(3),
  578. "source_ldemo_0_exceptions_total": int64(0),
  579. "source_ldemo_0_records_in_total": int64(5),
  580. "source_ldemo_0_records_out_total": int64(5),
  581. "op_2_filter_0_exceptions_total": int64(1),
  582. "op_2_filter_0_process_latency_us": int64(0),
  583. "op_2_filter_0_records_in_total": int64(5),
  584. "op_2_filter_0_records_out_total": int64(2),
  585. },
  586. }, {
  587. Name: `TestSingleSQLErrorRule2`,
  588. Sql: `SELECT size * 5 FROM ldemo`,
  589. R: [][]map[string]interface{}{
  590. {{
  591. "kuiper_field_0": float64(15),
  592. }},
  593. {{
  594. "error": "run Select error: invalid operation string(string) * int64(5)",
  595. }},
  596. {{
  597. "kuiper_field_0": float64(15),
  598. }},
  599. {{
  600. "kuiper_field_0": float64(10),
  601. }},
  602. {{}},
  603. },
  604. M: map[string]interface{}{
  605. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  606. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  607. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  608. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  609. "op_2_project_0_exceptions_total": int64(1),
  610. "op_2_project_0_process_latency_us": int64(0),
  611. "op_2_project_0_records_in_total": int64(5),
  612. "op_2_project_0_records_out_total": int64(4),
  613. "sink_mockSink_0_exceptions_total": int64(0),
  614. "sink_mockSink_0_records_in_total": int64(5),
  615. "sink_mockSink_0_records_out_total": int64(5),
  616. "source_ldemo_0_exceptions_total": int64(0),
  617. "source_ldemo_0_records_in_total": int64(5),
  618. "source_ldemo_0_records_out_total": int64(5),
  619. },
  620. },
  621. }
  622. HandleStream(true, streamList, t)
  623. DoRuleTest(t, tests, 0, &api.RuleOption{
  624. BufferLength: 100,
  625. SendError: true,
  626. }, 0)
  627. }
  628. func TestSingleSQLOmitError(t *testing.T) {
  629. //Reset
  630. streamList := []string{"ldemo"}
  631. HandleStream(false, streamList, t)
  632. //Data setup
  633. var tests = []RuleTest{
  634. {
  635. Name: `TestSingleSQLErrorRule1`,
  636. Sql: `SELECT color, ts FROM ldemo where size >= 3`,
  637. R: [][]map[string]interface{}{
  638. {{
  639. "color": "red",
  640. "ts": float64(1541152486013),
  641. }},
  642. {{
  643. "ts": float64(1541152487632),
  644. }},
  645. },
  646. M: map[string]interface{}{
  647. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  648. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  649. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  650. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  651. "op_3_project_0_exceptions_total": int64(0),
  652. "op_3_project_0_process_latency_us": int64(0),
  653. "op_3_project_0_records_in_total": int64(2),
  654. "op_3_project_0_records_out_total": int64(2),
  655. "sink_mockSink_0_exceptions_total": int64(0),
  656. "sink_mockSink_0_records_in_total": int64(2),
  657. "sink_mockSink_0_records_out_total": int64(2),
  658. "source_ldemo_0_exceptions_total": int64(0),
  659. "source_ldemo_0_records_in_total": int64(5),
  660. "source_ldemo_0_records_out_total": int64(5),
  661. "op_2_filter_0_exceptions_total": int64(1),
  662. "op_2_filter_0_process_latency_us": int64(0),
  663. "op_2_filter_0_records_in_total": int64(5),
  664. "op_2_filter_0_records_out_total": int64(2),
  665. },
  666. }, {
  667. Name: `TestSingleSQLErrorRule2`,
  668. Sql: `SELECT size * 5 FROM ldemo`,
  669. R: [][]map[string]interface{}{
  670. {{
  671. "kuiper_field_0": float64(15),
  672. }},
  673. {{
  674. "kuiper_field_0": float64(15),
  675. }},
  676. {{
  677. "kuiper_field_0": float64(10),
  678. }},
  679. {{}},
  680. },
  681. M: map[string]interface{}{
  682. "op_1_preprocessor_ldemo_0_exceptions_total": int64(0),
  683. "op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
  684. "op_1_preprocessor_ldemo_0_records_in_total": int64(5),
  685. "op_1_preprocessor_ldemo_0_records_out_total": int64(5),
  686. "op_2_project_0_exceptions_total": int64(1),
  687. "op_2_project_0_process_latency_us": int64(0),
  688. "op_2_project_0_records_in_total": int64(5),
  689. "op_2_project_0_records_out_total": int64(4),
  690. "sink_mockSink_0_exceptions_total": int64(0),
  691. "sink_mockSink_0_records_in_total": int64(4),
  692. "sink_mockSink_0_records_out_total": int64(4),
  693. "source_ldemo_0_exceptions_total": int64(0),
  694. "source_ldemo_0_records_in_total": int64(5),
  695. "source_ldemo_0_records_out_total": int64(5),
  696. },
  697. },
  698. }
  699. HandleStream(true, streamList, t)
  700. DoRuleTest(t, tests, 0, &api.RuleOption{
  701. BufferLength: 100,
  702. SendError: false,
  703. }, 0)
  704. }
  705. func TestSingleSQLTemplate(t *testing.T) {
  706. //Reset
  707. streamList := []string{"demo"}
  708. HandleStream(false, streamList, t)
  709. //Data setup
  710. var tests = []RuleTest{
  711. {
  712. Name: `TestSingleSQLTemplateRule1`,
  713. Sql: `SELECT * FROM demo`,
  714. R: []map[string]interface{}{
  715. {
  716. "c": "red",
  717. "wrapper": "w1",
  718. },
  719. {
  720. "c": "blue",
  721. "wrapper": "w1",
  722. },
  723. {
  724. "c": "blue",
  725. "wrapper": "w1",
  726. },
  727. {
  728. "c": "yellow",
  729. "wrapper": "w1",
  730. },
  731. {
  732. "c": "red",
  733. "wrapper": "w1",
  734. },
  735. },
  736. M: map[string]interface{}{
  737. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  738. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  739. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  740. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  741. "op_2_project_0_exceptions_total": int64(0),
  742. "op_2_project_0_process_latency_us": int64(0),
  743. "op_2_project_0_records_in_total": int64(5),
  744. "op_2_project_0_records_out_total": int64(5),
  745. "sink_mockSink_0_exceptions_total": int64(0),
  746. "sink_mockSink_0_records_in_total": int64(5),
  747. "sink_mockSink_0_records_out_total": int64(5),
  748. "source_demo_0_exceptions_total": int64(0),
  749. "source_demo_0_records_in_total": int64(5),
  750. "source_demo_0_records_out_total": int64(5),
  751. },
  752. },
  753. }
  754. HandleStream(true, streamList, t)
  755. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  756. BufferLength: 100,
  757. SendError: true,
  758. }, 0, map[string]interface{}{
  759. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  760. "sendSingle": true,
  761. }, func(result [][]byte) interface{} {
  762. var maps []map[string]interface{}
  763. for _, v := range result {
  764. var mapRes map[string]interface{}
  765. err := json.Unmarshal(v, &mapRes)
  766. if err != nil {
  767. t.Errorf("Failed to parse the input into map")
  768. continue
  769. }
  770. maps = append(maps, mapRes)
  771. }
  772. return maps
  773. })
  774. }
  775. func TestNoneSingleSQLTemplate(t *testing.T) {
  776. //Reset
  777. streamList := []string{"demo"}
  778. HandleStream(false, streamList, t)
  779. //Data setup
  780. var tests = []RuleTest{
  781. {
  782. Name: `TestNoneSingleSQLTemplateRule1`,
  783. Sql: `SELECT * FROM demo`,
  784. R: [][]byte{
  785. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  786. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  787. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  788. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  789. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  790. },
  791. M: map[string]interface{}{
  792. "op_1_preprocessor_demo_0_exceptions_total": int64(0),
  793. "op_1_preprocessor_demo_0_process_latency_us": int64(0),
  794. "op_1_preprocessor_demo_0_records_in_total": int64(5),
  795. "op_1_preprocessor_demo_0_records_out_total": int64(5),
  796. "op_2_project_0_exceptions_total": int64(0),
  797. "op_2_project_0_process_latency_us": int64(0),
  798. "op_2_project_0_records_in_total": int64(5),
  799. "op_2_project_0_records_out_total": int64(5),
  800. "sink_mockSink_0_exceptions_total": int64(0),
  801. "sink_mockSink_0_records_in_total": int64(5),
  802. "sink_mockSink_0_records_out_total": int64(5),
  803. "source_demo_0_exceptions_total": int64(0),
  804. "source_demo_0_records_in_total": int64(5),
  805. "source_demo_0_records_out_total": int64(5),
  806. },
  807. },
  808. }
  809. HandleStream(true, streamList, t)
  810. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  811. BufferLength: 100,
  812. SendError: true,
  813. }, 0, map[string]interface{}{
  814. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  815. }, func(result [][]byte) interface{} {
  816. return result
  817. })
  818. }
  819. func TestSingleSQLForBinary(t *testing.T) {
  820. //Reset
  821. streamList := []string{"binDemo"}
  822. HandleStream(false, streamList, t)
  823. //Data setup
  824. var tests = []RuleTest{
  825. {
  826. Name: `TestSingleSQLRule1`,
  827. Sql: `SELECT * FROM binDemo`,
  828. R: [][]map[string]interface{}{
  829. {{
  830. "self": mocknode.Image,
  831. }},
  832. },
  833. W: 50,
  834. M: map[string]interface{}{
  835. "op_1_preprocessor_binDemo_0_exceptions_total": int64(0),
  836. "op_1_preprocessor_binDemo_0_process_latency_us": int64(0),
  837. "op_1_preprocessor_binDemo_0_records_in_total": int64(1),
  838. "op_1_preprocessor_binDemo_0_records_out_total": int64(1),
  839. "op_2_project_0_exceptions_total": int64(0),
  840. "op_2_project_0_process_latency_us": int64(0),
  841. "op_2_project_0_records_in_total": int64(1),
  842. "op_2_project_0_records_out_total": int64(1),
  843. "sink_mockSink_0_exceptions_total": int64(0),
  844. "sink_mockSink_0_records_in_total": int64(1),
  845. "sink_mockSink_0_records_out_total": int64(1),
  846. "source_binDemo_0_exceptions_total": int64(0),
  847. "source_binDemo_0_records_in_total": int64(1),
  848. "source_binDemo_0_records_out_total": int64(1),
  849. },
  850. },
  851. }
  852. HandleStream(true, streamList, t)
  853. options := []*api.RuleOption{
  854. {
  855. BufferLength: 100,
  856. SendError: true,
  857. }, {
  858. BufferLength: 100,
  859. SendError: true,
  860. Qos: api.AtLeastOnce,
  861. CheckpointInterval: 5000,
  862. }, {
  863. BufferLength: 100,
  864. SendError: true,
  865. Qos: api.ExactlyOnce,
  866. CheckpointInterval: 5000,
  867. },
  868. }
  869. byteFunc := func(result [][]byte) interface{} {
  870. var maps [][]map[string]interface{}
  871. for _, v := range result {
  872. var mapRes []map[string][]byte
  873. err := json.Unmarshal(v, &mapRes)
  874. if err != nil {
  875. panic("Failed to parse the input into map")
  876. }
  877. mapInt := make([]map[string]interface{}, len(mapRes))
  878. for i, mv := range mapRes {
  879. mapInt[i] = make(map[string]interface{})
  880. //assume only one key
  881. for k, v := range mv {
  882. mapInt[i][k] = v
  883. }
  884. }
  885. maps = append(maps, mapInt)
  886. }
  887. return maps
  888. }
  889. for j, opt := range options {
  890. doRuleTestBySinkProps(t, tests, j, opt, 0, nil, byteFunc)
  891. }
  892. }