project_test.go 46 KB


  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestProjectPlan_Apply1(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data *xsql.Tuple
  17. result []map[string]interface{}
  18. }{
  19. {
  20. sql: "SELECT a FROM test",
  21. data: &xsql.Tuple{
  22. Emitter: "test",
  23. Message: xsql.Message{
  24. "a": "val_a",
  25. },
  26. Metadata: xsql.Metadata{
  27. "id": 45,
  28. "other": "mock",
  29. },
  30. },
  31. result: []map[string]interface{}{{
  32. "a": "val_a",
  33. "__meta": map[string]interface{}{
  34. "id": float64(45),
  35. "other": "mock",
  36. },
  37. }},
  38. },
  39. {
  40. sql: "SELECT b FROM test",
  41. data: &xsql.Tuple{
  42. Emitter: "test",
  43. Message: xsql.Message{
  44. "a": "val_a",
  45. },
  46. },
  47. result: []map[string]interface{}{{}},
  48. },
  49. {
  50. sql: "SELECT ts FROM test",
  51. data: &xsql.Tuple{
  52. Emitter: "test",
  53. Message: xsql.Message{
  54. "a": "val_a",
  55. "ts": common.TimeFromUnixMilli(1568854573431),
  56. },
  57. },
  58. result: []map[string]interface{}{{
  59. "ts": "2019-09-19T00:56:13.431Z",
  60. }},
  61. },
  62. //Schemaless may return a message without selecting column
  63. {
  64. sql: "SELECT ts FROM test",
  65. data: &xsql.Tuple{
  66. Emitter: "test",
  67. Message: xsql.Message{
  68. "a": "val_a",
  69. "ts2": common.TimeFromUnixMilli(1568854573431),
  70. },
  71. },
  72. result: []map[string]interface{}{{}},
  73. },
  74. {
  75. sql: "SELECT A FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "val_a",
  80. },
  81. },
  82. result: []map[string]interface{}{{
  83. "A": "val_a",
  84. }},
  85. },
  86. {
  87. sql: `SELECT "value" FROM test`,
  88. data: &xsql.Tuple{
  89. Emitter: "test",
  90. Message: xsql.Message{},
  91. },
  92. result: []map[string]interface{}{{
  93. DEFAULT_FIELD_NAME_PREFIX + "0": "value",
  94. }},
  95. },
  96. {
  97. sql: `SELECT 3.4 FROM test`,
  98. data: &xsql.Tuple{
  99. Emitter: "test",
  100. Message: xsql.Message{},
  101. },
  102. result: []map[string]interface{}{{
  103. DEFAULT_FIELD_NAME_PREFIX + "0": 3.4,
  104. }},
  105. },
  106. {
  107. sql: `SELECT 5 FROM test`,
  108. data: &xsql.Tuple{
  109. Emitter: "test",
  110. Message: xsql.Message{},
  111. },
  112. result: []map[string]interface{}{{
  113. DEFAULT_FIELD_NAME_PREFIX + "0": 5.0,
  114. }},
  115. },
  116. {
  117. sql: `SELECT a, "value" AS b FROM test`,
  118. data: &xsql.Tuple{
  119. Emitter: "test",
  120. Message: xsql.Message{
  121. "a": "val_a",
  122. },
  123. },
  124. result: []map[string]interface{}{{
  125. "a": "val_a",
  126. "b": "value",
  127. }},
  128. },
  129. {
  130. sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
  131. data: &xsql.Tuple{
  132. Emitter: "test",
  133. Message: xsql.Message{
  134. "a": "val_a",
  135. },
  136. },
  137. result: []map[string]interface{}{{
  138. "a": "val_a",
  139. "b": "value",
  140. "Pi": 3.14,
  141. "Zero": 0.0,
  142. }},
  143. },
  144. {
  145. sql: `SELECT a->b AS ab FROM test`,
  146. data: &xsql.Tuple{
  147. Emitter: "test",
  148. Message: xsql.Message{
  149. "a": map[string]interface{}{"b": "hello"},
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "ab": "hello",
  154. }},
  155. },
  156. {
  157. sql: `SELECT a->b AS ab FROM test`,
  158. data: &xsql.Tuple{
  159. Emitter: "test",
  160. Message: xsql.Message{
  161. "a": map[string]interface{}(nil),
  162. },
  163. },
  164. result: []map[string]interface{}{{}},
  165. },
  166. {
  167. sql: `SELECT a->b AS ab FROM test`,
  168. data: &xsql.Tuple{
  169. Emitter: "test",
  170. Message: xsql.Message{
  171. "name": "name",
  172. },
  173. },
  174. result: []map[string]interface{}{{}},
  175. },
  176. {
  177. sql: `SELECT a->b AS ab FROM test`,
  178. data: &xsql.Tuple{
  179. Emitter: "test",
  180. Message: xsql.Message{
  181. "a": "commonstring",
  182. },
  183. },
  184. result: []map[string]interface{}{{}},
  185. },
  186. {
  187. sql: `SELECT a[0]->b AS ab FROM test`,
  188. data: &xsql.Tuple{
  189. Emitter: "test",
  190. Message: xsql.Message{
  191. "a": []interface{}{
  192. map[string]interface{}{"b": "hello1"},
  193. map[string]interface{}{"b": "hello2"},
  194. },
  195. },
  196. },
  197. result: []map[string]interface{}{{
  198. "ab": "hello1",
  199. }},
  200. },
  201. {
  202. sql: `SELECT a[0]->b AS ab FROM test`,
  203. data: &xsql.Tuple{
  204. Emitter: "test",
  205. Message: xsql.Message{
  206. "a": []map[string]interface{}{
  207. {"b": "hello1"},
  208. {"b": "hello2"},
  209. },
  210. },
  211. },
  212. result: []map[string]interface{}{{
  213. "ab": "hello1",
  214. }},
  215. },
  216. {
  217. sql: `SELECT a[2:4] AS ab FROM test`,
  218. data: &xsql.Tuple{
  219. Emitter: "test",
  220. Message: xsql.Message{
  221. "a": []map[string]interface{}{
  222. {"b": "hello1"},
  223. {"b": "hello2"},
  224. {"b": "hello3"},
  225. {"b": "hello4"},
  226. {"b": "hello5"},
  227. },
  228. },
  229. },
  230. result: []map[string]interface{}{{
  231. "ab": []interface{}{
  232. map[string]interface{}{"b": "hello3"},
  233. map[string]interface{}{"b": "hello4"},
  234. },
  235. }},
  236. },
  237. {
  238. sql: `SELECT a[2:] AS ab FROM test`,
  239. data: &xsql.Tuple{
  240. Emitter: "test",
  241. Message: xsql.Message{
  242. "a": []map[string]interface{}{
  243. {"b": "hello1"},
  244. {"b": "hello2"},
  245. {"b": "hello3"},
  246. {"b": "hello4"},
  247. {"b": "hello5"},
  248. },
  249. },
  250. },
  251. result: []map[string]interface{}{{
  252. "ab": []interface{}{
  253. map[string]interface{}{"b": "hello3"},
  254. map[string]interface{}{"b": "hello4"},
  255. map[string]interface{}{"b": "hello5"},
  256. },
  257. }},
  258. },
  259. {
  260. sql: `SELECT a[2:] AS ab FROM test`,
  261. data: &xsql.Tuple{
  262. Emitter: "test",
  263. Message: xsql.Message{
  264. "a": []interface{}{
  265. true, false, true, false, true, true,
  266. },
  267. },
  268. },
  269. result: []map[string]interface{}{{
  270. "ab": []interface{}{
  271. true, false, true, true,
  272. },
  273. }},
  274. },
  275. {
  276. sql: `SELECT a[:4] AS ab FROM test`,
  277. data: &xsql.Tuple{
  278. Emitter: "test",
  279. Message: xsql.Message{
  280. "a": []interface{}{
  281. true, false, true, false, true, true,
  282. },
  283. },
  284. },
  285. result: []map[string]interface{}{{
  286. "ab": []interface{}{
  287. true, false, true, false,
  288. },
  289. }},
  290. },
  291. {
  292. sql: `SELECT a[:4] AS ab FROM test`,
  293. data: &xsql.Tuple{
  294. Emitter: "test",
  295. Message: xsql.Message{
  296. "a": []interface{}{
  297. 3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926,
  298. },
  299. },
  300. },
  301. result: []map[string]interface{}{{
  302. "ab": []interface{}{
  303. 3.14, 3.141, 3.1415, 3.14159,
  304. },
  305. }},
  306. },
  307. {
  308. sql: `SELECT a->b[:4] AS ab FROM test`,
  309. data: &xsql.Tuple{
  310. Emitter: "test",
  311. Message: xsql.Message{
  312. "a": map[string]interface{}{
  313. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  314. },
  315. },
  316. },
  317. result: []map[string]interface{}{{
  318. "ab": []interface{}{
  319. 3.14, 3.141, 3.1415, 3.14159,
  320. },
  321. }},
  322. },
  323. {
  324. sql: `SELECT a->b[0:1] AS ab FROM test`,
  325. data: &xsql.Tuple{
  326. Emitter: "test",
  327. Message: xsql.Message{
  328. "a": map[string]interface{}{
  329. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  330. },
  331. },
  332. },
  333. result: []map[string]interface{}{{
  334. "ab": []interface{}{
  335. 3.14,
  336. },
  337. }},
  338. },
  339. {
  340. sql: `SELECT a->c->d AS f1 FROM test`,
  341. data: &xsql.Tuple{
  342. Emitter: "test",
  343. Message: xsql.Message{
  344. "a": map[string]interface{}{
  345. "b": "hello",
  346. "c": map[string]interface{}{
  347. "d": 35.2,
  348. },
  349. },
  350. },
  351. },
  352. result: []map[string]interface{}{{
  353. "f1": 35.2,
  354. }},
  355. },
  356. {
  357. sql: `SELECT a->c->d AS f1 FROM test`,
  358. data: &xsql.Tuple{
  359. Emitter: "test",
  360. Message: xsql.Message{
  361. "a": map[string]interface{}{
  362. "b": "hello",
  363. "c": map[string]interface{}{
  364. "e": 35.2,
  365. },
  366. },
  367. },
  368. },
  369. result: []map[string]interface{}{{}},
  370. },
  371. {
  372. sql: `SELECT a->c->d AS f1 FROM test`,
  373. data: &xsql.Tuple{
  374. Emitter: "test",
  375. Message: xsql.Message{
  376. "a": map[string]interface{}{
  377. "b": "hello",
  378. },
  379. },
  380. },
  381. result: []map[string]interface{}{{}},
  382. },
  383. //The int type is not supported yet, the json parser returns float64 for int values
  384. {
  385. sql: `SELECT a->c->d AS f1 FROM test`,
  386. data: &xsql.Tuple{
  387. Emitter: "test",
  388. Message: xsql.Message{
  389. "a": map[string]interface{}{
  390. "b": "hello",
  391. "c": map[string]interface{}{
  392. "d": float64(35),
  393. },
  394. },
  395. },
  396. },
  397. result: []map[string]interface{}{{
  398. "f1": float64(35),
  399. }},
  400. },
  401. {
  402. sql: "SELECT a FROM test",
  403. data: &xsql.Tuple{
  404. Emitter: "test",
  405. Message: xsql.Message{},
  406. },
  407. result: []map[string]interface{}{
  408. {},
  409. },
  410. },
  411. {
  412. sql: "SELECT * FROM test",
  413. data: &xsql.Tuple{
  414. Emitter: "test",
  415. Message: xsql.Message{},
  416. },
  417. result: []map[string]interface{}{
  418. {},
  419. },
  420. },
  421. {
  422. sql: `SELECT * FROM test`,
  423. data: &xsql.Tuple{
  424. Emitter: "test",
  425. Message: xsql.Message{
  426. "a": map[string]interface{}{
  427. "b": "hello",
  428. "c": map[string]interface{}{
  429. "d": 35.2,
  430. },
  431. },
  432. },
  433. },
  434. result: []map[string]interface{}{{
  435. "a": map[string]interface{}{
  436. "b": "hello",
  437. "c": map[string]interface{}{
  438. "d": 35.2,
  439. },
  440. },
  441. }},
  442. },
  443. {
  444. sql: `SELECT * FROM test`,
  445. data: &xsql.Tuple{
  446. Emitter: "test",
  447. Message: xsql.Message{
  448. "a": "val1",
  449. "b": 3.14,
  450. },
  451. },
  452. result: []map[string]interface{}{{
  453. "a": "val1",
  454. "b": 3.14,
  455. }},
  456. },
  457. {
  458. sql: `SELECT 3*4 AS f1 FROM test`,
  459. data: &xsql.Tuple{
  460. Emitter: "test",
  461. Message: xsql.Message{},
  462. },
  463. result: []map[string]interface{}{{
  464. "f1": float64(12),
  465. }},
  466. },
  467. {
  468. sql: `SELECT 4.5*2 AS f1 FROM test`,
  469. data: &xsql.Tuple{
  470. Emitter: "test",
  471. Message: xsql.Message{},
  472. },
  473. result: []map[string]interface{}{{
  474. "f1": float64(9),
  475. }},
  476. },
  477. {
  478. sql: "SELECT `a.b.c` FROM test",
  479. data: &xsql.Tuple{
  480. Emitter: "test",
  481. Message: xsql.Message{
  482. "a.b.c": "val_a",
  483. },
  484. },
  485. result: []map[string]interface{}{{
  486. "a.b.c": "val_a",
  487. }},
  488. },
  489. }
  490. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  491. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Apply1")
  492. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  493. for i, tt := range tests {
  494. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  495. pp := &ProjectPlan{Fields: stmt.Fields, SendMeta: true}
  496. pp.isTest = true
  497. fv, afv := xsql.NewFunctionValuersForOp(nil)
  498. result := pp.Apply(ctx, tt.data, fv, afv)
  499. var mapRes []map[string]interface{}
  500. if v, ok := result.([]byte); ok {
  501. err := json.Unmarshal(v, &mapRes)
  502. if err != nil {
  503. t.Errorf("Failed to parse the input into map.\n")
  504. continue
  505. }
  506. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  507. if !reflect.DeepEqual(tt.result, mapRes) {
  508. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  509. }
  510. } else {
  511. t.Errorf("%d. The returned result %#v is not type of []byte\n", result, i)
  512. }
  513. }
  514. }
  515. func TestProjectPlan_MultiInput(t *testing.T) {
  516. var tests = []struct {
  517. sql string
  518. data interface{}
  519. result []map[string]interface{}
  520. }{
  521. {
  522. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  523. data: &xsql.Tuple{
  524. Emitter: "tbl",
  525. Message: xsql.Message{
  526. "abc": int64(6),
  527. },
  528. },
  529. result: []map[string]interface{}{{
  530. "abc": float64(6), //json marshall problem
  531. }},
  532. },
  533. {
  534. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  535. data: &xsql.Tuple{
  536. Emitter: "tbl",
  537. Message: xsql.Message{
  538. "abc": int64(34),
  539. "def": "hello",
  540. },
  541. },
  542. result: []map[string]interface{}{{
  543. "abc": float64(34),
  544. }},
  545. },
  546. {
  547. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  548. data: xsql.WindowTuplesSet{
  549. xsql.WindowTuples{
  550. Emitter: "src1",
  551. Tuples: []xsql.Tuple{
  552. {
  553. Emitter: "src1",
  554. Message: xsql.Message{"id1": 1, "f1": "v1"},
  555. }, {
  556. Emitter: "src1",
  557. Message: xsql.Message{"id1": 2, "f1": "v2"},
  558. }, {
  559. Emitter: "src1",
  560. Message: xsql.Message{"id1": 3, "f1": "v1"},
  561. },
  562. },
  563. },
  564. },
  565. result: []map[string]interface{}{{
  566. "id1": float64(1),
  567. }, {
  568. "id1": float64(2),
  569. }, {
  570. "id1": float64(3),
  571. }},
  572. },
  573. {
  574. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  575. data: xsql.WindowTuplesSet{
  576. xsql.WindowTuples{
  577. Emitter: "src1",
  578. Tuples: []xsql.Tuple{
  579. {
  580. Emitter: "src1",
  581. Message: xsql.Message{"id1": 1, "f1": "v1"},
  582. }, {
  583. Emitter: "src1",
  584. Message: xsql.Message{"id2": 2, "f1": "v2"},
  585. }, {
  586. Emitter: "src1",
  587. Message: xsql.Message{"id1": 3, "f1": "v1"},
  588. },
  589. },
  590. },
  591. },
  592. result: []map[string]interface{}{{
  593. "id1": float64(1),
  594. }, {}, {
  595. "id1": float64(3),
  596. }},
  597. },
  598. {
  599. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  600. data: xsql.WindowTuplesSet{
  601. xsql.WindowTuples{
  602. Emitter: "src1",
  603. Tuples: []xsql.Tuple{
  604. {
  605. Emitter: "src1",
  606. Message: xsql.Message{"id1": 1, "f1": "v1"},
  607. }, {
  608. Emitter: "src1",
  609. Message: xsql.Message{"id1": 2, "f1": "v2"},
  610. }, {
  611. Emitter: "src1",
  612. Message: xsql.Message{"id1": 3, "f1": "v1"},
  613. },
  614. },
  615. },
  616. },
  617. result: []map[string]interface{}{{
  618. "id1": float64(1),
  619. "f1": "v1",
  620. }, {
  621. "id1": float64(2),
  622. "f1": "v2",
  623. }, {
  624. "id1": float64(3),
  625. "f1": "v1",
  626. }},
  627. },
  628. {
  629. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  630. data: xsql.WindowTuplesSet{
  631. xsql.WindowTuples{
  632. Emitter: "src1",
  633. Tuples: []xsql.Tuple{
  634. {
  635. Emitter: "src1",
  636. Message: xsql.Message{"id1": 1, "f1": "v1"},
  637. }, {
  638. Emitter: "src1",
  639. Message: xsql.Message{"id2": 2, "f2": "v2"},
  640. }, {
  641. Emitter: "src1",
  642. Message: xsql.Message{"id1": 3, "f1": "v1"},
  643. },
  644. },
  645. },
  646. },
  647. result: []map[string]interface{}{{
  648. "id1": float64(1),
  649. "f1": "v1",
  650. }, {
  651. "id2": float64(2),
  652. "f2": "v2",
  653. }, {
  654. "id1": float64(3),
  655. "f1": "v1",
  656. }},
  657. },
  658. {
  659. sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  660. data: xsql.WindowTuplesSet{
  661. xsql.WindowTuples{
  662. Emitter: "src1",
  663. Tuples: []xsql.Tuple{
  664. {
  665. Emitter: "src1",
  666. Message: xsql.Message{"id1": 1, "f1": "v1"},
  667. }, {
  668. Emitter: "src1",
  669. Message: xsql.Message{"id1": 2, "f1": "v2"},
  670. }, {
  671. Emitter: "src1",
  672. Message: xsql.Message{"id1": 3, "f1": "v1"},
  673. },
  674. },
  675. },
  676. },
  677. result: []map[string]interface{}{{
  678. "id1": float64(1),
  679. "f1": "v1",
  680. }, {
  681. "id1": float64(2),
  682. "f1": "v2",
  683. }, {
  684. "id1": float64(3),
  685. "f1": "v1",
  686. }},
  687. },
  688. {
  689. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  690. data: xsql.JoinTupleSets{
  691. xsql.JoinTuple{
  692. Tuples: []xsql.Tuple{
  693. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  694. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  695. },
  696. },
  697. xsql.JoinTuple{
  698. Tuples: []xsql.Tuple{
  699. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  700. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  701. },
  702. },
  703. xsql.JoinTuple{
  704. Tuples: []xsql.Tuple{
  705. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  706. },
  707. },
  708. },
  709. result: []map[string]interface{}{{
  710. "id1": float64(1),
  711. }, {
  712. "id1": float64(2),
  713. }, {
  714. "id1": float64(3),
  715. }},
  716. },
  717. {
  718. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  719. data: xsql.JoinTupleSets{
  720. xsql.JoinTuple{
  721. Tuples: []xsql.Tuple{
  722. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  723. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  724. },
  725. },
  726. xsql.JoinTuple{
  727. Tuples: []xsql.Tuple{
  728. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  729. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  730. },
  731. },
  732. xsql.JoinTuple{
  733. Tuples: []xsql.Tuple{
  734. {Emitter: "src1", Message: xsql.Message{"id2": 3, "f1": "v1"}},
  735. },
  736. },
  737. },
  738. result: []map[string]interface{}{{
  739. "id1": float64(1),
  740. }, {
  741. "id1": float64(2),
  742. }, {}},
  743. },
  744. {
  745. sql: "SELECT abc FROM tbl group by abc",
  746. data: xsql.GroupedTuplesSet{
  747. {
  748. &xsql.Tuple{
  749. Emitter: "tbl",
  750. Message: xsql.Message{
  751. "abc": int64(6),
  752. "def": "hello",
  753. },
  754. },
  755. },
  756. },
  757. result: []map[string]interface{}{{
  758. "abc": float64(6),
  759. }},
  760. },
  761. {
  762. sql: "SELECT abc FROM tbl group by abc",
  763. data: xsql.GroupedTuplesSet{
  764. {
  765. &xsql.Tuple{
  766. Emitter: "tbl",
  767. Message: xsql.Message{
  768. "def": "hello",
  769. },
  770. },
  771. },
  772. },
  773. result: []map[string]interface{}{{}},
  774. },
  775. {
  776. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  777. data: xsql.GroupedTuplesSet{
  778. {
  779. &xsql.Tuple{
  780. Emitter: "src1",
  781. Message: xsql.Message{"id1": 1, "f1": "v1"},
  782. },
  783. &xsql.Tuple{
  784. Emitter: "src1",
  785. Message: xsql.Message{"id1": 3, "f1": "v1"},
  786. },
  787. },
  788. {
  789. &xsql.Tuple{
  790. Emitter: "src1",
  791. Message: xsql.Message{"id1": 2, "f1": "v2"},
  792. },
  793. },
  794. },
  795. result: []map[string]interface{}{{
  796. "id1": float64(1),
  797. }, {
  798. "id1": float64(2),
  799. }},
  800. },
  801. {
  802. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  803. data: xsql.GroupedTuplesSet{
  804. {
  805. &xsql.Tuple{
  806. Emitter: "src1",
  807. Message: xsql.Message{"id1": 1, "f1": "v1"},
  808. },
  809. &xsql.Tuple{
  810. Emitter: "src1",
  811. Message: xsql.Message{"id1": 3, "f1": "v1"},
  812. },
  813. },
  814. {
  815. &xsql.Tuple{
  816. Emitter: "src1",
  817. Message: xsql.Message{"id2": 2, "f1": "v2"},
  818. },
  819. },
  820. },
  821. result: []map[string]interface{}{{
  822. "id1": float64(1),
  823. }, {}},
  824. },
  825. {
  826. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  827. data: xsql.GroupedTuplesSet{
  828. {
  829. &xsql.JoinTuple{
  830. Tuples: []xsql.Tuple{
  831. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  832. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  833. },
  834. },
  835. },
  836. {
  837. &xsql.JoinTuple{
  838. Tuples: []xsql.Tuple{
  839. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  840. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  841. },
  842. },
  843. },
  844. {
  845. &xsql.JoinTuple{
  846. Tuples: []xsql.Tuple{
  847. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  848. },
  849. },
  850. },
  851. },
  852. result: []map[string]interface{}{{
  853. "id2": float64(2),
  854. }, {
  855. "id2": float64(4),
  856. }, {}},
  857. },
  858. {
  859. sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  860. data: xsql.JoinTupleSets{
  861. xsql.JoinTuple{
  862. Tuples: []xsql.Tuple{
  863. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  864. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  865. },
  866. },
  867. xsql.JoinTuple{
  868. Tuples: []xsql.Tuple{
  869. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  870. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  871. },
  872. },
  873. xsql.JoinTuple{
  874. Tuples: []xsql.Tuple{
  875. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  876. },
  877. },
  878. },
  879. result: []map[string]interface{}{{
  880. "id1": float64(1),
  881. "f1": "v1",
  882. "f2": "w2",
  883. }, {
  884. "id1": float64(2),
  885. "f1": "v2",
  886. "f2": "w3",
  887. }, {
  888. "id1": float64(3),
  889. "f1": "v1",
  890. }},
  891. },
  892. {
  893. sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  894. data: xsql.JoinTupleSets{
  895. xsql.JoinTuple{
  896. Tuples: []xsql.Tuple{
  897. {Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
  898. {Emitter: "src2", Message: xsql.Message{"id": 2, "f2": "w2"}},
  899. },
  900. },
  901. xsql.JoinTuple{
  902. Tuples: []xsql.Tuple{
  903. {Emitter: "src1", Message: xsql.Message{"id": 2, "f1": "v2"}},
  904. {Emitter: "src2", Message: xsql.Message{"id": 4, "f2": "w3"}},
  905. },
  906. },
  907. xsql.JoinTuple{
  908. Tuples: []xsql.Tuple{
  909. {Emitter: "src1", Message: xsql.Message{"id": 3, "f1": "v1"}},
  910. },
  911. },
  912. },
  913. result: []map[string]interface{}{{
  914. "id": float64(1),
  915. "f1": "v1",
  916. "f2": "w2",
  917. }, {
  918. "id": float64(2),
  919. "f1": "v2",
  920. "f2": "w3",
  921. }, {
  922. "id": float64(3),
  923. "f1": "v1",
  924. }},
  925. },
  926. {
  927. sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  928. data: xsql.GroupedTuplesSet{
  929. {
  930. &xsql.Tuple{
  931. Emitter: "src1",
  932. Message: xsql.Message{"id1": 1, "f1": "v1"},
  933. },
  934. &xsql.Tuple{
  935. Emitter: "src1",
  936. Message: xsql.Message{"id1": 3, "f1": "v1"},
  937. },
  938. },
  939. {
  940. &xsql.Tuple{
  941. Emitter: "src1",
  942. Message: xsql.Message{"id1": 2, "f1": "v2"},
  943. },
  944. },
  945. },
  946. result: []map[string]interface{}{{
  947. "id1": float64(1),
  948. "f1": "v1",
  949. }, {
  950. "id1": float64(2),
  951. "f1": "v2",
  952. }},
  953. },
  954. {
  955. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  956. data: xsql.GroupedTuplesSet{
  957. {
  958. &xsql.JoinTuple{
  959. Tuples: []xsql.Tuple{
  960. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  961. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  962. },
  963. },
  964. },
  965. {
  966. &xsql.JoinTuple{
  967. Tuples: []xsql.Tuple{
  968. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  969. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  970. },
  971. },
  972. },
  973. {
  974. &xsql.JoinTuple{
  975. Tuples: []xsql.Tuple{
  976. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  977. },
  978. },
  979. },
  980. },
  981. result: []map[string]interface{}{{
  982. "id2": float64(2),
  983. "id1": float64(1),
  984. "f1": "v1",
  985. }, {
  986. "id2": float64(4),
  987. "id1": float64(2),
  988. "f1": "v2",
  989. }, {
  990. "id1": float64(3),
  991. "f1": "v1",
  992. }},
  993. },
  994. {
  995. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  996. data: xsql.GroupedTuplesSet{
  997. {
  998. &xsql.JoinTuple{
  999. Tuples: []xsql.Tuple{
  1000. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1001. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1002. },
  1003. },
  1004. },
  1005. {
  1006. &xsql.JoinTuple{
  1007. Tuples: []xsql.Tuple{
  1008. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1009. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1010. },
  1011. },
  1012. },
  1013. {
  1014. &xsql.JoinTuple{
  1015. Tuples: []xsql.Tuple{
  1016. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  1017. },
  1018. },
  1019. },
  1020. },
  1021. result: []map[string]interface{}{{
  1022. "id2": float64(2),
  1023. "id1": float64(1),
  1024. "f1": "v1",
  1025. }, {
  1026. "id2": float64(4),
  1027. "id1": float64(2),
  1028. "f1": "v2",
  1029. }, {
  1030. "id1": float64(3),
  1031. "f1": "v1",
  1032. }},
  1033. },
  1034. }
  1035. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1036. contextLogger := common.Log.WithField("rule", "TestProjectPlan_MultiInput")
  1037. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1038. for i, tt := range tests {
  1039. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1040. pp := &ProjectPlan{Fields: stmt.Fields}
  1041. pp.isTest = true
  1042. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1043. result := pp.Apply(ctx, tt.data, fv, afv)
  1044. var mapRes []map[string]interface{}
  1045. if v, ok := result.([]byte); ok {
  1046. err := json.Unmarshal(v, &mapRes)
  1047. if err != nil {
  1048. t.Errorf("Failed to parse the input into map.\n")
  1049. continue
  1050. }
  1051. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1052. if !reflect.DeepEqual(tt.result, mapRes) {
  1053. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1054. }
  1055. } else {
  1056. t.Errorf("The returned result is not type of []byte\n")
  1057. }
  1058. }
  1059. }
  1060. func TestProjectPlan_Funcs(t *testing.T) {
  1061. var tests = []struct {
  1062. sql string
  1063. data interface{}
  1064. result []map[string]interface{}
  1065. }{
  1066. {
  1067. sql: "SELECT round(a) as r FROM test",
  1068. data: &xsql.Tuple{
  1069. Emitter: "test",
  1070. Message: xsql.Message{
  1071. "a": 47.5,
  1072. },
  1073. },
  1074. result: []map[string]interface{}{{
  1075. "r": float64(48),
  1076. }},
  1077. }, {
  1078. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1079. data: xsql.WindowTuplesSet{
  1080. xsql.WindowTuples{
  1081. Emitter: "test",
  1082. Tuples: []xsql.Tuple{
  1083. {
  1084. Emitter: "src1",
  1085. Message: xsql.Message{"a": 53.1},
  1086. }, {
  1087. Emitter: "src1",
  1088. Message: xsql.Message{"a": 27.4},
  1089. }, {
  1090. Emitter: "src1",
  1091. Message: xsql.Message{"a": 123123.7},
  1092. },
  1093. },
  1094. },
  1095. },
  1096. result: []map[string]interface{}{{
  1097. "r": float64(53),
  1098. }, {
  1099. "r": float64(27),
  1100. }, {
  1101. "r": float64(123124),
  1102. }},
  1103. }, {
  1104. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1105. data: xsql.WindowTuplesSet{
  1106. xsql.WindowTuples{
  1107. Emitter: "test",
  1108. Tuples: []xsql.Tuple{
  1109. {
  1110. Emitter: "src1",
  1111. Message: xsql.Message{"a": 53.1},
  1112. }, {
  1113. Emitter: "src1",
  1114. Message: xsql.Message{"a": 27.4},
  1115. }, {
  1116. Emitter: "src1",
  1117. Message: xsql.Message{"a": 123123.7},
  1118. },
  1119. },
  1120. },
  1121. },
  1122. result: []map[string]interface{}{{
  1123. "r": float64(53),
  1124. }, {
  1125. "r": float64(27),
  1126. }, {
  1127. "r": float64(123124),
  1128. }},
  1129. }, {
  1130. sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1131. data: xsql.JoinTupleSets{
  1132. xsql.JoinTuple{
  1133. Tuples: []xsql.Tuple{
  1134. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1135. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1136. },
  1137. },
  1138. xsql.JoinTuple{
  1139. Tuples: []xsql.Tuple{
  1140. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1141. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1142. },
  1143. },
  1144. xsql.JoinTuple{
  1145. Tuples: []xsql.Tuple{
  1146. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1147. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1148. },
  1149. },
  1150. },
  1151. result: []map[string]interface{}{{
  1152. "r": float64(66),
  1153. }, {
  1154. "r": float64(73),
  1155. }, {
  1156. "r": float64(89),
  1157. }},
  1158. }, {
  1159. sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1160. data: xsql.JoinTupleSets{
  1161. xsql.JoinTuple{
  1162. Tuples: []xsql.Tuple{
  1163. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1164. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1165. },
  1166. },
  1167. xsql.JoinTuple{
  1168. Tuples: []xsql.Tuple{
  1169. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1170. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1171. },
  1172. },
  1173. xsql.JoinTuple{
  1174. Tuples: []xsql.Tuple{
  1175. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1176. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1177. },
  1178. },
  1179. },
  1180. result: []map[string]interface{}{{
  1181. "concat": "165.5512",
  1182. }, {
  1183. "concat": "273.49934",
  1184. }, {
  1185. "concat": "388.886",
  1186. }},
  1187. }, {
  1188. sql: "SELECT count(a) as r FROM test",
  1189. data: &xsql.Tuple{
  1190. Emitter: "test",
  1191. Message: xsql.Message{
  1192. "a": 47.5,
  1193. },
  1194. },
  1195. result: []map[string]interface{}{{
  1196. "r": float64(1),
  1197. }},
  1198. }, {
  1199. sql: "SELECT meta(test.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1200. data: xsql.JoinTupleSets{
  1201. xsql.JoinTuple{
  1202. Tuples: []xsql.Tuple{
  1203. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}, Metadata: xsql.Metadata{"device": "devicea"}},
  1204. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1205. },
  1206. },
  1207. xsql.JoinTuple{
  1208. Tuples: []xsql.Tuple{
  1209. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1210. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1211. },
  1212. },
  1213. xsql.JoinTuple{
  1214. Tuples: []xsql.Tuple{
  1215. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}, Metadata: xsql.Metadata{"device": "devicec"}},
  1216. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1217. },
  1218. },
  1219. },
  1220. result: []map[string]interface{}{{
  1221. "d": "devicea",
  1222. }, {
  1223. "d": "deviceb",
  1224. }, {
  1225. "d": "devicec",
  1226. }},
  1227. },
  1228. }
  1229. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1230. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Funcs")
  1231. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1232. for i, tt := range tests {
  1233. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1234. if err != nil {
  1235. t.Error(err)
  1236. }
  1237. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  1238. pp.isTest = true
  1239. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1240. result := pp.Apply(ctx, tt.data, fv, afv)
  1241. var mapRes []map[string]interface{}
  1242. if v, ok := result.([]byte); ok {
  1243. err := json.Unmarshal(v, &mapRes)
  1244. if err != nil {
  1245. t.Errorf("Failed to parse the input into map.\n")
  1246. continue
  1247. }
  1248. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1249. if !reflect.DeepEqual(tt.result, mapRes) {
  1250. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1251. }
  1252. } else {
  1253. t.Errorf("%d. The returned result is not type of []byte\n", i)
  1254. }
  1255. }
  1256. }
  1257. func TestProjectPlan_AggFuncs(t *testing.T) {
  1258. var tests = []struct {
  1259. sql string
  1260. data interface{}
  1261. result []map[string]interface{}
  1262. }{
  1263. {
  1264. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1265. data: xsql.GroupedTuplesSet{
  1266. {
  1267. &xsql.JoinTuple{
  1268. Tuples: []xsql.Tuple{
  1269. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1270. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1271. },
  1272. },
  1273. &xsql.JoinTuple{
  1274. Tuples: []xsql.Tuple{
  1275. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1276. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1277. },
  1278. },
  1279. },
  1280. {
  1281. &xsql.JoinTuple{
  1282. Tuples: []xsql.Tuple{
  1283. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1284. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1285. },
  1286. },
  1287. &xsql.JoinTuple{
  1288. Tuples: []xsql.Tuple{
  1289. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1290. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1291. },
  1292. },
  1293. },
  1294. },
  1295. result: []map[string]interface{}{{
  1296. "c": float64(2),
  1297. "r": float64(122),
  1298. }, {
  1299. "c": float64(2),
  1300. "r": float64(89),
  1301. }},
  1302. },
  1303. {
  1304. sql: "SELECT count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1305. data: xsql.GroupedTuplesSet{
  1306. {
  1307. &xsql.JoinTuple{
  1308. Tuples: []xsql.Tuple{
  1309. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1310. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1311. },
  1312. },
  1313. &xsql.JoinTuple{
  1314. Tuples: []xsql.Tuple{
  1315. {Emitter: "test", Message: xsql.Message{"id": 5}},
  1316. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1317. },
  1318. },
  1319. },
  1320. {
  1321. &xsql.JoinTuple{
  1322. Tuples: []xsql.Tuple{
  1323. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1324. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1325. },
  1326. },
  1327. &xsql.JoinTuple{
  1328. Tuples: []xsql.Tuple{
  1329. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1330. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1331. },
  1332. },
  1333. },
  1334. },
  1335. result: []map[string]interface{}{{
  1336. "c": float64(1),
  1337. "a": 122.33,
  1338. "s": 122.33,
  1339. "min": 122.33,
  1340. "max": 122.33,
  1341. }, {
  1342. "c": float64(2),
  1343. "s": 103.63,
  1344. "a": 51.815,
  1345. "min": 14.6,
  1346. "max": 89.03,
  1347. }},
  1348. },
  1349. {
  1350. sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1351. data: xsql.GroupedTuplesSet{
  1352. {
  1353. &xsql.JoinTuple{
  1354. Tuples: []xsql.Tuple{
  1355. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1356. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1357. },
  1358. },
  1359. &xsql.JoinTuple{
  1360. Tuples: []xsql.Tuple{
  1361. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1362. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1363. },
  1364. },
  1365. &xsql.JoinTuple{
  1366. Tuples: []xsql.Tuple{
  1367. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 98.31}},
  1368. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1369. },
  1370. },
  1371. &xsql.JoinTuple{
  1372. Tuples: []xsql.Tuple{
  1373. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1374. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1375. },
  1376. },
  1377. },
  1378. {
  1379. &xsql.JoinTuple{
  1380. Tuples: []xsql.Tuple{
  1381. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1382. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1383. },
  1384. },
  1385. &xsql.JoinTuple{
  1386. Tuples: []xsql.Tuple{
  1387. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1388. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1389. },
  1390. },
  1391. },
  1392. },
  1393. result: []map[string]interface{}{{
  1394. "avg": 116.68,
  1395. }, {
  1396. "avg": 51.815,
  1397. }},
  1398. },
  1399. {
  1400. sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1401. data: xsql.GroupedTuplesSet{
  1402. {
  1403. &xsql.JoinTuple{
  1404. Tuples: []xsql.Tuple{
  1405. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1406. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1407. },
  1408. },
  1409. &xsql.JoinTuple{
  1410. Tuples: []xsql.Tuple{
  1411. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1412. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1413. },
  1414. },
  1415. &xsql.JoinTuple{
  1416. Tuples: []xsql.Tuple{
  1417. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1418. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1419. },
  1420. },
  1421. },
  1422. {
  1423. &xsql.JoinTuple{
  1424. Tuples: []xsql.Tuple{
  1425. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1426. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1427. },
  1428. },
  1429. &xsql.JoinTuple{
  1430. Tuples: []xsql.Tuple{
  1431. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1432. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1433. },
  1434. },
  1435. },
  1436. },
  1437. result: []map[string]interface{}{{
  1438. "max": 177.51,
  1439. }, {
  1440. "max": 89.03,
  1441. }},
  1442. },
  1443. {
  1444. sql: "SELECT min(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1445. data: xsql.JoinTupleSets{
  1446. xsql.JoinTuple{
  1447. Tuples: []xsql.Tuple{
  1448. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1449. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1450. },
  1451. },
  1452. xsql.JoinTuple{
  1453. Tuples: []xsql.Tuple{
  1454. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1455. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1456. },
  1457. },
  1458. xsql.JoinTuple{
  1459. Tuples: []xsql.Tuple{
  1460. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1461. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1462. },
  1463. },
  1464. },
  1465. result: []map[string]interface{}{{
  1466. "min": 68.55,
  1467. }},
  1468. }, {
  1469. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1470. data: xsql.JoinTupleSets{
  1471. xsql.JoinTuple{
  1472. Tuples: []xsql.Tuple{
  1473. {Emitter: "test", Message: xsql.Message{"id": 1}},
  1474. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1475. },
  1476. },
  1477. xsql.JoinTuple{
  1478. Tuples: []xsql.Tuple{
  1479. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1480. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1481. },
  1482. },
  1483. xsql.JoinTuple{
  1484. Tuples: []xsql.Tuple{
  1485. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1486. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1487. },
  1488. },
  1489. },
  1490. result: []map[string]interface{}{{
  1491. "all": float64(3),
  1492. "c": float64(2),
  1493. "a": 123.03,
  1494. "s": 246.06,
  1495. "min": 68.55,
  1496. "max": 177.51,
  1497. }},
  1498. }, {
  1499. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1500. data: xsql.WindowTuplesSet{
  1501. xsql.WindowTuples{
  1502. Emitter: "test",
  1503. Tuples: []xsql.Tuple{
  1504. {
  1505. Emitter: "src1",
  1506. Message: xsql.Message{"a": 53},
  1507. }, {
  1508. Emitter: "src1",
  1509. Message: xsql.Message{"a": 27},
  1510. }, {
  1511. Emitter: "src1",
  1512. Message: xsql.Message{"a": 123123},
  1513. },
  1514. },
  1515. },
  1516. },
  1517. result: []map[string]interface{}{{
  1518. "sum": float64(123203),
  1519. }},
  1520. }, {
  1521. sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
  1522. data: xsql.WindowTuplesSet{
  1523. xsql.WindowTuples{
  1524. Emitter: "test",
  1525. Tuples: []xsql.Tuple{
  1526. {
  1527. Emitter: "src1",
  1528. Message: xsql.Message{"a": 53, "s": 123203},
  1529. }, {
  1530. Emitter: "src1",
  1531. Message: xsql.Message{"a": 27},
  1532. }, {
  1533. Emitter: "src1",
  1534. Message: xsql.Message{"a": 123123},
  1535. },
  1536. },
  1537. },
  1538. },
  1539. result: []map[string]interface{}{{
  1540. "s": float64(123203),
  1541. }},
  1542. }, {
  1543. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1544. data: xsql.WindowTuplesSet{
  1545. xsql.WindowTuples{
  1546. Emitter: "test",
  1547. Tuples: []xsql.Tuple{
  1548. {
  1549. Emitter: "src1",
  1550. Message: xsql.Message{"a": 53},
  1551. }, {
  1552. Emitter: "src1",
  1553. Message: xsql.Message{"a": 27},
  1554. }, {
  1555. Emitter: "src1",
  1556. Message: xsql.Message{"a": 123123},
  1557. },
  1558. },
  1559. },
  1560. },
  1561. result: []map[string]interface{}{{
  1562. "sum": float64(123203),
  1563. }},
  1564. }, {
  1565. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test GROUP BY TumblingWindow(ss, 10)",
  1566. data: xsql.WindowTuplesSet{
  1567. xsql.WindowTuples{
  1568. Emitter: "test",
  1569. Tuples: []xsql.Tuple{
  1570. {
  1571. Emitter: "src1",
  1572. Message: xsql.Message{"a": 53},
  1573. }, {
  1574. Emitter: "src1",
  1575. Message: xsql.Message{"a": 27},
  1576. }, {
  1577. Emitter: "src1",
  1578. Message: xsql.Message{"s": 123123},
  1579. },
  1580. },
  1581. },
  1582. },
  1583. result: []map[string]interface{}{{
  1584. "all": float64(3),
  1585. "c": float64(2),
  1586. "a": float64(40),
  1587. "s": float64(80),
  1588. "min": float64(27),
  1589. "max": float64(53),
  1590. }},
  1591. },
  1592. {
  1593. sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1594. data: xsql.GroupedTuplesSet{
  1595. {
  1596. &xsql.JoinTuple{
  1597. Tuples: []xsql.Tuple{
  1598. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1599. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1600. },
  1601. },
  1602. &xsql.JoinTuple{
  1603. Tuples: []xsql.Tuple{
  1604. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1605. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1606. },
  1607. },
  1608. },
  1609. {
  1610. &xsql.JoinTuple{
  1611. Tuples: []xsql.Tuple{
  1612. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1613. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1614. },
  1615. },
  1616. &xsql.JoinTuple{
  1617. Tuples: []xsql.Tuple{
  1618. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1619. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1620. },
  1621. },
  1622. },
  1623. },
  1624. result: []map[string]interface{}{{
  1625. "count": float64(2),
  1626. "meta": "devicea",
  1627. }, {
  1628. "count": float64(2),
  1629. "meta": "devicec",
  1630. }},
  1631. },
  1632. {
  1633. sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1634. data: xsql.GroupedTuplesSet{
  1635. {
  1636. &xsql.JoinTuple{
  1637. Tuples: []xsql.Tuple{
  1638. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
  1639. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1640. },
  1641. },
  1642. &xsql.JoinTuple{
  1643. Tuples: []xsql.Tuple{
  1644. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1645. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1646. },
  1647. },
  1648. },
  1649. {
  1650. &xsql.JoinTuple{
  1651. Tuples: []xsql.Tuple{
  1652. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
  1653. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1654. },
  1655. },
  1656. &xsql.JoinTuple{
  1657. Tuples: []xsql.Tuple{
  1658. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1659. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1660. },
  1661. },
  1662. },
  1663. },
  1664. result: []map[string]interface{}{{
  1665. "c": float64(2),
  1666. "d": "devicea",
  1667. }, {
  1668. "c": float64(2),
  1669. "d": "devicec",
  1670. }},
  1671. },
  1672. }
  1673. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1674. contextLogger := common.Log.WithField("rule", "TestProjectPlan_AggFuncs")
  1675. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1676. for i, tt := range tests {
  1677. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1678. if err != nil {
  1679. t.Error(err)
  1680. }
  1681. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true, isTest: true}
  1682. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1683. result := pp.Apply(ctx, tt.data, fv, afv)
  1684. var mapRes []map[string]interface{}
  1685. if v, ok := result.([]byte); ok {
  1686. err := json.Unmarshal(v, &mapRes)
  1687. if err != nil {
  1688. t.Errorf("Failed to parse the input into map.\n")
  1689. continue
  1690. }
  1691. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1692. if !reflect.DeepEqual(tt.result, mapRes) {
  1693. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1694. }
  1695. } else {
  1696. t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
  1697. }
  1698. }
  1699. }
  1700. func TestProjectPlanError(t *testing.T) {
  1701. var tests = []struct {
  1702. sql string
  1703. data interface{}
  1704. result interface{}
  1705. }{
  1706. {
  1707. sql: "SELECT a FROM test",
  1708. data: errors.New("an error from upstream"),
  1709. result: errors.New("an error from upstream"),
  1710. }, {
  1711. sql: "SELECT a * 5 FROM test",
  1712. data: &xsql.Tuple{
  1713. Emitter: "test",
  1714. Message: xsql.Message{
  1715. "a": "val_a",
  1716. },
  1717. },
  1718. result: errors.New("run Select error: invalid operation string(val_a) * int64(5)"),
  1719. }, {
  1720. sql: `SELECT a[0]->b AS ab FROM test`,
  1721. data: &xsql.Tuple{
  1722. Emitter: "test",
  1723. Message: xsql.Message{
  1724. "a": "common string",
  1725. },
  1726. },
  1727. result: errors.New("run Select error: invalid operation string(common string) [] *xsql.BracketEvalResult(&{0 0})"),
  1728. }, {
  1729. sql: `SELECT round(a) as r FROM test`,
  1730. data: &xsql.Tuple{
  1731. Emitter: "test",
  1732. Message: xsql.Message{
  1733. "a": "common string",
  1734. },
  1735. },
  1736. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  1737. }, {
  1738. sql: `SELECT round(a) as r FROM test`,
  1739. data: &xsql.Tuple{
  1740. Emitter: "test",
  1741. Message: xsql.Message{
  1742. "abc": "common string",
  1743. },
  1744. },
  1745. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  1746. }, {
  1747. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1748. data: xsql.GroupedTuplesSet{
  1749. {
  1750. &xsql.JoinTuple{
  1751. Tuples: []xsql.Tuple{
  1752. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1753. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1754. },
  1755. },
  1756. &xsql.JoinTuple{
  1757. Tuples: []xsql.Tuple{
  1758. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1759. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1760. },
  1761. },
  1762. &xsql.JoinTuple{
  1763. Tuples: []xsql.Tuple{
  1764. {Emitter: "test", Message: xsql.Message{"id": 4, "a": "dde"}},
  1765. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1766. },
  1767. },
  1768. &xsql.JoinTuple{
  1769. Tuples: []xsql.Tuple{
  1770. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1771. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1772. },
  1773. },
  1774. },
  1775. {
  1776. &xsql.JoinTuple{
  1777. Tuples: []xsql.Tuple{
  1778. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1779. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1780. },
  1781. },
  1782. &xsql.JoinTuple{
  1783. Tuples: []xsql.Tuple{
  1784. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1785. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1786. },
  1787. },
  1788. },
  1789. },
  1790. result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
  1791. }, {
  1792. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  1793. data: xsql.WindowTuplesSet{
  1794. xsql.WindowTuples{
  1795. Emitter: "test",
  1796. Tuples: []xsql.Tuple{
  1797. {
  1798. Emitter: "src1",
  1799. Message: xsql.Message{"a": 53},
  1800. }, {
  1801. Emitter: "src1",
  1802. Message: xsql.Message{"a": "ddd"},
  1803. }, {
  1804. Emitter: "src1",
  1805. Message: xsql.Message{"a": 123123},
  1806. },
  1807. },
  1808. },
  1809. },
  1810. result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
  1811. }, {
  1812. sql: `SELECT a[0]->b AS ab FROM test`,
  1813. data: &xsql.Tuple{
  1814. Emitter: "test",
  1815. Message: xsql.Message{
  1816. "a": []map[string]interface{}(nil),
  1817. },
  1818. },
  1819. result: errors.New("run Select error: out of index: 0 of 0"),
  1820. },
  1821. }
  1822. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1823. contextLogger := common.Log.WithField("rule", "TestProjectPlanError")
  1824. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1825. for i, tt := range tests {
  1826. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1827. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  1828. pp.isTest = true
  1829. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1830. result := pp.Apply(ctx, tt.data, fv, afv)
  1831. if !reflect.DeepEqual(tt.result, result) {
  1832. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1833. }
  1834. }
  1835. }