rule_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. package processors
  2. import (
  3. "encoding/json"
  4. "github.com/emqx/kuiper/xstream"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "testing"
  7. )
  8. func TestSingleSQL(t *testing.T) {
  9. //Reset
  10. streamList := []string{"demo", "demoError", "demo1"}
  11. handleStream(false, streamList, t)
  12. //Data setup
  13. var tests = []ruleTest{
  14. {
  15. name: `TestSingleSQLRule1`,
  16. sql: `SELECT * FROM demo`,
  17. r: [][]map[string]interface{}{
  18. {{
  19. "color": "red",
  20. "size": float64(3),
  21. "ts": float64(1541152486013),
  22. }},
  23. {{
  24. "color": "blue",
  25. "size": float64(6),
  26. "ts": float64(1541152486822),
  27. }},
  28. {{
  29. "color": "blue",
  30. "size": float64(2),
  31. "ts": float64(1541152487632),
  32. }},
  33. {{
  34. "color": "yellow",
  35. "size": float64(4),
  36. "ts": float64(1541152488442),
  37. }},
  38. {{
  39. "color": "red",
  40. "size": float64(1),
  41. "ts": float64(1541152489252),
  42. }},
  43. },
  44. m: map[string]interface{}{
  45. "op_preprocessor_demo_0_exceptions_total": int64(0),
  46. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  47. "op_preprocessor_demo_0_records_in_total": int64(5),
  48. "op_preprocessor_demo_0_records_out_total": int64(5),
  49. "op_project_0_exceptions_total": int64(0),
  50. "op_project_0_process_latency_ms": int64(0),
  51. "op_project_0_records_in_total": int64(5),
  52. "op_project_0_records_out_total": int64(5),
  53. "sink_mockSink_0_exceptions_total": int64(0),
  54. "sink_mockSink_0_records_in_total": int64(5),
  55. "sink_mockSink_0_records_out_total": int64(5),
  56. "source_demo_0_exceptions_total": int64(0),
  57. "source_demo_0_records_in_total": int64(5),
  58. "source_demo_0_records_out_total": int64(5),
  59. },
  60. t: &xstream.PrintableTopo{
  61. Sources: []string{"source_demo"},
  62. Edges: map[string][]string{
  63. "source_demo": {"op_preprocessor_demo"},
  64. "op_preprocessor_demo": {"op_project"},
  65. "op_project": {"sink_mockSink"},
  66. },
  67. },
  68. }, {
  69. name: `TestSingleSQLRule2`,
  70. sql: `SELECT color, ts FROM demo where size > 3`,
  71. r: [][]map[string]interface{}{
  72. {{
  73. "color": "blue",
  74. "ts": float64(1541152486822),
  75. }},
  76. {{
  77. "color": "yellow",
  78. "ts": float64(1541152488442),
  79. }},
  80. },
  81. m: map[string]interface{}{
  82. "op_preprocessor_demo_0_exceptions_total": int64(0),
  83. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  84. "op_preprocessor_demo_0_records_in_total": int64(5),
  85. "op_preprocessor_demo_0_records_out_total": int64(5),
  86. "op_project_0_exceptions_total": int64(0),
  87. "op_project_0_process_latency_ms": int64(0),
  88. "op_project_0_records_in_total": int64(2),
  89. "op_project_0_records_out_total": int64(2),
  90. "sink_mockSink_0_exceptions_total": int64(0),
  91. "sink_mockSink_0_records_in_total": int64(2),
  92. "sink_mockSink_0_records_out_total": int64(2),
  93. "source_demo_0_exceptions_total": int64(0),
  94. "source_demo_0_records_in_total": int64(5),
  95. "source_demo_0_records_out_total": int64(5),
  96. "op_filter_0_exceptions_total": int64(0),
  97. "op_filter_0_process_latency_ms": int64(0),
  98. "op_filter_0_records_in_total": int64(5),
  99. "op_filter_0_records_out_total": int64(2),
  100. },
  101. }, {
  102. name: `TestSingleSQLRule3`,
  103. sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  104. r: [][]map[string]interface{}{
  105. {{
  106. "Int8": float64(6),
  107. "ts": float64(1541152486822),
  108. }},
  109. {{
  110. "Int8": float64(4),
  111. "ts": float64(1541152488442),
  112. }},
  113. },
  114. m: map[string]interface{}{
  115. "op_preprocessor_demo_0_exceptions_total": int64(0),
  116. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  117. "op_preprocessor_demo_0_records_in_total": int64(5),
  118. "op_preprocessor_demo_0_records_out_total": int64(5),
  119. "op_project_0_exceptions_total": int64(0),
  120. "op_project_0_process_latency_ms": int64(0),
  121. "op_project_0_records_in_total": int64(2),
  122. "op_project_0_records_out_total": int64(2),
  123. "sink_mockSink_0_exceptions_total": int64(0),
  124. "sink_mockSink_0_records_in_total": int64(2),
  125. "sink_mockSink_0_records_out_total": int64(2),
  126. "source_demo_0_exceptions_total": int64(0),
  127. "source_demo_0_records_in_total": int64(5),
  128. "source_demo_0_records_out_total": int64(5),
  129. "op_filter_0_exceptions_total": int64(0),
  130. "op_filter_0_process_latency_ms": int64(0),
  131. "op_filter_0_records_in_total": int64(5),
  132. "op_filter_0_records_out_total": int64(2),
  133. },
  134. }, {
  135. name: `TestSingleSQLRule4`,
  136. sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  137. r: [][]map[string]interface{}{
  138. {{
  139. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  140. }},
  141. {{
  142. "Int8": float64(6),
  143. "ts": float64(1541152486822),
  144. }},
  145. {{
  146. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  147. }},
  148. {{
  149. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  150. }},
  151. },
  152. m: map[string]interface{}{
  153. "op_preprocessor_demoError_0_exceptions_total": int64(3),
  154. "op_preprocessor_demoError_0_process_latency_ms": int64(0),
  155. "op_preprocessor_demoError_0_records_in_total": int64(5),
  156. "op_preprocessor_demoError_0_records_out_total": int64(2),
  157. "op_project_0_exceptions_total": int64(3),
  158. "op_project_0_process_latency_ms": int64(0),
  159. "op_project_0_records_in_total": int64(4),
  160. "op_project_0_records_out_total": int64(1),
  161. "sink_mockSink_0_exceptions_total": int64(0),
  162. "sink_mockSink_0_records_in_total": int64(4),
  163. "sink_mockSink_0_records_out_total": int64(4),
  164. "source_demoError_0_exceptions_total": int64(0),
  165. "source_demoError_0_records_in_total": int64(5),
  166. "source_demoError_0_records_out_total": int64(5),
  167. "op_filter_0_exceptions_total": int64(3),
  168. "op_filter_0_process_latency_ms": int64(0),
  169. "op_filter_0_records_in_total": int64(5),
  170. "op_filter_0_records_out_total": int64(1),
  171. },
  172. }, {
  173. name: `TestSingleSQLRule4`,
  174. sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
  175. r: [][]map[string]interface{}{
  176. {{
  177. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  178. }},
  179. {{
  180. "Int8": float64(6),
  181. "ts": float64(1541152486822),
  182. }},
  183. {{
  184. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  185. }},
  186. {{
  187. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  188. }},
  189. },
  190. m: map[string]interface{}{
  191. "op_preprocessor_demoError_0_exceptions_total": int64(3),
  192. "op_preprocessor_demoError_0_process_latency_ms": int64(0),
  193. "op_preprocessor_demoError_0_records_in_total": int64(5),
  194. "op_preprocessor_demoError_0_records_out_total": int64(2),
  195. "op_project_0_exceptions_total": int64(3),
  196. "op_project_0_process_latency_ms": int64(0),
  197. "op_project_0_records_in_total": int64(4),
  198. "op_project_0_records_out_total": int64(1),
  199. "sink_mockSink_0_exceptions_total": int64(0),
  200. "sink_mockSink_0_records_in_total": int64(4),
  201. "sink_mockSink_0_records_out_total": int64(4),
  202. "source_demoError_0_exceptions_total": int64(0),
  203. "source_demoError_0_records_in_total": int64(5),
  204. "source_demoError_0_records_out_total": int64(5),
  205. "op_filter_0_exceptions_total": int64(3),
  206. "op_filter_0_process_latency_ms": int64(0),
  207. "op_filter_0_records_in_total": int64(5),
  208. "op_filter_0_records_out_total": int64(1),
  209. },
  210. }, {
  211. name: `TestSingleSQLRule5`,
  212. sql: `SELECT meta(topic) as m, ts FROM demo`,
  213. r: [][]map[string]interface{}{
  214. {{
  215. "m": "mock",
  216. "ts": float64(1541152486013),
  217. }},
  218. {{
  219. "m": "mock",
  220. "ts": float64(1541152486822),
  221. }},
  222. {{
  223. "m": "mock",
  224. "ts": float64(1541152487632),
  225. }},
  226. {{
  227. "m": "mock",
  228. "ts": float64(1541152488442),
  229. }},
  230. {{
  231. "m": "mock",
  232. "ts": float64(1541152489252),
  233. }},
  234. },
  235. m: map[string]interface{}{
  236. "op_preprocessor_demo_0_exceptions_total": int64(0),
  237. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  238. "op_preprocessor_demo_0_records_in_total": int64(5),
  239. "op_preprocessor_demo_0_records_out_total": int64(5),
  240. "op_project_0_exceptions_total": int64(0),
  241. "op_project_0_process_latency_ms": int64(0),
  242. "op_project_0_records_in_total": int64(5),
  243. "op_project_0_records_out_total": int64(5),
  244. "sink_mockSink_0_exceptions_total": int64(0),
  245. "sink_mockSink_0_records_in_total": int64(5),
  246. "sink_mockSink_0_records_out_total": int64(5),
  247. "source_demo_0_exceptions_total": int64(0),
  248. "source_demo_0_records_in_total": int64(5),
  249. "source_demo_0_records_out_total": int64(5),
  250. },
  251. }, {
  252. name: `TestSingleSQLRule6`,
  253. sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  254. r: [][]map[string]interface{}{
  255. {{
  256. "color": "blue",
  257. "ts": float64(1541152486822),
  258. }},
  259. {{
  260. "color": "yellow",
  261. "ts": float64(1541152488442),
  262. }},
  263. },
  264. m: map[string]interface{}{
  265. "op_preprocessor_demo_0_exceptions_total": int64(0),
  266. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  267. "op_preprocessor_demo_0_records_in_total": int64(5),
  268. "op_preprocessor_demo_0_records_out_total": int64(5),
  269. "op_project_0_exceptions_total": int64(0),
  270. "op_project_0_process_latency_ms": int64(0),
  271. "op_project_0_records_in_total": int64(2),
  272. "op_project_0_records_out_total": int64(2),
  273. "sink_mockSink_0_exceptions_total": int64(0),
  274. "sink_mockSink_0_records_in_total": int64(2),
  275. "sink_mockSink_0_records_out_total": int64(2),
  276. "source_demo_0_exceptions_total": int64(0),
  277. "source_demo_0_records_in_total": int64(5),
  278. "source_demo_0_records_out_total": int64(5),
  279. "op_filter_0_exceptions_total": int64(0),
  280. "op_filter_0_process_latency_ms": int64(0),
  281. "op_filter_0_records_in_total": int64(5),
  282. "op_filter_0_records_out_total": int64(2),
  283. },
  284. }, {
  285. name: `TestSingleSQLRule7`,
  286. sql: "SELECT `from` FROM demo1",
  287. r: [][]map[string]interface{}{
  288. {{
  289. "from": "device1",
  290. }},
  291. {{
  292. "from": "device2",
  293. }},
  294. {{
  295. "from": "device3",
  296. }},
  297. {{
  298. "from": "device1",
  299. }},
  300. {{
  301. "from": "device3",
  302. }},
  303. },
  304. m: map[string]interface{}{
  305. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  306. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  307. "op_preprocessor_demo1_0_records_in_total": int64(5),
  308. "op_preprocessor_demo1_0_records_out_total": int64(5),
  309. "op_project_0_exceptions_total": int64(0),
  310. "op_project_0_process_latency_ms": int64(0),
  311. "op_project_0_records_in_total": int64(5),
  312. "op_project_0_records_out_total": int64(5),
  313. "sink_mockSink_0_exceptions_total": int64(0),
  314. "sink_mockSink_0_records_in_total": int64(5),
  315. "sink_mockSink_0_records_out_total": int64(5),
  316. "source_demo1_0_exceptions_total": int64(0),
  317. "source_demo1_0_records_in_total": int64(5),
  318. "source_demo1_0_records_out_total": int64(5),
  319. },
  320. }, {
  321. name: `TestSingleSQLRule8`,
  322. sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  323. r: [][]map[string]interface{}{
  324. {{
  325. "temp": float64(25.5),
  326. "hum": float64(65),
  327. "from": "device1",
  328. "ts": float64(1541152486013),
  329. }},
  330. {{
  331. "temp": float64(27.4),
  332. "hum": float64(80),
  333. "from": "device1",
  334. "ts": float64(1541152488442),
  335. }},
  336. },
  337. m: map[string]interface{}{
  338. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  339. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  340. "op_preprocessor_demo1_0_records_in_total": int64(5),
  341. "op_preprocessor_demo1_0_records_out_total": int64(5),
  342. "op_project_0_exceptions_total": int64(0),
  343. "op_project_0_process_latency_ms": int64(0),
  344. "op_project_0_records_in_total": int64(2),
  345. "op_project_0_records_out_total": int64(2),
  346. "op_filter_0_exceptions_total": int64(0),
  347. "op_filter_0_process_latency_ms": int64(0),
  348. "op_filter_0_records_in_total": int64(5),
  349. "op_filter_0_records_out_total": int64(2),
  350. "sink_mockSink_0_exceptions_total": int64(0),
  351. "sink_mockSink_0_records_in_total": int64(2),
  352. "sink_mockSink_0_records_out_total": int64(2),
  353. "source_demo1_0_exceptions_total": int64(0),
  354. "source_demo1_0_records_in_total": int64(5),
  355. "source_demo1_0_records_out_total": int64(5),
  356. },
  357. },
  358. }
  359. handleStream(true, streamList, t)
  360. options := []*api.RuleOption{
  361. {
  362. BufferLength: 100,
  363. }, {
  364. BufferLength: 100,
  365. Qos: api.AtLeastOnce,
  366. CheckpointInterval: 5000,
  367. }, {
  368. BufferLength: 100,
  369. Qos: api.ExactlyOnce,
  370. CheckpointInterval: 5000,
  371. },
  372. }
  373. for j, opt := range options {
  374. doRuleTest(t, tests, j, opt)
  375. }
  376. }
  377. func TestSingleSQLError(t *testing.T) {
  378. //Reset
  379. streamList := []string{"ldemo"}
  380. handleStream(false, streamList, t)
  381. //Data setup
  382. var tests = []ruleTest{
  383. {
  384. name: `TestSingleSQLErrorRule1`,
  385. sql: `SELECT color, ts FROM ldemo where size >= 3`,
  386. r: [][]map[string]interface{}{
  387. {{
  388. "color": "red",
  389. "ts": float64(1541152486013),
  390. }},
  391. {{
  392. "error": "run Where error: invalid operation string(string) >= int64(3)",
  393. }},
  394. {{
  395. "ts": float64(1541152487632),
  396. }},
  397. },
  398. m: map[string]interface{}{
  399. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  400. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  401. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  402. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  403. "op_project_0_exceptions_total": int64(1),
  404. "op_project_0_process_latency_ms": int64(0),
  405. "op_project_0_records_in_total": int64(3),
  406. "op_project_0_records_out_total": int64(2),
  407. "sink_mockSink_0_exceptions_total": int64(0),
  408. "sink_mockSink_0_records_in_total": int64(3),
  409. "sink_mockSink_0_records_out_total": int64(3),
  410. "source_ldemo_0_exceptions_total": int64(0),
  411. "source_ldemo_0_records_in_total": int64(5),
  412. "source_ldemo_0_records_out_total": int64(5),
  413. "op_filter_0_exceptions_total": int64(1),
  414. "op_filter_0_process_latency_ms": int64(0),
  415. "op_filter_0_records_in_total": int64(5),
  416. "op_filter_0_records_out_total": int64(2),
  417. },
  418. }, {
  419. name: `TestSingleSQLErrorRule2`,
  420. sql: `SELECT size * 5 FROM ldemo`,
  421. r: [][]map[string]interface{}{
  422. {{
  423. "rengine_field_0": float64(15),
  424. }},
  425. {{
  426. "error": "run Select error: invalid operation string(string) * int64(5)",
  427. }},
  428. {{
  429. "rengine_field_0": float64(15),
  430. }},
  431. {{
  432. "rengine_field_0": float64(10),
  433. }},
  434. {{}},
  435. },
  436. m: map[string]interface{}{
  437. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  438. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  439. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  440. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  441. "op_project_0_exceptions_total": int64(1),
  442. "op_project_0_process_latency_ms": int64(0),
  443. "op_project_0_records_in_total": int64(5),
  444. "op_project_0_records_out_total": int64(4),
  445. "sink_mockSink_0_exceptions_total": int64(0),
  446. "sink_mockSink_0_records_in_total": int64(5),
  447. "sink_mockSink_0_records_out_total": int64(5),
  448. "source_ldemo_0_exceptions_total": int64(0),
  449. "source_ldemo_0_records_in_total": int64(5),
  450. "source_ldemo_0_records_out_total": int64(5),
  451. },
  452. },
  453. }
  454. handleStream(true, streamList, t)
  455. doRuleTest(t, tests, 0, &api.RuleOption{
  456. BufferLength: 100,
  457. })
  458. }
  459. func TestSingleSQLTemplate(t *testing.T) {
  460. //Reset
  461. streamList := []string{"demo"}
  462. handleStream(false, streamList, t)
  463. //Data setup
  464. var tests = []ruleTest{
  465. {
  466. name: `TestSingleSQLTemplateRule1`,
  467. sql: `SELECT * FROM demo`,
  468. r: []map[string]interface{}{
  469. {
  470. "c": "red",
  471. "wrapper": "w1",
  472. },
  473. {
  474. "c": "blue",
  475. "wrapper": "w1",
  476. },
  477. {
  478. "c": "blue",
  479. "wrapper": "w1",
  480. },
  481. {
  482. "c": "yellow",
  483. "wrapper": "w1",
  484. },
  485. {
  486. "c": "red",
  487. "wrapper": "w1",
  488. },
  489. },
  490. m: map[string]interface{}{
  491. "op_preprocessor_demo_0_exceptions_total": int64(0),
  492. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  493. "op_preprocessor_demo_0_records_in_total": int64(5),
  494. "op_preprocessor_demo_0_records_out_total": int64(5),
  495. "op_project_0_exceptions_total": int64(0),
  496. "op_project_0_process_latency_ms": int64(0),
  497. "op_project_0_records_in_total": int64(5),
  498. "op_project_0_records_out_total": int64(5),
  499. "sink_mockSink_0_exceptions_total": int64(0),
  500. "sink_mockSink_0_records_in_total": int64(5),
  501. "sink_mockSink_0_records_out_total": int64(5),
  502. "source_demo_0_exceptions_total": int64(0),
  503. "source_demo_0_records_in_total": int64(5),
  504. "source_demo_0_records_out_total": int64(5),
  505. },
  506. },
  507. }
  508. handleStream(true, streamList, t)
  509. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  510. BufferLength: 100,
  511. }, map[string]interface{}{
  512. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  513. "sendSingle": true,
  514. }, func(result [][]byte) interface{} {
  515. var maps []map[string]interface{}
  516. for _, v := range result {
  517. var mapRes map[string]interface{}
  518. err := json.Unmarshal(v, &mapRes)
  519. if err != nil {
  520. t.Errorf("Failed to parse the input into map")
  521. continue
  522. }
  523. maps = append(maps, mapRes)
  524. }
  525. return maps
  526. })
  527. }
  528. func TestNoneSingleSQLTemplate(t *testing.T) {
  529. //Reset
  530. streamList := []string{"demo"}
  531. handleStream(false, streamList, t)
  532. //Data setup
  533. var tests = []ruleTest{
  534. {
  535. name: `TestNoneSingleSQLTemplateRule1`,
  536. sql: `SELECT * FROM demo`,
  537. r: [][]byte{
  538. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  539. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  540. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  541. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  542. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  543. },
  544. m: map[string]interface{}{
  545. "op_preprocessor_demo_0_exceptions_total": int64(0),
  546. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  547. "op_preprocessor_demo_0_records_in_total": int64(5),
  548. "op_preprocessor_demo_0_records_out_total": int64(5),
  549. "op_project_0_exceptions_total": int64(0),
  550. "op_project_0_process_latency_ms": int64(0),
  551. "op_project_0_records_in_total": int64(5),
  552. "op_project_0_records_out_total": int64(5),
  553. "sink_mockSink_0_exceptions_total": int64(0),
  554. "sink_mockSink_0_records_in_total": int64(5),
  555. "sink_mockSink_0_records_out_total": int64(5),
  556. "source_demo_0_exceptions_total": int64(0),
  557. "source_demo_0_records_in_total": int64(5),
  558. "source_demo_0_records_out_total": int64(5),
  559. },
  560. },
  561. }
  562. handleStream(true, streamList, t)
  563. doRuleTestBySinkProps(t, tests, 0, &api.RuleOption{
  564. BufferLength: 100,
  565. }, map[string]interface{}{
  566. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  567. }, func(result [][]byte) interface{} {
  568. return result
  569. })
  570. }