planner_test.go 93 KB


  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "reflect"
  19. "strings"
  20. "testing"
  21. "github.com/gdexlab/go-render/render"
  22. "github.com/lf-edge/ekuiper/internal/pkg/store"
  23. "github.com/lf-edge/ekuiper/internal/testx"
  24. "github.com/lf-edge/ekuiper/internal/topo/node"
  25. "github.com/lf-edge/ekuiper/internal/xsql"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. "github.com/lf-edge/ekuiper/pkg/ast"
  28. )
  29. func init() {
  30. testx.InitEnv()
  31. }
  32. func Test_createLogicalPlan(t *testing.T) {
  33. kv, err := store.GetKV("stream")
  34. if err != nil {
  35. t.Error(err)
  36. return
  37. }
  38. streamSqls := map[string]string{
  39. "src1": `CREATE STREAM src1 (
  40. id1 BIGINT,
  41. temp BIGINT,
  42. name string,
  43. myarray array(string)
  44. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  45. "src2": `CREATE STREAM src2 (
  46. id2 BIGINT,
  47. hum BIGINT
  48. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  49. "tableInPlanner": `CREATE TABLE tableInPlanner (
  50. id BIGINT,
  51. name STRING,
  52. value STRING,
  53. hum BIGINT
  54. ) WITH (TYPE="file");`,
  55. }
  56. types := map[string]ast.StreamType{
  57. "src1": ast.TypeStream,
  58. "src2": ast.TypeStream,
  59. "tableInPlanner": ast.TypeTable,
  60. }
  61. for name, sql := range streamSqls {
  62. s, err := json.Marshal(&xsql.StreamInfo{
  63. StreamType: types[name],
  64. Statement: sql,
  65. })
  66. if err != nil {
  67. t.Error(err)
  68. t.Fail()
  69. }
  70. err = kv.Set(name, string(s))
  71. if err != nil {
  72. t.Error(err)
  73. t.Fail()
  74. }
  75. }
  76. streams := make(map[string]*ast.StreamStmt)
  77. for n := range streamSqls {
  78. streamStmt, err := xsql.GetDataSource(kv, n)
  79. if err != nil {
  80. t.Errorf("fail to get stream %s, please check if stream is created", n)
  81. return
  82. }
  83. streams[n] = streamStmt
  84. }
  85. // boolTrue = true
  86. boolFalse := false
  87. tests := []struct {
  88. sql string
  89. p LogicalPlan
  90. err string
  91. }{
  92. {
  93. sql: "select name from src1 where true limit 1",
  94. p: ProjectPlan{
  95. baseLogicalPlan: baseLogicalPlan{
  96. children: []LogicalPlan{
  97. FilterPlan{
  98. baseLogicalPlan: baseLogicalPlan{
  99. children: []LogicalPlan{
  100. DataSourcePlan{
  101. baseLogicalPlan: baseLogicalPlan{},
  102. name: "src1",
  103. streamFields: map[string]*ast.JsonStreamField{
  104. "name": {
  105. Type: "string",
  106. },
  107. },
  108. streamStmt: streams["src1"],
  109. metaFields: []string{},
  110. }.Init(),
  111. },
  112. },
  113. condition: &ast.BooleanLiteral{
  114. Val: true,
  115. },
  116. }.Init(),
  117. },
  118. },
  119. fields: []ast.Field{
  120. {
  121. Name: "name",
  122. Expr: &ast.FieldRef{
  123. StreamName: "src1",
  124. Name: "name",
  125. },
  126. },
  127. },
  128. limitCount: 1,
  129. enableLimit: true,
  130. }.Init(),
  131. },
  132. {
  133. sql: "select name from src1 limit 1",
  134. p: ProjectPlan{
  135. baseLogicalPlan: baseLogicalPlan{
  136. children: []LogicalPlan{
  137. DataSourcePlan{
  138. baseLogicalPlan: baseLogicalPlan{},
  139. name: "src1",
  140. streamFields: map[string]*ast.JsonStreamField{
  141. "name": {
  142. Type: "string",
  143. },
  144. },
  145. streamStmt: streams["src1"],
  146. metaFields: []string{},
  147. }.Init(),
  148. },
  149. },
  150. fields: []ast.Field{
  151. {
  152. Name: "name",
  153. Expr: &ast.FieldRef{
  154. StreamName: "src1",
  155. Name: "name",
  156. },
  157. },
  158. },
  159. limitCount: 1,
  160. enableLimit: true,
  161. }.Init(),
  162. },
  163. {
  164. sql: "select unnest(myarray) as col from src1 limit 1",
  165. p: ProjectSetPlan{
  166. SrfMapping: map[string]struct{}{
  167. "col": {},
  168. },
  169. limitCount: 1,
  170. enableLimit: true,
  171. baseLogicalPlan: baseLogicalPlan{
  172. children: []LogicalPlan{
  173. ProjectPlan{
  174. baseLogicalPlan: baseLogicalPlan{
  175. children: []LogicalPlan{
  176. DataSourcePlan{
  177. baseLogicalPlan: baseLogicalPlan{},
  178. name: "src1",
  179. streamFields: map[string]*ast.JsonStreamField{
  180. "myarray": {
  181. Type: "array",
  182. Items: &ast.JsonStreamField{
  183. Type: "string",
  184. },
  185. },
  186. },
  187. streamStmt: streams["src1"],
  188. metaFields: []string{},
  189. }.Init(),
  190. },
  191. },
  192. fields: []ast.Field{
  193. {
  194. Name: "unnest",
  195. AName: "col",
  196. Expr: func() *ast.FieldRef {
  197. fr := &ast.FieldRef{
  198. StreamName: ast.AliasStream,
  199. Name: "col",
  200. AliasRef: &ast.AliasRef{
  201. Expression: &ast.Call{
  202. Name: "unnest",
  203. FuncType: ast.FuncTypeSrf,
  204. Args: []ast.Expr{
  205. &ast.FieldRef{
  206. StreamName: "src1",
  207. Name: "myarray",
  208. },
  209. },
  210. },
  211. },
  212. }
  213. fr.SetRefSource([]ast.StreamName{"src1"})
  214. return fr
  215. }(),
  216. },
  217. },
  218. }.Init(),
  219. },
  220. },
  221. }.Init(),
  222. },
  223. { // 0
  224. sql: "SELECT unnest(myarray), name from src1",
  225. p: ProjectSetPlan{
  226. SrfMapping: map[string]struct{}{
  227. "unnest": {},
  228. },
  229. baseLogicalPlan: baseLogicalPlan{
  230. children: []LogicalPlan{
  231. ProjectPlan{
  232. baseLogicalPlan: baseLogicalPlan{
  233. children: []LogicalPlan{
  234. DataSourcePlan{
  235. baseLogicalPlan: baseLogicalPlan{},
  236. name: "src1",
  237. streamFields: map[string]*ast.JsonStreamField{
  238. "myarray": {
  239. Type: "array",
  240. Items: &ast.JsonStreamField{
  241. Type: "string",
  242. },
  243. },
  244. "name": {
  245. Type: "string",
  246. },
  247. },
  248. streamStmt: streams["src1"],
  249. metaFields: []string{},
  250. }.Init(),
  251. },
  252. },
  253. fields: []ast.Field{
  254. {
  255. Expr: &ast.Call{
  256. Name: "unnest",
  257. FuncType: ast.FuncTypeSrf,
  258. Args: []ast.Expr{
  259. &ast.FieldRef{
  260. StreamName: "src1",
  261. Name: "myarray",
  262. },
  263. },
  264. },
  265. Name: "unnest",
  266. },
  267. {
  268. Name: "name",
  269. Expr: &ast.FieldRef{
  270. StreamName: "src1",
  271. Name: "name",
  272. },
  273. },
  274. },
  275. }.Init(),
  276. },
  277. },
  278. }.Init(),
  279. },
  280. { // 0
  281. sql: `SELECT myarray[temp] FROM src1`,
  282. p: ProjectPlan{
  283. baseLogicalPlan: baseLogicalPlan{
  284. children: []LogicalPlan{
  285. DataSourcePlan{
  286. baseLogicalPlan: baseLogicalPlan{},
  287. name: "src1",
  288. streamFields: map[string]*ast.JsonStreamField{
  289. "myarray": {
  290. Type: "array",
  291. Items: &ast.JsonStreamField{
  292. Type: "string",
  293. },
  294. },
  295. "temp": {
  296. Type: "bigint",
  297. },
  298. },
  299. streamStmt: streams["src1"],
  300. metaFields: []string{},
  301. }.Init(),
  302. },
  303. },
  304. fields: []ast.Field{
  305. {
  306. Expr: &ast.BinaryExpr{
  307. OP: ast.SUBSET,
  308. LHS: &ast.FieldRef{
  309. StreamName: "src1",
  310. Name: "myarray",
  311. },
  312. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  313. StreamName: "src1",
  314. Name: "temp",
  315. }},
  316. },
  317. Name: "kuiper_field_0",
  318. AName: "",
  319. },
  320. },
  321. isAggregate: false,
  322. sendMeta: false,
  323. }.Init(),
  324. },
  325. { // 1 optimize where to data source
  326. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  327. p: ProjectPlan{
  328. baseLogicalPlan: baseLogicalPlan{
  329. children: []LogicalPlan{
  330. WindowPlan{
  331. baseLogicalPlan: baseLogicalPlan{
  332. children: []LogicalPlan{
  333. FilterPlan{
  334. baseLogicalPlan: baseLogicalPlan{
  335. children: []LogicalPlan{
  336. DataSourcePlan{
  337. name: "src1",
  338. streamFields: map[string]*ast.JsonStreamField{
  339. "name": {
  340. Type: "string",
  341. },
  342. "temp": {
  343. Type: "bigint",
  344. },
  345. },
  346. streamStmt: streams["src1"],
  347. metaFields: []string{},
  348. }.Init(),
  349. },
  350. },
  351. condition: &ast.BinaryExpr{
  352. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  353. OP: ast.EQ,
  354. RHS: &ast.StringLiteral{Val: "v1"},
  355. },
  356. }.Init(),
  357. },
  358. },
  359. condition: nil,
  360. wtype: ast.TUMBLING_WINDOW,
  361. length: 10,
  362. timeUnit: ast.SS,
  363. interval: 0,
  364. limit: 0,
  365. }.Init(),
  366. },
  367. },
  368. fields: []ast.Field{
  369. {
  370. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  371. Name: "temp",
  372. AName: "",
  373. },
  374. },
  375. isAggregate: false,
  376. sendMeta: false,
  377. }.Init(),
  378. },
  379. { // 2 condition that cannot be optimized
  380. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  381. p: ProjectPlan{
  382. baseLogicalPlan: baseLogicalPlan{
  383. children: []LogicalPlan{
  384. JoinPlan{
  385. baseLogicalPlan: baseLogicalPlan{
  386. children: []LogicalPlan{
  387. WindowPlan{
  388. baseLogicalPlan: baseLogicalPlan{
  389. children: []LogicalPlan{
  390. DataSourcePlan{
  391. name: "src1",
  392. streamFields: map[string]*ast.JsonStreamField{
  393. "id1": {
  394. Type: "bigint",
  395. },
  396. "temp": {
  397. Type: "bigint",
  398. },
  399. },
  400. streamStmt: streams["src1"],
  401. metaFields: []string{},
  402. }.Init(),
  403. DataSourcePlan{
  404. name: "src2",
  405. streamFields: map[string]*ast.JsonStreamField{
  406. "hum": {
  407. Type: "bigint",
  408. },
  409. "id2": {
  410. Type: "bigint",
  411. },
  412. },
  413. streamStmt: streams["src2"],
  414. metaFields: []string{},
  415. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  416. }.Init(),
  417. },
  418. },
  419. condition: nil,
  420. wtype: ast.TUMBLING_WINDOW,
  421. length: 10,
  422. timeUnit: ast.SS,
  423. interval: 0,
  424. limit: 0,
  425. }.Init(),
  426. },
  427. },
  428. from: &ast.Table{Name: "src1"},
  429. joins: ast.Joins{ast.Join{
  430. Name: "src2",
  431. JoinType: ast.INNER_JOIN,
  432. Expr: &ast.BinaryExpr{
  433. OP: ast.AND,
  434. LHS: &ast.BinaryExpr{
  435. LHS: &ast.BinaryExpr{
  436. OP: ast.GT,
  437. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  438. RHS: &ast.IntegerLiteral{Val: 20},
  439. },
  440. OP: ast.OR,
  441. RHS: &ast.BinaryExpr{
  442. OP: ast.GT,
  443. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  444. RHS: &ast.IntegerLiteral{Val: 60},
  445. },
  446. },
  447. RHS: &ast.BinaryExpr{
  448. OP: ast.EQ,
  449. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  450. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  451. },
  452. },
  453. }},
  454. }.Init(),
  455. },
  456. },
  457. fields: []ast.Field{
  458. {
  459. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  460. Name: "id1",
  461. AName: "",
  462. },
  463. },
  464. isAggregate: false,
  465. sendMeta: false,
  466. }.Init(),
  467. },
  468. { // 3 optimize window filter
  469. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  470. p: ProjectPlan{
  471. baseLogicalPlan: baseLogicalPlan{
  472. children: []LogicalPlan{
  473. WindowPlan{
  474. baseLogicalPlan: baseLogicalPlan{
  475. children: []LogicalPlan{
  476. FilterPlan{
  477. baseLogicalPlan: baseLogicalPlan{
  478. children: []LogicalPlan{
  479. DataSourcePlan{
  480. name: "src1",
  481. streamFields: map[string]*ast.JsonStreamField{
  482. "id1": {
  483. Type: "bigint",
  484. },
  485. "name": {
  486. Type: "string",
  487. },
  488. "temp": {
  489. Type: "bigint",
  490. },
  491. },
  492. streamStmt: streams["src1"],
  493. metaFields: []string{},
  494. }.Init(),
  495. },
  496. },
  497. condition: &ast.BinaryExpr{
  498. OP: ast.AND,
  499. LHS: &ast.BinaryExpr{
  500. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  501. OP: ast.EQ,
  502. RHS: &ast.StringLiteral{Val: "v1"},
  503. },
  504. RHS: &ast.BinaryExpr{
  505. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  506. OP: ast.GT,
  507. RHS: &ast.IntegerLiteral{Val: 2},
  508. },
  509. },
  510. }.Init(),
  511. },
  512. },
  513. condition: nil,
  514. wtype: ast.TUMBLING_WINDOW,
  515. length: 10,
  516. timeUnit: ast.SS,
  517. interval: 0,
  518. limit: 0,
  519. }.Init(),
  520. },
  521. },
  522. fields: []ast.Field{
  523. {
  524. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  525. Name: "id1",
  526. AName: "",
  527. },
  528. },
  529. isAggregate: false,
  530. sendMeta: false,
  531. }.Init(),
  532. },
  533. { // 4. do not optimize count window
  534. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  535. p: ProjectPlan{
  536. baseLogicalPlan: baseLogicalPlan{
  537. children: []LogicalPlan{
  538. HavingPlan{
  539. baseLogicalPlan: baseLogicalPlan{
  540. children: []LogicalPlan{
  541. FilterPlan{
  542. baseLogicalPlan: baseLogicalPlan{
  543. children: []LogicalPlan{
  544. WindowPlan{
  545. baseLogicalPlan: baseLogicalPlan{
  546. children: []LogicalPlan{
  547. DataSourcePlan{
  548. name: "src1",
  549. isWildCard: true,
  550. streamFields: map[string]*ast.JsonStreamField{
  551. "id1": {
  552. Type: "bigint",
  553. },
  554. "temp": {
  555. Type: "bigint",
  556. },
  557. "name": {
  558. Type: "string",
  559. },
  560. "myarray": {
  561. Type: "array",
  562. Items: &ast.JsonStreamField{
  563. Type: "string",
  564. },
  565. },
  566. },
  567. streamStmt: streams["src1"],
  568. metaFields: []string{},
  569. }.Init(),
  570. },
  571. },
  572. condition: nil,
  573. wtype: ast.COUNT_WINDOW,
  574. length: 5,
  575. interval: 1,
  576. limit: 0,
  577. }.Init(),
  578. },
  579. },
  580. condition: &ast.BinaryExpr{
  581. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  582. OP: ast.GT,
  583. RHS: &ast.IntegerLiteral{Val: 20},
  584. },
  585. }.Init(),
  586. },
  587. },
  588. condition: &ast.BinaryExpr{
  589. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  590. Token: ast.ASTERISK,
  591. }}, FuncType: ast.FuncTypeAgg},
  592. OP: ast.GT,
  593. RHS: &ast.IntegerLiteral{Val: 2},
  594. },
  595. }.Init(),
  596. },
  597. },
  598. fields: []ast.Field{
  599. {
  600. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  601. Name: "*",
  602. AName: "",
  603. },
  604. },
  605. isAggregate: false,
  606. sendMeta: false,
  607. }.Init(),
  608. },
  609. { // 5. optimize join on
  610. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  611. p: ProjectPlan{
  612. baseLogicalPlan: baseLogicalPlan{
  613. children: []LogicalPlan{
  614. JoinPlan{
  615. baseLogicalPlan: baseLogicalPlan{
  616. children: []LogicalPlan{
  617. WindowPlan{
  618. baseLogicalPlan: baseLogicalPlan{
  619. children: []LogicalPlan{
  620. FilterPlan{
  621. baseLogicalPlan: baseLogicalPlan{
  622. children: []LogicalPlan{
  623. DataSourcePlan{
  624. name: "src1",
  625. streamFields: map[string]*ast.JsonStreamField{
  626. "id1": {
  627. Type: "bigint",
  628. },
  629. "temp": {
  630. Type: "bigint",
  631. },
  632. },
  633. streamStmt: streams["src1"],
  634. metaFields: []string{},
  635. }.Init(),
  636. },
  637. },
  638. condition: &ast.BinaryExpr{
  639. RHS: &ast.BinaryExpr{
  640. OP: ast.GT,
  641. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  642. RHS: &ast.IntegerLiteral{Val: 20},
  643. },
  644. OP: ast.AND,
  645. LHS: &ast.BinaryExpr{
  646. OP: ast.GT,
  647. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  648. RHS: &ast.IntegerLiteral{Val: 111},
  649. },
  650. },
  651. }.Init(),
  652. FilterPlan{
  653. baseLogicalPlan: baseLogicalPlan{
  654. children: []LogicalPlan{
  655. DataSourcePlan{
  656. name: "src2",
  657. streamFields: map[string]*ast.JsonStreamField{
  658. "hum": {
  659. Type: "bigint",
  660. },
  661. "id2": {
  662. Type: "bigint",
  663. },
  664. },
  665. streamStmt: streams["src2"],
  666. metaFields: []string{},
  667. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  668. }.Init(),
  669. },
  670. },
  671. condition: &ast.BinaryExpr{
  672. OP: ast.LT,
  673. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  674. RHS: &ast.IntegerLiteral{Val: 60},
  675. },
  676. }.Init(),
  677. },
  678. },
  679. condition: nil,
  680. wtype: ast.TUMBLING_WINDOW,
  681. length: 10,
  682. timeUnit: ast.SS,
  683. interval: 0,
  684. limit: 0,
  685. }.Init(),
  686. },
  687. },
  688. from: &ast.Table{
  689. Name: "src1",
  690. },
  691. joins: []ast.Join{
  692. {
  693. Name: "src2",
  694. Alias: "",
  695. JoinType: ast.INNER_JOIN,
  696. Expr: &ast.BinaryExpr{
  697. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  698. OP: ast.EQ,
  699. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  700. },
  701. },
  702. },
  703. }.Init(),
  704. },
  705. },
  706. fields: []ast.Field{
  707. {
  708. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  709. Name: "id1",
  710. AName: "",
  711. },
  712. },
  713. isAggregate: false,
  714. sendMeta: false,
  715. }.Init(),
  716. },
  717. { // 6. optimize outter join on
  718. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  719. p: ProjectPlan{
  720. baseLogicalPlan: baseLogicalPlan{
  721. children: []LogicalPlan{
  722. JoinPlan{
  723. baseLogicalPlan: baseLogicalPlan{
  724. children: []LogicalPlan{
  725. WindowPlan{
  726. baseLogicalPlan: baseLogicalPlan{
  727. children: []LogicalPlan{
  728. FilterPlan{
  729. baseLogicalPlan: baseLogicalPlan{
  730. children: []LogicalPlan{
  731. DataSourcePlan{
  732. name: "src1",
  733. streamFields: map[string]*ast.JsonStreamField{
  734. "id1": {
  735. Type: "bigint",
  736. },
  737. "temp": {
  738. Type: "bigint",
  739. },
  740. },
  741. streamStmt: streams["src1"],
  742. metaFields: []string{},
  743. }.Init(),
  744. },
  745. },
  746. condition: &ast.BinaryExpr{
  747. OP: ast.GT,
  748. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  749. RHS: &ast.IntegerLiteral{Val: 111},
  750. },
  751. }.Init(),
  752. DataSourcePlan{
  753. name: "src2",
  754. streamFields: map[string]*ast.JsonStreamField{
  755. "hum": {
  756. Type: "bigint",
  757. },
  758. "id2": {
  759. Type: "bigint",
  760. },
  761. },
  762. streamStmt: streams["src2"],
  763. metaFields: []string{},
  764. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  765. }.Init(),
  766. },
  767. },
  768. condition: nil,
  769. wtype: ast.TUMBLING_WINDOW,
  770. length: 10,
  771. timeUnit: ast.SS,
  772. interval: 0,
  773. limit: 0,
  774. }.Init(),
  775. },
  776. },
  777. from: &ast.Table{
  778. Name: "src1",
  779. },
  780. joins: []ast.Join{
  781. {
  782. Name: "src2",
  783. Alias: "",
  784. JoinType: ast.FULL_JOIN,
  785. Expr: &ast.BinaryExpr{
  786. OP: ast.AND,
  787. LHS: &ast.BinaryExpr{
  788. OP: ast.AND,
  789. LHS: &ast.BinaryExpr{
  790. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  791. OP: ast.EQ,
  792. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  793. },
  794. RHS: &ast.BinaryExpr{
  795. OP: ast.GT,
  796. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  797. RHS: &ast.IntegerLiteral{Val: 20},
  798. },
  799. },
  800. RHS: &ast.BinaryExpr{
  801. OP: ast.LT,
  802. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  803. RHS: &ast.IntegerLiteral{Val: 60},
  804. },
  805. },
  806. },
  807. },
  808. }.Init(),
  809. },
  810. },
  811. fields: []ast.Field{
  812. {
  813. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  814. Name: "id1",
  815. AName: "",
  816. },
  817. },
  818. isAggregate: false,
  819. sendMeta: false,
  820. }.Init(),
  821. },
  822. { // 7 window error for table
  823. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  824. p: nil,
  825. err: "cannot run window for TABLE sources",
  826. },
  827. { // 8 join table without window
  828. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  829. p: ProjectPlan{
  830. baseLogicalPlan: baseLogicalPlan{
  831. children: []LogicalPlan{
  832. JoinPlan{
  833. baseLogicalPlan: baseLogicalPlan{
  834. children: []LogicalPlan{
  835. JoinAlignPlan{
  836. baseLogicalPlan: baseLogicalPlan{
  837. children: []LogicalPlan{
  838. FilterPlan{
  839. baseLogicalPlan: baseLogicalPlan{
  840. children: []LogicalPlan{
  841. DataSourcePlan{
  842. name: "src1",
  843. streamFields: map[string]*ast.JsonStreamField{
  844. "id1": {
  845. Type: "bigint",
  846. },
  847. "temp": {
  848. Type: "bigint",
  849. },
  850. },
  851. streamStmt: streams["src1"],
  852. metaFields: []string{},
  853. }.Init(),
  854. },
  855. },
  856. condition: &ast.BinaryExpr{
  857. RHS: &ast.BinaryExpr{
  858. OP: ast.GT,
  859. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  860. RHS: &ast.IntegerLiteral{Val: 20},
  861. },
  862. OP: ast.AND,
  863. LHS: &ast.BinaryExpr{
  864. OP: ast.GT,
  865. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  866. RHS: &ast.IntegerLiteral{Val: 111},
  867. },
  868. },
  869. }.Init(),
  870. DataSourcePlan{
  871. name: "tableInPlanner",
  872. streamFields: map[string]*ast.JsonStreamField{
  873. "hum": {
  874. Type: "bigint",
  875. },
  876. "id": {
  877. Type: "bigint",
  878. },
  879. },
  880. streamStmt: streams["tableInPlanner"],
  881. metaFields: []string{},
  882. }.Init(),
  883. },
  884. },
  885. Emitters: []string{"tableInPlanner"},
  886. }.Init(),
  887. },
  888. },
  889. from: &ast.Table{
  890. Name: "src1",
  891. },
  892. joins: []ast.Join{
  893. {
  894. Name: "tableInPlanner",
  895. Alias: "",
  896. JoinType: ast.INNER_JOIN,
  897. Expr: &ast.BinaryExpr{
  898. OP: ast.AND,
  899. LHS: &ast.BinaryExpr{
  900. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  901. OP: ast.EQ,
  902. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  903. },
  904. RHS: &ast.BinaryExpr{
  905. OP: ast.LT,
  906. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  907. RHS: &ast.IntegerLiteral{Val: 60},
  908. },
  909. },
  910. },
  911. },
  912. }.Init(),
  913. },
  914. },
  915. fields: []ast.Field{
  916. {
  917. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  918. Name: "id1",
  919. AName: "",
  920. },
  921. },
  922. isAggregate: false,
  923. sendMeta: false,
  924. }.Init(),
  925. },
  926. { // 9 join table with window
  927. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  928. p: ProjectPlan{
  929. baseLogicalPlan: baseLogicalPlan{
  930. children: []LogicalPlan{
  931. JoinPlan{
  932. baseLogicalPlan: baseLogicalPlan{
  933. children: []LogicalPlan{
  934. JoinAlignPlan{
  935. baseLogicalPlan: baseLogicalPlan{
  936. children: []LogicalPlan{
  937. WindowPlan{
  938. baseLogicalPlan: baseLogicalPlan{
  939. children: []LogicalPlan{
  940. DataSourcePlan{
  941. name: "src1",
  942. streamFields: map[string]*ast.JsonStreamField{
  943. "id1": {
  944. Type: "bigint",
  945. },
  946. "temp": {
  947. Type: "bigint",
  948. },
  949. },
  950. streamStmt: streams["src1"],
  951. metaFields: []string{},
  952. }.Init(),
  953. },
  954. },
  955. condition: nil,
  956. wtype: ast.TUMBLING_WINDOW,
  957. length: 10,
  958. timeUnit: ast.SS,
  959. interval: 0,
  960. limit: 0,
  961. }.Init(),
  962. DataSourcePlan{
  963. name: "tableInPlanner",
  964. streamFields: map[string]*ast.JsonStreamField{
  965. "hum": {
  966. Type: "bigint",
  967. },
  968. "id": {
  969. Type: "bigint",
  970. },
  971. },
  972. streamStmt: streams["tableInPlanner"],
  973. metaFields: []string{},
  974. }.Init(),
  975. },
  976. },
  977. Emitters: []string{"tableInPlanner"},
  978. }.Init(),
  979. },
  980. },
  981. from: &ast.Table{
  982. Name: "src1",
  983. },
  984. joins: []ast.Join{
  985. {
  986. Name: "tableInPlanner",
  987. Alias: "",
  988. JoinType: ast.INNER_JOIN,
  989. Expr: &ast.BinaryExpr{
  990. OP: ast.AND,
  991. LHS: &ast.BinaryExpr{
  992. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  993. OP: ast.EQ,
  994. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  995. },
  996. RHS: &ast.BinaryExpr{
  997. RHS: &ast.BinaryExpr{
  998. OP: ast.AND,
  999. LHS: &ast.BinaryExpr{
  1000. OP: ast.GT,
  1001. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1002. RHS: &ast.IntegerLiteral{Val: 20},
  1003. },
  1004. RHS: &ast.BinaryExpr{
  1005. OP: ast.LT,
  1006. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1007. RHS: &ast.IntegerLiteral{Val: 60},
  1008. },
  1009. },
  1010. OP: ast.AND,
  1011. LHS: &ast.BinaryExpr{
  1012. OP: ast.GT,
  1013. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1014. RHS: &ast.IntegerLiteral{Val: 111},
  1015. },
  1016. },
  1017. },
  1018. },
  1019. },
  1020. }.Init(),
  1021. },
  1022. },
  1023. fields: []ast.Field{
  1024. {
  1025. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1026. Name: "id1",
  1027. AName: "",
  1028. },
  1029. },
  1030. isAggregate: false,
  1031. sendMeta: false,
  1032. }.Init(),
  1033. },
  1034. { // 10 meta
  1035. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1036. p: ProjectPlan{
  1037. baseLogicalPlan: baseLogicalPlan{
  1038. children: []LogicalPlan{
  1039. FilterPlan{
  1040. baseLogicalPlan: baseLogicalPlan{
  1041. children: []LogicalPlan{
  1042. DataSourcePlan{
  1043. name: "src1",
  1044. streamFields: map[string]*ast.JsonStreamField{
  1045. "temp": {
  1046. Type: "bigint",
  1047. },
  1048. },
  1049. streamStmt: streams["src1"],
  1050. metaFields: []string{"Humidity", "device", "id"},
  1051. }.Init(),
  1052. },
  1053. },
  1054. condition: &ast.BinaryExpr{
  1055. LHS: &ast.Call{
  1056. Name: "meta",
  1057. FuncId: 2,
  1058. Args: []ast.Expr{&ast.MetaRef{
  1059. Name: "device",
  1060. StreamName: ast.DefaultStream,
  1061. }},
  1062. },
  1063. OP: ast.EQ,
  1064. RHS: &ast.StringLiteral{
  1065. Val: "demo2",
  1066. },
  1067. },
  1068. }.Init(),
  1069. },
  1070. },
  1071. fields: []ast.Field{
  1072. {
  1073. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1074. Name: "temp",
  1075. AName: "",
  1076. }, {
  1077. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1078. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1079. Name: "id",
  1080. StreamName: ast.DefaultStream,
  1081. }}},
  1082. []ast.StreamName{},
  1083. nil,
  1084. )},
  1085. Name: "meta",
  1086. AName: "eid",
  1087. }, {
  1088. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1089. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  1090. &ast.BinaryExpr{
  1091. OP: ast.ARROW,
  1092. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  1093. RHS: &ast.JsonFieldRef{Name: "Device"},
  1094. },
  1095. }},
  1096. []ast.StreamName{},
  1097. nil,
  1098. )},
  1099. Name: "meta",
  1100. AName: "hdevice",
  1101. },
  1102. },
  1103. isAggregate: false,
  1104. sendMeta: false,
  1105. }.Init(),
  1106. },
  1107. { // 11 join with same name field and aliased
  1108. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1109. p: ProjectPlan{
  1110. baseLogicalPlan: baseLogicalPlan{
  1111. children: []LogicalPlan{
  1112. JoinPlan{
  1113. baseLogicalPlan: baseLogicalPlan{
  1114. children: []LogicalPlan{
  1115. JoinAlignPlan{
  1116. baseLogicalPlan: baseLogicalPlan{
  1117. children: []LogicalPlan{
  1118. DataSourcePlan{
  1119. name: "src2",
  1120. streamFields: map[string]*ast.JsonStreamField{
  1121. "hum": {
  1122. Type: "bigint",
  1123. },
  1124. "id2": {
  1125. Type: "bigint",
  1126. },
  1127. },
  1128. streamStmt: streams["src2"],
  1129. metaFields: []string{},
  1130. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  1131. }.Init(),
  1132. DataSourcePlan{
  1133. name: "tableInPlanner",
  1134. streamFields: map[string]*ast.JsonStreamField{
  1135. "hum": {
  1136. Type: "bigint",
  1137. },
  1138. "id": {
  1139. Type: "bigint",
  1140. },
  1141. },
  1142. streamStmt: streams["tableInPlanner"],
  1143. metaFields: []string{},
  1144. }.Init(),
  1145. },
  1146. },
  1147. Emitters: []string{"tableInPlanner"},
  1148. }.Init(),
  1149. },
  1150. },
  1151. from: &ast.Table{
  1152. Name: "src2",
  1153. },
  1154. joins: []ast.Join{
  1155. {
  1156. Name: "tableInPlanner",
  1157. Alias: "",
  1158. JoinType: ast.INNER_JOIN,
  1159. Expr: &ast.BinaryExpr{
  1160. RHS: &ast.BinaryExpr{
  1161. OP: ast.EQ,
  1162. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1163. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1164. },
  1165. OP: ast.AND,
  1166. LHS: &ast.BinaryExpr{
  1167. OP: ast.GT,
  1168. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1169. &ast.FieldRef{
  1170. Name: "hum",
  1171. StreamName: "src2",
  1172. },
  1173. []ast.StreamName{"src2"},
  1174. &boolFalse,
  1175. )},
  1176. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1177. &ast.FieldRef{
  1178. Name: "hum",
  1179. StreamName: "tableInPlanner",
  1180. },
  1181. []ast.StreamName{"tableInPlanner"},
  1182. &boolFalse,
  1183. )},
  1184. },
  1185. },
  1186. },
  1187. },
  1188. }.Init(),
  1189. },
  1190. },
  1191. fields: []ast.Field{
  1192. {
  1193. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1194. &ast.FieldRef{
  1195. Name: "hum",
  1196. StreamName: "src2",
  1197. },
  1198. []ast.StreamName{"src2"},
  1199. &boolFalse,
  1200. )},
  1201. Name: "hum",
  1202. AName: "hum1",
  1203. }, {
  1204. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1205. &ast.FieldRef{
  1206. Name: "hum",
  1207. StreamName: "tableInPlanner",
  1208. },
  1209. []ast.StreamName{"tableInPlanner"},
  1210. &boolFalse,
  1211. )},
  1212. Name: "hum",
  1213. AName: "hum2",
  1214. },
  1215. },
  1216. isAggregate: false,
  1217. sendMeta: false,
  1218. }.Init(),
  1219. },
  1220. { // 12 meta with more fields
  1221. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1222. p: ProjectPlan{
  1223. baseLogicalPlan: baseLogicalPlan{
  1224. children: []LogicalPlan{
  1225. FilterPlan{
  1226. baseLogicalPlan: baseLogicalPlan{
  1227. children: []LogicalPlan{
  1228. DataSourcePlan{
  1229. name: "src1",
  1230. streamFields: map[string]*ast.JsonStreamField{
  1231. "temp": {
  1232. Type: "bigint",
  1233. },
  1234. },
  1235. streamStmt: streams["src1"],
  1236. metaFields: []string{},
  1237. allMeta: true,
  1238. }.Init(),
  1239. },
  1240. },
  1241. condition: &ast.BinaryExpr{
  1242. LHS: &ast.Call{
  1243. Name: "meta",
  1244. FuncId: 1,
  1245. Args: []ast.Expr{&ast.MetaRef{
  1246. Name: "device",
  1247. StreamName: ast.DefaultStream,
  1248. }},
  1249. },
  1250. OP: ast.EQ,
  1251. RHS: &ast.StringLiteral{
  1252. Val: "demo2",
  1253. },
  1254. },
  1255. }.Init(),
  1256. },
  1257. },
  1258. fields: []ast.Field{
  1259. {
  1260. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1261. Name: "temp",
  1262. AName: "",
  1263. }, {
  1264. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1265. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1266. Name: "*",
  1267. StreamName: ast.DefaultStream,
  1268. }}},
  1269. []ast.StreamName{},
  1270. nil,
  1271. )},
  1272. Name: "meta",
  1273. AName: "m",
  1274. },
  1275. },
  1276. isAggregate: false,
  1277. sendMeta: false,
  1278. }.Init(),
  1279. },
  1280. { // 13 analytic function plan
  1281. sql: `SELECT latest(lag(name)), id1 FROM src1 WHERE lag(temp) > temp`,
  1282. p: ProjectPlan{
  1283. baseLogicalPlan: baseLogicalPlan{
  1284. children: []LogicalPlan{
  1285. FilterPlan{
  1286. baseLogicalPlan: baseLogicalPlan{
  1287. children: []LogicalPlan{
  1288. AnalyticFuncsPlan{
  1289. baseLogicalPlan: baseLogicalPlan{
  1290. children: []LogicalPlan{
  1291. DataSourcePlan{
  1292. name: "src1",
  1293. streamFields: map[string]*ast.JsonStreamField{
  1294. "id1": {
  1295. Type: "bigint",
  1296. },
  1297. "name": {
  1298. Type: "string",
  1299. },
  1300. "temp": {
  1301. Type: "bigint",
  1302. },
  1303. },
  1304. streamStmt: streams["src1"],
  1305. metaFields: []string{},
  1306. }.Init(),
  1307. },
  1308. },
  1309. funcs: []*ast.Call{
  1310. {
  1311. Name: "lag",
  1312. FuncId: 2,
  1313. CachedField: "$$a_lag_2",
  1314. Args: []ast.Expr{&ast.FieldRef{
  1315. Name: "temp",
  1316. StreamName: "src1",
  1317. }},
  1318. },
  1319. {
  1320. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1321. },
  1322. {
  1323. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1324. },
  1325. },
  1326. }.Init(),
  1327. },
  1328. },
  1329. condition: &ast.BinaryExpr{
  1330. LHS: &ast.Call{
  1331. Name: "lag",
  1332. FuncId: 2,
  1333. Args: []ast.Expr{&ast.FieldRef{
  1334. Name: "temp",
  1335. StreamName: "src1",
  1336. }},
  1337. CachedField: "$$a_lag_2",
  1338. Cached: true,
  1339. },
  1340. OP: ast.GT,
  1341. RHS: &ast.FieldRef{
  1342. Name: "temp",
  1343. StreamName: "src1",
  1344. },
  1345. },
  1346. }.Init(),
  1347. },
  1348. },
  1349. fields: []ast.Field{
  1350. {
  1351. Expr: &ast.Call{
  1352. Name: "latest",
  1353. FuncId: 1,
  1354. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1355. CachedField: "$$a_latest_1",
  1356. Cached: true,
  1357. },
  1358. Name: "latest",
  1359. }, {
  1360. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1361. Name: "id1",
  1362. },
  1363. },
  1364. isAggregate: false,
  1365. sendMeta: false,
  1366. }.Init(),
  1367. },
  1368. { // 14
  1369. sql: `SELECT name, *, meta(device) FROM src1`,
  1370. p: ProjectPlan{
  1371. baseLogicalPlan: baseLogicalPlan{
  1372. children: []LogicalPlan{
  1373. DataSourcePlan{
  1374. baseLogicalPlan: baseLogicalPlan{},
  1375. name: "src1",
  1376. streamFields: map[string]*ast.JsonStreamField{
  1377. "id1": {
  1378. Type: "bigint",
  1379. },
  1380. "temp": {
  1381. Type: "bigint",
  1382. },
  1383. "name": {
  1384. Type: "string",
  1385. },
  1386. "myarray": {
  1387. Type: "array",
  1388. Items: &ast.JsonStreamField{
  1389. Type: "string",
  1390. },
  1391. },
  1392. },
  1393. streamStmt: streams["src1"],
  1394. metaFields: []string{"device"},
  1395. isWildCard: true,
  1396. }.Init(),
  1397. },
  1398. },
  1399. fields: []ast.Field{
  1400. {
  1401. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1402. Name: "name",
  1403. AName: "",
  1404. },
  1405. {
  1406. Name: "*",
  1407. Expr: &ast.Wildcard{
  1408. Token: ast.ASTERISK,
  1409. },
  1410. },
  1411. {
  1412. Name: "meta",
  1413. Expr: &ast.Call{
  1414. Name: "meta",
  1415. Args: []ast.Expr{
  1416. &ast.MetaRef{
  1417. StreamName: ast.DefaultStream,
  1418. Name: "device",
  1419. },
  1420. },
  1421. },
  1422. },
  1423. },
  1424. isAggregate: false,
  1425. allWildcard: true,
  1426. sendMeta: false,
  1427. }.Init(),
  1428. },
  1429. { // 15 analytic function over partition plan
  1430. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp), id1 FROM src1 WHERE lag(temp) > temp`,
  1431. p: ProjectPlan{
  1432. baseLogicalPlan: baseLogicalPlan{
  1433. children: []LogicalPlan{
  1434. FilterPlan{
  1435. baseLogicalPlan: baseLogicalPlan{
  1436. children: []LogicalPlan{
  1437. AnalyticFuncsPlan{
  1438. baseLogicalPlan: baseLogicalPlan{
  1439. children: []LogicalPlan{
  1440. DataSourcePlan{
  1441. name: "src1",
  1442. streamFields: map[string]*ast.JsonStreamField{
  1443. "id1": {
  1444. Type: "bigint",
  1445. },
  1446. "name": {
  1447. Type: "string",
  1448. },
  1449. "temp": {
  1450. Type: "bigint",
  1451. },
  1452. },
  1453. streamStmt: streams["src1"],
  1454. metaFields: []string{},
  1455. }.Init(),
  1456. },
  1457. },
  1458. funcs: []*ast.Call{
  1459. {
  1460. Name: "lag",
  1461. FuncId: 2,
  1462. CachedField: "$$a_lag_2",
  1463. Args: []ast.Expr{&ast.FieldRef{
  1464. Name: "temp",
  1465. StreamName: "src1",
  1466. }},
  1467. },
  1468. {
  1469. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}},
  1470. },
  1471. {
  1472. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1473. },
  1474. },
  1475. }.Init(),
  1476. },
  1477. },
  1478. condition: &ast.BinaryExpr{
  1479. LHS: &ast.Call{
  1480. Name: "lag",
  1481. FuncId: 2,
  1482. Args: []ast.Expr{&ast.FieldRef{
  1483. Name: "temp",
  1484. StreamName: "src1",
  1485. }},
  1486. CachedField: "$$a_lag_2",
  1487. Cached: true,
  1488. },
  1489. OP: ast.GT,
  1490. RHS: &ast.FieldRef{
  1491. Name: "temp",
  1492. StreamName: "src1",
  1493. },
  1494. },
  1495. }.Init(),
  1496. },
  1497. },
  1498. fields: []ast.Field{
  1499. {
  1500. Expr: &ast.Call{
  1501. Name: "latest",
  1502. FuncId: 1,
  1503. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1504. CachedField: "$$a_latest_1",
  1505. Cached: true,
  1506. Partition: &ast.PartitionExpr{
  1507. Exprs: []ast.Expr{
  1508. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1509. },
  1510. },
  1511. },
  1512. Name: "latest",
  1513. }, {
  1514. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1515. Name: "id1",
  1516. },
  1517. },
  1518. isAggregate: false,
  1519. sendMeta: false,
  1520. }.Init(),
  1521. },
  1522. { // 16 analytic function over partition when plan
  1523. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp WHEN temp > 12), id1 FROM src1 WHERE lag(temp) > temp`,
  1524. p: ProjectPlan{
  1525. baseLogicalPlan: baseLogicalPlan{
  1526. children: []LogicalPlan{
  1527. FilterPlan{
  1528. baseLogicalPlan: baseLogicalPlan{
  1529. children: []LogicalPlan{
  1530. AnalyticFuncsPlan{
  1531. baseLogicalPlan: baseLogicalPlan{
  1532. children: []LogicalPlan{
  1533. DataSourcePlan{
  1534. name: "src1",
  1535. streamFields: map[string]*ast.JsonStreamField{
  1536. "id1": {
  1537. Type: "bigint",
  1538. },
  1539. "name": {
  1540. Type: "string",
  1541. },
  1542. "temp": {
  1543. Type: "bigint",
  1544. },
  1545. },
  1546. streamStmt: streams["src1"],
  1547. metaFields: []string{},
  1548. }.Init(),
  1549. },
  1550. },
  1551. funcs: []*ast.Call{
  1552. {
  1553. Name: "lag",
  1554. FuncId: 2,
  1555. CachedField: "$$a_lag_2",
  1556. Args: []ast.Expr{&ast.FieldRef{
  1557. Name: "temp",
  1558. StreamName: "src1",
  1559. }},
  1560. },
  1561. {
  1562. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}}, WhenExpr: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"}, OP: ast.GT, RHS: &ast.IntegerLiteral{Val: 12}},
  1563. },
  1564. {
  1565. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1566. },
  1567. },
  1568. }.Init(),
  1569. },
  1570. },
  1571. condition: &ast.BinaryExpr{
  1572. LHS: &ast.Call{
  1573. Name: "lag",
  1574. FuncId: 2,
  1575. Args: []ast.Expr{&ast.FieldRef{
  1576. Name: "temp",
  1577. StreamName: "src1",
  1578. }},
  1579. CachedField: "$$a_lag_2",
  1580. Cached: true,
  1581. },
  1582. OP: ast.GT,
  1583. RHS: &ast.FieldRef{
  1584. Name: "temp",
  1585. StreamName: "src1",
  1586. },
  1587. },
  1588. }.Init(),
  1589. },
  1590. },
  1591. fields: []ast.Field{
  1592. {
  1593. Expr: &ast.Call{
  1594. Name: "latest",
  1595. FuncId: 1,
  1596. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1597. CachedField: "$$a_latest_1",
  1598. Cached: true,
  1599. Partition: &ast.PartitionExpr{
  1600. Exprs: []ast.Expr{
  1601. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1602. },
  1603. },
  1604. WhenExpr: &ast.BinaryExpr{
  1605. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1606. OP: ast.GT,
  1607. RHS: &ast.IntegerLiteral{Val: 12},
  1608. },
  1609. },
  1610. Name: "latest",
  1611. }, {
  1612. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1613. Name: "id1",
  1614. },
  1615. },
  1616. isAggregate: false,
  1617. sendMeta: false,
  1618. }.Init(),
  1619. },
  1620. { // 17. do not optimize sliding window
  1621. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY SLIDINGWINDOW(ss, 10) HAVING COUNT(*) > 2`,
  1622. p: ProjectPlan{
  1623. baseLogicalPlan: baseLogicalPlan{
  1624. children: []LogicalPlan{
  1625. HavingPlan{
  1626. baseLogicalPlan: baseLogicalPlan{
  1627. children: []LogicalPlan{
  1628. FilterPlan{
  1629. baseLogicalPlan: baseLogicalPlan{
  1630. children: []LogicalPlan{
  1631. WindowPlan{
  1632. baseLogicalPlan: baseLogicalPlan{
  1633. children: []LogicalPlan{
  1634. DataSourcePlan{
  1635. name: "src1",
  1636. isWildCard: true,
  1637. streamFields: map[string]*ast.JsonStreamField{
  1638. "id1": {
  1639. Type: "bigint",
  1640. },
  1641. "temp": {
  1642. Type: "bigint",
  1643. },
  1644. "name": {
  1645. Type: "string",
  1646. },
  1647. "myarray": {
  1648. Type: "array",
  1649. Items: &ast.JsonStreamField{
  1650. Type: "string",
  1651. },
  1652. },
  1653. },
  1654. streamStmt: streams["src1"],
  1655. metaFields: []string{},
  1656. }.Init(),
  1657. },
  1658. },
  1659. condition: nil,
  1660. wtype: ast.SLIDING_WINDOW,
  1661. length: 10,
  1662. timeUnit: ast.SS,
  1663. interval: 0,
  1664. limit: 0,
  1665. }.Init(),
  1666. },
  1667. },
  1668. condition: &ast.BinaryExpr{
  1669. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1670. OP: ast.GT,
  1671. RHS: &ast.IntegerLiteral{Val: 20},
  1672. },
  1673. }.Init(),
  1674. },
  1675. },
  1676. condition: &ast.BinaryExpr{
  1677. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1678. Token: ast.ASTERISK,
  1679. }}, FuncType: ast.FuncTypeAgg},
  1680. OP: ast.GT,
  1681. RHS: &ast.IntegerLiteral{Val: 2},
  1682. },
  1683. }.Init(),
  1684. },
  1685. },
  1686. fields: []ast.Field{
  1687. {
  1688. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1689. Name: "*",
  1690. AName: "",
  1691. },
  1692. },
  1693. isAggregate: false,
  1694. sendMeta: false,
  1695. }.Init(),
  1696. },
  1697. {
  1698. // 18 analytic function over when plan
  1699. sql: `SELECT CASE WHEN lag(temp) OVER (WHEN lag(id1) > 1) BETWEEN 0 AND 10 THEN 1 ELSE 0 END FROM src1`,
  1700. p: ProjectPlan{
  1701. baseLogicalPlan: baseLogicalPlan{
  1702. children: []LogicalPlan{
  1703. AnalyticFuncsPlan{
  1704. baseLogicalPlan: baseLogicalPlan{
  1705. children: []LogicalPlan{
  1706. DataSourcePlan{
  1707. name: "src1",
  1708. streamFields: map[string]*ast.JsonStreamField{
  1709. "id1": {
  1710. Type: "bigint",
  1711. },
  1712. "temp": {
  1713. Type: "bigint",
  1714. },
  1715. },
  1716. streamStmt: &ast.StreamStmt{
  1717. Name: "src1",
  1718. StreamFields: []ast.StreamField{
  1719. {
  1720. Name: "id1",
  1721. FieldType: &ast.BasicType{
  1722. Type: ast.DataType(1),
  1723. },
  1724. },
  1725. {
  1726. Name: "temp",
  1727. FieldType: &ast.BasicType{
  1728. Type: ast.DataType(1),
  1729. },
  1730. },
  1731. {
  1732. Name: "name",
  1733. FieldType: &ast.BasicType{
  1734. Type: ast.DataType(3),
  1735. },
  1736. },
  1737. {
  1738. Name: "myarray",
  1739. FieldType: &ast.ArrayType{
  1740. Type: ast.DataType(3),
  1741. },
  1742. },
  1743. },
  1744. Options: &ast.Options{
  1745. DATASOURCE: "src1",
  1746. KEY: "ts",
  1747. FORMAT: "json",
  1748. },
  1749. StreamType: ast.StreamType(0),
  1750. },
  1751. metaFields: []string{},
  1752. }.Init(),
  1753. },
  1754. },
  1755. funcs: []*ast.Call{
  1756. {
  1757. Name: "lag",
  1758. FuncId: 0,
  1759. FuncType: ast.FuncType(0),
  1760. Args: []ast.Expr{
  1761. &ast.FieldRef{
  1762. StreamName: "src1",
  1763. Name: "temp",
  1764. },
  1765. },
  1766. CachedField: "$$a_lag_0",
  1767. WhenExpr: &ast.BinaryExpr{
  1768. OP: ast.GT,
  1769. LHS: &ast.Call{
  1770. Name: "lag",
  1771. FuncId: 1,
  1772. FuncType: ast.FuncType(0),
  1773. Args: []ast.Expr{
  1774. &ast.FieldRef{
  1775. StreamName: "src1",
  1776. Name: "id1",
  1777. },
  1778. },
  1779. CachedField: "$$a_lag_1",
  1780. Cached: true,
  1781. },
  1782. RHS: &ast.IntegerLiteral{
  1783. Val: 1,
  1784. },
  1785. },
  1786. },
  1787. {
  1788. Name: "lag",
  1789. FuncId: 1,
  1790. FuncType: ast.FuncType(0),
  1791. Args: []ast.Expr{
  1792. &ast.FieldRef{
  1793. StreamName: "src1",
  1794. Name: "id1",
  1795. },
  1796. },
  1797. CachedField: "$$a_lag_1",
  1798. },
  1799. },
  1800. }.Init(),
  1801. },
  1802. },
  1803. fields: []ast.Field{
  1804. {
  1805. Name: "kuiper_field_0",
  1806. Expr: &ast.CaseExpr{
  1807. WhenClauses: []*ast.WhenClause{
  1808. {
  1809. Expr: &ast.BinaryExpr{
  1810. OP: ast.BETWEEN,
  1811. LHS: &ast.Call{
  1812. Name: "lag",
  1813. FuncId: 0,
  1814. FuncType: ast.FuncType(0),
  1815. Args: []ast.Expr{
  1816. &ast.FieldRef{
  1817. StreamName: "src1",
  1818. Name: "temp",
  1819. },
  1820. },
  1821. CachedField: "$$a_lag_0",
  1822. Cached: true,
  1823. WhenExpr: &ast.BinaryExpr{
  1824. OP: ast.GT,
  1825. LHS: &ast.Call{
  1826. Name: "lag",
  1827. FuncId: 1,
  1828. FuncType: ast.FuncType(0),
  1829. Args: []ast.Expr{
  1830. &ast.FieldRef{
  1831. StreamName: "src1",
  1832. Name: "id1",
  1833. },
  1834. },
  1835. CachedField: "$$a_lag_1",
  1836. Cached: true,
  1837. },
  1838. RHS: &ast.IntegerLiteral{
  1839. Val: 1,
  1840. },
  1841. },
  1842. },
  1843. RHS: &ast.BetweenExpr{
  1844. Lower: &ast.IntegerLiteral{
  1845. Val: 0,
  1846. },
  1847. Higher: &ast.IntegerLiteral{
  1848. Val: 10,
  1849. },
  1850. },
  1851. },
  1852. Result: &ast.IntegerLiteral{
  1853. Val: 1,
  1854. },
  1855. },
  1856. },
  1857. ElseClause: &ast.IntegerLiteral{
  1858. Val: 0,
  1859. },
  1860. },
  1861. },
  1862. },
  1863. }.Init(),
  1864. },
  1865. }
  1866. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1867. for i, tt := range tests {
  1868. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1869. if err != nil {
  1870. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1871. continue
  1872. }
  1873. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1874. IsEventTime: false,
  1875. LateTol: 0,
  1876. Concurrency: 0,
  1877. BufferLength: 0,
  1878. SendMetaToSink: false,
  1879. Qos: 0,
  1880. CheckpointInterval: 0,
  1881. SendError: true,
  1882. }, kv)
  1883. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1884. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1885. } else if !reflect.DeepEqual(tt.p, p) {
  1886. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1887. }
  1888. }
  1889. }
  1890. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1891. kv, err := store.GetKV("stream")
  1892. if err != nil {
  1893. t.Error(err)
  1894. return
  1895. }
  1896. streamSqls := map[string]string{
  1897. "src1": `CREATE STREAM src1 (
  1898. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1899. "src2": `CREATE STREAM src2 (
  1900. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1901. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1902. id BIGINT,
  1903. name STRING,
  1904. value STRING,
  1905. hum BIGINT
  1906. ) WITH (TYPE="file");`,
  1907. }
  1908. types := map[string]ast.StreamType{
  1909. "src1": ast.TypeStream,
  1910. "src2": ast.TypeStream,
  1911. "tableInPlanner": ast.TypeTable,
  1912. }
  1913. for name, sql := range streamSqls {
  1914. s, err := json.Marshal(&xsql.StreamInfo{
  1915. StreamType: types[name],
  1916. Statement: sql,
  1917. })
  1918. if err != nil {
  1919. t.Error(err)
  1920. t.Fail()
  1921. }
  1922. err = kv.Set(name, string(s))
  1923. if err != nil {
  1924. t.Error(err)
  1925. t.Fail()
  1926. }
  1927. }
  1928. streams := make(map[string]*ast.StreamStmt)
  1929. for n := range streamSqls {
  1930. streamStmt, err := xsql.GetDataSource(kv, n)
  1931. if err != nil {
  1932. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1933. return
  1934. }
  1935. streams[n] = streamStmt
  1936. }
  1937. // boolTrue = true
  1938. boolFalse := false
  1939. tests := []struct {
  1940. sql string
  1941. p LogicalPlan
  1942. err string
  1943. }{
  1944. { // 0
  1945. sql: `SELECT name FROM src1`,
  1946. p: ProjectPlan{
  1947. baseLogicalPlan: baseLogicalPlan{
  1948. children: []LogicalPlan{
  1949. DataSourcePlan{
  1950. baseLogicalPlan: baseLogicalPlan{},
  1951. name: "src1",
  1952. streamFields: map[string]*ast.JsonStreamField{
  1953. "name": nil,
  1954. },
  1955. streamStmt: streams["src1"],
  1956. isSchemaless: true,
  1957. metaFields: []string{},
  1958. }.Init(),
  1959. },
  1960. },
  1961. fields: []ast.Field{
  1962. {
  1963. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1964. Name: "name",
  1965. AName: "",
  1966. },
  1967. },
  1968. isAggregate: false,
  1969. sendMeta: false,
  1970. }.Init(),
  1971. }, { // 1 optimize where to data source
  1972. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1973. p: ProjectPlan{
  1974. baseLogicalPlan: baseLogicalPlan{
  1975. children: []LogicalPlan{
  1976. WindowPlan{
  1977. baseLogicalPlan: baseLogicalPlan{
  1978. children: []LogicalPlan{
  1979. FilterPlan{
  1980. baseLogicalPlan: baseLogicalPlan{
  1981. children: []LogicalPlan{
  1982. DataSourcePlan{
  1983. name: "src1",
  1984. streamFields: map[string]*ast.JsonStreamField{
  1985. "name": nil,
  1986. "temp": nil,
  1987. },
  1988. streamStmt: streams["src1"],
  1989. metaFields: []string{},
  1990. isSchemaless: true,
  1991. }.Init(),
  1992. },
  1993. },
  1994. condition: &ast.BinaryExpr{
  1995. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1996. OP: ast.EQ,
  1997. RHS: &ast.StringLiteral{Val: "v1"},
  1998. },
  1999. }.Init(),
  2000. },
  2001. },
  2002. condition: nil,
  2003. wtype: ast.TUMBLING_WINDOW,
  2004. length: 10,
  2005. timeUnit: ast.SS,
  2006. interval: 0,
  2007. limit: 0,
  2008. }.Init(),
  2009. },
  2010. },
  2011. fields: []ast.Field{
  2012. {
  2013. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2014. Name: "temp",
  2015. AName: "",
  2016. },
  2017. },
  2018. isAggregate: false,
  2019. sendMeta: false,
  2020. }.Init(),
  2021. }, { // 2 condition that cannot be optimized
  2022. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2023. p: ProjectPlan{
  2024. baseLogicalPlan: baseLogicalPlan{
  2025. children: []LogicalPlan{
  2026. JoinPlan{
  2027. baseLogicalPlan: baseLogicalPlan{
  2028. children: []LogicalPlan{
  2029. WindowPlan{
  2030. baseLogicalPlan: baseLogicalPlan{
  2031. children: []LogicalPlan{
  2032. DataSourcePlan{
  2033. name: "src1",
  2034. streamFields: map[string]*ast.JsonStreamField{
  2035. "id1": nil,
  2036. "temp": nil,
  2037. },
  2038. streamStmt: streams["src1"],
  2039. metaFields: []string{},
  2040. isSchemaless: true,
  2041. }.Init(),
  2042. DataSourcePlan{
  2043. name: "src2",
  2044. streamFields: map[string]*ast.JsonStreamField{ // can't determine where is id1 belonged to
  2045. "hum": nil,
  2046. "id1": nil,
  2047. "id2": nil,
  2048. },
  2049. isSchemaless: true,
  2050. streamStmt: streams["src2"],
  2051. metaFields: []string{},
  2052. }.Init(),
  2053. },
  2054. },
  2055. condition: nil,
  2056. wtype: ast.TUMBLING_WINDOW,
  2057. length: 10,
  2058. timeUnit: ast.SS,
  2059. interval: 0,
  2060. limit: 0,
  2061. }.Init(),
  2062. },
  2063. },
  2064. from: &ast.Table{Name: "src1"},
  2065. joins: ast.Joins{ast.Join{
  2066. Name: "src2",
  2067. JoinType: ast.INNER_JOIN,
  2068. Expr: &ast.BinaryExpr{
  2069. OP: ast.AND,
  2070. LHS: &ast.BinaryExpr{
  2071. LHS: &ast.BinaryExpr{
  2072. OP: ast.GT,
  2073. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2074. RHS: &ast.IntegerLiteral{Val: 20},
  2075. },
  2076. OP: ast.OR,
  2077. RHS: &ast.BinaryExpr{
  2078. OP: ast.GT,
  2079. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2080. RHS: &ast.IntegerLiteral{Val: 60},
  2081. },
  2082. },
  2083. RHS: &ast.BinaryExpr{
  2084. OP: ast.EQ,
  2085. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2086. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2087. },
  2088. },
  2089. }},
  2090. }.Init(),
  2091. },
  2092. },
  2093. fields: []ast.Field{
  2094. {
  2095. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2096. Name: "id1",
  2097. AName: "",
  2098. },
  2099. },
  2100. isAggregate: false,
  2101. sendMeta: false,
  2102. }.Init(),
  2103. }, { // 3 optimize window filter
  2104. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  2105. p: ProjectPlan{
  2106. baseLogicalPlan: baseLogicalPlan{
  2107. children: []LogicalPlan{
  2108. WindowPlan{
  2109. baseLogicalPlan: baseLogicalPlan{
  2110. children: []LogicalPlan{
  2111. FilterPlan{
  2112. baseLogicalPlan: baseLogicalPlan{
  2113. children: []LogicalPlan{
  2114. DataSourcePlan{
  2115. name: "src1",
  2116. streamFields: map[string]*ast.JsonStreamField{
  2117. "id1": nil,
  2118. "name": nil,
  2119. "temp": nil,
  2120. },
  2121. isSchemaless: true,
  2122. streamStmt: streams["src1"],
  2123. metaFields: []string{},
  2124. }.Init(),
  2125. },
  2126. },
  2127. condition: &ast.BinaryExpr{
  2128. OP: ast.AND,
  2129. LHS: &ast.BinaryExpr{
  2130. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  2131. OP: ast.EQ,
  2132. RHS: &ast.StringLiteral{Val: "v1"},
  2133. },
  2134. RHS: &ast.BinaryExpr{
  2135. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2136. OP: ast.GT,
  2137. RHS: &ast.IntegerLiteral{Val: 2},
  2138. },
  2139. },
  2140. }.Init(),
  2141. },
  2142. },
  2143. condition: nil,
  2144. wtype: ast.TUMBLING_WINDOW,
  2145. length: 10,
  2146. timeUnit: ast.SS,
  2147. interval: 0,
  2148. limit: 0,
  2149. }.Init(),
  2150. },
  2151. },
  2152. fields: []ast.Field{
  2153. {
  2154. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2155. Name: "id1",
  2156. AName: "",
  2157. },
  2158. },
  2159. isAggregate: false,
  2160. sendMeta: false,
  2161. }.Init(),
  2162. }, { // 4. do not optimize count window
  2163. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  2164. p: ProjectPlan{
  2165. baseLogicalPlan: baseLogicalPlan{
  2166. children: []LogicalPlan{
  2167. HavingPlan{
  2168. baseLogicalPlan: baseLogicalPlan{
  2169. children: []LogicalPlan{
  2170. FilterPlan{
  2171. baseLogicalPlan: baseLogicalPlan{
  2172. children: []LogicalPlan{
  2173. WindowPlan{
  2174. baseLogicalPlan: baseLogicalPlan{
  2175. children: []LogicalPlan{
  2176. DataSourcePlan{
  2177. name: "src1",
  2178. isWildCard: true,
  2179. streamFields: map[string]*ast.JsonStreamField{},
  2180. streamStmt: streams["src1"],
  2181. metaFields: []string{},
  2182. isSchemaless: true,
  2183. }.Init(),
  2184. },
  2185. },
  2186. condition: nil,
  2187. wtype: ast.COUNT_WINDOW,
  2188. length: 5,
  2189. interval: 1,
  2190. limit: 0,
  2191. }.Init(),
  2192. },
  2193. },
  2194. condition: &ast.BinaryExpr{
  2195. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2196. OP: ast.GT,
  2197. RHS: &ast.IntegerLiteral{Val: 20},
  2198. },
  2199. }.Init(),
  2200. },
  2201. },
  2202. condition: &ast.BinaryExpr{
  2203. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  2204. Token: ast.ASTERISK,
  2205. }}, FuncType: ast.FuncTypeAgg},
  2206. OP: ast.GT,
  2207. RHS: &ast.IntegerLiteral{Val: 2},
  2208. },
  2209. }.Init(),
  2210. },
  2211. },
  2212. fields: []ast.Field{
  2213. {
  2214. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  2215. Name: "*",
  2216. AName: "",
  2217. },
  2218. },
  2219. isAggregate: false,
  2220. sendMeta: false,
  2221. }.Init(),
  2222. }, { // 5. optimize join on
  2223. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2224. p: ProjectPlan{
  2225. baseLogicalPlan: baseLogicalPlan{
  2226. children: []LogicalPlan{
  2227. JoinPlan{
  2228. baseLogicalPlan: baseLogicalPlan{
  2229. children: []LogicalPlan{
  2230. WindowPlan{
  2231. baseLogicalPlan: baseLogicalPlan{
  2232. children: []LogicalPlan{
  2233. FilterPlan{
  2234. baseLogicalPlan: baseLogicalPlan{
  2235. children: []LogicalPlan{
  2236. DataSourcePlan{
  2237. name: "src1",
  2238. streamFields: map[string]*ast.JsonStreamField{
  2239. "id1": nil,
  2240. "temp": nil,
  2241. },
  2242. isSchemaless: true,
  2243. streamStmt: streams["src1"],
  2244. metaFields: []string{},
  2245. }.Init(),
  2246. },
  2247. },
  2248. condition: &ast.BinaryExpr{
  2249. RHS: &ast.BinaryExpr{
  2250. OP: ast.GT,
  2251. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2252. RHS: &ast.IntegerLiteral{Val: 20},
  2253. },
  2254. OP: ast.AND,
  2255. LHS: &ast.BinaryExpr{
  2256. OP: ast.GT,
  2257. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2258. RHS: &ast.IntegerLiteral{Val: 111},
  2259. },
  2260. },
  2261. }.Init(),
  2262. FilterPlan{
  2263. baseLogicalPlan: baseLogicalPlan{
  2264. children: []LogicalPlan{
  2265. DataSourcePlan{
  2266. name: "src2",
  2267. streamFields: map[string]*ast.JsonStreamField{
  2268. "hum": nil,
  2269. "id1": nil,
  2270. "id2": nil,
  2271. },
  2272. isSchemaless: true,
  2273. streamStmt: streams["src2"],
  2274. metaFields: []string{},
  2275. }.Init(),
  2276. },
  2277. },
  2278. condition: &ast.BinaryExpr{
  2279. OP: ast.LT,
  2280. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2281. RHS: &ast.IntegerLiteral{Val: 60},
  2282. },
  2283. }.Init(),
  2284. },
  2285. },
  2286. condition: nil,
  2287. wtype: ast.TUMBLING_WINDOW,
  2288. length: 10,
  2289. timeUnit: ast.SS,
  2290. interval: 0,
  2291. limit: 0,
  2292. }.Init(),
  2293. },
  2294. },
  2295. from: &ast.Table{
  2296. Name: "src1",
  2297. },
  2298. joins: []ast.Join{
  2299. {
  2300. Name: "src2",
  2301. Alias: "",
  2302. JoinType: ast.INNER_JOIN,
  2303. Expr: &ast.BinaryExpr{
  2304. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2305. OP: ast.EQ,
  2306. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2307. },
  2308. },
  2309. },
  2310. }.Init(),
  2311. },
  2312. },
  2313. fields: []ast.Field{
  2314. {
  2315. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2316. Name: "id1",
  2317. AName: "",
  2318. },
  2319. },
  2320. isAggregate: false,
  2321. sendMeta: false,
  2322. }.Init(),
  2323. }, { // 6. optimize outter join on
  2324. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2325. p: ProjectPlan{
  2326. baseLogicalPlan: baseLogicalPlan{
  2327. children: []LogicalPlan{
  2328. JoinPlan{
  2329. baseLogicalPlan: baseLogicalPlan{
  2330. children: []LogicalPlan{
  2331. WindowPlan{
  2332. baseLogicalPlan: baseLogicalPlan{
  2333. children: []LogicalPlan{
  2334. FilterPlan{
  2335. baseLogicalPlan: baseLogicalPlan{
  2336. children: []LogicalPlan{
  2337. DataSourcePlan{
  2338. name: "src1",
  2339. streamFields: map[string]*ast.JsonStreamField{
  2340. "id1": nil,
  2341. "temp": nil,
  2342. },
  2343. isSchemaless: true,
  2344. streamStmt: streams["src1"],
  2345. metaFields: []string{},
  2346. }.Init(),
  2347. },
  2348. },
  2349. condition: &ast.BinaryExpr{
  2350. OP: ast.GT,
  2351. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2352. RHS: &ast.IntegerLiteral{Val: 111},
  2353. },
  2354. }.Init(),
  2355. DataSourcePlan{
  2356. name: "src2",
  2357. streamFields: map[string]*ast.JsonStreamField{
  2358. "hum": nil,
  2359. "id1": nil,
  2360. "id2": nil,
  2361. },
  2362. isSchemaless: true,
  2363. streamStmt: streams["src2"],
  2364. metaFields: []string{},
  2365. }.Init(),
  2366. },
  2367. },
  2368. condition: nil,
  2369. wtype: ast.TUMBLING_WINDOW,
  2370. length: 10,
  2371. timeUnit: ast.SS,
  2372. interval: 0,
  2373. limit: 0,
  2374. }.Init(),
  2375. },
  2376. },
  2377. from: &ast.Table{
  2378. Name: "src1",
  2379. },
  2380. joins: []ast.Join{
  2381. {
  2382. Name: "src2",
  2383. Alias: "",
  2384. JoinType: ast.FULL_JOIN,
  2385. Expr: &ast.BinaryExpr{
  2386. OP: ast.AND,
  2387. LHS: &ast.BinaryExpr{
  2388. OP: ast.AND,
  2389. LHS: &ast.BinaryExpr{
  2390. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2391. OP: ast.EQ,
  2392. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2393. },
  2394. RHS: &ast.BinaryExpr{
  2395. OP: ast.GT,
  2396. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2397. RHS: &ast.IntegerLiteral{Val: 20},
  2398. },
  2399. },
  2400. RHS: &ast.BinaryExpr{
  2401. OP: ast.LT,
  2402. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2403. RHS: &ast.IntegerLiteral{Val: 60},
  2404. },
  2405. },
  2406. },
  2407. },
  2408. }.Init(),
  2409. },
  2410. },
  2411. fields: []ast.Field{
  2412. {
  2413. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2414. Name: "id1",
  2415. AName: "",
  2416. },
  2417. },
  2418. isAggregate: false,
  2419. sendMeta: false,
  2420. }.Init(),
  2421. }, { // 7 window error for table
  2422. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2423. p: nil,
  2424. err: "cannot run window for TABLE sources",
  2425. }, { // 8 join table without window
  2426. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  2427. p: ProjectPlan{
  2428. baseLogicalPlan: baseLogicalPlan{
  2429. children: []LogicalPlan{
  2430. JoinPlan{
  2431. baseLogicalPlan: baseLogicalPlan{
  2432. children: []LogicalPlan{
  2433. JoinAlignPlan{
  2434. baseLogicalPlan: baseLogicalPlan{
  2435. children: []LogicalPlan{
  2436. FilterPlan{
  2437. baseLogicalPlan: baseLogicalPlan{
  2438. children: []LogicalPlan{
  2439. DataSourcePlan{
  2440. name: "src1",
  2441. streamFields: map[string]*ast.JsonStreamField{
  2442. "hum": nil,
  2443. "id1": nil,
  2444. "temp": nil,
  2445. },
  2446. isSchemaless: true,
  2447. streamStmt: streams["src1"],
  2448. metaFields: []string{},
  2449. }.Init(),
  2450. },
  2451. },
  2452. condition: &ast.BinaryExpr{
  2453. RHS: &ast.BinaryExpr{
  2454. OP: ast.GT,
  2455. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2456. RHS: &ast.IntegerLiteral{Val: 20},
  2457. },
  2458. OP: ast.AND,
  2459. LHS: &ast.BinaryExpr{
  2460. OP: ast.GT,
  2461. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2462. RHS: &ast.IntegerLiteral{Val: 111},
  2463. },
  2464. },
  2465. }.Init(),
  2466. DataSourcePlan{
  2467. name: "tableInPlanner",
  2468. streamFields: map[string]*ast.JsonStreamField{
  2469. "hum": {
  2470. Type: "bigint",
  2471. },
  2472. "id": {
  2473. Type: "bigint",
  2474. },
  2475. },
  2476. streamStmt: streams["tableInPlanner"],
  2477. metaFields: []string{},
  2478. }.Init(),
  2479. },
  2480. },
  2481. Emitters: []string{"tableInPlanner"},
  2482. }.Init(),
  2483. },
  2484. },
  2485. from: &ast.Table{
  2486. Name: "src1",
  2487. },
  2488. joins: []ast.Join{
  2489. {
  2490. Name: "tableInPlanner",
  2491. Alias: "",
  2492. JoinType: ast.INNER_JOIN,
  2493. Expr: &ast.BinaryExpr{
  2494. OP: ast.AND,
  2495. LHS: &ast.BinaryExpr{
  2496. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2497. OP: ast.EQ,
  2498. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  2499. },
  2500. RHS: &ast.BinaryExpr{
  2501. OP: ast.LT,
  2502. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  2503. RHS: &ast.IntegerLiteral{Val: 60},
  2504. },
  2505. },
  2506. },
  2507. },
  2508. }.Init(),
  2509. },
  2510. },
  2511. fields: []ast.Field{
  2512. {
  2513. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2514. Name: "id1",
  2515. AName: "",
  2516. },
  2517. },
  2518. isAggregate: false,
  2519. sendMeta: false,
  2520. }.Init(),
  2521. }, { // 9 join table with window
  2522. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2523. p: ProjectPlan{
  2524. baseLogicalPlan: baseLogicalPlan{
  2525. children: []LogicalPlan{
  2526. JoinPlan{
  2527. baseLogicalPlan: baseLogicalPlan{
  2528. children: []LogicalPlan{
  2529. JoinAlignPlan{
  2530. baseLogicalPlan: baseLogicalPlan{
  2531. children: []LogicalPlan{
  2532. WindowPlan{
  2533. baseLogicalPlan: baseLogicalPlan{
  2534. children: []LogicalPlan{
  2535. DataSourcePlan{
  2536. name: "src1",
  2537. streamFields: map[string]*ast.JsonStreamField{
  2538. "id1": nil,
  2539. "temp": nil,
  2540. },
  2541. isSchemaless: true,
  2542. streamStmt: streams["src1"],
  2543. metaFields: []string{},
  2544. }.Init(),
  2545. },
  2546. },
  2547. condition: nil,
  2548. wtype: ast.TUMBLING_WINDOW,
  2549. length: 10,
  2550. timeUnit: ast.SS,
  2551. interval: 0,
  2552. limit: 0,
  2553. }.Init(),
  2554. DataSourcePlan{
  2555. name: "tableInPlanner",
  2556. streamFields: map[string]*ast.JsonStreamField{
  2557. "hum": {
  2558. Type: "bigint",
  2559. },
  2560. "id": {
  2561. Type: "bigint",
  2562. },
  2563. },
  2564. streamStmt: streams["tableInPlanner"],
  2565. metaFields: []string{},
  2566. }.Init(),
  2567. },
  2568. },
  2569. Emitters: []string{"tableInPlanner"},
  2570. }.Init(),
  2571. },
  2572. },
  2573. from: &ast.Table{
  2574. Name: "src1",
  2575. },
  2576. joins: []ast.Join{
  2577. {
  2578. Name: "tableInPlanner",
  2579. Alias: "",
  2580. JoinType: ast.INNER_JOIN,
  2581. Expr: &ast.BinaryExpr{
  2582. OP: ast.AND,
  2583. LHS: &ast.BinaryExpr{
  2584. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2585. OP: ast.EQ,
  2586. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  2587. },
  2588. RHS: &ast.BinaryExpr{
  2589. RHS: &ast.BinaryExpr{
  2590. OP: ast.AND,
  2591. LHS: &ast.BinaryExpr{
  2592. OP: ast.GT,
  2593. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2594. RHS: &ast.IntegerLiteral{Val: 20},
  2595. },
  2596. RHS: &ast.BinaryExpr{
  2597. OP: ast.LT,
  2598. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  2599. RHS: &ast.IntegerLiteral{Val: 60},
  2600. },
  2601. },
  2602. OP: ast.AND,
  2603. LHS: &ast.BinaryExpr{
  2604. OP: ast.GT,
  2605. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2606. RHS: &ast.IntegerLiteral{Val: 111},
  2607. },
  2608. },
  2609. },
  2610. },
  2611. },
  2612. }.Init(),
  2613. },
  2614. },
  2615. fields: []ast.Field{
  2616. {
  2617. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2618. Name: "id1",
  2619. AName: "",
  2620. },
  2621. },
  2622. isAggregate: false,
  2623. sendMeta: false,
  2624. }.Init(),
  2625. }, { // 10 meta
  2626. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  2627. p: ProjectPlan{
  2628. baseLogicalPlan: baseLogicalPlan{
  2629. children: []LogicalPlan{
  2630. FilterPlan{
  2631. baseLogicalPlan: baseLogicalPlan{
  2632. children: []LogicalPlan{
  2633. DataSourcePlan{
  2634. name: "src1",
  2635. streamFields: map[string]*ast.JsonStreamField{
  2636. "temp": nil,
  2637. },
  2638. isSchemaless: true,
  2639. streamStmt: streams["src1"],
  2640. metaFields: []string{"Humidity", "device", "id"},
  2641. }.Init(),
  2642. },
  2643. },
  2644. condition: &ast.BinaryExpr{
  2645. LHS: &ast.Call{
  2646. Name: "meta",
  2647. FuncId: 2,
  2648. Args: []ast.Expr{&ast.MetaRef{
  2649. Name: "device",
  2650. StreamName: ast.DefaultStream,
  2651. }},
  2652. },
  2653. OP: ast.EQ,
  2654. RHS: &ast.StringLiteral{
  2655. Val: "demo2",
  2656. },
  2657. },
  2658. }.Init(),
  2659. },
  2660. },
  2661. fields: []ast.Field{
  2662. {
  2663. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2664. Name: "temp",
  2665. AName: "",
  2666. }, {
  2667. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2668. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  2669. Name: "id",
  2670. StreamName: ast.DefaultStream,
  2671. }}},
  2672. []ast.StreamName{},
  2673. nil,
  2674. )},
  2675. Name: "meta",
  2676. AName: "eid",
  2677. }, {
  2678. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2679. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  2680. &ast.BinaryExpr{
  2681. OP: ast.ARROW,
  2682. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  2683. RHS: &ast.JsonFieldRef{Name: "Device"},
  2684. },
  2685. }},
  2686. []ast.StreamName{},
  2687. nil,
  2688. )},
  2689. Name: "meta",
  2690. AName: "hdevice",
  2691. },
  2692. },
  2693. isAggregate: false,
  2694. sendMeta: false,
  2695. }.Init(),
  2696. }, { // 11 join with same name field and aliased
  2697. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  2698. p: ProjectPlan{
  2699. baseLogicalPlan: baseLogicalPlan{
  2700. children: []LogicalPlan{
  2701. JoinPlan{
  2702. baseLogicalPlan: baseLogicalPlan{
  2703. children: []LogicalPlan{
  2704. JoinAlignPlan{
  2705. baseLogicalPlan: baseLogicalPlan{
  2706. children: []LogicalPlan{
  2707. DataSourcePlan{
  2708. name: "src2",
  2709. streamFields: map[string]*ast.JsonStreamField{
  2710. "hum": nil,
  2711. "id": nil,
  2712. "id2": nil,
  2713. },
  2714. isSchemaless: true,
  2715. streamStmt: streams["src2"],
  2716. metaFields: []string{},
  2717. }.Init(),
  2718. DataSourcePlan{
  2719. name: "tableInPlanner",
  2720. streamFields: map[string]*ast.JsonStreamField{
  2721. "hum": {
  2722. Type: "bigint",
  2723. },
  2724. "id": {
  2725. Type: "bigint",
  2726. },
  2727. },
  2728. streamStmt: streams["tableInPlanner"],
  2729. metaFields: []string{},
  2730. }.Init(),
  2731. },
  2732. },
  2733. Emitters: []string{"tableInPlanner"},
  2734. }.Init(),
  2735. },
  2736. },
  2737. from: &ast.Table{
  2738. Name: "src2",
  2739. },
  2740. joins: []ast.Join{
  2741. {
  2742. Name: "tableInPlanner",
  2743. Alias: "",
  2744. JoinType: ast.INNER_JOIN,
  2745. Expr: &ast.BinaryExpr{
  2746. RHS: &ast.BinaryExpr{
  2747. OP: ast.EQ,
  2748. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  2749. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  2750. },
  2751. OP: ast.AND,
  2752. LHS: &ast.BinaryExpr{
  2753. OP: ast.GT,
  2754. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2755. &ast.FieldRef{
  2756. Name: "hum",
  2757. StreamName: "src2",
  2758. },
  2759. []ast.StreamName{"src2"},
  2760. &boolFalse,
  2761. )},
  2762. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2763. &ast.FieldRef{
  2764. Name: "hum",
  2765. StreamName: "tableInPlanner",
  2766. },
  2767. []ast.StreamName{"tableInPlanner"},
  2768. &boolFalse,
  2769. )},
  2770. },
  2771. },
  2772. },
  2773. },
  2774. }.Init(),
  2775. },
  2776. },
  2777. fields: []ast.Field{
  2778. {
  2779. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2780. &ast.FieldRef{
  2781. Name: "hum",
  2782. StreamName: "src2",
  2783. },
  2784. []ast.StreamName{"src2"},
  2785. &boolFalse,
  2786. )},
  2787. Name: "hum",
  2788. AName: "hum1",
  2789. }, {
  2790. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2791. &ast.FieldRef{
  2792. Name: "hum",
  2793. StreamName: "tableInPlanner",
  2794. },
  2795. []ast.StreamName{"tableInPlanner"},
  2796. &boolFalse,
  2797. )},
  2798. Name: "hum",
  2799. AName: "hum2",
  2800. },
  2801. },
  2802. isAggregate: false,
  2803. sendMeta: false,
  2804. }.Init(),
  2805. }, { // 12
  2806. sql: `SELECT name->first, name->last FROM src1`,
  2807. p: ProjectPlan{
  2808. baseLogicalPlan: baseLogicalPlan{
  2809. children: []LogicalPlan{
  2810. DataSourcePlan{
  2811. baseLogicalPlan: baseLogicalPlan{},
  2812. name: "src1",
  2813. streamFields: map[string]*ast.JsonStreamField{
  2814. "name": nil,
  2815. },
  2816. isSchemaless: true,
  2817. streamStmt: streams["src1"],
  2818. metaFields: []string{},
  2819. }.Init(),
  2820. },
  2821. },
  2822. fields: []ast.Field{
  2823. {
  2824. Expr: &ast.BinaryExpr{
  2825. OP: ast.ARROW,
  2826. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2827. RHS: &ast.JsonFieldRef{Name: "first"},
  2828. },
  2829. Name: "kuiper_field_0",
  2830. AName: "",
  2831. }, {
  2832. Expr: &ast.BinaryExpr{
  2833. OP: ast.ARROW,
  2834. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2835. RHS: &ast.JsonFieldRef{Name: "last"},
  2836. },
  2837. Name: "kuiper_field_1",
  2838. AName: "",
  2839. },
  2840. },
  2841. isAggregate: false,
  2842. sendMeta: false,
  2843. }.Init(),
  2844. },
  2845. }
  2846. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2847. for i, tt := range tests {
  2848. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2849. if err != nil {
  2850. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2851. continue
  2852. }
  2853. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2854. IsEventTime: false,
  2855. LateTol: 0,
  2856. Concurrency: 0,
  2857. BufferLength: 0,
  2858. SendMetaToSink: false,
  2859. Qos: 0,
  2860. CheckpointInterval: 0,
  2861. SendError: true,
  2862. }, kv)
  2863. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2864. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2865. } else if !reflect.DeepEqual(tt.p, p) {
  2866. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2867. }
  2868. }
  2869. }
  2870. func Test_createLogicalPlan4Lookup(t *testing.T) {
  2871. kv, err := store.GetKV("stream")
  2872. if err != nil {
  2873. t.Error(err)
  2874. return
  2875. }
  2876. streamSqls := map[string]string{
  2877. "src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  2878. "table1": `CREATE TABLE table1 () WITH (DATASOURCE="table1",TYPE="sql", KIND="lookup");`,
  2879. "table2": `CREATE TABLE table2 () WITH (DATASOURCE="table2",TYPE="sql", KIND="lookup");`,
  2880. }
  2881. types := map[string]ast.StreamType{
  2882. "src1": ast.TypeStream,
  2883. "table1": ast.TypeTable,
  2884. "table2": ast.TypeTable,
  2885. }
  2886. for name, sql := range streamSqls {
  2887. s, err := json.Marshal(&xsql.StreamInfo{
  2888. StreamType: types[name],
  2889. Statement: sql,
  2890. })
  2891. if err != nil {
  2892. t.Error(err)
  2893. t.Fail()
  2894. }
  2895. err = kv.Set(name, string(s))
  2896. if err != nil {
  2897. t.Error(err)
  2898. t.Fail()
  2899. }
  2900. }
  2901. streams := make(map[string]*ast.StreamStmt)
  2902. for n := range streamSqls {
  2903. streamStmt, err := xsql.GetDataSource(kv, n)
  2904. if err != nil {
  2905. t.Errorf("fail to get stream %s, please check if stream is created", n)
  2906. return
  2907. }
  2908. streams[n] = streamStmt
  2909. }
  2910. tests := []struct {
  2911. sql string
  2912. p LogicalPlan
  2913. err string
  2914. }{
  2915. { // 0
  2916. sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id`,
  2917. p: ProjectPlan{
  2918. baseLogicalPlan: baseLogicalPlan{
  2919. children: []LogicalPlan{
  2920. LookupPlan{
  2921. baseLogicalPlan: baseLogicalPlan{
  2922. children: []LogicalPlan{
  2923. DataSourcePlan{
  2924. baseLogicalPlan: baseLogicalPlan{},
  2925. name: "src1",
  2926. streamFields: map[string]*ast.JsonStreamField{
  2927. "a": nil,
  2928. },
  2929. isSchemaless: true,
  2930. streamStmt: streams["src1"],
  2931. metaFields: []string{},
  2932. }.Init(),
  2933. },
  2934. },
  2935. joinExpr: ast.Join{
  2936. Name: "table1",
  2937. Alias: "",
  2938. JoinType: ast.INNER_JOIN,
  2939. Expr: &ast.BinaryExpr{
  2940. OP: ast.EQ,
  2941. LHS: &ast.FieldRef{
  2942. StreamName: "src1",
  2943. Name: "id",
  2944. },
  2945. RHS: &ast.FieldRef{
  2946. StreamName: "table1",
  2947. Name: "id",
  2948. },
  2949. },
  2950. },
  2951. keys: []string{"id"},
  2952. fields: []string{"b"},
  2953. valvars: []ast.Expr{
  2954. &ast.FieldRef{
  2955. StreamName: "src1",
  2956. Name: "id",
  2957. },
  2958. },
  2959. options: &ast.Options{
  2960. DATASOURCE: "table1",
  2961. TYPE: "sql",
  2962. KIND: "lookup",
  2963. },
  2964. conditions: nil,
  2965. }.Init(),
  2966. },
  2967. },
  2968. fields: []ast.Field{
  2969. {
  2970. Expr: &ast.FieldRef{
  2971. StreamName: "src1",
  2972. Name: "a",
  2973. },
  2974. Name: "a",
  2975. AName: "",
  2976. },
  2977. {
  2978. Expr: &ast.FieldRef{
  2979. StreamName: "table1",
  2980. Name: "b",
  2981. },
  2982. Name: "b",
  2983. AName: "",
  2984. },
  2985. },
  2986. isAggregate: false,
  2987. sendMeta: false,
  2988. }.Init(),
  2989. },
  2990. { // 1
  2991. sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
  2992. p: ProjectPlan{
  2993. baseLogicalPlan: baseLogicalPlan{
  2994. children: []LogicalPlan{
  2995. FilterPlan{
  2996. baseLogicalPlan: baseLogicalPlan{
  2997. children: []LogicalPlan{
  2998. LookupPlan{
  2999. baseLogicalPlan: baseLogicalPlan{
  3000. children: []LogicalPlan{
  3001. FilterPlan{
  3002. baseLogicalPlan: baseLogicalPlan{
  3003. children: []LogicalPlan{
  3004. DataSourcePlan{
  3005. baseLogicalPlan: baseLogicalPlan{},
  3006. name: "src1",
  3007. streamFields: map[string]*ast.JsonStreamField{
  3008. "a": nil,
  3009. },
  3010. isSchemaless: true,
  3011. streamStmt: streams["src1"],
  3012. metaFields: []string{},
  3013. }.Init(),
  3014. },
  3015. },
  3016. condition: &ast.BinaryExpr{
  3017. OP: ast.LT,
  3018. LHS: &ast.FieldRef{
  3019. StreamName: "src1",
  3020. Name: "c",
  3021. },
  3022. RHS: &ast.IntegerLiteral{Val: 40},
  3023. },
  3024. }.Init(),
  3025. },
  3026. },
  3027. joinExpr: ast.Join{
  3028. Name: "table1",
  3029. Alias: "",
  3030. JoinType: ast.INNER_JOIN,
  3031. Expr: &ast.BinaryExpr{
  3032. OP: ast.AND,
  3033. RHS: &ast.BinaryExpr{
  3034. OP: ast.EQ,
  3035. LHS: &ast.FieldRef{
  3036. StreamName: "src1",
  3037. Name: "id",
  3038. },
  3039. RHS: &ast.FieldRef{
  3040. StreamName: "table1",
  3041. Name: "id",
  3042. },
  3043. },
  3044. LHS: &ast.BinaryExpr{
  3045. OP: ast.AND,
  3046. LHS: &ast.BinaryExpr{
  3047. OP: ast.GT,
  3048. LHS: &ast.FieldRef{
  3049. StreamName: "table1",
  3050. Name: "b",
  3051. },
  3052. RHS: &ast.IntegerLiteral{Val: 20},
  3053. },
  3054. RHS: &ast.BinaryExpr{
  3055. OP: ast.LT,
  3056. LHS: &ast.FieldRef{
  3057. StreamName: "src1",
  3058. Name: "c",
  3059. },
  3060. RHS: &ast.IntegerLiteral{Val: 40},
  3061. },
  3062. },
  3063. },
  3064. },
  3065. keys: []string{"id"},
  3066. valvars: []ast.Expr{
  3067. &ast.FieldRef{
  3068. StreamName: "src1",
  3069. Name: "id",
  3070. },
  3071. },
  3072. options: &ast.Options{
  3073. DATASOURCE: "table1",
  3074. TYPE: "sql",
  3075. KIND: "lookup",
  3076. },
  3077. conditions: &ast.BinaryExpr{
  3078. OP: ast.AND,
  3079. LHS: &ast.BinaryExpr{
  3080. OP: ast.GT,
  3081. LHS: &ast.FieldRef{
  3082. StreamName: "table1",
  3083. Name: "b",
  3084. },
  3085. RHS: &ast.IntegerLiteral{Val: 20},
  3086. },
  3087. RHS: &ast.BinaryExpr{
  3088. OP: ast.LT,
  3089. LHS: &ast.FieldRef{
  3090. StreamName: "src1",
  3091. Name: "c",
  3092. },
  3093. RHS: &ast.IntegerLiteral{Val: 40},
  3094. },
  3095. },
  3096. }.Init(),
  3097. },
  3098. },
  3099. condition: &ast.BinaryExpr{
  3100. OP: ast.GT,
  3101. LHS: &ast.FieldRef{
  3102. StreamName: "table1",
  3103. Name: "b",
  3104. },
  3105. RHS: &ast.IntegerLiteral{Val: 20},
  3106. },
  3107. }.Init(),
  3108. },
  3109. },
  3110. fields: []ast.Field{
  3111. {
  3112. Expr: &ast.FieldRef{
  3113. StreamName: "src1",
  3114. Name: "a",
  3115. },
  3116. Name: "a",
  3117. AName: "",
  3118. },
  3119. {
  3120. Expr: &ast.FieldRef{
  3121. StreamName: "table1",
  3122. Name: "*",
  3123. },
  3124. Name: "*",
  3125. AName: "",
  3126. },
  3127. },
  3128. isAggregate: false,
  3129. sendMeta: false,
  3130. }.Init(),
  3131. },
  3132. { // 2
  3133. sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
  3134. p: ProjectPlan{
  3135. baseLogicalPlan: baseLogicalPlan{
  3136. children: []LogicalPlan{
  3137. LookupPlan{
  3138. baseLogicalPlan: baseLogicalPlan{
  3139. children: []LogicalPlan{
  3140. LookupPlan{
  3141. baseLogicalPlan: baseLogicalPlan{
  3142. children: []LogicalPlan{
  3143. DataSourcePlan{
  3144. baseLogicalPlan: baseLogicalPlan{},
  3145. name: "src1",
  3146. streamFields: map[string]*ast.JsonStreamField{
  3147. "a": nil,
  3148. },
  3149. isSchemaless: true,
  3150. streamStmt: streams["src1"],
  3151. metaFields: []string{},
  3152. }.Init(),
  3153. },
  3154. },
  3155. joinExpr: ast.Join{
  3156. Name: "table1",
  3157. Alias: "",
  3158. JoinType: ast.INNER_JOIN,
  3159. Expr: &ast.BinaryExpr{
  3160. OP: ast.EQ,
  3161. LHS: &ast.FieldRef{
  3162. StreamName: "src1",
  3163. Name: "id",
  3164. },
  3165. RHS: &ast.FieldRef{
  3166. StreamName: "table1",
  3167. Name: "id",
  3168. },
  3169. },
  3170. },
  3171. keys: []string{"id"},
  3172. fields: []string{"b"},
  3173. valvars: []ast.Expr{
  3174. &ast.FieldRef{
  3175. StreamName: "src1",
  3176. Name: "id",
  3177. },
  3178. },
  3179. options: &ast.Options{
  3180. DATASOURCE: "table1",
  3181. TYPE: "sql",
  3182. KIND: "lookup",
  3183. },
  3184. conditions: nil,
  3185. }.Init(),
  3186. },
  3187. },
  3188. joinExpr: ast.Join{
  3189. Name: "table2",
  3190. Alias: "",
  3191. JoinType: ast.INNER_JOIN,
  3192. Expr: &ast.BinaryExpr{
  3193. OP: ast.EQ,
  3194. LHS: &ast.FieldRef{
  3195. StreamName: "table1",
  3196. Name: "id",
  3197. },
  3198. RHS: &ast.FieldRef{
  3199. StreamName: "table2",
  3200. Name: "id",
  3201. },
  3202. },
  3203. },
  3204. keys: []string{"id"},
  3205. fields: []string{"c"},
  3206. valvars: []ast.Expr{
  3207. &ast.FieldRef{
  3208. StreamName: "table1",
  3209. Name: "id",
  3210. },
  3211. },
  3212. options: &ast.Options{
  3213. DATASOURCE: "table2",
  3214. TYPE: "sql",
  3215. KIND: "lookup",
  3216. },
  3217. }.Init(),
  3218. },
  3219. },
  3220. fields: []ast.Field{
  3221. {
  3222. Expr: &ast.FieldRef{
  3223. StreamName: "src1",
  3224. Name: "a",
  3225. },
  3226. Name: "a",
  3227. AName: "",
  3228. },
  3229. {
  3230. Expr: &ast.FieldRef{
  3231. StreamName: "table1",
  3232. Name: "b",
  3233. },
  3234. Name: "b",
  3235. AName: "",
  3236. },
  3237. {
  3238. Expr: &ast.FieldRef{
  3239. StreamName: "table2",
  3240. Name: "c",
  3241. },
  3242. Name: "c",
  3243. AName: "",
  3244. },
  3245. },
  3246. isAggregate: false,
  3247. sendMeta: false,
  3248. }.Init(),
  3249. },
  3250. { // 3
  3251. sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
  3252. p: ProjectPlan{
  3253. baseLogicalPlan: baseLogicalPlan{
  3254. children: []LogicalPlan{
  3255. LookupPlan{
  3256. baseLogicalPlan: baseLogicalPlan{
  3257. children: []LogicalPlan{
  3258. WindowPlan{
  3259. baseLogicalPlan: baseLogicalPlan{
  3260. children: []LogicalPlan{
  3261. DataSourcePlan{
  3262. baseLogicalPlan: baseLogicalPlan{},
  3263. name: "src1",
  3264. streamStmt: streams["src1"],
  3265. streamFields: map[string]*ast.JsonStreamField{},
  3266. metaFields: []string{},
  3267. isWildCard: true,
  3268. isSchemaless: true,
  3269. }.Init(),
  3270. },
  3271. },
  3272. condition: nil,
  3273. wtype: ast.TUMBLING_WINDOW,
  3274. length: 10,
  3275. timeUnit: ast.SS,
  3276. interval: 0,
  3277. limit: 0,
  3278. }.Init(),
  3279. },
  3280. },
  3281. joinExpr: ast.Join{
  3282. Name: "table1",
  3283. Alias: "",
  3284. JoinType: ast.INNER_JOIN,
  3285. Expr: &ast.BinaryExpr{
  3286. OP: ast.EQ,
  3287. LHS: &ast.FieldRef{
  3288. StreamName: "src1",
  3289. Name: "id",
  3290. },
  3291. RHS: &ast.FieldRef{
  3292. StreamName: "table1",
  3293. Name: "id",
  3294. },
  3295. },
  3296. },
  3297. keys: []string{"id"},
  3298. valvars: []ast.Expr{
  3299. &ast.FieldRef{
  3300. StreamName: "src1",
  3301. Name: "id",
  3302. },
  3303. },
  3304. options: &ast.Options{
  3305. DATASOURCE: "table1",
  3306. TYPE: "sql",
  3307. KIND: "lookup",
  3308. },
  3309. conditions: nil,
  3310. }.Init(),
  3311. },
  3312. },
  3313. fields: []ast.Field{
  3314. {
  3315. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  3316. Name: "*",
  3317. AName: "",
  3318. },
  3319. },
  3320. isAggregate: false,
  3321. sendMeta: false,
  3322. }.Init(),
  3323. },
  3324. }
  3325. for i, tt := range tests {
  3326. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  3327. if err != nil {
  3328. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  3329. continue
  3330. }
  3331. p, err := createLogicalPlan(stmt, &api.RuleOption{
  3332. IsEventTime: false,
  3333. LateTol: 0,
  3334. Concurrency: 0,
  3335. BufferLength: 0,
  3336. SendMetaToSink: false,
  3337. Qos: 0,
  3338. CheckpointInterval: 0,
  3339. SendError: true,
  3340. }, kv)
  3341. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  3342. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  3343. } else if !reflect.DeepEqual(tt.p, p) {
  3344. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  3345. }
  3346. }
  3347. }
  3348. func TestTransformSourceNode(t *testing.T) {
  3349. schema := map[string]*ast.JsonStreamField{
  3350. "a": {
  3351. Type: "bigint",
  3352. },
  3353. }
  3354. testCases := []struct {
  3355. name string
  3356. plan *DataSourcePlan
  3357. node *node.SourceNode
  3358. }{
  3359. {
  3360. name: "normal source node",
  3361. plan: &DataSourcePlan{
  3362. name: "test",
  3363. streamStmt: &ast.StreamStmt{
  3364. StreamType: ast.TypeStream,
  3365. Options: &ast.Options{
  3366. TYPE: "file",
  3367. },
  3368. },
  3369. streamFields: nil,
  3370. allMeta: false,
  3371. metaFields: []string{},
  3372. iet: false,
  3373. isBinary: false,
  3374. },
  3375. node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  3376. TYPE: "file",
  3377. }, false, nil),
  3378. },
  3379. {
  3380. name: "schema source node",
  3381. plan: &DataSourcePlan{
  3382. name: "test",
  3383. streamStmt: &ast.StreamStmt{
  3384. StreamType: ast.TypeStream,
  3385. Options: &ast.Options{
  3386. TYPE: "file",
  3387. },
  3388. },
  3389. streamFields: schema,
  3390. allMeta: false,
  3391. metaFields: []string{},
  3392. iet: false,
  3393. isBinary: false,
  3394. },
  3395. node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  3396. TYPE: "file",
  3397. }, false, schema),
  3398. },
  3399. }
  3400. for _, tc := range testCases {
  3401. t.Run(tc.name, func(t *testing.T) {
  3402. sourceNode, err := transformSourceNode(tc.plan, nil, &api.RuleOption{})
  3403. if err != nil {
  3404. t.Errorf("unexpected error: %v", err)
  3405. return
  3406. }
  3407. if !reflect.DeepEqual(sourceNode, tc.node) {
  3408. t.Errorf("unexpected result: got %v, want %v", sourceNode, tc.node)
  3409. }
  3410. })
  3411. }
  3412. }