project_test.go 42 KB


  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestProjectPlan_Apply1(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data *xsql.Tuple
  17. result []map[string]interface{}
  18. }{
  19. {
  20. sql: "SELECT a FROM test",
  21. data: &xsql.Tuple{
  22. Emitter: "test",
  23. Message: xsql.Message{
  24. "a": "val_a",
  25. },
  26. Metadata: xsql.Metadata{
  27. "id": 45,
  28. "other": "mock",
  29. },
  30. },
  31. result: []map[string]interface{}{{
  32. "a": "val_a",
  33. "__meta": map[string]interface{}{
  34. "id": float64(45),
  35. "other": "mock",
  36. },
  37. }},
  38. },
  39. {
  40. sql: "SELECT b FROM test",
  41. data: &xsql.Tuple{
  42. Emitter: "test",
  43. Message: xsql.Message{
  44. "a": "val_a",
  45. },
  46. },
  47. result: []map[string]interface{}{{}},
  48. },
  49. {
  50. sql: "SELECT ts FROM test",
  51. data: &xsql.Tuple{
  52. Emitter: "test",
  53. Message: xsql.Message{
  54. "a": "val_a",
  55. "ts": common.TimeFromUnixMilli(1568854573431),
  56. },
  57. },
  58. result: []map[string]interface{}{{
  59. "ts": "2019-09-19T00:56:13.431Z",
  60. }},
  61. },
  62. //Schemaless may return a message without selecting column
  63. {
  64. sql: "SELECT ts FROM test",
  65. data: &xsql.Tuple{
  66. Emitter: "test",
  67. Message: xsql.Message{
  68. "a": "val_a",
  69. "ts2": common.TimeFromUnixMilli(1568854573431),
  70. },
  71. },
  72. result: []map[string]interface{}{{}},
  73. },
  74. {
  75. sql: "SELECT A FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "val_a",
  80. },
  81. },
  82. result: []map[string]interface{}{{
  83. "A": "val_a",
  84. }},
  85. },
  86. {
  87. sql: `SELECT "value" FROM test`,
  88. data: &xsql.Tuple{
  89. Emitter: "test",
  90. Message: xsql.Message{},
  91. },
  92. result: []map[string]interface{}{{
  93. DEFAULT_FIELD_NAME_PREFIX + "0": "value",
  94. }},
  95. },
  96. {
  97. sql: `SELECT 3.4 FROM test`,
  98. data: &xsql.Tuple{
  99. Emitter: "test",
  100. Message: xsql.Message{},
  101. },
  102. result: []map[string]interface{}{{
  103. DEFAULT_FIELD_NAME_PREFIX + "0": 3.4,
  104. }},
  105. },
  106. {
  107. sql: `SELECT 5 FROM test`,
  108. data: &xsql.Tuple{
  109. Emitter: "test",
  110. Message: xsql.Message{},
  111. },
  112. result: []map[string]interface{}{{
  113. DEFAULT_FIELD_NAME_PREFIX + "0": 5.0,
  114. }},
  115. },
  116. {
  117. sql: `SELECT a, "value" AS b FROM test`,
  118. data: &xsql.Tuple{
  119. Emitter: "test",
  120. Message: xsql.Message{
  121. "a": "val_a",
  122. },
  123. },
  124. result: []map[string]interface{}{{
  125. "a": "val_a",
  126. "b": "value",
  127. }},
  128. },
  129. {
  130. sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
  131. data: &xsql.Tuple{
  132. Emitter: "test",
  133. Message: xsql.Message{
  134. "a": "val_a",
  135. },
  136. },
  137. result: []map[string]interface{}{{
  138. "a": "val_a",
  139. "b": "value",
  140. "Pi": 3.14,
  141. "Zero": 0.0,
  142. }},
  143. },
  144. {
  145. sql: `SELECT a->b AS ab FROM test`,
  146. data: &xsql.Tuple{
  147. Emitter: "test",
  148. Message: xsql.Message{
  149. "a": map[string]interface{}{"b": "hello"},
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "ab": "hello",
  154. }},
  155. },
  156. {
  157. sql: `SELECT a->b AS ab FROM test`,
  158. data: &xsql.Tuple{
  159. Emitter: "test",
  160. Message: xsql.Message{
  161. "a": map[string]interface{}(nil),
  162. },
  163. },
  164. result: []map[string]interface{}{{}},
  165. },
  166. {
  167. sql: `SELECT a->b AS ab FROM test`,
  168. data: &xsql.Tuple{
  169. Emitter: "test",
  170. Message: xsql.Message{
  171. "name": "name",
  172. },
  173. },
  174. result: []map[string]interface{}{{}},
  175. },
  176. {
  177. sql: `SELECT a->b AS ab FROM test`,
  178. data: &xsql.Tuple{
  179. Emitter: "test",
  180. Message: xsql.Message{
  181. "a": "commonstring",
  182. },
  183. },
  184. result: []map[string]interface{}{{}},
  185. },
  186. {
  187. sql: `SELECT a[0]->b AS ab FROM test`,
  188. data: &xsql.Tuple{
  189. Emitter: "test",
  190. Message: xsql.Message{
  191. "a": []interface{}{
  192. map[string]interface{}{"b": "hello1"},
  193. map[string]interface{}{"b": "hello2"},
  194. },
  195. },
  196. },
  197. result: []map[string]interface{}{{
  198. "ab": "hello1",
  199. }},
  200. },
  201. {
  202. sql: `SELECT a[0]->b AS ab FROM test`,
  203. data: &xsql.Tuple{
  204. Emitter: "test",
  205. Message: xsql.Message{
  206. "a": []map[string]interface{}{
  207. {"b": "hello1"},
  208. {"b": "hello2"},
  209. },
  210. },
  211. },
  212. result: []map[string]interface{}{{
  213. "ab": "hello1",
  214. }},
  215. },
  216. {
  217. sql: `SELECT a[2:4] AS ab FROM test`,
  218. data: &xsql.Tuple{
  219. Emitter: "test",
  220. Message: xsql.Message{
  221. "a": []map[string]interface{}{
  222. {"b": "hello1"},
  223. {"b": "hello2"},
  224. {"b": "hello3"},
  225. {"b": "hello4"},
  226. {"b": "hello5"},
  227. },
  228. },
  229. },
  230. result: []map[string]interface{}{{
  231. "ab": []interface{}{
  232. map[string]interface{}{"b": "hello3"},
  233. map[string]interface{}{"b": "hello4"},
  234. },
  235. }},
  236. },
  237. {
  238. sql: `SELECT a->c->d AS f1 FROM test`,
  239. data: &xsql.Tuple{
  240. Emitter: "test",
  241. Message: xsql.Message{
  242. "a": map[string]interface{}{
  243. "b": "hello",
  244. "c": map[string]interface{}{
  245. "d": 35.2,
  246. },
  247. },
  248. },
  249. },
  250. result: []map[string]interface{}{{
  251. "f1": 35.2,
  252. }},
  253. },
  254. {
  255. sql: `SELECT a->c->d AS f1 FROM test`,
  256. data: &xsql.Tuple{
  257. Emitter: "test",
  258. Message: xsql.Message{
  259. "a": map[string]interface{}{
  260. "b": "hello",
  261. "c": map[string]interface{}{
  262. "e": 35.2,
  263. },
  264. },
  265. },
  266. },
  267. result: []map[string]interface{}{{}},
  268. },
  269. {
  270. sql: `SELECT a->c->d AS f1 FROM test`,
  271. data: &xsql.Tuple{
  272. Emitter: "test",
  273. Message: xsql.Message{
  274. "a": map[string]interface{}{
  275. "b": "hello",
  276. },
  277. },
  278. },
  279. result: []map[string]interface{}{{}},
  280. },
  281. //The int type is not supported yet, the json parser returns float64 for int values
  282. {
  283. sql: `SELECT a->c->d AS f1 FROM test`,
  284. data: &xsql.Tuple{
  285. Emitter: "test",
  286. Message: xsql.Message{
  287. "a": map[string]interface{}{
  288. "b": "hello",
  289. "c": map[string]interface{}{
  290. "d": float64(35),
  291. },
  292. },
  293. },
  294. },
  295. result: []map[string]interface{}{{
  296. "f1": float64(35),
  297. }},
  298. },
  299. {
  300. sql: "SELECT a FROM test",
  301. data: &xsql.Tuple{
  302. Emitter: "test",
  303. Message: xsql.Message{},
  304. },
  305. result: []map[string]interface{}{
  306. {},
  307. },
  308. },
  309. {
  310. sql: "SELECT * FROM test",
  311. data: &xsql.Tuple{
  312. Emitter: "test",
  313. Message: xsql.Message{},
  314. },
  315. result: []map[string]interface{}{
  316. {},
  317. },
  318. },
  319. {
  320. sql: `SELECT * FROM test`,
  321. data: &xsql.Tuple{
  322. Emitter: "test",
  323. Message: xsql.Message{
  324. "a": map[string]interface{}{
  325. "b": "hello",
  326. "c": map[string]interface{}{
  327. "d": 35.2,
  328. },
  329. },
  330. },
  331. },
  332. result: []map[string]interface{}{{
  333. "a": map[string]interface{}{
  334. "b": "hello",
  335. "c": map[string]interface{}{
  336. "d": 35.2,
  337. },
  338. },
  339. }},
  340. },
  341. {
  342. sql: `SELECT * FROM test`,
  343. data: &xsql.Tuple{
  344. Emitter: "test",
  345. Message: xsql.Message{
  346. "a": "val1",
  347. "b": 3.14,
  348. },
  349. },
  350. result: []map[string]interface{}{{
  351. "a": "val1",
  352. "b": 3.14,
  353. }},
  354. },
  355. {
  356. sql: `SELECT 3*4 AS f1 FROM test`,
  357. data: &xsql.Tuple{
  358. Emitter: "test",
  359. Message: xsql.Message{},
  360. },
  361. result: []map[string]interface{}{{
  362. "f1": float64(12),
  363. }},
  364. },
  365. {
  366. sql: `SELECT 4.5*2 AS f1 FROM test`,
  367. data: &xsql.Tuple{
  368. Emitter: "test",
  369. Message: xsql.Message{},
  370. },
  371. result: []map[string]interface{}{{
  372. "f1": float64(9),
  373. }},
  374. },
  375. {
  376. sql: "SELECT `a.b.c` FROM test",
  377. data: &xsql.Tuple{
  378. Emitter: "test",
  379. Message: xsql.Message{
  380. "a.b.c": "val_a",
  381. },
  382. },
  383. result: []map[string]interface{}{{
  384. "a.b.c": "val_a",
  385. }},
  386. },
  387. }
  388. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  389. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Apply1")
  390. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  391. for i, tt := range tests {
  392. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  393. pp := &ProjectPlan{Fields: stmt.Fields, SendMeta: true}
  394. pp.isTest = true
  395. fv, afv := xsql.NewAggregateFunctionValuers()
  396. result := pp.Apply(ctx, tt.data, fv, afv)
  397. var mapRes []map[string]interface{}
  398. if v, ok := result.([]byte); ok {
  399. err := json.Unmarshal(v, &mapRes)
  400. if err != nil {
  401. t.Errorf("Failed to parse the input into map.\n")
  402. continue
  403. }
  404. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  405. if !reflect.DeepEqual(tt.result, mapRes) {
  406. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  407. }
  408. } else {
  409. t.Errorf("%d. The returned result is not type of []byte\n", i)
  410. }
  411. }
  412. }
  413. func TestProjectPlan_MultiInput(t *testing.T) {
  414. var tests = []struct {
  415. sql string
  416. data interface{}
  417. result []map[string]interface{}
  418. }{
  419. {
  420. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  421. data: &xsql.Tuple{
  422. Emitter: "tbl",
  423. Message: xsql.Message{
  424. "abc": int64(6),
  425. },
  426. },
  427. result: []map[string]interface{}{{
  428. "abc": float64(6), //json marshall problem
  429. }},
  430. },
  431. {
  432. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  433. data: &xsql.Tuple{
  434. Emitter: "tbl",
  435. Message: xsql.Message{
  436. "abc": int64(34),
  437. "def": "hello",
  438. },
  439. },
  440. result: []map[string]interface{}{{
  441. "abc": float64(34),
  442. }},
  443. },
  444. {
  445. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  446. data: xsql.WindowTuplesSet{
  447. xsql.WindowTuples{
  448. Emitter: "src1",
  449. Tuples: []xsql.Tuple{
  450. {
  451. Emitter: "src1",
  452. Message: xsql.Message{"id1": 1, "f1": "v1"},
  453. }, {
  454. Emitter: "src1",
  455. Message: xsql.Message{"id1": 2, "f1": "v2"},
  456. }, {
  457. Emitter: "src1",
  458. Message: xsql.Message{"id1": 3, "f1": "v1"},
  459. },
  460. },
  461. },
  462. },
  463. result: []map[string]interface{}{{
  464. "id1": float64(1),
  465. }, {
  466. "id1": float64(2),
  467. }, {
  468. "id1": float64(3),
  469. }},
  470. },
  471. {
  472. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  473. data: xsql.WindowTuplesSet{
  474. xsql.WindowTuples{
  475. Emitter: "src1",
  476. Tuples: []xsql.Tuple{
  477. {
  478. Emitter: "src1",
  479. Message: xsql.Message{"id1": 1, "f1": "v1"},
  480. }, {
  481. Emitter: "src1",
  482. Message: xsql.Message{"id2": 2, "f1": "v2"},
  483. }, {
  484. Emitter: "src1",
  485. Message: xsql.Message{"id1": 3, "f1": "v1"},
  486. },
  487. },
  488. },
  489. },
  490. result: []map[string]interface{}{{
  491. "id1": float64(1),
  492. }, {}, {
  493. "id1": float64(3),
  494. }},
  495. },
  496. {
  497. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  498. data: xsql.WindowTuplesSet{
  499. xsql.WindowTuples{
  500. Emitter: "src1",
  501. Tuples: []xsql.Tuple{
  502. {
  503. Emitter: "src1",
  504. Message: xsql.Message{"id1": 1, "f1": "v1"},
  505. }, {
  506. Emitter: "src1",
  507. Message: xsql.Message{"id1": 2, "f1": "v2"},
  508. }, {
  509. Emitter: "src1",
  510. Message: xsql.Message{"id1": 3, "f1": "v1"},
  511. },
  512. },
  513. },
  514. },
  515. result: []map[string]interface{}{{
  516. "id1": float64(1),
  517. "f1": "v1",
  518. }, {
  519. "id1": float64(2),
  520. "f1": "v2",
  521. }, {
  522. "id1": float64(3),
  523. "f1": "v1",
  524. }},
  525. },
  526. {
  527. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  528. data: xsql.WindowTuplesSet{
  529. xsql.WindowTuples{
  530. Emitter: "src1",
  531. Tuples: []xsql.Tuple{
  532. {
  533. Emitter: "src1",
  534. Message: xsql.Message{"id1": 1, "f1": "v1"},
  535. }, {
  536. Emitter: "src1",
  537. Message: xsql.Message{"id2": 2, "f2": "v2"},
  538. }, {
  539. Emitter: "src1",
  540. Message: xsql.Message{"id1": 3, "f1": "v1"},
  541. },
  542. },
  543. },
  544. },
  545. result: []map[string]interface{}{{
  546. "id1": float64(1),
  547. "f1": "v1",
  548. }, {
  549. "id2": float64(2),
  550. "f2": "v2",
  551. }, {
  552. "id1": float64(3),
  553. "f1": "v1",
  554. }},
  555. },
  556. {
  557. sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  558. data: xsql.WindowTuplesSet{
  559. xsql.WindowTuples{
  560. Emitter: "src1",
  561. Tuples: []xsql.Tuple{
  562. {
  563. Emitter: "src1",
  564. Message: xsql.Message{"id1": 1, "f1": "v1"},
  565. }, {
  566. Emitter: "src1",
  567. Message: xsql.Message{"id1": 2, "f1": "v2"},
  568. }, {
  569. Emitter: "src1",
  570. Message: xsql.Message{"id1": 3, "f1": "v1"},
  571. },
  572. },
  573. },
  574. },
  575. result: []map[string]interface{}{{
  576. "id1": float64(1),
  577. "f1": "v1",
  578. }, {
  579. "id1": float64(2),
  580. "f1": "v2",
  581. }, {
  582. "id1": float64(3),
  583. "f1": "v1",
  584. }},
  585. },
  586. {
  587. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  588. data: xsql.JoinTupleSets{
  589. xsql.JoinTuple{
  590. Tuples: []xsql.Tuple{
  591. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  592. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  593. },
  594. },
  595. xsql.JoinTuple{
  596. Tuples: []xsql.Tuple{
  597. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  598. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  599. },
  600. },
  601. xsql.JoinTuple{
  602. Tuples: []xsql.Tuple{
  603. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  604. },
  605. },
  606. },
  607. result: []map[string]interface{}{{
  608. "id1": float64(1),
  609. }, {
  610. "id1": float64(2),
  611. }, {
  612. "id1": float64(3),
  613. }},
  614. },
  615. {
  616. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  617. data: xsql.JoinTupleSets{
  618. xsql.JoinTuple{
  619. Tuples: []xsql.Tuple{
  620. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  621. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  622. },
  623. },
  624. xsql.JoinTuple{
  625. Tuples: []xsql.Tuple{
  626. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  627. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  628. },
  629. },
  630. xsql.JoinTuple{
  631. Tuples: []xsql.Tuple{
  632. {Emitter: "src1", Message: xsql.Message{"id2": 3, "f1": "v1"}},
  633. },
  634. },
  635. },
  636. result: []map[string]interface{}{{
  637. "id1": float64(1),
  638. }, {
  639. "id1": float64(2),
  640. }, {}},
  641. },
  642. {
  643. sql: "SELECT abc FROM tbl group by abc",
  644. data: xsql.GroupedTuplesSet{
  645. {
  646. &xsql.Tuple{
  647. Emitter: "tbl",
  648. Message: xsql.Message{
  649. "abc": int64(6),
  650. "def": "hello",
  651. },
  652. },
  653. },
  654. },
  655. result: []map[string]interface{}{{
  656. "abc": float64(6),
  657. }},
  658. },
  659. {
  660. sql: "SELECT abc FROM tbl group by abc",
  661. data: xsql.GroupedTuplesSet{
  662. {
  663. &xsql.Tuple{
  664. Emitter: "tbl",
  665. Message: xsql.Message{
  666. "def": "hello",
  667. },
  668. },
  669. },
  670. },
  671. result: []map[string]interface{}{{}},
  672. },
  673. {
  674. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  675. data: xsql.GroupedTuplesSet{
  676. {
  677. &xsql.Tuple{
  678. Emitter: "src1",
  679. Message: xsql.Message{"id1": 1, "f1": "v1"},
  680. },
  681. &xsql.Tuple{
  682. Emitter: "src1",
  683. Message: xsql.Message{"id1": 3, "f1": "v1"},
  684. },
  685. },
  686. {
  687. &xsql.Tuple{
  688. Emitter: "src1",
  689. Message: xsql.Message{"id1": 2, "f1": "v2"},
  690. },
  691. },
  692. },
  693. result: []map[string]interface{}{{
  694. "id1": float64(1),
  695. }, {
  696. "id1": float64(2),
  697. }},
  698. },
  699. {
  700. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  701. data: xsql.GroupedTuplesSet{
  702. {
  703. &xsql.Tuple{
  704. Emitter: "src1",
  705. Message: xsql.Message{"id1": 1, "f1": "v1"},
  706. },
  707. &xsql.Tuple{
  708. Emitter: "src1",
  709. Message: xsql.Message{"id1": 3, "f1": "v1"},
  710. },
  711. },
  712. {
  713. &xsql.Tuple{
  714. Emitter: "src1",
  715. Message: xsql.Message{"id2": 2, "f1": "v2"},
  716. },
  717. },
  718. },
  719. result: []map[string]interface{}{{
  720. "id1": float64(1),
  721. }, {}},
  722. },
  723. {
  724. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  725. data: xsql.GroupedTuplesSet{
  726. {
  727. &xsql.JoinTuple{
  728. Tuples: []xsql.Tuple{
  729. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  730. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  731. },
  732. },
  733. },
  734. {
  735. &xsql.JoinTuple{
  736. Tuples: []xsql.Tuple{
  737. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  738. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  739. },
  740. },
  741. },
  742. {
  743. &xsql.JoinTuple{
  744. Tuples: []xsql.Tuple{
  745. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  746. },
  747. },
  748. },
  749. },
  750. result: []map[string]interface{}{{
  751. "id2": float64(2),
  752. }, {
  753. "id2": float64(4),
  754. }, {}},
  755. },
  756. {
  757. sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  758. data: xsql.JoinTupleSets{
  759. xsql.JoinTuple{
  760. Tuples: []xsql.Tuple{
  761. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  762. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  763. },
  764. },
  765. xsql.JoinTuple{
  766. Tuples: []xsql.Tuple{
  767. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  768. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  769. },
  770. },
  771. xsql.JoinTuple{
  772. Tuples: []xsql.Tuple{
  773. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  774. },
  775. },
  776. },
  777. result: []map[string]interface{}{{
  778. "id1": float64(1),
  779. "f1": "v1",
  780. "f2": "w2",
  781. }, {
  782. "id1": float64(2),
  783. "f1": "v2",
  784. "f2": "w3",
  785. }, {
  786. "id1": float64(3),
  787. "f1": "v1",
  788. }},
  789. },
  790. {
  791. sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  792. data: xsql.JoinTupleSets{
  793. xsql.JoinTuple{
  794. Tuples: []xsql.Tuple{
  795. {Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
  796. {Emitter: "src2", Message: xsql.Message{"id": 2, "f2": "w2"}},
  797. },
  798. },
  799. xsql.JoinTuple{
  800. Tuples: []xsql.Tuple{
  801. {Emitter: "src1", Message: xsql.Message{"id": 2, "f1": "v2"}},
  802. {Emitter: "src2", Message: xsql.Message{"id": 4, "f2": "w3"}},
  803. },
  804. },
  805. xsql.JoinTuple{
  806. Tuples: []xsql.Tuple{
  807. {Emitter: "src1", Message: xsql.Message{"id": 3, "f1": "v1"}},
  808. },
  809. },
  810. },
  811. result: []map[string]interface{}{{
  812. "id": float64(1),
  813. "f1": "v1",
  814. "f2": "w2",
  815. }, {
  816. "id": float64(2),
  817. "f1": "v2",
  818. "f2": "w3",
  819. }, {
  820. "id": float64(3),
  821. "f1": "v1",
  822. }},
  823. },
  824. {
  825. sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  826. data: xsql.GroupedTuplesSet{
  827. {
  828. &xsql.Tuple{
  829. Emitter: "src1",
  830. Message: xsql.Message{"id1": 1, "f1": "v1"},
  831. },
  832. &xsql.Tuple{
  833. Emitter: "src1",
  834. Message: xsql.Message{"id1": 3, "f1": "v1"},
  835. },
  836. },
  837. {
  838. &xsql.Tuple{
  839. Emitter: "src1",
  840. Message: xsql.Message{"id1": 2, "f1": "v2"},
  841. },
  842. },
  843. },
  844. result: []map[string]interface{}{{
  845. "id1": float64(1),
  846. "f1": "v1",
  847. }, {
  848. "id1": float64(2),
  849. "f1": "v2",
  850. }},
  851. },
  852. {
  853. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  854. data: xsql.GroupedTuplesSet{
  855. {
  856. &xsql.JoinTuple{
  857. Tuples: []xsql.Tuple{
  858. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  859. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  860. },
  861. },
  862. },
  863. {
  864. &xsql.JoinTuple{
  865. Tuples: []xsql.Tuple{
  866. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  867. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  868. },
  869. },
  870. },
  871. {
  872. &xsql.JoinTuple{
  873. Tuples: []xsql.Tuple{
  874. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  875. },
  876. },
  877. },
  878. },
  879. result: []map[string]interface{}{{
  880. "id2": float64(2),
  881. "id1": float64(1),
  882. "f1": "v1",
  883. }, {
  884. "id2": float64(4),
  885. "id1": float64(2),
  886. "f1": "v2",
  887. }, {
  888. "id1": float64(3),
  889. "f1": "v1",
  890. }},
  891. },
  892. {
  893. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  894. data: xsql.GroupedTuplesSet{
  895. {
  896. &xsql.JoinTuple{
  897. Tuples: []xsql.Tuple{
  898. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  899. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  900. },
  901. },
  902. },
  903. {
  904. &xsql.JoinTuple{
  905. Tuples: []xsql.Tuple{
  906. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  907. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  908. },
  909. },
  910. },
  911. {
  912. &xsql.JoinTuple{
  913. Tuples: []xsql.Tuple{
  914. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  915. },
  916. },
  917. },
  918. },
  919. result: []map[string]interface{}{{
  920. "id2": float64(2),
  921. "id1": float64(1),
  922. "f1": "v1",
  923. }, {
  924. "id2": float64(4),
  925. "id1": float64(2),
  926. "f1": "v2",
  927. }, {
  928. "id1": float64(3),
  929. "f1": "v1",
  930. }},
  931. },
  932. }
  933. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  934. contextLogger := common.Log.WithField("rule", "TestProjectPlan_MultiInput")
  935. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  936. for i, tt := range tests {
  937. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  938. pp := &ProjectPlan{Fields: stmt.Fields}
  939. pp.isTest = true
  940. fv, afv := xsql.NewAggregateFunctionValuers()
  941. result := pp.Apply(ctx, tt.data, fv, afv)
  942. var mapRes []map[string]interface{}
  943. if v, ok := result.([]byte); ok {
  944. err := json.Unmarshal(v, &mapRes)
  945. if err != nil {
  946. t.Errorf("Failed to parse the input into map.\n")
  947. continue
  948. }
  949. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  950. if !reflect.DeepEqual(tt.result, mapRes) {
  951. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  952. }
  953. } else {
  954. t.Errorf("The returned result is not type of []byte\n")
  955. }
  956. }
  957. }
  958. func TestProjectPlan_Funcs(t *testing.T) {
  959. var tests = []struct {
  960. sql string
  961. data interface{}
  962. result []map[string]interface{}
  963. }{
  964. {
  965. sql: "SELECT round(a) as r FROM test",
  966. data: &xsql.Tuple{
  967. Emitter: "test",
  968. Message: xsql.Message{
  969. "a": 47.5,
  970. },
  971. },
  972. result: []map[string]interface{}{{
  973. "r": float64(48),
  974. }},
  975. }, {
  976. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  977. data: xsql.WindowTuplesSet{
  978. xsql.WindowTuples{
  979. Emitter: "test",
  980. Tuples: []xsql.Tuple{
  981. {
  982. Emitter: "src1",
  983. Message: xsql.Message{"a": 53.1},
  984. }, {
  985. Emitter: "src1",
  986. Message: xsql.Message{"a": 27.4},
  987. }, {
  988. Emitter: "src1",
  989. Message: xsql.Message{"a": 123123.7},
  990. },
  991. },
  992. },
  993. },
  994. result: []map[string]interface{}{{
  995. "r": float64(53),
  996. }, {
  997. "r": float64(27),
  998. }, {
  999. "r": float64(123124),
  1000. }},
  1001. }, {
  1002. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1003. data: xsql.WindowTuplesSet{
  1004. xsql.WindowTuples{
  1005. Emitter: "test",
  1006. Tuples: []xsql.Tuple{
  1007. {
  1008. Emitter: "src1",
  1009. Message: xsql.Message{"a": 53.1},
  1010. }, {
  1011. Emitter: "src1",
  1012. Message: xsql.Message{"a": 27.4},
  1013. }, {
  1014. Emitter: "src1",
  1015. Message: xsql.Message{"a": 123123.7},
  1016. },
  1017. },
  1018. },
  1019. },
  1020. result: []map[string]interface{}{{
  1021. "r": float64(53),
  1022. }, {
  1023. "r": float64(27),
  1024. }, {
  1025. "r": float64(123124),
  1026. }},
  1027. }, {
  1028. sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1029. data: xsql.JoinTupleSets{
  1030. xsql.JoinTuple{
  1031. Tuples: []xsql.Tuple{
  1032. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1033. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1034. },
  1035. },
  1036. xsql.JoinTuple{
  1037. Tuples: []xsql.Tuple{
  1038. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1039. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1040. },
  1041. },
  1042. xsql.JoinTuple{
  1043. Tuples: []xsql.Tuple{
  1044. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1045. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1046. },
  1047. },
  1048. },
  1049. result: []map[string]interface{}{{
  1050. "r": float64(66),
  1051. }, {
  1052. "r": float64(73),
  1053. }, {
  1054. "r": float64(89),
  1055. }},
  1056. }, {
  1057. sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1058. data: xsql.JoinTupleSets{
  1059. xsql.JoinTuple{
  1060. Tuples: []xsql.Tuple{
  1061. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1062. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1063. },
  1064. },
  1065. xsql.JoinTuple{
  1066. Tuples: []xsql.Tuple{
  1067. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1068. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1069. },
  1070. },
  1071. xsql.JoinTuple{
  1072. Tuples: []xsql.Tuple{
  1073. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1074. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1075. },
  1076. },
  1077. },
  1078. result: []map[string]interface{}{{
  1079. "concat": "165.5512",
  1080. }, {
  1081. "concat": "273.49934",
  1082. }, {
  1083. "concat": "388.886",
  1084. }},
  1085. }, {
  1086. sql: "SELECT count(a) as r FROM test",
  1087. data: &xsql.Tuple{
  1088. Emitter: "test",
  1089. Message: xsql.Message{
  1090. "a": 47.5,
  1091. },
  1092. },
  1093. result: []map[string]interface{}{{
  1094. "r": float64(1),
  1095. }},
  1096. }, {
  1097. sql: "SELECT meta(test.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1098. data: xsql.JoinTupleSets{
  1099. xsql.JoinTuple{
  1100. Tuples: []xsql.Tuple{
  1101. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}, Metadata: xsql.Metadata{"device": "devicea"}},
  1102. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1103. },
  1104. },
  1105. xsql.JoinTuple{
  1106. Tuples: []xsql.Tuple{
  1107. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1108. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1109. },
  1110. },
  1111. xsql.JoinTuple{
  1112. Tuples: []xsql.Tuple{
  1113. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}, Metadata: xsql.Metadata{"device": "devicec"}},
  1114. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1115. },
  1116. },
  1117. },
  1118. result: []map[string]interface{}{{
  1119. "d": "devicea",
  1120. }, {
  1121. "d": "deviceb",
  1122. }, {
  1123. "d": "devicec",
  1124. }},
  1125. },
  1126. }
  1127. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1128. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Funcs")
  1129. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1130. for i, tt := range tests {
  1131. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1132. if err != nil {
  1133. t.Error(err)
  1134. }
  1135. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  1136. pp.isTest = true
  1137. fv, afv := xsql.NewAggregateFunctionValuers()
  1138. result := pp.Apply(ctx, tt.data, fv, afv)
  1139. var mapRes []map[string]interface{}
  1140. if v, ok := result.([]byte); ok {
  1141. err := json.Unmarshal(v, &mapRes)
  1142. if err != nil {
  1143. t.Errorf("Failed to parse the input into map.\n")
  1144. continue
  1145. }
  1146. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1147. if !reflect.DeepEqual(tt.result, mapRes) {
  1148. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1149. }
  1150. } else {
  1151. t.Errorf("%d. The returned result is not type of []byte\n", i)
  1152. }
  1153. }
  1154. }
  1155. func TestProjectPlan_AggFuncs(t *testing.T) {
  1156. var tests = []struct {
  1157. sql string
  1158. data interface{}
  1159. result []map[string]interface{}
  1160. }{
  1161. {
  1162. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1163. data: xsql.GroupedTuplesSet{
  1164. {
  1165. &xsql.JoinTuple{
  1166. Tuples: []xsql.Tuple{
  1167. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1168. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1169. },
  1170. },
  1171. &xsql.JoinTuple{
  1172. Tuples: []xsql.Tuple{
  1173. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1174. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1175. },
  1176. },
  1177. },
  1178. {
  1179. &xsql.JoinTuple{
  1180. Tuples: []xsql.Tuple{
  1181. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1182. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1183. },
  1184. },
  1185. &xsql.JoinTuple{
  1186. Tuples: []xsql.Tuple{
  1187. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1188. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1189. },
  1190. },
  1191. },
  1192. },
  1193. result: []map[string]interface{}{{
  1194. "c": float64(2),
  1195. "r": float64(122),
  1196. }, {
  1197. "c": float64(2),
  1198. "r": float64(89),
  1199. }},
  1200. },
  1201. {
  1202. sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1203. data: xsql.GroupedTuplesSet{
  1204. {
  1205. &xsql.JoinTuple{
  1206. Tuples: []xsql.Tuple{
  1207. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1208. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1209. },
  1210. },
  1211. &xsql.JoinTuple{
  1212. Tuples: []xsql.Tuple{
  1213. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1214. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1215. },
  1216. },
  1217. &xsql.JoinTuple{
  1218. Tuples: []xsql.Tuple{
  1219. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 98.31}},
  1220. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1221. },
  1222. },
  1223. &xsql.JoinTuple{
  1224. Tuples: []xsql.Tuple{
  1225. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1226. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1227. },
  1228. },
  1229. },
  1230. {
  1231. &xsql.JoinTuple{
  1232. Tuples: []xsql.Tuple{
  1233. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1234. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1235. },
  1236. },
  1237. &xsql.JoinTuple{
  1238. Tuples: []xsql.Tuple{
  1239. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1240. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1241. },
  1242. },
  1243. },
  1244. },
  1245. result: []map[string]interface{}{{
  1246. "avg": 116.68,
  1247. }, {
  1248. "avg": 51.815,
  1249. }},
  1250. },
  1251. {
  1252. sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1253. data: xsql.GroupedTuplesSet{
  1254. {
  1255. &xsql.JoinTuple{
  1256. Tuples: []xsql.Tuple{
  1257. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1258. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1259. },
  1260. },
  1261. &xsql.JoinTuple{
  1262. Tuples: []xsql.Tuple{
  1263. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1264. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1265. },
  1266. },
  1267. &xsql.JoinTuple{
  1268. Tuples: []xsql.Tuple{
  1269. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1270. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1271. },
  1272. },
  1273. },
  1274. {
  1275. &xsql.JoinTuple{
  1276. Tuples: []xsql.Tuple{
  1277. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1278. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1279. },
  1280. },
  1281. &xsql.JoinTuple{
  1282. Tuples: []xsql.Tuple{
  1283. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1284. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1285. },
  1286. },
  1287. },
  1288. },
  1289. result: []map[string]interface{}{{
  1290. "max": 177.51,
  1291. }, {
  1292. "max": 89.03,
  1293. }},
  1294. },
  1295. {
  1296. sql: "SELECT min(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1297. data: xsql.JoinTupleSets{
  1298. xsql.JoinTuple{
  1299. Tuples: []xsql.Tuple{
  1300. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1301. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1302. },
  1303. },
  1304. xsql.JoinTuple{
  1305. Tuples: []xsql.Tuple{
  1306. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1307. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1308. },
  1309. },
  1310. xsql.JoinTuple{
  1311. Tuples: []xsql.Tuple{
  1312. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1313. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1314. },
  1315. },
  1316. },
  1317. result: []map[string]interface{}{{
  1318. "min": 68.55,
  1319. }},
  1320. }, {
  1321. sql: "SELECT min(a) as m FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1322. data: xsql.JoinTupleSets{
  1323. xsql.JoinTuple{
  1324. Tuples: []xsql.Tuple{
  1325. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "m": 68.55}},
  1326. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1327. },
  1328. },
  1329. xsql.JoinTuple{
  1330. Tuples: []xsql.Tuple{
  1331. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1332. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1333. },
  1334. },
  1335. xsql.JoinTuple{
  1336. Tuples: []xsql.Tuple{
  1337. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1338. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1339. },
  1340. },
  1341. },
  1342. result: []map[string]interface{}{{
  1343. "m": 68.55,
  1344. }},
  1345. }, {
  1346. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1347. data: xsql.WindowTuplesSet{
  1348. xsql.WindowTuples{
  1349. Emitter: "test",
  1350. Tuples: []xsql.Tuple{
  1351. {
  1352. Emitter: "src1",
  1353. Message: xsql.Message{"a": 53},
  1354. }, {
  1355. Emitter: "src1",
  1356. Message: xsql.Message{"a": 27},
  1357. }, {
  1358. Emitter: "src1",
  1359. Message: xsql.Message{"a": 123123},
  1360. },
  1361. },
  1362. },
  1363. },
  1364. result: []map[string]interface{}{{
  1365. "sum": float64(123203),
  1366. }},
  1367. }, {
  1368. sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
  1369. data: xsql.WindowTuplesSet{
  1370. xsql.WindowTuples{
  1371. Emitter: "test",
  1372. Tuples: []xsql.Tuple{
  1373. {
  1374. Emitter: "src1",
  1375. Message: xsql.Message{"a": 53, "s": 123203},
  1376. }, {
  1377. Emitter: "src1",
  1378. Message: xsql.Message{"a": 27},
  1379. }, {
  1380. Emitter: "src1",
  1381. Message: xsql.Message{"a": 123123},
  1382. },
  1383. },
  1384. },
  1385. },
  1386. result: []map[string]interface{}{{
  1387. "s": float64(123203),
  1388. }},
  1389. }, {
  1390. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1391. data: xsql.WindowTuplesSet{
  1392. xsql.WindowTuples{
  1393. Emitter: "test",
  1394. Tuples: []xsql.Tuple{
  1395. {
  1396. Emitter: "src1",
  1397. Message: xsql.Message{"a": 53},
  1398. }, {
  1399. Emitter: "src1",
  1400. Message: xsql.Message{"a": 27},
  1401. }, {
  1402. Emitter: "src1",
  1403. Message: xsql.Message{"a": 123123},
  1404. },
  1405. },
  1406. },
  1407. },
  1408. result: []map[string]interface{}{{
  1409. "sum": float64(123203),
  1410. }},
  1411. },
  1412. {
  1413. sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1414. data: xsql.GroupedTuplesSet{
  1415. {
  1416. &xsql.JoinTuple{
  1417. Tuples: []xsql.Tuple{
  1418. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1419. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1420. },
  1421. },
  1422. &xsql.JoinTuple{
  1423. Tuples: []xsql.Tuple{
  1424. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1425. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1426. },
  1427. },
  1428. },
  1429. {
  1430. &xsql.JoinTuple{
  1431. Tuples: []xsql.Tuple{
  1432. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1433. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1434. },
  1435. },
  1436. &xsql.JoinTuple{
  1437. Tuples: []xsql.Tuple{
  1438. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1439. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1440. },
  1441. },
  1442. },
  1443. },
  1444. result: []map[string]interface{}{{
  1445. "count": float64(2),
  1446. "meta": "devicea",
  1447. }, {
  1448. "count": float64(2),
  1449. "meta": "devicec",
  1450. }},
  1451. },
  1452. {
  1453. sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1454. data: xsql.GroupedTuplesSet{
  1455. {
  1456. &xsql.JoinTuple{
  1457. Tuples: []xsql.Tuple{
  1458. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
  1459. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1460. },
  1461. },
  1462. &xsql.JoinTuple{
  1463. Tuples: []xsql.Tuple{
  1464. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1465. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1466. },
  1467. },
  1468. },
  1469. {
  1470. &xsql.JoinTuple{
  1471. Tuples: []xsql.Tuple{
  1472. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
  1473. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1474. },
  1475. },
  1476. &xsql.JoinTuple{
  1477. Tuples: []xsql.Tuple{
  1478. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1479. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1480. },
  1481. },
  1482. },
  1483. },
  1484. result: []map[string]interface{}{{
  1485. "c": float64(2),
  1486. "d": "devicea",
  1487. }, {
  1488. "c": float64(2),
  1489. "d": "devicec",
  1490. }},
  1491. },
  1492. }
  1493. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1494. contextLogger := common.Log.WithField("rule", "TestProjectPlan_AggFuncs")
  1495. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1496. for i, tt := range tests {
  1497. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1498. if err != nil {
  1499. t.Error(err)
  1500. }
  1501. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true}
  1502. fv, afv := xsql.NewAggregateFunctionValuers()
  1503. result := pp.Apply(ctx, tt.data, fv, afv)
  1504. var mapRes []map[string]interface{}
  1505. if v, ok := result.([]byte); ok {
  1506. err := json.Unmarshal(v, &mapRes)
  1507. if err != nil {
  1508. t.Errorf("Failed to parse the input into map.\n")
  1509. continue
  1510. }
  1511. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1512. if !reflect.DeepEqual(tt.result, mapRes) {
  1513. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1514. }
  1515. } else {
  1516. t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
  1517. }
  1518. }
  1519. }
  1520. func TestProjectPlanError(t *testing.T) {
  1521. var tests = []struct {
  1522. sql string
  1523. data interface{}
  1524. result interface{}
  1525. }{
  1526. {
  1527. sql: "SELECT a FROM test",
  1528. data: errors.New("an error from upstream"),
  1529. result: errors.New("an error from upstream"),
  1530. }, {
  1531. sql: "SELECT a * 5 FROM test",
  1532. data: &xsql.Tuple{
  1533. Emitter: "test",
  1534. Message: xsql.Message{
  1535. "a": "val_a",
  1536. },
  1537. },
  1538. result: errors.New("run Select error: invalid operation string(val_a) * int64(5)"),
  1539. }, {
  1540. sql: `SELECT a[0]->b AS ab FROM test`,
  1541. data: &xsql.Tuple{
  1542. Emitter: "test",
  1543. Message: xsql.Message{
  1544. "a": "common string",
  1545. },
  1546. },
  1547. result: errors.New("run Select error: invalid operation string(common string) [] *xsql.BracketEvalResult(&{0 0})"),
  1548. }, {
  1549. sql: `SELECT round(a) as r FROM test`,
  1550. data: &xsql.Tuple{
  1551. Emitter: "test",
  1552. Message: xsql.Message{
  1553. "a": "common string",
  1554. },
  1555. },
  1556. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  1557. }, {
  1558. sql: `SELECT round(a) as r FROM test`,
  1559. data: &xsql.Tuple{
  1560. Emitter: "test",
  1561. Message: xsql.Message{
  1562. "abc": "common string",
  1563. },
  1564. },
  1565. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  1566. }, {
  1567. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1568. data: xsql.GroupedTuplesSet{
  1569. {
  1570. &xsql.JoinTuple{
  1571. Tuples: []xsql.Tuple{
  1572. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1573. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1574. },
  1575. },
  1576. &xsql.JoinTuple{
  1577. Tuples: []xsql.Tuple{
  1578. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1579. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1580. },
  1581. },
  1582. &xsql.JoinTuple{
  1583. Tuples: []xsql.Tuple{
  1584. {Emitter: "test", Message: xsql.Message{"id": 4, "a": "dde"}},
  1585. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1586. },
  1587. },
  1588. &xsql.JoinTuple{
  1589. Tuples: []xsql.Tuple{
  1590. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1591. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1592. },
  1593. },
  1594. },
  1595. {
  1596. &xsql.JoinTuple{
  1597. Tuples: []xsql.Tuple{
  1598. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1599. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1600. },
  1601. },
  1602. &xsql.JoinTuple{
  1603. Tuples: []xsql.Tuple{
  1604. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1605. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1606. },
  1607. },
  1608. },
  1609. },
  1610. result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
  1611. }, {
  1612. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  1613. data: xsql.WindowTuplesSet{
  1614. xsql.WindowTuples{
  1615. Emitter: "test",
  1616. Tuples: []xsql.Tuple{
  1617. {
  1618. Emitter: "src1",
  1619. Message: xsql.Message{"a": 53},
  1620. }, {
  1621. Emitter: "src1",
  1622. Message: xsql.Message{"a": "ddd"},
  1623. }, {
  1624. Emitter: "src1",
  1625. Message: xsql.Message{"a": 123123},
  1626. },
  1627. },
  1628. },
  1629. },
  1630. result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
  1631. }, {
  1632. sql: `SELECT a[0]->b AS ab FROM test`,
  1633. data: &xsql.Tuple{
  1634. Emitter: "test",
  1635. Message: xsql.Message{
  1636. "a": []map[string]interface{}(nil),
  1637. },
  1638. },
  1639. result: errors.New("run Select error: out of index: 0 of 0"),
  1640. },
  1641. }
  1642. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1643. contextLogger := common.Log.WithField("rule", "TestProjectPlanError")
  1644. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1645. for i, tt := range tests {
  1646. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1647. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  1648. pp.isTest = true
  1649. fv, afv := xsql.NewAggregateFunctionValuers()
  1650. result := pp.Apply(ctx, tt.data, fv, afv)
  1651. if !reflect.DeepEqual(tt.result, result) {
  1652. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1653. }
  1654. }
  1655. }