project_test.go 60 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func TestProjectPlan_Apply1(t *testing.T) {
  29. var tests = []struct {
  30. sql string
  31. data *xsql.Tuple
  32. result []map[string]interface{}
  33. }{
  34. { //0
  35. sql: "SELECT a FROM test",
  36. data: &xsql.Tuple{
  37. Emitter: "test",
  38. Message: xsql.Message{
  39. "a": "val_a",
  40. },
  41. Metadata: xsql.Metadata{
  42. "id": 45,
  43. "other": "mock",
  44. },
  45. },
  46. result: []map[string]interface{}{{
  47. "a": "val_a",
  48. "__meta": map[string]interface{}{
  49. "id": float64(45),
  50. "other": "mock",
  51. },
  52. }},
  53. },
  54. { //1
  55. sql: "SELECT b FROM test",
  56. data: &xsql.Tuple{
  57. Emitter: "test",
  58. Message: xsql.Message{
  59. "a": "val_a",
  60. },
  61. },
  62. result: []map[string]interface{}{{}},
  63. },
  64. { //2
  65. sql: "SELECT ts FROM test",
  66. data: &xsql.Tuple{
  67. Emitter: "test",
  68. Message: xsql.Message{
  69. "a": "val_a",
  70. "ts": cast.TimeFromUnixMilli(1568854573431),
  71. },
  72. },
  73. result: []map[string]interface{}{{
  74. "ts": "2019-09-19T00:56:13.431Z",
  75. }},
  76. },
  77. //Schemaless may return a message without selecting column
  78. { //3
  79. sql: "SELECT ts FROM test",
  80. data: &xsql.Tuple{
  81. Emitter: "test",
  82. Message: xsql.Message{
  83. "a": "val_a",
  84. "ts2": cast.TimeFromUnixMilli(1568854573431),
  85. },
  86. },
  87. result: []map[string]interface{}{{}},
  88. },
  89. { //4
  90. sql: "SELECT A FROM test",
  91. data: &xsql.Tuple{
  92. Emitter: "test",
  93. Message: xsql.Message{
  94. "a": "val_a",
  95. },
  96. },
  97. result: []map[string]interface{}{{
  98. "A": "val_a",
  99. }},
  100. },
  101. //5
  102. {
  103. sql: `SELECT "value" FROM test`,
  104. data: &xsql.Tuple{
  105. Emitter: "test",
  106. Message: xsql.Message{},
  107. },
  108. result: []map[string]interface{}{{
  109. "": "value",
  110. }},
  111. },
  112. //6
  113. {
  114. sql: `SELECT 3.4 FROM test`,
  115. data: &xsql.Tuple{
  116. Emitter: "test",
  117. Message: xsql.Message{},
  118. },
  119. result: []map[string]interface{}{{
  120. "": 3.4,
  121. }},
  122. },
  123. //7
  124. {
  125. sql: `SELECT 5 FROM test`,
  126. data: &xsql.Tuple{
  127. Emitter: "test",
  128. Message: xsql.Message{},
  129. },
  130. result: []map[string]interface{}{{
  131. "": 5.0,
  132. }},
  133. },
  134. //8
  135. {
  136. sql: `SELECT a, "value" AS b FROM test`,
  137. data: &xsql.Tuple{
  138. Emitter: "test",
  139. Message: xsql.Message{
  140. "a": "val_a",
  141. },
  142. },
  143. result: []map[string]interface{}{{
  144. "a": "val_a",
  145. "b": "value",
  146. }},
  147. },
  148. //9
  149. {
  150. sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
  151. data: &xsql.Tuple{
  152. Emitter: "test",
  153. Message: xsql.Message{
  154. "a": "val_a",
  155. },
  156. },
  157. result: []map[string]interface{}{{
  158. "a": "val_a",
  159. "b": "value",
  160. "Pi": 3.14,
  161. "Zero": 0.0,
  162. }},
  163. },
  164. //10
  165. {
  166. sql: `SELECT a->b AS ab FROM test`,
  167. data: &xsql.Tuple{
  168. Emitter: "test",
  169. Message: xsql.Message{
  170. "a": map[string]interface{}{"b": "hello"},
  171. },
  172. },
  173. result: []map[string]interface{}{{
  174. "ab": "hello",
  175. }},
  176. },
  177. //11
  178. {
  179. sql: `SELECT a->b AS ab FROM test`,
  180. data: &xsql.Tuple{
  181. Emitter: "test",
  182. Message: xsql.Message{
  183. "a": map[string]interface{}(nil),
  184. },
  185. },
  186. result: []map[string]interface{}{{}},
  187. },
  188. //12
  189. {
  190. sql: `SELECT a->b AS ab FROM test`,
  191. data: &xsql.Tuple{
  192. Emitter: "test",
  193. Message: xsql.Message{
  194. "name": "name",
  195. },
  196. },
  197. result: []map[string]interface{}{{}},
  198. },
  199. //13
  200. {
  201. sql: `SELECT a->b AS ab FROM test`,
  202. data: &xsql.Tuple{
  203. Emitter: "test",
  204. Message: xsql.Message{
  205. "a": "commonstring",
  206. },
  207. },
  208. result: []map[string]interface{}{{}},
  209. },
  210. //14
  211. {
  212. sql: `SELECT a[0]->b AS ab FROM test`,
  213. data: &xsql.Tuple{
  214. Emitter: "test",
  215. Message: xsql.Message{
  216. "a": []interface{}{
  217. map[string]interface{}{"b": "hello1"},
  218. map[string]interface{}{"b": "hello2"},
  219. },
  220. },
  221. },
  222. result: []map[string]interface{}{{
  223. "ab": "hello1",
  224. }},
  225. },
  226. //15
  227. {
  228. sql: `SELECT a[0]->b AS ab FROM test`,
  229. data: &xsql.Tuple{
  230. Emitter: "test",
  231. Message: xsql.Message{
  232. "a": []map[string]interface{}{
  233. {"b": "hello1"},
  234. {"b": "hello2"},
  235. },
  236. },
  237. },
  238. result: []map[string]interface{}{{
  239. "ab": "hello1",
  240. }},
  241. },
  242. //16
  243. {
  244. sql: `SELECT a[2:4] AS ab FROM test`,
  245. data: &xsql.Tuple{
  246. Emitter: "test",
  247. Message: xsql.Message{
  248. "a": []map[string]interface{}{
  249. {"b": "hello1"},
  250. {"b": "hello2"},
  251. {"b": "hello3"},
  252. {"b": "hello4"},
  253. {"b": "hello5"},
  254. },
  255. },
  256. },
  257. result: []map[string]interface{}{{
  258. "ab": []interface{}{
  259. map[string]interface{}{"b": "hello3"},
  260. map[string]interface{}{"b": "hello4"},
  261. },
  262. }},
  263. },
  264. //17
  265. {
  266. sql: `SELECT a[2:] AS ab FROM test`,
  267. data: &xsql.Tuple{
  268. Emitter: "test",
  269. Message: xsql.Message{
  270. "a": []map[string]interface{}{
  271. {"b": "hello1"},
  272. {"b": "hello2"},
  273. {"b": "hello3"},
  274. {"b": "hello4"},
  275. {"b": "hello5"},
  276. },
  277. },
  278. },
  279. result: []map[string]interface{}{{
  280. "ab": []interface{}{
  281. map[string]interface{}{"b": "hello3"},
  282. map[string]interface{}{"b": "hello4"},
  283. map[string]interface{}{"b": "hello5"},
  284. },
  285. }},
  286. },
  287. //18
  288. {
  289. sql: `SELECT a[2:] AS ab FROM test`,
  290. data: &xsql.Tuple{
  291. Emitter: "test",
  292. Message: xsql.Message{
  293. "a": []interface{}{
  294. true, false, true, false, true, true,
  295. },
  296. },
  297. },
  298. result: []map[string]interface{}{{
  299. "ab": []interface{}{
  300. true, false, true, true,
  301. },
  302. }},
  303. },
  304. //19
  305. {
  306. sql: `SELECT a[:4] AS ab FROM test`,
  307. data: &xsql.Tuple{
  308. Emitter: "test",
  309. Message: xsql.Message{
  310. "a": []interface{}{
  311. true, false, true, false, true, true,
  312. },
  313. },
  314. },
  315. result: []map[string]interface{}{{
  316. "ab": []interface{}{
  317. true, false, true, false,
  318. },
  319. }},
  320. },
  321. //20
  322. {
  323. sql: `SELECT a[:4] AS ab FROM test`,
  324. data: &xsql.Tuple{
  325. Emitter: "test",
  326. Message: xsql.Message{
  327. "a": []interface{}{
  328. 3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926,
  329. },
  330. },
  331. },
  332. result: []map[string]interface{}{{
  333. "ab": []interface{}{
  334. 3.14, 3.141, 3.1415, 3.14159,
  335. },
  336. }},
  337. },
  338. //21
  339. {
  340. sql: `SELECT a->b[:4] AS ab FROM test`,
  341. data: &xsql.Tuple{
  342. Emitter: "test",
  343. Message: xsql.Message{
  344. "a": map[string]interface{}{
  345. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  346. },
  347. },
  348. },
  349. result: []map[string]interface{}{{
  350. "ab": []interface{}{
  351. 3.14, 3.141, 3.1415, 3.14159,
  352. },
  353. }},
  354. },
  355. //22
  356. {
  357. sql: `SELECT a->b[0:1] AS ab FROM test`,
  358. data: &xsql.Tuple{
  359. Emitter: "test",
  360. Message: xsql.Message{
  361. "a": map[string]interface{}{
  362. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  363. },
  364. },
  365. },
  366. result: []map[string]interface{}{{
  367. "ab": []interface{}{
  368. 3.14,
  369. },
  370. }},
  371. },
  372. //23
  373. {
  374. sql: `SELECT a->c->d AS f1 FROM test`,
  375. data: &xsql.Tuple{
  376. Emitter: "test",
  377. Message: xsql.Message{
  378. "a": map[string]interface{}{
  379. "b": "hello",
  380. "c": map[string]interface{}{
  381. "d": 35.2,
  382. },
  383. },
  384. },
  385. },
  386. result: []map[string]interface{}{{
  387. "f1": 35.2,
  388. }},
  389. },
  390. //24
  391. {
  392. sql: `SELECT a->c->d AS f1 FROM test`,
  393. data: &xsql.Tuple{
  394. Emitter: "test",
  395. Message: xsql.Message{
  396. "a": map[string]interface{}{
  397. "b": "hello",
  398. "c": map[string]interface{}{
  399. "e": 35.2,
  400. },
  401. },
  402. },
  403. },
  404. result: []map[string]interface{}{{}},
  405. },
  406. //25
  407. {
  408. sql: `SELECT a->c->d AS f1 FROM test`,
  409. data: &xsql.Tuple{
  410. Emitter: "test",
  411. Message: xsql.Message{
  412. "a": map[string]interface{}{
  413. "b": "hello",
  414. },
  415. },
  416. },
  417. result: []map[string]interface{}{{}},
  418. },
  419. //26
  420. //The int type is not supported yet, the json parser returns float64 for int values
  421. {
  422. sql: `SELECT a->c->d AS f1 FROM test`,
  423. data: &xsql.Tuple{
  424. Emitter: "test",
  425. Message: xsql.Message{
  426. "a": map[string]interface{}{
  427. "b": "hello",
  428. "c": map[string]interface{}{
  429. "d": float64(35),
  430. },
  431. },
  432. },
  433. },
  434. result: []map[string]interface{}{{
  435. "f1": float64(35),
  436. }},
  437. },
  438. //27
  439. {
  440. sql: "SELECT a FROM test",
  441. data: &xsql.Tuple{
  442. Emitter: "test",
  443. Message: xsql.Message{},
  444. },
  445. result: []map[string]interface{}{
  446. {},
  447. },
  448. },
  449. //28
  450. {
  451. sql: "SELECT * FROM test",
  452. data: &xsql.Tuple{
  453. Emitter: "test",
  454. Message: xsql.Message{},
  455. },
  456. result: []map[string]interface{}{
  457. {},
  458. },
  459. },
  460. //29
  461. {
  462. sql: `SELECT * FROM test`,
  463. data: &xsql.Tuple{
  464. Emitter: "test",
  465. Message: xsql.Message{
  466. "a": map[string]interface{}{
  467. "b": "hello",
  468. "c": map[string]interface{}{
  469. "d": 35.2,
  470. },
  471. },
  472. },
  473. },
  474. result: []map[string]interface{}{{
  475. "a": map[string]interface{}{
  476. "b": "hello",
  477. "c": map[string]interface{}{
  478. "d": 35.2,
  479. },
  480. },
  481. }},
  482. },
  483. //30
  484. {
  485. sql: `SELECT * FROM test`,
  486. data: &xsql.Tuple{
  487. Emitter: "test",
  488. Message: xsql.Message{
  489. "a": "val1",
  490. "b": 3.14,
  491. },
  492. },
  493. result: []map[string]interface{}{{
  494. "a": "val1",
  495. "b": 3.14,
  496. }},
  497. },
  498. //31
  499. {
  500. sql: `SELECT 3*4 AS f1 FROM test`,
  501. data: &xsql.Tuple{
  502. Emitter: "test",
  503. Message: xsql.Message{},
  504. },
  505. result: []map[string]interface{}{{
  506. "f1": float64(12),
  507. }},
  508. },
  509. //32
  510. {
  511. sql: `SELECT 4.5*2 AS f1 FROM test`,
  512. data: &xsql.Tuple{
  513. Emitter: "test",
  514. Message: xsql.Message{},
  515. },
  516. result: []map[string]interface{}{{
  517. "f1": float64(9),
  518. }},
  519. },
  520. //33
  521. {
  522. sql: "SELECT `a.b.c` FROM test",
  523. data: &xsql.Tuple{
  524. Emitter: "test",
  525. Message: xsql.Message{
  526. "a.b.c": "val_a",
  527. },
  528. },
  529. result: []map[string]interface{}{{
  530. "a.b.c": "val_a",
  531. }},
  532. },
  533. //34
  534. {
  535. sql: `SELECT CASE a WHEN 10 THEN "true" END AS b FROM test`,
  536. data: &xsql.Tuple{
  537. Emitter: "test",
  538. Message: xsql.Message{
  539. "a": int64(10),
  540. },
  541. },
  542. result: []map[string]interface{}{{
  543. "b": "true",
  544. }},
  545. },
  546. }
  547. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  548. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_Apply1")
  549. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  550. for i, tt := range tests {
  551. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  552. if err != nil {
  553. t.Errorf("parse sql error: %s", err)
  554. continue
  555. }
  556. pp := &ProjectOp{Fields: stmt.Fields, SendMeta: true}
  557. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  558. result := pp.Apply(ctx, tt.data, fv, afv)
  559. var mapRes []map[string]interface{}
  560. if v, ok := result.([]byte); ok {
  561. err := json.Unmarshal(v, &mapRes)
  562. if err != nil {
  563. t.Errorf("Failed to parse the input into map.\n")
  564. continue
  565. }
  566. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  567. if !reflect.DeepEqual(tt.result, mapRes) {
  568. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  569. }
  570. } else {
  571. t.Errorf("%d. The returned result %#v is not type of []byte\n", result, i)
  572. }
  573. }
  574. }
  575. func TestProjectPlan_MultiInput(t *testing.T) {
  576. var tests = []struct {
  577. sql string
  578. data interface{}
  579. result []map[string]interface{}
  580. }{ //0
  581. {
  582. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  583. data: &xsql.Tuple{
  584. Emitter: "tbl",
  585. Message: xsql.Message{
  586. "abc": int64(6),
  587. },
  588. },
  589. result: []map[string]interface{}{{
  590. "abc": float64(6), //json marshall problem
  591. }},
  592. },
  593. //1
  594. {
  595. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  596. data: &xsql.Tuple{
  597. Emitter: "tbl",
  598. Message: xsql.Message{
  599. "abc": int64(34),
  600. "def": "hello",
  601. },
  602. },
  603. result: []map[string]interface{}{{
  604. "abc": float64(34),
  605. }},
  606. },
  607. //2
  608. {
  609. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  610. data: xsql.WindowTuplesSet{
  611. Content: []xsql.WindowTuples{
  612. {
  613. Emitter: "src1",
  614. Tuples: []xsql.Tuple{
  615. {
  616. Emitter: "src1",
  617. Message: xsql.Message{"id1": 1, "f1": "v1"},
  618. }, {
  619. Emitter: "src1",
  620. Message: xsql.Message{"id1": 2, "f1": "v2"},
  621. }, {
  622. Emitter: "src1",
  623. Message: xsql.Message{"id1": 3, "f1": "v1"},
  624. },
  625. },
  626. },
  627. },
  628. },
  629. result: []map[string]interface{}{{
  630. "id1": float64(1),
  631. }, {
  632. "id1": float64(2),
  633. }, {
  634. "id1": float64(3),
  635. }},
  636. },
  637. //3
  638. {
  639. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  640. data: xsql.WindowTuplesSet{
  641. Content: []xsql.WindowTuples{
  642. {
  643. Emitter: "src1",
  644. Tuples: []xsql.Tuple{
  645. {
  646. Emitter: "src1",
  647. Message: xsql.Message{"id1": 1, "f1": "v1"},
  648. }, {
  649. Emitter: "src1",
  650. Message: xsql.Message{"id2": 2, "f1": "v2"},
  651. }, {
  652. Emitter: "src1",
  653. Message: xsql.Message{"id1": 3, "f1": "v1"},
  654. },
  655. },
  656. },
  657. },
  658. },
  659. result: []map[string]interface{}{{
  660. "id1": float64(1),
  661. }, {}, {
  662. "id1": float64(3),
  663. }},
  664. },
  665. //4
  666. {
  667. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  668. data: xsql.WindowTuplesSet{
  669. Content: []xsql.WindowTuples{
  670. {
  671. Emitter: "src1",
  672. Tuples: []xsql.Tuple{
  673. {
  674. Emitter: "src1",
  675. Message: xsql.Message{"id1": 1, "f1": "v1"},
  676. }, {
  677. Emitter: "src1",
  678. Message: xsql.Message{"id1": 2, "f1": "v2"},
  679. }, {
  680. Emitter: "src1",
  681. Message: xsql.Message{"id1": 3, "f1": "v1"},
  682. },
  683. },
  684. },
  685. },
  686. },
  687. result: []map[string]interface{}{{
  688. "id1": float64(1),
  689. "f1": "v1",
  690. }, {
  691. "id1": float64(2),
  692. "f1": "v2",
  693. }, {
  694. "id1": float64(3),
  695. "f1": "v1",
  696. }},
  697. },
  698. //5
  699. {
  700. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  701. data: xsql.WindowTuplesSet{
  702. Content: []xsql.WindowTuples{
  703. {
  704. Emitter: "src1",
  705. Tuples: []xsql.Tuple{
  706. {
  707. Emitter: "src1",
  708. Message: xsql.Message{"id1": 1, "f1": "v1"},
  709. }, {
  710. Emitter: "src1",
  711. Message: xsql.Message{"id2": 2, "f2": "v2"},
  712. }, {
  713. Emitter: "src1",
  714. Message: xsql.Message{"id1": 3, "f1": "v1"},
  715. },
  716. },
  717. },
  718. },
  719. },
  720. result: []map[string]interface{}{{
  721. "id1": float64(1),
  722. "f1": "v1",
  723. }, {
  724. "id2": float64(2),
  725. "f2": "v2",
  726. }, {
  727. "id1": float64(3),
  728. "f1": "v1",
  729. }},
  730. },
  731. //6
  732. {
  733. sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  734. data: xsql.WindowTuplesSet{
  735. Content: []xsql.WindowTuples{
  736. {
  737. Emitter: "src1",
  738. Tuples: []xsql.Tuple{
  739. {
  740. Emitter: "src1",
  741. Message: xsql.Message{"id1": 1, "f1": "v1"},
  742. }, {
  743. Emitter: "src1",
  744. Message: xsql.Message{"id1": 2, "f1": "v2"},
  745. }, {
  746. Emitter: "src1",
  747. Message: xsql.Message{"id1": 3, "f1": "v1"},
  748. },
  749. },
  750. },
  751. },
  752. },
  753. result: []map[string]interface{}{{
  754. "id1": float64(1),
  755. "f1": "v1",
  756. }, {
  757. "id1": float64(2),
  758. "f1": "v2",
  759. }, {
  760. "id1": float64(3),
  761. "f1": "v1",
  762. }},
  763. },
  764. //7
  765. {
  766. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  767. data: &xsql.JoinTupleSets{
  768. Content: []xsql.JoinTuple{
  769. {
  770. Tuples: []xsql.Tuple{
  771. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  772. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  773. },
  774. },
  775. {
  776. Tuples: []xsql.Tuple{
  777. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  778. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  779. },
  780. },
  781. {
  782. Tuples: []xsql.Tuple{
  783. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  784. },
  785. },
  786. },
  787. },
  788. result: []map[string]interface{}{{
  789. "id1": float64(1),
  790. }, {
  791. "id1": float64(2),
  792. }, {
  793. "id1": float64(3),
  794. }},
  795. },
  796. //8
  797. {
  798. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  799. data: &xsql.JoinTupleSets{
  800. Content: []xsql.JoinTuple{
  801. {
  802. Tuples: []xsql.Tuple{
  803. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  804. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  805. },
  806. },
  807. {
  808. Tuples: []xsql.Tuple{
  809. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  810. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  811. },
  812. },
  813. {
  814. Tuples: []xsql.Tuple{
  815. {Emitter: "src1", Message: xsql.Message{"id2": 3, "f1": "v1"}},
  816. },
  817. },
  818. },
  819. },
  820. result: []map[string]interface{}{{
  821. "id1": float64(1),
  822. }, {
  823. "id1": float64(2),
  824. }, {}},
  825. },
  826. //9
  827. {
  828. sql: "SELECT abc FROM tbl group by abc",
  829. data: xsql.GroupedTuplesSet{
  830. {
  831. Content: []xsql.DataValuer{
  832. &xsql.Tuple{
  833. Emitter: "tbl",
  834. Message: xsql.Message{
  835. "abc": int64(6),
  836. "def": "hello",
  837. },
  838. },
  839. },
  840. },
  841. },
  842. result: []map[string]interface{}{{
  843. "abc": float64(6),
  844. }},
  845. },
  846. //10
  847. {
  848. sql: "SELECT abc FROM tbl group by abc",
  849. data: xsql.GroupedTuplesSet{
  850. {
  851. Content: []xsql.DataValuer{
  852. &xsql.Tuple{
  853. Emitter: "tbl",
  854. Message: xsql.Message{
  855. "def": "hello",
  856. },
  857. },
  858. },
  859. },
  860. },
  861. result: []map[string]interface{}{{}},
  862. },
  863. //11
  864. {
  865. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  866. data: xsql.GroupedTuplesSet{
  867. {
  868. Content: []xsql.DataValuer{
  869. &xsql.Tuple{
  870. Emitter: "src1",
  871. Message: xsql.Message{"id1": 1, "f1": "v1"},
  872. },
  873. &xsql.Tuple{
  874. Emitter: "src1",
  875. Message: xsql.Message{"id1": 3, "f1": "v1"},
  876. },
  877. },
  878. },
  879. {
  880. Content: []xsql.DataValuer{
  881. &xsql.Tuple{
  882. Emitter: "src1",
  883. Message: xsql.Message{"id1": 2, "f1": "v2"},
  884. },
  885. },
  886. },
  887. },
  888. result: []map[string]interface{}{{
  889. "id1": float64(1),
  890. }, {
  891. "id1": float64(2),
  892. }},
  893. },
  894. //12
  895. {
  896. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  897. data: xsql.GroupedTuplesSet{
  898. {
  899. Content: []xsql.DataValuer{
  900. &xsql.Tuple{
  901. Emitter: "src1",
  902. Message: xsql.Message{"id1": 1, "f1": "v1"},
  903. },
  904. &xsql.Tuple{
  905. Emitter: "src1",
  906. Message: xsql.Message{"id1": 3, "f1": "v1"},
  907. },
  908. },
  909. },
  910. {
  911. Content: []xsql.DataValuer{
  912. &xsql.Tuple{
  913. Emitter: "src1",
  914. Message: xsql.Message{"id2": 2, "f1": "v2"},
  915. },
  916. },
  917. },
  918. },
  919. result: []map[string]interface{}{{
  920. "id1": float64(1),
  921. }, {}},
  922. },
  923. //13
  924. {
  925. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  926. data: xsql.GroupedTuplesSet{
  927. {
  928. Content: []xsql.DataValuer{
  929. &xsql.JoinTuple{
  930. Tuples: []xsql.Tuple{
  931. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  932. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  933. },
  934. },
  935. },
  936. },
  937. {
  938. Content: []xsql.DataValuer{
  939. &xsql.JoinTuple{
  940. Tuples: []xsql.Tuple{
  941. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  942. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  943. },
  944. },
  945. },
  946. },
  947. {
  948. Content: []xsql.DataValuer{
  949. &xsql.JoinTuple{
  950. Tuples: []xsql.Tuple{
  951. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  952. },
  953. },
  954. },
  955. },
  956. },
  957. result: []map[string]interface{}{{
  958. "id2": float64(2),
  959. }, {
  960. "id2": float64(4),
  961. }, {}},
  962. },
  963. //14
  964. {
  965. sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  966. data: &xsql.JoinTupleSets{
  967. Content: []xsql.JoinTuple{
  968. {
  969. Tuples: []xsql.Tuple{
  970. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  971. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  972. },
  973. },
  974. {
  975. Tuples: []xsql.Tuple{
  976. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  977. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  978. },
  979. },
  980. {
  981. Tuples: []xsql.Tuple{
  982. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  983. },
  984. },
  985. },
  986. },
  987. result: []map[string]interface{}{{
  988. "id1": float64(1),
  989. "f1": "v1",
  990. "f2": "w2",
  991. }, {
  992. "id1": float64(2),
  993. "f1": "v2",
  994. "f2": "w3",
  995. }, {
  996. "id1": float64(3),
  997. "f1": "v1",
  998. }},
  999. },
  1000. //15
  1001. {
  1002. sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  1003. data: &xsql.JoinTupleSets{
  1004. Content: []xsql.JoinTuple{
  1005. {
  1006. Tuples: []xsql.Tuple{
  1007. {Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
  1008. {Emitter: "src2", Message: xsql.Message{"id": 2, "f2": "w2"}},
  1009. },
  1010. },
  1011. {
  1012. Tuples: []xsql.Tuple{
  1013. {Emitter: "src1", Message: xsql.Message{"id": 2, "f1": "v2"}},
  1014. {Emitter: "src2", Message: xsql.Message{"id": 4, "f2": "w3"}},
  1015. },
  1016. },
  1017. {
  1018. Tuples: []xsql.Tuple{
  1019. {Emitter: "src1", Message: xsql.Message{"id": 3, "f1": "v1"}},
  1020. },
  1021. },
  1022. },
  1023. },
  1024. result: []map[string]interface{}{{
  1025. "id": float64(1),
  1026. "f1": "v1",
  1027. "f2": "w2",
  1028. }, {
  1029. "id": float64(2),
  1030. "f1": "v2",
  1031. "f2": "w3",
  1032. }, {
  1033. "id": float64(3),
  1034. "f1": "v1",
  1035. }},
  1036. },
  1037. //16
  1038. {
  1039. sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  1040. data: xsql.GroupedTuplesSet{
  1041. {
  1042. Content: []xsql.DataValuer{
  1043. &xsql.Tuple{
  1044. Emitter: "src1",
  1045. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1046. },
  1047. &xsql.Tuple{
  1048. Emitter: "src1",
  1049. Message: xsql.Message{"id1": 3, "f1": "v1"},
  1050. },
  1051. },
  1052. },
  1053. {
  1054. Content: []xsql.DataValuer{
  1055. &xsql.Tuple{
  1056. Emitter: "src1",
  1057. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1058. },
  1059. },
  1060. },
  1061. },
  1062. result: []map[string]interface{}{{
  1063. "id1": float64(1),
  1064. "f1": "v1",
  1065. }, {
  1066. "id1": float64(2),
  1067. "f1": "v2",
  1068. }},
  1069. },
  1070. //17
  1071. {
  1072. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  1073. data: xsql.GroupedTuplesSet{
  1074. {
  1075. Content: []xsql.DataValuer{
  1076. &xsql.JoinTuple{
  1077. Tuples: []xsql.Tuple{
  1078. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1079. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1080. },
  1081. },
  1082. },
  1083. },
  1084. {
  1085. Content: []xsql.DataValuer{
  1086. &xsql.JoinTuple{
  1087. Tuples: []xsql.Tuple{
  1088. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1089. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1090. },
  1091. },
  1092. },
  1093. },
  1094. {
  1095. Content: []xsql.DataValuer{
  1096. &xsql.JoinTuple{
  1097. Tuples: []xsql.Tuple{
  1098. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  1099. },
  1100. },
  1101. },
  1102. },
  1103. },
  1104. result: []map[string]interface{}{{
  1105. "id2": float64(2),
  1106. "id1": float64(1),
  1107. "f1": "v1",
  1108. }, {
  1109. "id2": float64(4),
  1110. "id1": float64(2),
  1111. "f1": "v2",
  1112. }, {
  1113. "id1": float64(3),
  1114. "f1": "v1",
  1115. }},
  1116. },
  1117. //18
  1118. {
  1119. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  1120. data: xsql.GroupedTuplesSet{
  1121. {
  1122. Content: []xsql.DataValuer{
  1123. &xsql.JoinTuple{
  1124. Tuples: []xsql.Tuple{
  1125. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1126. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1127. },
  1128. },
  1129. },
  1130. },
  1131. {
  1132. Content: []xsql.DataValuer{
  1133. &xsql.JoinTuple{
  1134. Tuples: []xsql.Tuple{
  1135. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1136. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1137. },
  1138. },
  1139. },
  1140. },
  1141. {
  1142. Content: []xsql.DataValuer{
  1143. &xsql.JoinTuple{
  1144. Tuples: []xsql.Tuple{
  1145. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  1146. },
  1147. },
  1148. },
  1149. },
  1150. },
  1151. result: []map[string]interface{}{{
  1152. "id2": float64(2),
  1153. "id1": float64(1),
  1154. "f1": "v1",
  1155. }, {
  1156. "id2": float64(4),
  1157. "id1": float64(2),
  1158. "f1": "v2",
  1159. }, {
  1160. "id1": float64(3),
  1161. "f1": "v1",
  1162. }},
  1163. },
  1164. }
  1165. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1166. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_MultiInput")
  1167. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1168. for i, tt := range tests {
  1169. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1170. pp := &ProjectOp{Fields: stmt.Fields}
  1171. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1172. result := pp.Apply(ctx, tt.data, fv, afv)
  1173. var mapRes []map[string]interface{}
  1174. if v, ok := result.([]byte); ok {
  1175. err := json.Unmarshal(v, &mapRes)
  1176. if err != nil {
  1177. t.Errorf("Failed to parse the input into map.\n")
  1178. continue
  1179. }
  1180. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  1181. if !reflect.DeepEqual(tt.result, mapRes) {
  1182. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1183. }
  1184. } else {
  1185. t.Errorf("The returned result is not type of []byte\n")
  1186. }
  1187. }
  1188. }
  1189. func TestProjectPlan_Funcs(t *testing.T) {
  1190. var tests = []struct {
  1191. sql string
  1192. data interface{}
  1193. result []map[string]interface{}
  1194. }{
  1195. //0
  1196. {
  1197. sql: "SELECT round(a) as r FROM test",
  1198. data: &xsql.Tuple{
  1199. Emitter: "test",
  1200. Message: xsql.Message{
  1201. "a": 47.5,
  1202. },
  1203. },
  1204. result: []map[string]interface{}{{
  1205. "r": float64(48),
  1206. }},
  1207. },
  1208. //1
  1209. {
  1210. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1211. data: xsql.WindowTuplesSet{
  1212. Content: []xsql.WindowTuples{
  1213. {
  1214. Emitter: "test",
  1215. Tuples: []xsql.Tuple{
  1216. {
  1217. Emitter: "src1",
  1218. Message: xsql.Message{"a": 53.1},
  1219. }, {
  1220. Emitter: "src1",
  1221. Message: xsql.Message{"a": 27.4},
  1222. }, {
  1223. Emitter: "src1",
  1224. Message: xsql.Message{"a": 123123.7},
  1225. },
  1226. },
  1227. },
  1228. },
  1229. },
  1230. result: []map[string]interface{}{{
  1231. "r": float64(53),
  1232. }, {
  1233. "r": float64(27),
  1234. }, {
  1235. "r": float64(123124),
  1236. }},
  1237. },
  1238. //2
  1239. {
  1240. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1241. data: xsql.WindowTuplesSet{
  1242. Content: []xsql.WindowTuples{
  1243. {
  1244. Emitter: "test",
  1245. Tuples: []xsql.Tuple{
  1246. {
  1247. Emitter: "src1",
  1248. Message: xsql.Message{"a": 53.1},
  1249. }, {
  1250. Emitter: "src1",
  1251. Message: xsql.Message{"a": 27.4},
  1252. }, {
  1253. Emitter: "src1",
  1254. Message: xsql.Message{"a": 123123.7},
  1255. },
  1256. },
  1257. },
  1258. },
  1259. },
  1260. result: []map[string]interface{}{{
  1261. "r": float64(53),
  1262. }, {
  1263. "r": float64(27),
  1264. }, {
  1265. "r": float64(123124),
  1266. }},
  1267. },
  1268. //3
  1269. {
  1270. sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1271. data: &xsql.JoinTupleSets{
  1272. Content: []xsql.JoinTuple{
  1273. {
  1274. Tuples: []xsql.Tuple{
  1275. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1276. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1277. },
  1278. },
  1279. {
  1280. Tuples: []xsql.Tuple{
  1281. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1282. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1283. },
  1284. },
  1285. {
  1286. Tuples: []xsql.Tuple{
  1287. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1288. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1289. },
  1290. },
  1291. },
  1292. },
  1293. result: []map[string]interface{}{{
  1294. "r": float64(66),
  1295. }, {
  1296. "r": float64(73),
  1297. }, {
  1298. "r": float64(89),
  1299. }},
  1300. },
  1301. //4
  1302. {
  1303. sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1304. data: &xsql.JoinTupleSets{
  1305. Content: []xsql.JoinTuple{
  1306. {
  1307. Tuples: []xsql.Tuple{
  1308. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1309. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1310. },
  1311. },
  1312. {
  1313. Tuples: []xsql.Tuple{
  1314. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1315. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1316. },
  1317. },
  1318. {
  1319. Tuples: []xsql.Tuple{
  1320. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1321. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1322. },
  1323. },
  1324. },
  1325. },
  1326. result: []map[string]interface{}{{
  1327. "concat": "165.5512",
  1328. }, {
  1329. "concat": "273.49934",
  1330. }, {
  1331. "concat": "388.886",
  1332. }},
  1333. },
  1334. //5
  1335. {
  1336. sql: "SELECT count(a) as r FROM test",
  1337. data: &xsql.Tuple{
  1338. Emitter: "test",
  1339. Message: xsql.Message{
  1340. "a": 47.5,
  1341. },
  1342. },
  1343. result: []map[string]interface{}{{
  1344. "r": float64(1),
  1345. }},
  1346. },
  1347. //6
  1348. {
  1349. sql: "SELECT meta(test.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1350. data: &xsql.JoinTupleSets{
  1351. Content: []xsql.JoinTuple{
  1352. {
  1353. Tuples: []xsql.Tuple{
  1354. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}, Metadata: xsql.Metadata{"device": "devicea"}},
  1355. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1356. },
  1357. },
  1358. {
  1359. Tuples: []xsql.Tuple{
  1360. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1361. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1362. },
  1363. },
  1364. {
  1365. Tuples: []xsql.Tuple{
  1366. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}, Metadata: xsql.Metadata{"device": "devicec"}},
  1367. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1368. },
  1369. },
  1370. },
  1371. },
  1372. result: []map[string]interface{}{{
  1373. "d": "devicea",
  1374. }, {
  1375. "d": "deviceb",
  1376. }, {
  1377. "d": "devicec",
  1378. }},
  1379. },
  1380. }
  1381. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1382. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_Funcs")
  1383. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1384. for i, tt := range tests {
  1385. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1386. if err != nil {
  1387. t.Error(err)
  1388. }
  1389. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: ast.IsAggStatement(stmt)}
  1390. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1391. result := pp.Apply(ctx, tt.data, fv, afv)
  1392. var mapRes []map[string]interface{}
  1393. if v, ok := result.([]byte); ok {
  1394. err := json.Unmarshal(v, &mapRes)
  1395. if err != nil {
  1396. t.Errorf("Failed to parse the input into map.\n")
  1397. continue
  1398. }
  1399. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  1400. if !reflect.DeepEqual(tt.result, mapRes) {
  1401. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1402. }
  1403. } else {
  1404. t.Errorf("%d. The returned result is not type of []byte\n", i)
  1405. }
  1406. }
  1407. }
  1408. func TestProjectPlan_AggFuncs(t *testing.T) {
  1409. var tests = []struct {
  1410. sql string
  1411. data interface{}
  1412. result []map[string]interface{}
  1413. }{
  1414. //0
  1415. {
  1416. sql: "SELECT count(*) as c, round(a) as r, window_start() as ws, window_end() as we FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1417. data: xsql.GroupedTuplesSet{
  1418. {
  1419. Content: []xsql.DataValuer{
  1420. &xsql.JoinTuple{
  1421. Tuples: []xsql.Tuple{
  1422. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1423. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1424. },
  1425. },
  1426. &xsql.JoinTuple{
  1427. Tuples: []xsql.Tuple{
  1428. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1429. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1430. },
  1431. },
  1432. },
  1433. WindowRange: &xsql.WindowRange{
  1434. WindowStart: 1541152486013,
  1435. WindowEnd: 1541152487013,
  1436. },
  1437. },
  1438. {
  1439. Content: []xsql.DataValuer{
  1440. &xsql.JoinTuple{
  1441. Tuples: []xsql.Tuple{
  1442. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1443. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1444. },
  1445. },
  1446. &xsql.JoinTuple{
  1447. Tuples: []xsql.Tuple{
  1448. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1449. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1450. },
  1451. },
  1452. },
  1453. WindowRange: &xsql.WindowRange{
  1454. WindowStart: 1541152486013,
  1455. WindowEnd: 1541152487013,
  1456. },
  1457. },
  1458. },
  1459. result: []map[string]interface{}{{
  1460. "c": float64(2),
  1461. "r": float64(122),
  1462. "ws": float64(1541152486013),
  1463. "we": float64(1541152487013),
  1464. }, {
  1465. "c": float64(2),
  1466. "r": float64(89),
  1467. "ws": float64(1541152486013),
  1468. "we": float64(1541152487013),
  1469. }},
  1470. },
  1471. //1
  1472. {
  1473. sql: "SELECT count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1474. data: xsql.GroupedTuplesSet{
  1475. {
  1476. Content: []xsql.DataValuer{
  1477. &xsql.JoinTuple{
  1478. Tuples: []xsql.Tuple{
  1479. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1480. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1481. },
  1482. },
  1483. &xsql.JoinTuple{
  1484. Tuples: []xsql.Tuple{
  1485. {Emitter: "test", Message: xsql.Message{"id": 5}},
  1486. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1487. },
  1488. },
  1489. },
  1490. },
  1491. {
  1492. Content: []xsql.DataValuer{
  1493. &xsql.JoinTuple{
  1494. Tuples: []xsql.Tuple{
  1495. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1496. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1497. },
  1498. },
  1499. &xsql.JoinTuple{
  1500. Tuples: []xsql.Tuple{
  1501. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1502. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1503. },
  1504. },
  1505. },
  1506. },
  1507. },
  1508. result: []map[string]interface{}{{
  1509. "c": float64(1),
  1510. "a": 122.33,
  1511. "s": 122.33,
  1512. "min": 122.33,
  1513. "max": 122.33,
  1514. }, {
  1515. "c": float64(2),
  1516. "s": 103.63,
  1517. "a": 51.815,
  1518. "min": 14.6,
  1519. "max": 89.03,
  1520. }},
  1521. },
  1522. //2
  1523. {
  1524. sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1525. data: xsql.GroupedTuplesSet{
  1526. {
  1527. Content: []xsql.DataValuer{
  1528. &xsql.JoinTuple{
  1529. Tuples: []xsql.Tuple{
  1530. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1531. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1532. },
  1533. },
  1534. &xsql.JoinTuple{
  1535. Tuples: []xsql.Tuple{
  1536. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1537. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1538. },
  1539. },
  1540. &xsql.JoinTuple{
  1541. Tuples: []xsql.Tuple{
  1542. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 98.31}},
  1543. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1544. },
  1545. },
  1546. &xsql.JoinTuple{
  1547. Tuples: []xsql.Tuple{
  1548. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1549. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1550. },
  1551. },
  1552. },
  1553. },
  1554. {
  1555. Content: []xsql.DataValuer{
  1556. &xsql.JoinTuple{
  1557. Tuples: []xsql.Tuple{
  1558. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1559. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1560. },
  1561. },
  1562. &xsql.JoinTuple{
  1563. Tuples: []xsql.Tuple{
  1564. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1565. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1566. },
  1567. },
  1568. },
  1569. },
  1570. },
  1571. result: []map[string]interface{}{{
  1572. "avg": 116.68,
  1573. }, {
  1574. "avg": 51.815,
  1575. }},
  1576. },
  1577. //3
  1578. {
  1579. sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1580. data: xsql.GroupedTuplesSet{
  1581. {
  1582. Content: []xsql.DataValuer{
  1583. &xsql.JoinTuple{
  1584. Tuples: []xsql.Tuple{
  1585. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1586. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1587. },
  1588. },
  1589. &xsql.JoinTuple{
  1590. Tuples: []xsql.Tuple{
  1591. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1592. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1593. },
  1594. },
  1595. &xsql.JoinTuple{
  1596. Tuples: []xsql.Tuple{
  1597. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1598. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1599. },
  1600. },
  1601. },
  1602. },
  1603. {
  1604. Content: []xsql.DataValuer{
  1605. &xsql.JoinTuple{
  1606. Tuples: []xsql.Tuple{
  1607. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1608. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1609. },
  1610. },
  1611. &xsql.JoinTuple{
  1612. Tuples: []xsql.Tuple{
  1613. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1614. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1615. },
  1616. },
  1617. },
  1618. },
  1619. },
  1620. result: []map[string]interface{}{{
  1621. "max": 177.51,
  1622. }, {
  1623. "max": 89.03,
  1624. }},
  1625. },
  1626. //4
  1627. {
  1628. sql: "SELECT min(a), window_start(), window_end() FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1629. data: &xsql.JoinTupleSets{
  1630. Content: []xsql.JoinTuple{
  1631. {
  1632. Tuples: []xsql.Tuple{
  1633. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1634. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1635. },
  1636. },
  1637. {
  1638. Tuples: []xsql.Tuple{
  1639. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1640. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1641. },
  1642. },
  1643. {
  1644. Tuples: []xsql.Tuple{
  1645. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1646. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1647. },
  1648. },
  1649. },
  1650. WindowRange: &xsql.WindowRange{
  1651. WindowStart: 1541152486013,
  1652. WindowEnd: 1541152487013,
  1653. },
  1654. },
  1655. result: []map[string]interface{}{{
  1656. "min": 68.55,
  1657. "window_start": float64(1541152486013),
  1658. "window_end": float64(1541152487013),
  1659. }},
  1660. },
  1661. //5
  1662. {
  1663. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1664. data: &xsql.JoinTupleSets{
  1665. Content: []xsql.JoinTuple{
  1666. {
  1667. Tuples: []xsql.Tuple{
  1668. {Emitter: "test", Message: xsql.Message{"id": 1}},
  1669. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1670. },
  1671. },
  1672. {
  1673. Tuples: []xsql.Tuple{
  1674. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1675. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1676. },
  1677. },
  1678. {
  1679. Tuples: []xsql.Tuple{
  1680. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1681. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1682. },
  1683. },
  1684. },
  1685. },
  1686. result: []map[string]interface{}{{
  1687. "all": float64(3),
  1688. "c": float64(2),
  1689. "a": 123.03,
  1690. "s": 246.06,
  1691. "min": 68.55,
  1692. "max": 177.51,
  1693. }},
  1694. },
  1695. //6
  1696. {
  1697. sql: "SELECT sum(a), window_start() as ws, window_end() FROM test GROUP BY TumblingWindow(ss, 10)",
  1698. data: xsql.WindowTuplesSet{
  1699. Content: []xsql.WindowTuples{{
  1700. Emitter: "test",
  1701. Tuples: []xsql.Tuple{
  1702. {
  1703. Emitter: "src1",
  1704. Message: xsql.Message{"a": 53},
  1705. }, {
  1706. Emitter: "src1",
  1707. Message: xsql.Message{"a": 27},
  1708. }, {
  1709. Emitter: "src1",
  1710. Message: xsql.Message{"a": 123123},
  1711. },
  1712. },
  1713. },
  1714. },
  1715. WindowRange: &xsql.WindowRange{
  1716. WindowStart: 1541152486013,
  1717. WindowEnd: 1541152487013,
  1718. },
  1719. },
  1720. result: []map[string]interface{}{{
  1721. "sum": float64(123203),
  1722. "ws": float64(1541152486013),
  1723. "window_end": float64(1541152487013),
  1724. }},
  1725. },
  1726. //7
  1727. {
  1728. sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
  1729. data: xsql.WindowTuplesSet{
  1730. Content: []xsql.WindowTuples{{
  1731. Emitter: "test",
  1732. Tuples: []xsql.Tuple{
  1733. {
  1734. Emitter: "src1",
  1735. Message: xsql.Message{"a": 53, "s": 123203},
  1736. }, {
  1737. Emitter: "src1",
  1738. Message: xsql.Message{"a": 27},
  1739. }, {
  1740. Emitter: "src1",
  1741. Message: xsql.Message{"a": 123123},
  1742. },
  1743. },
  1744. },
  1745. },
  1746. },
  1747. result: []map[string]interface{}{{
  1748. "s": float64(123203),
  1749. }},
  1750. },
  1751. //8
  1752. {
  1753. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1754. data: xsql.WindowTuplesSet{
  1755. Content: []xsql.WindowTuples{{
  1756. Emitter: "test",
  1757. Tuples: []xsql.Tuple{
  1758. {
  1759. Emitter: "src1",
  1760. Message: xsql.Message{"a": 53},
  1761. }, {
  1762. Emitter: "src1",
  1763. Message: xsql.Message{"a": 27},
  1764. }, {
  1765. Emitter: "src1",
  1766. Message: xsql.Message{"a": 123123},
  1767. },
  1768. },
  1769. },
  1770. },
  1771. },
  1772. result: []map[string]interface{}{{
  1773. "sum": float64(123203),
  1774. }},
  1775. },
  1776. //9
  1777. {
  1778. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test GROUP BY TumblingWindow(ss, 10)",
  1779. data: xsql.WindowTuplesSet{
  1780. Content: []xsql.WindowTuples{{
  1781. Emitter: "test",
  1782. Tuples: []xsql.Tuple{
  1783. {
  1784. Emitter: "src1",
  1785. Message: xsql.Message{"a": 53},
  1786. }, {
  1787. Emitter: "src1",
  1788. Message: xsql.Message{"a": 27},
  1789. }, {
  1790. Emitter: "src1",
  1791. Message: xsql.Message{"s": 123123},
  1792. },
  1793. },
  1794. },
  1795. },
  1796. },
  1797. result: []map[string]interface{}{{
  1798. "all": float64(3),
  1799. "c": float64(2),
  1800. "a": float64(40),
  1801. "s": float64(80),
  1802. "min": float64(27),
  1803. "max": float64(53),
  1804. }},
  1805. },
  1806. //10
  1807. {
  1808. sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1809. data: xsql.GroupedTuplesSet{
  1810. {
  1811. Content: []xsql.DataValuer{
  1812. &xsql.JoinTuple{
  1813. Tuples: []xsql.Tuple{
  1814. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1815. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1816. },
  1817. },
  1818. &xsql.JoinTuple{
  1819. Tuples: []xsql.Tuple{
  1820. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1821. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1822. },
  1823. },
  1824. },
  1825. },
  1826. {
  1827. Content: []xsql.DataValuer{
  1828. &xsql.JoinTuple{
  1829. Tuples: []xsql.Tuple{
  1830. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1831. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1832. },
  1833. },
  1834. &xsql.JoinTuple{
  1835. Tuples: []xsql.Tuple{
  1836. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1837. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1838. },
  1839. },
  1840. },
  1841. },
  1842. },
  1843. result: []map[string]interface{}{{
  1844. "count": float64(2),
  1845. "meta": "devicea",
  1846. }, {
  1847. "count": float64(2),
  1848. "meta": "devicec",
  1849. }},
  1850. },
  1851. //11
  1852. {
  1853. sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1854. data: xsql.GroupedTuplesSet{
  1855. {
  1856. Content: []xsql.DataValuer{
  1857. &xsql.JoinTuple{
  1858. Tuples: []xsql.Tuple{
  1859. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
  1860. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1861. },
  1862. },
  1863. &xsql.JoinTuple{
  1864. Tuples: []xsql.Tuple{
  1865. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1866. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1867. },
  1868. },
  1869. },
  1870. },
  1871. {
  1872. Content: []xsql.DataValuer{
  1873. &xsql.JoinTuple{
  1874. Tuples: []xsql.Tuple{
  1875. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
  1876. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1877. },
  1878. },
  1879. &xsql.JoinTuple{
  1880. Tuples: []xsql.Tuple{
  1881. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1882. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1883. },
  1884. },
  1885. },
  1886. },
  1887. },
  1888. result: []map[string]interface{}{{
  1889. "c": float64(2),
  1890. "d": "devicea",
  1891. }, {
  1892. "c": float64(2),
  1893. "d": "devicec",
  1894. }},
  1895. },
  1896. //12
  1897. {
  1898. sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1899. data: xsql.GroupedTuplesSet{
  1900. {
  1901. Content: []xsql.DataValuer{
  1902. &xsql.JoinTuple{
  1903. Tuples: []xsql.Tuple{
  1904. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1905. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1906. },
  1907. },
  1908. &xsql.JoinTuple{
  1909. Tuples: []xsql.Tuple{
  1910. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1911. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1912. },
  1913. },
  1914. },
  1915. WindowRange: &xsql.WindowRange{
  1916. WindowStart: 1541152486013,
  1917. WindowEnd: 1541152487013,
  1918. },
  1919. },
  1920. {
  1921. Content: []xsql.DataValuer{
  1922. &xsql.JoinTuple{
  1923. Tuples: []xsql.Tuple{
  1924. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1925. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1926. },
  1927. },
  1928. &xsql.JoinTuple{
  1929. Tuples: []xsql.Tuple{
  1930. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1931. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1932. },
  1933. },
  1934. },
  1935. WindowRange: &xsql.WindowRange{
  1936. WindowStart: 1541152486013,
  1937. WindowEnd: 1541152487013,
  1938. },
  1939. },
  1940. },
  1941. result: []map[string]interface{}{{
  1942. "a": 122.33,
  1943. "c": float64(2),
  1944. "color": "w2",
  1945. "id": float64(1),
  1946. "r": float64(122),
  1947. }, {
  1948. "a": 89.03,
  1949. "c": float64(2),
  1950. "color": "w1",
  1951. "id": float64(2),
  1952. "r": float64(89),
  1953. }},
  1954. },
  1955. //13
  1956. {
  1957. sql: "SELECT collect(a) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1958. data: xsql.GroupedTuplesSet{
  1959. {
  1960. Content: []xsql.DataValuer{
  1961. &xsql.JoinTuple{
  1962. Tuples: []xsql.Tuple{
  1963. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1964. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1965. },
  1966. },
  1967. &xsql.JoinTuple{
  1968. Tuples: []xsql.Tuple{
  1969. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1970. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1971. },
  1972. },
  1973. },
  1974. },
  1975. {
  1976. Content: []xsql.DataValuer{
  1977. &xsql.JoinTuple{
  1978. Tuples: []xsql.Tuple{
  1979. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1980. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1981. },
  1982. },
  1983. &xsql.JoinTuple{
  1984. Tuples: []xsql.Tuple{
  1985. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1986. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1987. },
  1988. },
  1989. },
  1990. },
  1991. },
  1992. result: []map[string]interface{}{{
  1993. "r1": []interface{}{122.33, 177.51},
  1994. }, {"r1": []interface{}{89.03, 14.6}}},
  1995. },
  1996. //14
  1997. {
  1998. sql: "SELECT collect(*)[1] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1999. data: xsql.WindowTuplesSet{
  2000. Content: []xsql.WindowTuples{
  2001. {
  2002. Emitter: "test",
  2003. Tuples: []xsql.Tuple{
  2004. {
  2005. Emitter: "src1",
  2006. Message: xsql.Message{"a": 53, "s": 123203},
  2007. }, {
  2008. Emitter: "src1",
  2009. Message: xsql.Message{"a": 27},
  2010. }, {
  2011. Emitter: "src1",
  2012. Message: xsql.Message{"a": 123123},
  2013. },
  2014. },
  2015. },
  2016. },
  2017. WindowRange: &xsql.WindowRange{
  2018. WindowStart: 1541152486013,
  2019. WindowEnd: 1541152487013,
  2020. },
  2021. },
  2022. result: []map[string]interface{}{{
  2023. "c1": map[string]interface{}{
  2024. "a": float64(27),
  2025. },
  2026. }},
  2027. },
  2028. //15
  2029. {
  2030. sql: "SELECT collect(*)[1]->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2031. data: xsql.WindowTuplesSet{
  2032. Content: []xsql.WindowTuples{{
  2033. Emitter: "test",
  2034. Tuples: []xsql.Tuple{
  2035. {
  2036. Emitter: "src1",
  2037. Message: xsql.Message{"a": 53, "s": 123203},
  2038. }, {
  2039. Emitter: "src1",
  2040. Message: xsql.Message{"a": 27},
  2041. }, {
  2042. Emitter: "src1",
  2043. Message: xsql.Message{"a": 123123},
  2044. },
  2045. },
  2046. },
  2047. },
  2048. },
  2049. result: []map[string]interface{}{{
  2050. "c1": float64(27),
  2051. }},
  2052. },
  2053. //16
  2054. {
  2055. sql: "SELECT collect(*)[1]->sl[0] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2056. data: xsql.WindowTuplesSet{
  2057. Content: []xsql.WindowTuples{{
  2058. Emitter: "test",
  2059. Tuples: []xsql.Tuple{
  2060. {
  2061. Emitter: "src1",
  2062. Message: xsql.Message{"a": 53, "sl": []string{"hello", "world"}},
  2063. }, {
  2064. Emitter: "src1",
  2065. Message: xsql.Message{"a": 27, "sl": []string{"new", "horizon"}},
  2066. }, {
  2067. Emitter: "src1",
  2068. Message: xsql.Message{"a": 123123, "sl": []string{"south", "africa"}},
  2069. },
  2070. },
  2071. },
  2072. },
  2073. },
  2074. result: []map[string]interface{}{{
  2075. "c1": "new",
  2076. }},
  2077. },
  2078. //17
  2079. {
  2080. sql: "SELECT deduplicate(id, true) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  2081. data: xsql.GroupedTuplesSet{
  2082. {
  2083. Content: []xsql.DataValuer{
  2084. &xsql.JoinTuple{
  2085. Tuples: []xsql.Tuple{
  2086. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  2087. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  2088. },
  2089. },
  2090. &xsql.JoinTuple{
  2091. Tuples: []xsql.Tuple{
  2092. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  2093. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  2094. },
  2095. },
  2096. },
  2097. },
  2098. {
  2099. Content: []xsql.DataValuer{
  2100. &xsql.JoinTuple{
  2101. Tuples: []xsql.Tuple{
  2102. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  2103. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  2104. },
  2105. },
  2106. &xsql.JoinTuple{
  2107. Tuples: []xsql.Tuple{
  2108. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  2109. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  2110. },
  2111. },
  2112. },
  2113. },
  2114. },
  2115. result: []map[string]interface{}{
  2116. {
  2117. "r1": []interface{}{
  2118. map[string]interface{}{"a": 122.33, "c": float64(2), "color": "w2", "id": float64(1), "r": float64(122)},
  2119. map[string]interface{}{"a": 177.51, "color": "w2", "id": float64(5)}},
  2120. }, {
  2121. "r1": []interface{}{
  2122. map[string]interface{}{"a": 89.03, "c": float64(2), "color": "w1", "id": float64(2), "r": float64(89)},
  2123. map[string]interface{}{"a": 14.6, "color": "w1", "id": float64(4)}},
  2124. },
  2125. },
  2126. },
  2127. //18
  2128. {
  2129. sql: "SELECT deduplicate(a, false)->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2130. data: xsql.WindowTuplesSet{
  2131. Content: []xsql.WindowTuples{{
  2132. Emitter: "test",
  2133. Tuples: []xsql.Tuple{
  2134. {
  2135. Emitter: "src1",
  2136. Message: xsql.Message{"a": 53, "s": 123203},
  2137. }, {
  2138. Emitter: "src1",
  2139. Message: xsql.Message{"a": 27},
  2140. }, {
  2141. Emitter: "src1",
  2142. Message: xsql.Message{"a": 123123},
  2143. },
  2144. },
  2145. },
  2146. },
  2147. },
  2148. result: []map[string]interface{}{{
  2149. "c1": float64(123123),
  2150. }},
  2151. },
  2152. //19
  2153. {
  2154. sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2155. data: xsql.WindowTuplesSet{
  2156. Content: []xsql.WindowTuples{{
  2157. Emitter: "test",
  2158. Tuples: []xsql.Tuple{
  2159. {
  2160. Emitter: "src1",
  2161. Message: xsql.Message{"a": 53, "s": 123203},
  2162. }, {
  2163. Emitter: "src1",
  2164. Message: xsql.Message{"a": 27},
  2165. }, {
  2166. Emitter: "src1",
  2167. Message: xsql.Message{"a": 53},
  2168. },
  2169. },
  2170. },
  2171. },
  2172. },
  2173. result: []map[string]interface{}{{}},
  2174. },
  2175. //20
  2176. {
  2177. sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2178. data: xsql.WindowTuplesSet{
  2179. Content: []xsql.WindowTuples{{
  2180. Emitter: "test",
  2181. Tuples: []xsql.Tuple{
  2182. {
  2183. Emitter: "src1",
  2184. Message: xsql.Message{"a": 53, "s": 123203},
  2185. }, {
  2186. Emitter: "src1",
  2187. Message: xsql.Message{"a": 27},
  2188. }, {
  2189. Emitter: "src1",
  2190. Message: xsql.Message{"a": 53},
  2191. },
  2192. },
  2193. },
  2194. },
  2195. },
  2196. result: []map[string]interface{}{{}},
  2197. },
  2198. //21 when got column after group by operation, return the first tuple's column
  2199. {
  2200. sql: "SELECT A.module, A.topic , max(A.value), B.topic as var2, max(B.value) as max2, C.topic as var3, max(C.value) as max3 FROM A FULL JOIN B on A.module=B.module FULL JOIN C on A.module=C.module GROUP BY A.module, TUMBLINGWINDOW(ss, 10)",
  2201. data: xsql.GroupedTuplesSet{
  2202. {
  2203. Content: []xsql.DataValuer{
  2204. &xsql.JoinTuple{
  2205. Tuples: []xsql.Tuple{
  2206. {Emitter: "B", Message: xsql.Message{"module": 1, "topic": "moduleB topic", "value": 1}},
  2207. },
  2208. },
  2209. &xsql.JoinTuple{
  2210. Tuples: []xsql.Tuple{
  2211. {Emitter: "C", Message: xsql.Message{"module": 1, "topic": "moduleC topic", "value": 100}},
  2212. },
  2213. },
  2214. },
  2215. },
  2216. },
  2217. result: []map[string]interface{}{{
  2218. "var2": "moduleB topic",
  2219. "max2": float64(1),
  2220. "max3": float64(100),
  2221. }},
  2222. },
  2223. }
  2224. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2225. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_AggFuncs")
  2226. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2227. for i, tt := range tests {
  2228. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2229. if err != nil {
  2230. t.Error(err)
  2231. }
  2232. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: true}
  2233. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  2234. result := pp.Apply(ctx, tt.data, fv, afv)
  2235. var mapRes []map[string]interface{}
  2236. if v, ok := result.([]byte); ok {
  2237. err := json.Unmarshal(v, &mapRes)
  2238. if err != nil {
  2239. t.Errorf("Failed to parse the input into map.\n")
  2240. continue
  2241. }
  2242. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  2243. if !reflect.DeepEqual(tt.result, mapRes) {
  2244. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  2245. }
  2246. } else {
  2247. t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
  2248. }
  2249. }
  2250. }
  2251. func TestProjectPlanError(t *testing.T) {
  2252. var tests = []struct {
  2253. sql string
  2254. data interface{}
  2255. result interface{}
  2256. }{
  2257. //0
  2258. {
  2259. sql: "SELECT a FROM test",
  2260. data: errors.New("an error from upstream"),
  2261. result: errors.New("an error from upstream"),
  2262. },
  2263. //1
  2264. {
  2265. sql: "SELECT a * 5 FROM test",
  2266. data: &xsql.Tuple{
  2267. Emitter: "test",
  2268. Message: xsql.Message{
  2269. "a": "val_a",
  2270. },
  2271. },
  2272. result: errors.New("run Select error: invalid operation string(val_a) * int64(5)"),
  2273. },
  2274. //2
  2275. {
  2276. sql: `SELECT a[0]->b AS ab FROM test`,
  2277. data: &xsql.Tuple{
  2278. Emitter: "test",
  2279. Message: xsql.Message{
  2280. "a": "common string",
  2281. },
  2282. },
  2283. result: errors.New("run Select error: invalid operation string(common string) [] *xsql.BracketEvalResult(&{0 0})"),
  2284. },
  2285. //3
  2286. {
  2287. sql: `SELECT round(a) as r FROM test`,
  2288. data: &xsql.Tuple{
  2289. Emitter: "test",
  2290. Message: xsql.Message{
  2291. "a": "common string",
  2292. },
  2293. },
  2294. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  2295. },
  2296. //4
  2297. {
  2298. sql: `SELECT round(a) as r FROM test`,
  2299. data: &xsql.Tuple{
  2300. Emitter: "test",
  2301. Message: xsql.Message{
  2302. "abc": "common string",
  2303. },
  2304. },
  2305. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  2306. },
  2307. //5
  2308. {
  2309. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  2310. data: xsql.GroupedTuplesSet{
  2311. {
  2312. Content: []xsql.DataValuer{
  2313. &xsql.JoinTuple{
  2314. Tuples: []xsql.Tuple{
  2315. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  2316. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  2317. },
  2318. },
  2319. &xsql.JoinTuple{
  2320. Tuples: []xsql.Tuple{
  2321. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  2322. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  2323. },
  2324. },
  2325. &xsql.JoinTuple{
  2326. Tuples: []xsql.Tuple{
  2327. {Emitter: "test", Message: xsql.Message{"id": 4, "a": "dde"}},
  2328. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  2329. },
  2330. },
  2331. &xsql.JoinTuple{
  2332. Tuples: []xsql.Tuple{
  2333. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  2334. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  2335. },
  2336. },
  2337. },
  2338. },
  2339. {
  2340. Content: []xsql.DataValuer{
  2341. &xsql.JoinTuple{
  2342. Tuples: []xsql.Tuple{
  2343. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  2344. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  2345. },
  2346. },
  2347. &xsql.JoinTuple{
  2348. Tuples: []xsql.Tuple{
  2349. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  2350. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  2351. },
  2352. },
  2353. },
  2354. },
  2355. },
  2356. result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
  2357. },
  2358. //6
  2359. {
  2360. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  2361. data: xsql.WindowTuplesSet{
  2362. Content: []xsql.WindowTuples{{
  2363. Emitter: "test",
  2364. Tuples: []xsql.Tuple{
  2365. {
  2366. Emitter: "src1",
  2367. Message: xsql.Message{"a": 53},
  2368. }, {
  2369. Emitter: "src1",
  2370. Message: xsql.Message{"a": "ddd"},
  2371. }, {
  2372. Emitter: "src1",
  2373. Message: xsql.Message{"a": 123123},
  2374. },
  2375. },
  2376. },
  2377. },
  2378. },
  2379. result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
  2380. },
  2381. //7
  2382. {
  2383. sql: `SELECT a[0]->b AS ab FROM test`,
  2384. data: &xsql.Tuple{
  2385. Emitter: "test",
  2386. Message: xsql.Message{
  2387. "a": []map[string]interface{}(nil),
  2388. },
  2389. },
  2390. result: errors.New("run Select error: out of index: 0 of 0"),
  2391. },
  2392. }
  2393. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2394. contextLogger := conf.Log.WithField("rule", "TestProjectPlanError")
  2395. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2396. for i, tt := range tests {
  2397. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2398. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: ast.IsAggStatement(stmt)}
  2399. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  2400. result := pp.Apply(ctx, tt.data, fv, afv)
  2401. if !reflect.DeepEqual(tt.result, result) {
  2402. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  2403. }
  2404. }
  2405. }