project_test.go 42 KB


  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestProjectPlan_Apply1(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data *xsql.Tuple
  17. result []map[string]interface{}
  18. }{
  19. {
  20. sql: "SELECT a FROM test",
  21. data: &xsql.Tuple{
  22. Emitter: "test",
  23. Message: xsql.Message{
  24. "a": "val_a",
  25. },
  26. },
  27. result: []map[string]interface{}{{
  28. "a": "val_a",
  29. }},
  30. },
  31. {
  32. sql: "SELECT b FROM test",
  33. data: &xsql.Tuple{
  34. Emitter: "test",
  35. Message: xsql.Message{
  36. "a": "val_a",
  37. },
  38. },
  39. result: []map[string]interface{}{{}},
  40. },
  41. {
  42. sql: "SELECT ts FROM test",
  43. data: &xsql.Tuple{
  44. Emitter: "test",
  45. Message: xsql.Message{
  46. "a": "val_a",
  47. "ts": common.TimeFromUnixMilli(1568854573431),
  48. },
  49. },
  50. result: []map[string]interface{}{{
  51. "ts": "2019-09-19T00:56:13.431Z",
  52. }},
  53. },
  54. //Schemaless may return a message without selecting column
  55. {
  56. sql: "SELECT ts FROM test",
  57. data: &xsql.Tuple{
  58. Emitter: "test",
  59. Message: xsql.Message{
  60. "a": "val_a",
  61. "ts2": common.TimeFromUnixMilli(1568854573431),
  62. },
  63. },
  64. result: []map[string]interface{}{{}},
  65. },
  66. {
  67. sql: "SELECT A FROM test",
  68. data: &xsql.Tuple{
  69. Emitter: "test",
  70. Message: xsql.Message{
  71. "a": "val_a",
  72. },
  73. },
  74. result: []map[string]interface{}{{
  75. "A": "val_a",
  76. }},
  77. },
  78. {
  79. sql: `SELECT "value" FROM test`,
  80. data: &xsql.Tuple{
  81. Emitter: "test",
  82. Message: xsql.Message{},
  83. },
  84. result: []map[string]interface{}{{
  85. DEFAULT_FIELD_NAME_PREFIX + "0": "value",
  86. }},
  87. },
  88. {
  89. sql: `SELECT 3.4 FROM test`,
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{},
  93. },
  94. result: []map[string]interface{}{{
  95. DEFAULT_FIELD_NAME_PREFIX + "0": 3.4,
  96. }},
  97. },
  98. {
  99. sql: `SELECT 5 FROM test`,
  100. data: &xsql.Tuple{
  101. Emitter: "test",
  102. Message: xsql.Message{},
  103. },
  104. result: []map[string]interface{}{{
  105. DEFAULT_FIELD_NAME_PREFIX + "0": 5.0,
  106. }},
  107. },
  108. {
  109. sql: `SELECT a, "value" AS b FROM test`,
  110. data: &xsql.Tuple{
  111. Emitter: "test",
  112. Message: xsql.Message{
  113. "a": "val_a",
  114. },
  115. },
  116. result: []map[string]interface{}{{
  117. "a": "val_a",
  118. "b": "value",
  119. }},
  120. },
  121. {
  122. sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
  123. data: &xsql.Tuple{
  124. Emitter: "test",
  125. Message: xsql.Message{
  126. "a": "val_a",
  127. },
  128. },
  129. result: []map[string]interface{}{{
  130. "a": "val_a",
  131. "b": "value",
  132. "Pi": 3.14,
  133. "Zero": 0.0,
  134. }},
  135. },
  136. {
  137. sql: `SELECT a->b AS ab FROM test`,
  138. data: &xsql.Tuple{
  139. Emitter: "test",
  140. Message: xsql.Message{
  141. "a": map[string]interface{}{"b": "hello"},
  142. },
  143. },
  144. result: []map[string]interface{}{{
  145. "ab": "hello",
  146. }},
  147. },
  148. {
  149. sql: `SELECT a->b AS ab FROM test`,
  150. data: &xsql.Tuple{
  151. Emitter: "test",
  152. Message: xsql.Message{
  153. "a": map[string]interface{}(nil),
  154. },
  155. },
  156. result: []map[string]interface{}{{}},
  157. },
  158. {
  159. sql: `SELECT a->b AS ab FROM test`,
  160. data: &xsql.Tuple{
  161. Emitter: "test",
  162. Message: xsql.Message{
  163. "name": "name",
  164. },
  165. },
  166. result: []map[string]interface{}{{}},
  167. },
  168. {
  169. sql: `SELECT a->b AS ab FROM test`,
  170. data: &xsql.Tuple{
  171. Emitter: "test",
  172. Message: xsql.Message{
  173. "a": "commonstring",
  174. },
  175. },
  176. result: []map[string]interface{}{{}},
  177. },
  178. {
  179. sql: `SELECT a[0]->b AS ab FROM test`,
  180. data: &xsql.Tuple{
  181. Emitter: "test",
  182. Message: xsql.Message{
  183. "a": []interface{}{
  184. map[string]interface{}{"b": "hello1"},
  185. map[string]interface{}{"b": "hello2"},
  186. },
  187. },
  188. },
  189. result: []map[string]interface{}{{
  190. "ab": "hello1",
  191. }},
  192. },
  193. {
  194. sql: `SELECT a[0]->b AS ab FROM test`,
  195. data: &xsql.Tuple{
  196. Emitter: "test",
  197. Message: xsql.Message{
  198. "a": []map[string]interface{}{
  199. {"b": "hello1"},
  200. {"b": "hello2"},
  201. },
  202. },
  203. },
  204. result: []map[string]interface{}{{
  205. "ab": "hello1",
  206. }},
  207. },
  208. {
  209. sql: `SELECT a[2:4] AS ab FROM test`,
  210. data: &xsql.Tuple{
  211. Emitter: "test",
  212. Message: xsql.Message{
  213. "a": []map[string]interface{}{
  214. {"b": "hello1"},
  215. {"b": "hello2"},
  216. {"b": "hello3"},
  217. {"b": "hello4"},
  218. {"b": "hello5"},
  219. },
  220. },
  221. },
  222. result: []map[string]interface{}{{
  223. "ab": []interface{}{
  224. map[string]interface{}{"b": "hello3"},
  225. map[string]interface{}{"b": "hello4"},
  226. },
  227. }},
  228. },
  229. {
  230. sql: `SELECT a->c->d AS f1 FROM test`,
  231. data: &xsql.Tuple{
  232. Emitter: "test",
  233. Message: xsql.Message{
  234. "a": map[string]interface{}{
  235. "b": "hello",
  236. "c": map[string]interface{}{
  237. "d": 35.2,
  238. },
  239. },
  240. },
  241. },
  242. result: []map[string]interface{}{{
  243. "f1": 35.2,
  244. }},
  245. },
  246. {
  247. sql: `SELECT a->c->d AS f1 FROM test`,
  248. data: &xsql.Tuple{
  249. Emitter: "test",
  250. Message: xsql.Message{
  251. "a": map[string]interface{}{
  252. "b": "hello",
  253. "c": map[string]interface{}{
  254. "e": 35.2,
  255. },
  256. },
  257. },
  258. },
  259. result: []map[string]interface{}{{}},
  260. },
  261. {
  262. sql: `SELECT a->c->d AS f1 FROM test`,
  263. data: &xsql.Tuple{
  264. Emitter: "test",
  265. Message: xsql.Message{
  266. "a": map[string]interface{}{
  267. "b": "hello",
  268. },
  269. },
  270. },
  271. result: []map[string]interface{}{{}},
  272. },
  273. //The int type is not supported yet, the json parser returns float64 for int values
  274. {
  275. sql: `SELECT a->c->d AS f1 FROM test`,
  276. data: &xsql.Tuple{
  277. Emitter: "test",
  278. Message: xsql.Message{
  279. "a": map[string]interface{}{
  280. "b": "hello",
  281. "c": map[string]interface{}{
  282. "d": float64(35),
  283. },
  284. },
  285. },
  286. },
  287. result: []map[string]interface{}{{
  288. "f1": float64(35),
  289. }},
  290. },
  291. {
  292. sql: "SELECT a FROM test",
  293. data: &xsql.Tuple{
  294. Emitter: "test",
  295. Message: xsql.Message{},
  296. },
  297. result: []map[string]interface{}{
  298. {},
  299. },
  300. },
  301. {
  302. sql: "SELECT * FROM test",
  303. data: &xsql.Tuple{
  304. Emitter: "test",
  305. Message: xsql.Message{},
  306. },
  307. result: []map[string]interface{}{
  308. {},
  309. },
  310. },
  311. {
  312. sql: `SELECT * FROM test`,
  313. data: &xsql.Tuple{
  314. Emitter: "test",
  315. Message: xsql.Message{
  316. "a": map[string]interface{}{
  317. "b": "hello",
  318. "c": map[string]interface{}{
  319. "d": 35.2,
  320. },
  321. },
  322. },
  323. },
  324. result: []map[string]interface{}{{
  325. "a": map[string]interface{}{
  326. "b": "hello",
  327. "c": map[string]interface{}{
  328. "d": 35.2,
  329. },
  330. },
  331. }},
  332. },
  333. {
  334. sql: `SELECT * FROM test`,
  335. data: &xsql.Tuple{
  336. Emitter: "test",
  337. Message: xsql.Message{
  338. "a": "val1",
  339. "b": 3.14,
  340. },
  341. },
  342. result: []map[string]interface{}{{
  343. "a": "val1",
  344. "b": 3.14,
  345. }},
  346. },
  347. {
  348. sql: `SELECT 3*4 AS f1 FROM test`,
  349. data: &xsql.Tuple{
  350. Emitter: "test",
  351. Message: xsql.Message{},
  352. },
  353. result: []map[string]interface{}{{
  354. "f1": float64(12),
  355. }},
  356. },
  357. {
  358. sql: `SELECT 4.5*2 AS f1 FROM test`,
  359. data: &xsql.Tuple{
  360. Emitter: "test",
  361. Message: xsql.Message{},
  362. },
  363. result: []map[string]interface{}{{
  364. "f1": float64(9),
  365. }},
  366. },
  367. }
  368. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  369. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Apply1")
  370. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  371. for i, tt := range tests {
  372. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  373. pp := &ProjectPlan{Fields: stmt.Fields}
  374. pp.isTest = true
  375. fv, afv := xsql.NewAggregateFunctionValuers()
  376. result := pp.Apply(ctx, tt.data, fv, afv)
  377. var mapRes []map[string]interface{}
  378. if v, ok := result.([]byte); ok {
  379. err := json.Unmarshal(v, &mapRes)
  380. if err != nil {
  381. t.Errorf("Failed to parse the input into map.\n")
  382. continue
  383. }
  384. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  385. if !reflect.DeepEqual(tt.result, mapRes) {
  386. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  387. }
  388. } else {
  389. t.Errorf("%d. The returned result is not type of []byte\n", i)
  390. }
  391. }
  392. }
  393. func TestProjectPlan_MultiInput(t *testing.T) {
  394. var tests = []struct {
  395. sql string
  396. data interface{}
  397. result []map[string]interface{}
  398. }{
  399. {
  400. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  401. data: &xsql.Tuple{
  402. Emitter: "tbl",
  403. Message: xsql.Message{
  404. "abc": int64(6),
  405. },
  406. },
  407. result: []map[string]interface{}{{
  408. "abc": float64(6), //json marshall problem
  409. }},
  410. },
  411. {
  412. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  413. data: &xsql.Tuple{
  414. Emitter: "tbl",
  415. Message: xsql.Message{
  416. "abc": int64(34),
  417. "def": "hello",
  418. },
  419. },
  420. result: []map[string]interface{}{{
  421. "abc": float64(34),
  422. }},
  423. },
  424. {
  425. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  426. data: xsql.WindowTuplesSet{
  427. xsql.WindowTuples{
  428. Emitter: "src1",
  429. Tuples: []xsql.Tuple{
  430. {
  431. Emitter: "src1",
  432. Message: xsql.Message{"id1": 1, "f1": "v1"},
  433. }, {
  434. Emitter: "src1",
  435. Message: xsql.Message{"id1": 2, "f1": "v2"},
  436. }, {
  437. Emitter: "src1",
  438. Message: xsql.Message{"id1": 3, "f1": "v1"},
  439. },
  440. },
  441. },
  442. },
  443. result: []map[string]interface{}{{
  444. "id1": float64(1),
  445. }, {
  446. "id1": float64(2),
  447. }, {
  448. "id1": float64(3),
  449. }},
  450. },
  451. {
  452. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  453. data: xsql.WindowTuplesSet{
  454. xsql.WindowTuples{
  455. Emitter: "src1",
  456. Tuples: []xsql.Tuple{
  457. {
  458. Emitter: "src1",
  459. Message: xsql.Message{"id1": 1, "f1": "v1"},
  460. }, {
  461. Emitter: "src1",
  462. Message: xsql.Message{"id2": 2, "f1": "v2"},
  463. }, {
  464. Emitter: "src1",
  465. Message: xsql.Message{"id1": 3, "f1": "v1"},
  466. },
  467. },
  468. },
  469. },
  470. result: []map[string]interface{}{{
  471. "id1": float64(1),
  472. }, {}, {
  473. "id1": float64(3),
  474. }},
  475. },
  476. {
  477. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  478. data: xsql.WindowTuplesSet{
  479. xsql.WindowTuples{
  480. Emitter: "src1",
  481. Tuples: []xsql.Tuple{
  482. {
  483. Emitter: "src1",
  484. Message: xsql.Message{"id1": 1, "f1": "v1"},
  485. }, {
  486. Emitter: "src1",
  487. Message: xsql.Message{"id1": 2, "f1": "v2"},
  488. }, {
  489. Emitter: "src1",
  490. Message: xsql.Message{"id1": 3, "f1": "v1"},
  491. },
  492. },
  493. },
  494. },
  495. result: []map[string]interface{}{{
  496. "id1": float64(1),
  497. "f1": "v1",
  498. }, {
  499. "id1": float64(2),
  500. "f1": "v2",
  501. }, {
  502. "id1": float64(3),
  503. "f1": "v1",
  504. }},
  505. },
  506. {
  507. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  508. data: xsql.WindowTuplesSet{
  509. xsql.WindowTuples{
  510. Emitter: "src1",
  511. Tuples: []xsql.Tuple{
  512. {
  513. Emitter: "src1",
  514. Message: xsql.Message{"id1": 1, "f1": "v1"},
  515. }, {
  516. Emitter: "src1",
  517. Message: xsql.Message{"id2": 2, "f2": "v2"},
  518. }, {
  519. Emitter: "src1",
  520. Message: xsql.Message{"id1": 3, "f1": "v1"},
  521. },
  522. },
  523. },
  524. },
  525. result: []map[string]interface{}{{
  526. "id1": float64(1),
  527. "f1": "v1",
  528. }, {
  529. "id2": float64(2),
  530. "f2": "v2",
  531. }, {
  532. "id1": float64(3),
  533. "f1": "v1",
  534. }},
  535. },
  536. {
  537. sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  538. data: xsql.WindowTuplesSet{
  539. xsql.WindowTuples{
  540. Emitter: "src1",
  541. Tuples: []xsql.Tuple{
  542. {
  543. Emitter: "src1",
  544. Message: xsql.Message{"id1": 1, "f1": "v1"},
  545. }, {
  546. Emitter: "src1",
  547. Message: xsql.Message{"id1": 2, "f1": "v2"},
  548. }, {
  549. Emitter: "src1",
  550. Message: xsql.Message{"id1": 3, "f1": "v1"},
  551. },
  552. },
  553. },
  554. },
  555. result: []map[string]interface{}{{
  556. "id1": float64(1),
  557. "f1": "v1",
  558. }, {
  559. "id1": float64(2),
  560. "f1": "v2",
  561. }, {
  562. "id1": float64(3),
  563. "f1": "v1",
  564. }},
  565. },
  566. {
  567. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  568. data: xsql.JoinTupleSets{
  569. xsql.JoinTuple{
  570. Tuples: []xsql.Tuple{
  571. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  572. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  573. },
  574. },
  575. xsql.JoinTuple{
  576. Tuples: []xsql.Tuple{
  577. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  578. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  579. },
  580. },
  581. xsql.JoinTuple{
  582. Tuples: []xsql.Tuple{
  583. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  584. },
  585. },
  586. },
  587. result: []map[string]interface{}{{
  588. "id1": float64(1),
  589. }, {
  590. "id1": float64(2),
  591. }, {
  592. "id1": float64(3),
  593. }},
  594. },
  595. {
  596. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  597. data: xsql.JoinTupleSets{
  598. xsql.JoinTuple{
  599. Tuples: []xsql.Tuple{
  600. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  601. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  602. },
  603. },
  604. xsql.JoinTuple{
  605. Tuples: []xsql.Tuple{
  606. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  607. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  608. },
  609. },
  610. xsql.JoinTuple{
  611. Tuples: []xsql.Tuple{
  612. {Emitter: "src1", Message: xsql.Message{"id2": 3, "f1": "v1"}},
  613. },
  614. },
  615. },
  616. result: []map[string]interface{}{{
  617. "id1": float64(1),
  618. }, {
  619. "id1": float64(2),
  620. }, {}},
  621. },
  622. {
  623. sql: "SELECT abc FROM tbl group by abc",
  624. data: xsql.GroupedTuplesSet{
  625. {
  626. &xsql.Tuple{
  627. Emitter: "tbl",
  628. Message: xsql.Message{
  629. "abc": int64(6),
  630. "def": "hello",
  631. },
  632. },
  633. },
  634. },
  635. result: []map[string]interface{}{{
  636. "abc": float64(6),
  637. }},
  638. },
  639. {
  640. sql: "SELECT abc FROM tbl group by abc",
  641. data: xsql.GroupedTuplesSet{
  642. {
  643. &xsql.Tuple{
  644. Emitter: "tbl",
  645. Message: xsql.Message{
  646. "def": "hello",
  647. },
  648. },
  649. },
  650. },
  651. result: []map[string]interface{}{{}},
  652. },
  653. {
  654. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  655. data: xsql.GroupedTuplesSet{
  656. {
  657. &xsql.Tuple{
  658. Emitter: "src1",
  659. Message: xsql.Message{"id1": 1, "f1": "v1"},
  660. },
  661. &xsql.Tuple{
  662. Emitter: "src1",
  663. Message: xsql.Message{"id1": 3, "f1": "v1"},
  664. },
  665. },
  666. {
  667. &xsql.Tuple{
  668. Emitter: "src1",
  669. Message: xsql.Message{"id1": 2, "f1": "v2"},
  670. },
  671. },
  672. },
  673. result: []map[string]interface{}{{
  674. "id1": float64(1),
  675. }, {
  676. "id1": float64(2),
  677. }},
  678. },
  679. {
  680. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  681. data: xsql.GroupedTuplesSet{
  682. {
  683. &xsql.Tuple{
  684. Emitter: "src1",
  685. Message: xsql.Message{"id1": 1, "f1": "v1"},
  686. },
  687. &xsql.Tuple{
  688. Emitter: "src1",
  689. Message: xsql.Message{"id1": 3, "f1": "v1"},
  690. },
  691. },
  692. {
  693. &xsql.Tuple{
  694. Emitter: "src1",
  695. Message: xsql.Message{"id2": 2, "f1": "v2"},
  696. },
  697. },
  698. },
  699. result: []map[string]interface{}{{
  700. "id1": float64(1),
  701. }, {}},
  702. },
  703. {
  704. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  705. data: xsql.GroupedTuplesSet{
  706. {
  707. &xsql.JoinTuple{
  708. Tuples: []xsql.Tuple{
  709. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  710. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  711. },
  712. },
  713. },
  714. {
  715. &xsql.JoinTuple{
  716. Tuples: []xsql.Tuple{
  717. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  718. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  719. },
  720. },
  721. },
  722. {
  723. &xsql.JoinTuple{
  724. Tuples: []xsql.Tuple{
  725. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  726. },
  727. },
  728. },
  729. },
  730. result: []map[string]interface{}{{
  731. "id2": float64(2),
  732. }, {
  733. "id2": float64(4),
  734. }, {}},
  735. },
  736. {
  737. sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  738. data: xsql.JoinTupleSets{
  739. xsql.JoinTuple{
  740. Tuples: []xsql.Tuple{
  741. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  742. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  743. },
  744. },
  745. xsql.JoinTuple{
  746. Tuples: []xsql.Tuple{
  747. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  748. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  749. },
  750. },
  751. xsql.JoinTuple{
  752. Tuples: []xsql.Tuple{
  753. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  754. },
  755. },
  756. },
  757. result: []map[string]interface{}{{
  758. "id1": float64(1),
  759. "f1": "v1",
  760. "f2": "w2",
  761. }, {
  762. "id1": float64(2),
  763. "f1": "v2",
  764. "f2": "w3",
  765. }, {
  766. "id1": float64(3),
  767. "f1": "v1",
  768. }},
  769. },
  770. {
  771. sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  772. data: xsql.JoinTupleSets{
  773. xsql.JoinTuple{
  774. Tuples: []xsql.Tuple{
  775. {Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
  776. {Emitter: "src2", Message: xsql.Message{"id": 2, "f2": "w2"}},
  777. },
  778. },
  779. xsql.JoinTuple{
  780. Tuples: []xsql.Tuple{
  781. {Emitter: "src1", Message: xsql.Message{"id": 2, "f1": "v2"}},
  782. {Emitter: "src2", Message: xsql.Message{"id": 4, "f2": "w3"}},
  783. },
  784. },
  785. xsql.JoinTuple{
  786. Tuples: []xsql.Tuple{
  787. {Emitter: "src1", Message: xsql.Message{"id": 3, "f1": "v1"}},
  788. },
  789. },
  790. },
  791. result: []map[string]interface{}{{
  792. "id": float64(1),
  793. "f1": "v1",
  794. "f2": "w2",
  795. }, {
  796. "id": float64(2),
  797. "f1": "v2",
  798. "f2": "w3",
  799. }, {
  800. "id": float64(3),
  801. "f1": "v1",
  802. }},
  803. },
  804. {
  805. sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  806. data: xsql.GroupedTuplesSet{
  807. {
  808. &xsql.Tuple{
  809. Emitter: "src1",
  810. Message: xsql.Message{"id1": 1, "f1": "v1"},
  811. },
  812. &xsql.Tuple{
  813. Emitter: "src1",
  814. Message: xsql.Message{"id1": 3, "f1": "v1"},
  815. },
  816. },
  817. {
  818. &xsql.Tuple{
  819. Emitter: "src1",
  820. Message: xsql.Message{"id1": 2, "f1": "v2"},
  821. },
  822. },
  823. },
  824. result: []map[string]interface{}{{
  825. "id1": float64(1),
  826. "f1": "v1",
  827. }, {
  828. "id1": float64(2),
  829. "f1": "v2",
  830. }},
  831. },
  832. {
  833. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  834. data: xsql.GroupedTuplesSet{
  835. {
  836. &xsql.JoinTuple{
  837. Tuples: []xsql.Tuple{
  838. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  839. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  840. },
  841. },
  842. },
  843. {
  844. &xsql.JoinTuple{
  845. Tuples: []xsql.Tuple{
  846. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  847. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  848. },
  849. },
  850. },
  851. {
  852. &xsql.JoinTuple{
  853. Tuples: []xsql.Tuple{
  854. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  855. },
  856. },
  857. },
  858. },
  859. result: []map[string]interface{}{{
  860. "id2": float64(2),
  861. "id1": float64(1),
  862. "f1": "v1",
  863. }, {
  864. "id2": float64(4),
  865. "id1": float64(2),
  866. "f1": "v2",
  867. }, {
  868. "id1": float64(3),
  869. "f1": "v1",
  870. }},
  871. },
  872. {
  873. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  874. data: xsql.GroupedTuplesSet{
  875. {
  876. &xsql.JoinTuple{
  877. Tuples: []xsql.Tuple{
  878. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  879. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  880. },
  881. },
  882. },
  883. {
  884. &xsql.JoinTuple{
  885. Tuples: []xsql.Tuple{
  886. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  887. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  888. },
  889. },
  890. },
  891. {
  892. &xsql.JoinTuple{
  893. Tuples: []xsql.Tuple{
  894. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  895. },
  896. },
  897. },
  898. },
  899. result: []map[string]interface{}{{
  900. "id2": float64(2),
  901. "id1": float64(1),
  902. "f1": "v1",
  903. }, {
  904. "id2": float64(4),
  905. "id1": float64(2),
  906. "f1": "v2",
  907. }, {
  908. "id1": float64(3),
  909. "f1": "v1",
  910. }},
  911. },
  912. }
  913. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  914. contextLogger := common.Log.WithField("rule", "TestProjectPlan_MultiInput")
  915. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  916. for i, tt := range tests {
  917. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  918. pp := &ProjectPlan{Fields: stmt.Fields}
  919. pp.isTest = true
  920. fv, afv := xsql.NewAggregateFunctionValuers()
  921. result := pp.Apply(ctx, tt.data, fv, afv)
  922. var mapRes []map[string]interface{}
  923. if v, ok := result.([]byte); ok {
  924. err := json.Unmarshal(v, &mapRes)
  925. if err != nil {
  926. t.Errorf("Failed to parse the input into map.\n")
  927. continue
  928. }
  929. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  930. if !reflect.DeepEqual(tt.result, mapRes) {
  931. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  932. }
  933. } else {
  934. t.Errorf("The returned result is not type of []byte\n")
  935. }
  936. }
  937. }
  938. func TestProjectPlan_Funcs(t *testing.T) {
  939. var tests = []struct {
  940. sql string
  941. data interface{}
  942. result []map[string]interface{}
  943. }{
  944. {
  945. sql: "SELECT round(a) as r FROM test",
  946. data: &xsql.Tuple{
  947. Emitter: "test",
  948. Message: xsql.Message{
  949. "a": 47.5,
  950. },
  951. },
  952. result: []map[string]interface{}{{
  953. "r": float64(48),
  954. }},
  955. }, {
  956. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  957. data: xsql.WindowTuplesSet{
  958. xsql.WindowTuples{
  959. Emitter: "test",
  960. Tuples: []xsql.Tuple{
  961. {
  962. Emitter: "src1",
  963. Message: xsql.Message{"a": 53.1},
  964. }, {
  965. Emitter: "src1",
  966. Message: xsql.Message{"a": 27.4},
  967. }, {
  968. Emitter: "src1",
  969. Message: xsql.Message{"a": 123123.7},
  970. },
  971. },
  972. },
  973. },
  974. result: []map[string]interface{}{{
  975. "r": float64(53),
  976. }, {
  977. "r": float64(27),
  978. }, {
  979. "r": float64(123124),
  980. }},
  981. }, {
  982. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  983. data: xsql.WindowTuplesSet{
  984. xsql.WindowTuples{
  985. Emitter: "test",
  986. Tuples: []xsql.Tuple{
  987. {
  988. Emitter: "src1",
  989. Message: xsql.Message{"a": 53.1},
  990. }, {
  991. Emitter: "src1",
  992. Message: xsql.Message{"a": 27.4},
  993. }, {
  994. Emitter: "src1",
  995. Message: xsql.Message{"a": 123123.7},
  996. },
  997. },
  998. },
  999. },
  1000. result: []map[string]interface{}{{
  1001. "r": float64(53),
  1002. }, {
  1003. "r": float64(27),
  1004. }, {
  1005. "r": float64(123124),
  1006. }},
  1007. }, {
  1008. sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1009. data: xsql.JoinTupleSets{
  1010. xsql.JoinTuple{
  1011. Tuples: []xsql.Tuple{
  1012. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1013. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1014. },
  1015. },
  1016. xsql.JoinTuple{
  1017. Tuples: []xsql.Tuple{
  1018. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1019. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1020. },
  1021. },
  1022. xsql.JoinTuple{
  1023. Tuples: []xsql.Tuple{
  1024. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1025. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1026. },
  1027. },
  1028. },
  1029. result: []map[string]interface{}{{
  1030. "r": float64(66),
  1031. }, {
  1032. "r": float64(73),
  1033. }, {
  1034. "r": float64(89),
  1035. }},
  1036. }, {
  1037. sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1038. data: xsql.JoinTupleSets{
  1039. xsql.JoinTuple{
  1040. Tuples: []xsql.Tuple{
  1041. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1042. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1043. },
  1044. },
  1045. xsql.JoinTuple{
  1046. Tuples: []xsql.Tuple{
  1047. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1048. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1049. },
  1050. },
  1051. xsql.JoinTuple{
  1052. Tuples: []xsql.Tuple{
  1053. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1054. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1055. },
  1056. },
  1057. },
  1058. result: []map[string]interface{}{{
  1059. "concat": "165.5512",
  1060. }, {
  1061. "concat": "273.49934",
  1062. }, {
  1063. "concat": "388.886",
  1064. }},
  1065. }, {
  1066. sql: "SELECT count(a) as r FROM test",
  1067. data: &xsql.Tuple{
  1068. Emitter: "test",
  1069. Message: xsql.Message{
  1070. "a": 47.5,
  1071. },
  1072. },
  1073. result: []map[string]interface{}{{
  1074. "r": float64(1),
  1075. }},
  1076. }, {
  1077. sql: "SELECT meta(test.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1078. data: xsql.JoinTupleSets{
  1079. xsql.JoinTuple{
  1080. Tuples: []xsql.Tuple{
  1081. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}, Metadata: xsql.Metadata{"device": "devicea"}},
  1082. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1083. },
  1084. },
  1085. xsql.JoinTuple{
  1086. Tuples: []xsql.Tuple{
  1087. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1088. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1089. },
  1090. },
  1091. xsql.JoinTuple{
  1092. Tuples: []xsql.Tuple{
  1093. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}, Metadata: xsql.Metadata{"device": "devicec"}},
  1094. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1095. },
  1096. },
  1097. },
  1098. result: []map[string]interface{}{{
  1099. "d": "devicea",
  1100. }, {
  1101. "d": "deviceb",
  1102. }, {
  1103. "d": "devicec",
  1104. }},
  1105. },
  1106. }
  1107. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1108. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Funcs")
  1109. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1110. for i, tt := range tests {
  1111. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1112. if err != nil {
  1113. t.Error(err)
  1114. }
  1115. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  1116. pp.isTest = true
  1117. fv, afv := xsql.NewAggregateFunctionValuers()
  1118. result := pp.Apply(ctx, tt.data, fv, afv)
  1119. var mapRes []map[string]interface{}
  1120. if v, ok := result.([]byte); ok {
  1121. err := json.Unmarshal(v, &mapRes)
  1122. if err != nil {
  1123. t.Errorf("Failed to parse the input into map.\n")
  1124. continue
  1125. }
  1126. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1127. if !reflect.DeepEqual(tt.result, mapRes) {
  1128. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1129. }
  1130. } else {
  1131. t.Errorf("%d. The returned result is not type of []byte\n", i)
  1132. }
  1133. }
  1134. }
  1135. func TestProjectPlan_AggFuncs(t *testing.T) {
  1136. var tests = []struct {
  1137. sql string
  1138. data interface{}
  1139. result []map[string]interface{}
  1140. }{
  1141. {
  1142. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1143. data: xsql.GroupedTuplesSet{
  1144. {
  1145. &xsql.JoinTuple{
  1146. Tuples: []xsql.Tuple{
  1147. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1148. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1149. },
  1150. },
  1151. &xsql.JoinTuple{
  1152. Tuples: []xsql.Tuple{
  1153. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1154. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1155. },
  1156. },
  1157. },
  1158. {
  1159. &xsql.JoinTuple{
  1160. Tuples: []xsql.Tuple{
  1161. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1162. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1163. },
  1164. },
  1165. &xsql.JoinTuple{
  1166. Tuples: []xsql.Tuple{
  1167. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1168. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1169. },
  1170. },
  1171. },
  1172. },
  1173. result: []map[string]interface{}{{
  1174. "c": float64(2),
  1175. "r": float64(122),
  1176. }, {
  1177. "c": float64(2),
  1178. "r": float64(89),
  1179. }},
  1180. },
  1181. {
  1182. sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1183. data: xsql.GroupedTuplesSet{
  1184. {
  1185. &xsql.JoinTuple{
  1186. Tuples: []xsql.Tuple{
  1187. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1188. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1189. },
  1190. },
  1191. &xsql.JoinTuple{
  1192. Tuples: []xsql.Tuple{
  1193. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1194. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1195. },
  1196. },
  1197. &xsql.JoinTuple{
  1198. Tuples: []xsql.Tuple{
  1199. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 98.31}},
  1200. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1201. },
  1202. },
  1203. &xsql.JoinTuple{
  1204. Tuples: []xsql.Tuple{
  1205. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1206. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1207. },
  1208. },
  1209. },
  1210. {
  1211. &xsql.JoinTuple{
  1212. Tuples: []xsql.Tuple{
  1213. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1214. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1215. },
  1216. },
  1217. &xsql.JoinTuple{
  1218. Tuples: []xsql.Tuple{
  1219. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1220. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1221. },
  1222. },
  1223. },
  1224. },
  1225. result: []map[string]interface{}{{
  1226. "avg": 116.68,
  1227. }, {
  1228. "avg": 51.815,
  1229. }},
  1230. },
  1231. {
  1232. sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1233. data: xsql.GroupedTuplesSet{
  1234. {
  1235. &xsql.JoinTuple{
  1236. Tuples: []xsql.Tuple{
  1237. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1238. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1239. },
  1240. },
  1241. &xsql.JoinTuple{
  1242. Tuples: []xsql.Tuple{
  1243. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1244. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1245. },
  1246. },
  1247. &xsql.JoinTuple{
  1248. Tuples: []xsql.Tuple{
  1249. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1250. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1251. },
  1252. },
  1253. },
  1254. {
  1255. &xsql.JoinTuple{
  1256. Tuples: []xsql.Tuple{
  1257. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1258. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1259. },
  1260. },
  1261. &xsql.JoinTuple{
  1262. Tuples: []xsql.Tuple{
  1263. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1264. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1265. },
  1266. },
  1267. },
  1268. },
  1269. result: []map[string]interface{}{{
  1270. "max": 177.51,
  1271. }, {
  1272. "max": 89.03,
  1273. }},
  1274. },
  1275. {
  1276. sql: "SELECT min(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1277. data: xsql.JoinTupleSets{
  1278. xsql.JoinTuple{
  1279. Tuples: []xsql.Tuple{
  1280. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1281. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1282. },
  1283. },
  1284. xsql.JoinTuple{
  1285. Tuples: []xsql.Tuple{
  1286. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1287. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1288. },
  1289. },
  1290. xsql.JoinTuple{
  1291. Tuples: []xsql.Tuple{
  1292. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1293. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1294. },
  1295. },
  1296. },
  1297. result: []map[string]interface{}{{
  1298. "min": 68.55,
  1299. }},
  1300. }, {
  1301. sql: "SELECT min(a) as m FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1302. data: xsql.JoinTupleSets{
  1303. xsql.JoinTuple{
  1304. Tuples: []xsql.Tuple{
  1305. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "m": 68.55}},
  1306. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1307. },
  1308. },
  1309. xsql.JoinTuple{
  1310. Tuples: []xsql.Tuple{
  1311. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1312. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1313. },
  1314. },
  1315. xsql.JoinTuple{
  1316. Tuples: []xsql.Tuple{
  1317. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1318. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1319. },
  1320. },
  1321. },
  1322. result: []map[string]interface{}{{
  1323. "m": 68.55,
  1324. }},
  1325. }, {
  1326. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1327. data: xsql.WindowTuplesSet{
  1328. xsql.WindowTuples{
  1329. Emitter: "test",
  1330. Tuples: []xsql.Tuple{
  1331. {
  1332. Emitter: "src1",
  1333. Message: xsql.Message{"a": 53},
  1334. }, {
  1335. Emitter: "src1",
  1336. Message: xsql.Message{"a": 27},
  1337. }, {
  1338. Emitter: "src1",
  1339. Message: xsql.Message{"a": 123123},
  1340. },
  1341. },
  1342. },
  1343. },
  1344. result: []map[string]interface{}{{
  1345. "sum": float64(123203),
  1346. }},
  1347. }, {
  1348. sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
  1349. data: xsql.WindowTuplesSet{
  1350. xsql.WindowTuples{
  1351. Emitter: "test",
  1352. Tuples: []xsql.Tuple{
  1353. {
  1354. Emitter: "src1",
  1355. Message: xsql.Message{"a": 53, "s": 123203},
  1356. }, {
  1357. Emitter: "src1",
  1358. Message: xsql.Message{"a": 27},
  1359. }, {
  1360. Emitter: "src1",
  1361. Message: xsql.Message{"a": 123123},
  1362. },
  1363. },
  1364. },
  1365. },
  1366. result: []map[string]interface{}{{
  1367. "s": float64(123203),
  1368. }},
  1369. }, {
  1370. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1371. data: xsql.WindowTuplesSet{
  1372. xsql.WindowTuples{
  1373. Emitter: "test",
  1374. Tuples: []xsql.Tuple{
  1375. {
  1376. Emitter: "src1",
  1377. Message: xsql.Message{"a": 53},
  1378. }, {
  1379. Emitter: "src1",
  1380. Message: xsql.Message{"a": 27},
  1381. }, {
  1382. Emitter: "src1",
  1383. Message: xsql.Message{"a": 123123},
  1384. },
  1385. },
  1386. },
  1387. },
  1388. result: []map[string]interface{}{{
  1389. "sum": float64(123203),
  1390. }},
  1391. },
  1392. {
  1393. sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1394. data: xsql.GroupedTuplesSet{
  1395. {
  1396. &xsql.JoinTuple{
  1397. Tuples: []xsql.Tuple{
  1398. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1399. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1400. },
  1401. },
  1402. &xsql.JoinTuple{
  1403. Tuples: []xsql.Tuple{
  1404. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1405. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1406. },
  1407. },
  1408. },
  1409. {
  1410. &xsql.JoinTuple{
  1411. Tuples: []xsql.Tuple{
  1412. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1413. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1414. },
  1415. },
  1416. &xsql.JoinTuple{
  1417. Tuples: []xsql.Tuple{
  1418. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1419. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1420. },
  1421. },
  1422. },
  1423. },
  1424. result: []map[string]interface{}{{
  1425. "count": float64(2),
  1426. "meta": "devicea",
  1427. }, {
  1428. "count": float64(2),
  1429. "meta": "devicec",
  1430. }},
  1431. },
  1432. {
  1433. sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1434. data: xsql.GroupedTuplesSet{
  1435. {
  1436. &xsql.JoinTuple{
  1437. Tuples: []xsql.Tuple{
  1438. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
  1439. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1440. },
  1441. },
  1442. &xsql.JoinTuple{
  1443. Tuples: []xsql.Tuple{
  1444. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1445. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1446. },
  1447. },
  1448. },
  1449. {
  1450. &xsql.JoinTuple{
  1451. Tuples: []xsql.Tuple{
  1452. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
  1453. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1454. },
  1455. },
  1456. &xsql.JoinTuple{
  1457. Tuples: []xsql.Tuple{
  1458. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1459. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1460. },
  1461. },
  1462. },
  1463. },
  1464. result: []map[string]interface{}{{
  1465. "c": float64(2),
  1466. "d": "devicea",
  1467. }, {
  1468. "c": float64(2),
  1469. "d": "devicec",
  1470. }},
  1471. },
  1472. }
  1473. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1474. contextLogger := common.Log.WithField("rule", "TestProjectPlan_AggFuncs")
  1475. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1476. for i, tt := range tests {
  1477. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1478. if err != nil {
  1479. t.Error(err)
  1480. }
  1481. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true}
  1482. fv, afv := xsql.NewAggregateFunctionValuers()
  1483. result := pp.Apply(ctx, tt.data, fv, afv)
  1484. var mapRes []map[string]interface{}
  1485. if v, ok := result.([]byte); ok {
  1486. err := json.Unmarshal(v, &mapRes)
  1487. if err != nil {
  1488. t.Errorf("Failed to parse the input into map.\n")
  1489. continue
  1490. }
  1491. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1492. if !reflect.DeepEqual(tt.result, mapRes) {
  1493. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1494. }
  1495. } else {
  1496. t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
  1497. }
  1498. }
  1499. }
  1500. func TestProjectPlanError(t *testing.T) {
  1501. var tests = []struct {
  1502. sql string
  1503. data interface{}
  1504. result interface{}
  1505. }{
  1506. {
  1507. sql: "SELECT a FROM test",
  1508. data: errors.New("an error from upstream"),
  1509. result: errors.New("an error from upstream"),
  1510. }, {
  1511. sql: "SELECT a * 5 FROM test",
  1512. data: &xsql.Tuple{
  1513. Emitter: "test",
  1514. Message: xsql.Message{
  1515. "a": "val_a",
  1516. },
  1517. },
  1518. result: errors.New("run Select error: invalid operation string(val_a) * int64(5)"),
  1519. }, {
  1520. sql: `SELECT a[0]->b AS ab FROM test`,
  1521. data: &xsql.Tuple{
  1522. Emitter: "test",
  1523. Message: xsql.Message{
  1524. "a": "common string",
  1525. },
  1526. },
  1527. result: errors.New("run Select error: invalid operation string(common string) [] *xsql.BracketEvalResult(&{0 0})"),
  1528. }, {
  1529. sql: `SELECT round(a) as r FROM test`,
  1530. data: &xsql.Tuple{
  1531. Emitter: "test",
  1532. Message: xsql.Message{
  1533. "a": "common string",
  1534. },
  1535. },
  1536. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  1537. }, {
  1538. sql: `SELECT round(a) as r FROM test`,
  1539. data: &xsql.Tuple{
  1540. Emitter: "test",
  1541. Message: xsql.Message{
  1542. "abc": "common string",
  1543. },
  1544. },
  1545. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  1546. }, {
  1547. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1548. data: xsql.GroupedTuplesSet{
  1549. {
  1550. &xsql.JoinTuple{
  1551. Tuples: []xsql.Tuple{
  1552. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1553. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1554. },
  1555. },
  1556. &xsql.JoinTuple{
  1557. Tuples: []xsql.Tuple{
  1558. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1559. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1560. },
  1561. },
  1562. &xsql.JoinTuple{
  1563. Tuples: []xsql.Tuple{
  1564. {Emitter: "test", Message: xsql.Message{"id": 4, "a": "dde"}},
  1565. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1566. },
  1567. },
  1568. &xsql.JoinTuple{
  1569. Tuples: []xsql.Tuple{
  1570. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1571. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1572. },
  1573. },
  1574. },
  1575. {
  1576. &xsql.JoinTuple{
  1577. Tuples: []xsql.Tuple{
  1578. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1579. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1580. },
  1581. },
  1582. &xsql.JoinTuple{
  1583. Tuples: []xsql.Tuple{
  1584. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1585. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1586. },
  1587. },
  1588. },
  1589. },
  1590. result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
  1591. }, {
  1592. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  1593. data: xsql.WindowTuplesSet{
  1594. xsql.WindowTuples{
  1595. Emitter: "test",
  1596. Tuples: []xsql.Tuple{
  1597. {
  1598. Emitter: "src1",
  1599. Message: xsql.Message{"a": 53},
  1600. }, {
  1601. Emitter: "src1",
  1602. Message: xsql.Message{"a": "ddd"},
  1603. }, {
  1604. Emitter: "src1",
  1605. Message: xsql.Message{"a": 123123},
  1606. },
  1607. },
  1608. },
  1609. },
  1610. result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
  1611. }, {
  1612. sql: `SELECT a[0]->b AS ab FROM test`,
  1613. data: &xsql.Tuple{
  1614. Emitter: "test",
  1615. Message: xsql.Message{
  1616. "a": []map[string]interface{}(nil),
  1617. },
  1618. },
  1619. result: errors.New("run Select error: out of index: 0 of 0"),
  1620. },
  1621. }
  1622. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1623. contextLogger := common.Log.WithField("rule", "TestProjectPlanError")
  1624. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1625. for i, tt := range tests {
  1626. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1627. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  1628. pp.isTest = true
  1629. fv, afv := xsql.NewAggregateFunctionValuers()
  1630. result := pp.Apply(ctx, tt.data, fv, afv)
  1631. if !reflect.DeepEqual(tt.result, result) {
  1632. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1633. }
  1634. }
  1635. }