project_test.go 26 KB


  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestProjectPlan_Apply1(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data *xsql.Tuple
  16. result []map[string]interface{}
  17. }{
  18. {
  19. sql: "SELECT a FROM test",
  20. data: &xsql.Tuple{
  21. Emitter: "test",
  22. Message: xsql.Message{
  23. "a" : "val_a",
  24. },
  25. },
  26. result: []map[string]interface{}{{
  27. "a": "val_a",
  28. }},
  29. },
  30. {
  31. sql: "SELECT ts FROM test",
  32. data: &xsql.Tuple{
  33. Emitter: "test",
  34. Message: xsql.Message{
  35. "a" : "val_a",
  36. "ts" : common.TimeFromUnixMilli(1568854573431),
  37. },
  38. },
  39. result: []map[string]interface{}{{
  40. "ts": "2019-09-19T00:56:13.000000431Z",
  41. }},
  42. },
  43. {
  44. sql: "SELECT A FROM test",
  45. data: &xsql.Tuple{
  46. Emitter: "test",
  47. Message: xsql.Message{
  48. "a" : "val_a",
  49. },
  50. },
  51. result: []map[string]interface{}{{
  52. "A": "val_a",
  53. }},
  54. },
  55. {
  56. sql: `SELECT "value" FROM test`,
  57. data: &xsql.Tuple{
  58. Emitter: "test",
  59. Message: xsql.Message{
  60. },
  61. },
  62. result: []map[string]interface{}{{
  63. DEFAULT_FIELD_NAME_PREFIX + "0" : "value",
  64. }},
  65. },
  66. {
  67. sql: `SELECT 3.4 FROM test`,
  68. data: &xsql.Tuple{
  69. Emitter: "test",
  70. Message: xsql.Message{
  71. },
  72. },
  73. result: []map[string]interface{}{{
  74. DEFAULT_FIELD_NAME_PREFIX + "0" : 3.4,
  75. }},
  76. },
  77. {
  78. sql: `SELECT 5 FROM test`,
  79. data: &xsql.Tuple{
  80. Emitter: "test",
  81. Message: xsql.Message{
  82. },
  83. },
  84. result: []map[string]interface{}{{
  85. DEFAULT_FIELD_NAME_PREFIX + "0": 5.0,
  86. }},
  87. },
  88. {
  89. sql: `SELECT a, "value" AS b FROM test`,
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{
  93. "a" : "val_a",
  94. },
  95. },
  96. result: []map[string]interface{}{{
  97. "a" : "val_a",
  98. "b" : "value",
  99. }},
  100. },
  101. {
  102. sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
  103. data: &xsql.Tuple{
  104. Emitter: "test",
  105. Message: xsql.Message{
  106. "a" : "val_a",
  107. },
  108. },
  109. result: []map[string]interface{}{{
  110. "a" : "val_a",
  111. "b" : "value",
  112. "Pi" : 3.14,
  113. "Zero" : 0.0,
  114. }},
  115. },
  116. {
  117. sql: `SELECT a->b AS ab FROM test`,
  118. data: &xsql.Tuple{
  119. Emitter: "test",
  120. Message: xsql.Message{
  121. "a" : map[string]interface{}{"b" : "hello"},
  122. },
  123. },
  124. result: []map[string]interface{}{{
  125. "ab" : "hello",
  126. }},
  127. },
  128. {
  129. sql: `SELECT a[0]->b AS ab FROM test`,
  130. data: &xsql.Tuple{
  131. Emitter: "test",
  132. Message: xsql.Message{
  133. "a" : []interface{}{
  134. map[string]interface{}{"b" : "hello1"},
  135. map[string]interface{}{"b" : "hello2"},
  136. },
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "ab" : "hello1",
  141. }},
  142. },
  143. {
  144. sql: `SELECT a->c->d AS f1 FROM test`,
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "a" : map[string]interface{}{
  149. "b" : "hello",
  150. "c" : map[string]interface{}{
  151. "d": 35.2,
  152. },
  153. },
  154. },
  155. },
  156. result: []map[string]interface{}{{
  157. "f1" : 35.2,
  158. }},
  159. },
  160. //The int type is not supported yet, the json parser returns float64 for int values
  161. {
  162. sql: `SELECT a->c->d AS f1 FROM test`,
  163. data: &xsql.Tuple{
  164. Emitter: "test",
  165. Message: xsql.Message{
  166. "a" : map[string]interface{}{
  167. "b" : "hello",
  168. "c" : map[string]interface{}{
  169. "d": float64(35),
  170. },
  171. },
  172. },
  173. },
  174. result: []map[string]interface{}{{
  175. "f1" : float64(35),
  176. }},
  177. },
  178. {
  179. sql: "SELECT a FROM test",
  180. data: &xsql.Tuple{
  181. Emitter: "test",
  182. Message: xsql.Message{
  183. },
  184. },
  185. result: []map[string]interface{}{
  186. {},
  187. },
  188. },
  189. {
  190. sql: "SELECT * FROM test",
  191. data: &xsql.Tuple{
  192. Emitter: "test",
  193. Message: xsql.Message{
  194. },
  195. },
  196. result: []map[string]interface{}{
  197. {},
  198. },
  199. },
  200. {
  201. sql: `SELECT * FROM test`,
  202. data: &xsql.Tuple{
  203. Emitter: "test",
  204. Message: xsql.Message{
  205. "a" : map[string]interface{}{
  206. "b" : "hello",
  207. "c" : map[string]interface{}{
  208. "d": 35.2,
  209. },
  210. },
  211. },
  212. },
  213. result: []map[string]interface{}{{
  214. "a" : map[string]interface{} {
  215. "b" : "hello",
  216. "c" : map[string]interface{} {
  217. "d" : 35.2,
  218. },
  219. },
  220. }},
  221. },
  222. {
  223. sql: `SELECT * FROM test`,
  224. data: &xsql.Tuple{
  225. Emitter: "test",
  226. Message: xsql.Message{
  227. "a" : "val1",
  228. "b" : 3.14,
  229. },
  230. },
  231. result: []map[string]interface{}{{
  232. "a" : "val1",
  233. "b" : 3.14,
  234. }},
  235. },
  236. {
  237. sql: `SELECT 3*4 AS f1 FROM test`,
  238. data: &xsql.Tuple{
  239. Emitter: "test",
  240. Message: xsql.Message{
  241. },
  242. },
  243. result: []map[string]interface{}{{
  244. "f1" : float64(12),
  245. }},
  246. },
  247. {
  248. sql: `SELECT 4.5*2 AS f1 FROM test`,
  249. data: &xsql.Tuple{
  250. Emitter: "test",
  251. Message: xsql.Message{
  252. },
  253. },
  254. result: []map[string]interface{}{{
  255. "f1" : float64(9),
  256. }},
  257. },
  258. }
  259. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  260. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Apply1")
  261. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  262. for i, tt := range tests {
  263. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  264. pp := &ProjectPlan{Fields:stmt.Fields}
  265. pp.isTest = true
  266. result := pp.Apply(ctx, tt.data)
  267. var mapRes []map[string]interface{}
  268. if v, ok := result.([]byte); ok {
  269. err := json.Unmarshal(v, &mapRes)
  270. if err != nil {
  271. t.Errorf("Failed to parse the input into map.\n")
  272. continue
  273. }
  274. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  275. if !reflect.DeepEqual(tt.result, mapRes) {
  276. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  277. }
  278. } else {
  279. t.Errorf("The returned result is not type of []byte\n")
  280. }
  281. }
  282. }
  283. func TestProjectPlan_MultiInput(t *testing.T) {
  284. var tests = []struct {
  285. sql string
  286. data interface{}
  287. result []map[string]interface{}
  288. }{
  289. {
  290. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  291. data: &xsql.Tuple{
  292. Emitter: "tbl",
  293. Message: xsql.Message{
  294. "abc" : int64(6),
  295. },
  296. },
  297. result: []map[string]interface{}{{
  298. "abc" : float64(6), //json marshall problem
  299. }},
  300. },
  301. {
  302. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  303. data: &xsql.Tuple{
  304. Emitter: "tbl",
  305. Message: xsql.Message{
  306. "abc" : int64(34),
  307. "def" : "hello",
  308. },
  309. },
  310. result: []map[string]interface{}{{
  311. "abc" : float64(34),
  312. }},
  313. },
  314. {
  315. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  316. data: xsql.WindowTuplesSet{
  317. xsql.WindowTuples{
  318. Emitter:"src1",
  319. Tuples:[]xsql.Tuple{
  320. {
  321. Emitter: "src1",
  322. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  323. },{
  324. Emitter: "src1",
  325. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  326. },{
  327. Emitter: "src1",
  328. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  329. },
  330. },
  331. },
  332. },
  333. result: []map[string]interface{}{{
  334. "id1" : float64(1),
  335. },{
  336. "id1" : float64(2),
  337. },{
  338. "id1" : float64(3),
  339. }},
  340. },
  341. {
  342. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  343. data: xsql.WindowTuplesSet{
  344. xsql.WindowTuples{
  345. Emitter:"src1",
  346. Tuples:[]xsql.Tuple{
  347. {
  348. Emitter: "src1",
  349. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  350. },{
  351. Emitter: "src1",
  352. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  353. },{
  354. Emitter: "src1",
  355. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  356. },
  357. },
  358. },
  359. },
  360. result: []map[string]interface{}{{
  361. "id1" : float64(1),
  362. "f1" : "v1",
  363. },{
  364. "id1" : float64(2),
  365. "f1" : "v2",
  366. },{
  367. "id1" : float64(3),
  368. "f1" : "v1",
  369. }},
  370. },
  371. {
  372. sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  373. data: xsql.WindowTuplesSet{
  374. xsql.WindowTuples{
  375. Emitter:"src1",
  376. Tuples:[]xsql.Tuple{
  377. {
  378. Emitter: "src1",
  379. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  380. },{
  381. Emitter: "src1",
  382. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  383. },{
  384. Emitter: "src1",
  385. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  386. },
  387. },
  388. },
  389. },
  390. result: []map[string]interface{}{{
  391. "id1" : float64(1),
  392. "f1" : "v1",
  393. },{
  394. "id1" : float64(2),
  395. "f1" : "v2",
  396. },{
  397. "id1" : float64(3),
  398. "f1" : "v1",
  399. }},
  400. },
  401. {
  402. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  403. data: xsql.JoinTupleSets{
  404. xsql.JoinTuple{
  405. Tuples: []xsql.Tuple{
  406. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  407. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  408. },
  409. },
  410. xsql.JoinTuple{
  411. Tuples: []xsql.Tuple{
  412. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  413. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  414. },
  415. },
  416. xsql.JoinTuple{
  417. Tuples: []xsql.Tuple{
  418. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  419. },
  420. },
  421. },
  422. result: []map[string]interface{}{{
  423. "id1" : float64(1),
  424. },{
  425. "id1" : float64(2),
  426. },{
  427. "id1" : float64(3),
  428. }},
  429. },
  430. {
  431. sql: "SELECT abc FROM tbl group by abc",
  432. data: xsql.GroupedTuplesSet{
  433. {
  434. &xsql.Tuple{
  435. Emitter: "tbl",
  436. Message: xsql.Message{
  437. "abc" : int64(6),
  438. "def" : "hello",
  439. },
  440. },
  441. },
  442. },
  443. result: []map[string]interface{}{{
  444. "abc" : float64(6),
  445. },},
  446. },
  447. {
  448. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  449. data: xsql.GroupedTuplesSet{
  450. {
  451. &xsql.Tuple{
  452. Emitter: "src1",
  453. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  454. },
  455. &xsql.Tuple{
  456. Emitter: "src1",
  457. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  458. },
  459. },
  460. {
  461. &xsql.Tuple{
  462. Emitter: "src1",
  463. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  464. },
  465. },
  466. },
  467. result: []map[string]interface{}{{
  468. "id1": float64(1),
  469. },{
  470. "id1": float64(2),
  471. },},
  472. },
  473. {
  474. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  475. data: xsql.GroupedTuplesSet{
  476. {
  477. &xsql.JoinTuple{
  478. Tuples: []xsql.Tuple{
  479. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  480. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  481. },
  482. },
  483. },
  484. {
  485. &xsql.JoinTuple{
  486. Tuples: []xsql.Tuple{
  487. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  488. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  489. },
  490. },
  491. },
  492. {
  493. &xsql.JoinTuple{
  494. Tuples: []xsql.Tuple{
  495. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  496. },
  497. },
  498. },
  499. },
  500. result: []map[string]interface{}{{
  501. "id2": float64(2),
  502. }, {
  503. "id2": float64(4),
  504. },{
  505. },},
  506. },
  507. {
  508. sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  509. data: xsql.JoinTupleSets{
  510. xsql.JoinTuple{
  511. Tuples: []xsql.Tuple{
  512. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  513. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  514. },
  515. },
  516. xsql.JoinTuple{
  517. Tuples: []xsql.Tuple{
  518. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  519. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  520. },
  521. },
  522. xsql.JoinTuple{
  523. Tuples: []xsql.Tuple{
  524. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  525. },
  526. },
  527. },
  528. result: []map[string]interface{}{{
  529. "id1" : float64(1),
  530. "f1" : "v1",
  531. "f2" : "w2",
  532. },{
  533. "id1" : float64(2),
  534. "f1" : "v2",
  535. "f2" : "w3",
  536. },{
  537. "id1" : float64(3),
  538. "f1" : "v1",
  539. }},
  540. },
  541. {
  542. sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  543. data: xsql.JoinTupleSets{
  544. xsql.JoinTuple{
  545. Tuples: []xsql.Tuple{
  546. {Emitter: "src1", Message: xsql.Message{ "id" : 1, "f1" : "v1",},},
  547. {Emitter: "src2", Message: xsql.Message{ "id" : 2, "f2" : "w2",},},
  548. },
  549. },
  550. xsql.JoinTuple{
  551. Tuples: []xsql.Tuple{
  552. {Emitter: "src1", Message: xsql.Message{ "id" : 2, "f1" : "v2",},},
  553. {Emitter: "src2", Message: xsql.Message{ "id" : 4, "f2" : "w3",},},
  554. },
  555. },
  556. xsql.JoinTuple{
  557. Tuples: []xsql.Tuple{
  558. {Emitter: "src1", Message: xsql.Message{ "id" : 3, "f1" : "v1",},},
  559. },
  560. },
  561. },
  562. result: []map[string]interface{}{{
  563. "id" : float64(1),
  564. "f1" : "v1",
  565. "f2" : "w2",
  566. },{
  567. "id" : float64(2),
  568. "f1" : "v2",
  569. "f2" : "w3",
  570. },{
  571. "id" : float64(3),
  572. "f1" : "v1",
  573. }},
  574. },
  575. {
  576. sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  577. data: xsql.GroupedTuplesSet{
  578. {
  579. &xsql.Tuple{
  580. Emitter: "src1",
  581. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  582. },
  583. &xsql.Tuple{
  584. Emitter: "src1",
  585. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  586. },
  587. },
  588. {
  589. &xsql.Tuple{
  590. Emitter: "src1",
  591. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  592. },
  593. },
  594. },
  595. result: []map[string]interface{}{{
  596. "id1": float64(1),
  597. "f1": "v1",
  598. },{
  599. "id1": float64(2),
  600. "f1": "v2",
  601. },},
  602. },
  603. {
  604. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  605. data: xsql.GroupedTuplesSet{
  606. {
  607. &xsql.JoinTuple{
  608. Tuples: []xsql.Tuple{
  609. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  610. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  611. },
  612. },
  613. },
  614. {
  615. &xsql.JoinTuple{
  616. Tuples: []xsql.Tuple{
  617. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  618. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  619. },
  620. },
  621. },
  622. {
  623. &xsql.JoinTuple{
  624. Tuples: []xsql.Tuple{
  625. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  626. },
  627. },
  628. },
  629. },
  630. result: []map[string]interface{}{{
  631. "id2": float64(2),
  632. "id1": float64(1),
  633. "f1" : "v1",
  634. }, {
  635. "id2": float64(4),
  636. "id1": float64(2),
  637. "f1" : "v2",
  638. },{
  639. "id1": float64(3),
  640. "f1" : "v1",
  641. },},
  642. },
  643. {
  644. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  645. data: xsql.GroupedTuplesSet{
  646. {
  647. &xsql.JoinTuple{
  648. Tuples: []xsql.Tuple{
  649. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  650. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  651. },
  652. },
  653. },
  654. {
  655. &xsql.JoinTuple{
  656. Tuples: []xsql.Tuple{
  657. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  658. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  659. },
  660. },
  661. },
  662. {
  663. &xsql.JoinTuple{
  664. Tuples: []xsql.Tuple{
  665. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  666. },
  667. },
  668. },
  669. },
  670. result: []map[string]interface{}{{
  671. "id2": float64(2),
  672. "id1": float64(1),
  673. "f1" : "v1",
  674. }, {
  675. "id2": float64(4),
  676. "id1": float64(2),
  677. "f1" : "v2",
  678. },{
  679. "id1": float64(3),
  680. "f1" : "v1",
  681. },},
  682. },
  683. }
  684. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  685. contextLogger := common.Log.WithField("rule", "TestProjectPlan_MultiInput")
  686. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  687. for i, tt := range tests {
  688. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  689. pp := &ProjectPlan{Fields:stmt.Fields}
  690. pp.isTest = true
  691. result := pp.Apply(ctx, tt.data)
  692. var mapRes []map[string]interface{}
  693. if v, ok := result.([]byte); ok {
  694. err := json.Unmarshal(v, &mapRes)
  695. if err != nil {
  696. t.Errorf("Failed to parse the input into map.\n")
  697. continue
  698. }
  699. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  700. if !reflect.DeepEqual(tt.result, mapRes) {
  701. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  702. }
  703. } else {
  704. t.Errorf("The returned result is not type of []byte\n")
  705. }
  706. }
  707. }
  708. func TestProjectPlan_Funcs(t *testing.T) {
  709. var tests = []struct {
  710. sql string
  711. data interface{}
  712. result []map[string]interface{}
  713. }{
  714. {
  715. sql: "SELECT round(a) as r FROM test",
  716. data: &xsql.Tuple{
  717. Emitter: "test",
  718. Message: xsql.Message{
  719. "a": 47.5,
  720. },
  721. },
  722. result: []map[string]interface{}{{
  723. "r": float64(48),
  724. }},
  725. }, {
  726. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  727. data: xsql.WindowTuplesSet{
  728. xsql.WindowTuples{
  729. Emitter:"test",
  730. Tuples:[]xsql.Tuple{
  731. {
  732. Emitter: "src1",
  733. Message: xsql.Message{ "a": 53.1 },
  734. },{
  735. Emitter: "src1",
  736. Message: xsql.Message{ "a": 27.4 },
  737. },{
  738. Emitter: "src1",
  739. Message: xsql.Message{ "a": 123123.7 },
  740. },
  741. },
  742. },
  743. },
  744. result: []map[string]interface{}{{
  745. "r": float64(53),
  746. },{
  747. "r": float64(27),
  748. },{
  749. "r": float64(123124),
  750. }},
  751. }, {
  752. sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  753. data: xsql.JoinTupleSets{
  754. xsql.JoinTuple{
  755. Tuples: []xsql.Tuple{
  756. {Emitter: "test", Message: xsql.Message{ "id": 1, "a": 65.55},},
  757. {Emitter: "test1", Message: xsql.Message{ "id": 1, "b": 12},},
  758. },
  759. },
  760. xsql.JoinTuple{
  761. Tuples: []xsql.Tuple{
  762. {Emitter: "test", Message: xsql.Message{ "id": 2, "a": 73.499},},
  763. {Emitter: "test1", Message: xsql.Message{ "id": 2, "b": 34},},
  764. },
  765. },
  766. xsql.JoinTuple{
  767. Tuples: []xsql.Tuple{
  768. {Emitter: "test", Message: xsql.Message{ "id": 3, "a": 88.88},},
  769. {Emitter: "test1", Message: xsql.Message{ "id": 3, "b": 6},},
  770. },
  771. },
  772. },
  773. result: []map[string]interface{}{{
  774. "r": float64(66),
  775. },{
  776. "r": float64(73),
  777. },{
  778. "r": float64(89),
  779. }},
  780. }, {
  781. sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  782. data: xsql.JoinTupleSets{
  783. xsql.JoinTuple{
  784. Tuples: []xsql.Tuple{
  785. {Emitter: "test", Message: xsql.Message{ "id": 1, "a": 65.55},},
  786. {Emitter: "test1", Message: xsql.Message{ "id": 1, "b": 12},},
  787. },
  788. },
  789. xsql.JoinTuple{
  790. Tuples: []xsql.Tuple{
  791. {Emitter: "test", Message: xsql.Message{ "id": 2, "a": 73.499},},
  792. {Emitter: "test1", Message: xsql.Message{ "id": 2, "b": 34},},
  793. },
  794. },
  795. xsql.JoinTuple{
  796. Tuples: []xsql.Tuple{
  797. {Emitter: "test", Message: xsql.Message{ "id": 3, "a": 88.88},},
  798. {Emitter: "test1", Message: xsql.Message{ "id": 3, "b": 6},},
  799. },
  800. },
  801. },
  802. result: []map[string]interface{}{{
  803. "concat": "165.5512",
  804. },{
  805. "concat": "273.49934",
  806. },{
  807. "concat": "388.886",
  808. }},
  809. },
  810. }
  811. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  812. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Funcs")
  813. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  814. for i, tt := range tests {
  815. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  816. if err != nil{
  817. t.Error(err)
  818. }
  819. pp := &ProjectPlan{Fields:stmt.Fields}
  820. pp.isTest = true
  821. result := pp.Apply(ctx, tt.data)
  822. var mapRes []map[string]interface{}
  823. if v, ok := result.([]byte); ok {
  824. err := json.Unmarshal(v, &mapRes)
  825. if err != nil {
  826. t.Errorf("Failed to parse the input into map.\n")
  827. continue
  828. }
  829. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  830. if !reflect.DeepEqual(tt.result, mapRes) {
  831. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  832. }
  833. } else {
  834. t.Errorf("The returned result is not type of []byte\n")
  835. }
  836. }
  837. }
  838. func TestProjectPlan_AggFuncs(t *testing.T) {
  839. var tests = []struct {
  840. sql string
  841. data interface{}
  842. result []map[string]interface{}
  843. }{
  844. {
  845. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  846. data: xsql.GroupedTuplesSet{
  847. {
  848. &xsql.JoinTuple{
  849. Tuples: []xsql.Tuple{
  850. {Emitter: "test", Message: xsql.Message{ "id" : 1, "a": 122.33,},},
  851. {Emitter: "src2", Message: xsql.Message{ "id" : 1, "color" : "w2",},},
  852. },
  853. },
  854. &xsql.JoinTuple{
  855. Tuples: []xsql.Tuple{
  856. {Emitter: "test", Message: xsql.Message{ "id" : 5, "a": 177.51,},},
  857. {Emitter: "src2", Message: xsql.Message{ "id" : 5, "color" : "w2",},},
  858. },
  859. },
  860. },
  861. {
  862. &xsql.JoinTuple{
  863. Tuples: []xsql.Tuple{
  864. {Emitter: "test", Message: xsql.Message{ "id" : 2, "a": 89.03,},},
  865. {Emitter: "src2", Message: xsql.Message{ "id" : 2, "color" : "w1",},},
  866. },
  867. },
  868. &xsql.JoinTuple{
  869. Tuples: []xsql.Tuple{
  870. {Emitter: "test", Message: xsql.Message{ "id" : 4, "a": 14.6,},},
  871. {Emitter: "src2", Message: xsql.Message{ "id" : 4, "color" : "w1",},},
  872. },
  873. },
  874. },
  875. },
  876. result: []map[string]interface{}{{
  877. "c": float64(2),
  878. "r": float64(122),
  879. },{
  880. "c": float64(2),
  881. "r": float64(89),
  882. }},
  883. },
  884. {
  885. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  886. data: xsql.GroupedTuplesSet{
  887. {
  888. &xsql.JoinTuple{
  889. Tuples: []xsql.Tuple{
  890. {Emitter: "test", Message: xsql.Message{ "id" : 1, "a": 122.33,},},
  891. {Emitter: "src2", Message: xsql.Message{ "id" : 1, "color" : "w2",},},
  892. },
  893. },
  894. &xsql.JoinTuple{
  895. Tuples: []xsql.Tuple{
  896. {Emitter: "test", Message: xsql.Message{ "id" : 1, "a": 68.54,},},
  897. {Emitter: "src2", Message: xsql.Message{ "id" : 1, "color" : "w2",},},
  898. },
  899. },
  900. &xsql.JoinTuple{
  901. Tuples: []xsql.Tuple{
  902. {Emitter: "test", Message: xsql.Message{ "id" : 4, "a": 98.31,},},
  903. {Emitter: "src2", Message: xsql.Message{ "id" : 4, "color" : "w2",},},
  904. },
  905. },
  906. &xsql.JoinTuple{
  907. Tuples: []xsql.Tuple{
  908. {Emitter: "test", Message: xsql.Message{ "id" : 5, "a": 177.54,},},
  909. {Emitter: "src2", Message: xsql.Message{ "id" : 5, "color" : "w2",},},
  910. },
  911. },
  912. },
  913. {
  914. &xsql.JoinTuple{
  915. Tuples: []xsql.Tuple{
  916. {Emitter: "test", Message: xsql.Message{ "id" : 2, "a": 89.03,},},
  917. {Emitter: "src2", Message: xsql.Message{ "id" : 2, "color" : "w1",},},
  918. },
  919. },
  920. &xsql.JoinTuple{
  921. Tuples: []xsql.Tuple{
  922. {Emitter: "test", Message: xsql.Message{ "id" : 4, "a": 14.6,},},
  923. {Emitter: "src2", Message: xsql.Message{ "id" : 4, "color" : "w1",},},
  924. },
  925. },
  926. },
  927. },
  928. result: []map[string]interface{}{{
  929. "avg": 116.68,
  930. },{
  931. "avg": 51.815,
  932. }},
  933. },
  934. {
  935. sql: "SELECT max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  936. data: xsql.GroupedTuplesSet{
  937. {
  938. &xsql.JoinTuple{
  939. Tuples: []xsql.Tuple{
  940. {Emitter: "test", Message: xsql.Message{ "id" : 1, "a": 122.33,},},
  941. {Emitter: "src2", Message: xsql.Message{ "id" : 1, "color" : "w2",},},
  942. },
  943. },
  944. &xsql.JoinTuple{
  945. Tuples: []xsql.Tuple{
  946. {Emitter: "test", Message: xsql.Message{ "id" : 1, "a": 68.55,},},
  947. {Emitter: "src2", Message: xsql.Message{ "id" : 1, "color" : "w2",},},
  948. },
  949. },
  950. &xsql.JoinTuple{
  951. Tuples: []xsql.Tuple{
  952. {Emitter: "test", Message: xsql.Message{ "id" : 5, "a": 177.51,},},
  953. {Emitter: "src2", Message: xsql.Message{ "id" : 5, "color" : "w2",},},
  954. },
  955. },
  956. },
  957. {
  958. &xsql.JoinTuple{
  959. Tuples: []xsql.Tuple{
  960. {Emitter: "test", Message: xsql.Message{ "id" : 2, "a": 89.03,},},
  961. {Emitter: "src2", Message: xsql.Message{ "id" : 2, "color" : "w1",},},
  962. },
  963. },
  964. &xsql.JoinTuple{
  965. Tuples: []xsql.Tuple{
  966. {Emitter: "test", Message: xsql.Message{ "id" : 4, "a": 14.6,},},
  967. {Emitter: "src2", Message: xsql.Message{ "id" : 4, "color" : "w1",},},
  968. },
  969. },
  970. },
  971. },
  972. result: []map[string]interface{}{{
  973. "max": 177.51,
  974. },{
  975. "max": 89.03,
  976. }},
  977. },
  978. {
  979. sql: "SELECT min(a) as min FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  980. data: xsql.JoinTupleSets{
  981. xsql.JoinTuple{
  982. Tuples: []xsql.Tuple{
  983. {Emitter: "test", Message: xsql.Message{ "id" : 1, "a": 122.33,},},
  984. {Emitter: "src2", Message: xsql.Message{ "id" : 1, "color" : "w2",},},
  985. },
  986. },
  987. xsql.JoinTuple{
  988. Tuples: []xsql.Tuple{
  989. {Emitter: "test", Message: xsql.Message{ "id" : 1, "a": 68.55,},},
  990. {Emitter: "src2", Message: xsql.Message{ "id" : 1, "color" : "w2",},},
  991. },
  992. },
  993. xsql.JoinTuple{
  994. Tuples: []xsql.Tuple{
  995. {Emitter: "test", Message: xsql.Message{ "id" : 5, "a": 177.51,},},
  996. {Emitter: "src2", Message: xsql.Message{ "id" : 5, "color" : "w2",},},
  997. },
  998. },
  999. },
  1000. result: []map[string]interface{}{{
  1001. "min": 68.55,
  1002. }},
  1003. },{
  1004. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  1005. data: xsql.WindowTuplesSet{
  1006. xsql.WindowTuples{
  1007. Emitter:"test",
  1008. Tuples:[]xsql.Tuple{
  1009. {
  1010. Emitter: "src1",
  1011. Message: xsql.Message{ "a": 53 },
  1012. },{
  1013. Emitter: "src1",
  1014. Message: xsql.Message{ "a": 27 },
  1015. },{
  1016. Emitter: "src1",
  1017. Message: xsql.Message{ "a": 123123 },
  1018. },
  1019. },
  1020. },
  1021. },
  1022. result: []map[string]interface{}{{
  1023. "sum": float64(123203),
  1024. }},
  1025. },
  1026. }
  1027. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1028. contextLogger := common.Log.WithField("rule", "TestProjectPlan_AggFuncs")
  1029. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1030. for i, tt := range tests {
  1031. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1032. if err != nil{
  1033. t.Error(err)
  1034. }
  1035. pp := &ProjectPlan{Fields:stmt.Fields, IsAggregate: true}
  1036. pp.isTest = true
  1037. result := pp.Apply(ctx, tt.data)
  1038. var mapRes []map[string]interface{}
  1039. if v, ok := result.([]byte); ok {
  1040. err := json.Unmarshal(v, &mapRes)
  1041. if err != nil {
  1042. t.Errorf("Failed to parse the input into map.\n")
  1043. continue
  1044. }
  1045. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1046. if !reflect.DeepEqual(tt.result, mapRes) {
  1047. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1048. }
  1049. } else {
  1050. t.Errorf("The returned result is not type of []byte\n")
  1051. }
  1052. }
  1053. }