planner_test.go 57 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/testx"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/kv"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. var (
  29. DbDir = testx.GetDbDir()
  30. )
  31. func Test_createLogicalPlan(t *testing.T) {
  32. err, store := kv.GetKVStore("stream")
  33. if err != nil {
  34. t.Error(err)
  35. return
  36. }
  37. streamSqls := map[string]string{
  38. "src1": `CREATE STREAM src1 (
  39. id1 BIGINT,
  40. temp BIGINT,
  41. name string
  42. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  43. "src2": `CREATE STREAM src2 (
  44. id2 BIGINT,
  45. hum BIGINT
  46. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  47. "tableInPlanner": `CREATE TABLE tableInPlanner (
  48. id BIGINT,
  49. name STRING,
  50. value STRING,
  51. hum BIGINT
  52. ) WITH (TYPE="file");`,
  53. }
  54. types := map[string]ast.StreamType{
  55. "src1": ast.TypeStream,
  56. "src2": ast.TypeStream,
  57. "tableInPlanner": ast.TypeTable,
  58. }
  59. for name, sql := range streamSqls {
  60. s, err := json.Marshal(&xsql.StreamInfo{
  61. StreamType: types[name],
  62. Statement: sql,
  63. })
  64. if err != nil {
  65. t.Error(err)
  66. t.Fail()
  67. }
  68. store.Set(name, string(s))
  69. }
  70. streams := make(map[string]*ast.StreamStmt)
  71. for n := range streamSqls {
  72. streamStmt, err := xsql.GetDataSource(store, n)
  73. if err != nil {
  74. t.Errorf("fail to get stream %s, please check if stream is created", n)
  75. return
  76. }
  77. streams[n] = streamStmt
  78. }
  79. var (
  80. //boolTrue = true
  81. boolFalse = false
  82. )
  83. var tests = []struct {
  84. sql string
  85. p LogicalPlan
  86. err string
  87. }{
  88. { // 0
  89. sql: `SELECT name FROM src1`,
  90. p: ProjectPlan{
  91. baseLogicalPlan: baseLogicalPlan{
  92. children: []LogicalPlan{
  93. DataSourcePlan{
  94. baseLogicalPlan: baseLogicalPlan{},
  95. name: "src1",
  96. streamFields: []interface{}{
  97. &ast.StreamField{
  98. Name: "name",
  99. FieldType: &ast.BasicType{Type: ast.STRINGS},
  100. },
  101. },
  102. streamStmt: streams["src1"],
  103. metaFields: []string{},
  104. }.Init(),
  105. },
  106. },
  107. fields: []ast.Field{
  108. {
  109. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  110. Name: "name",
  111. AName: "",
  112. },
  113. },
  114. isAggregate: false,
  115. sendMeta: false,
  116. }.Init(),
  117. }, { // 1 optimize where to data source
  118. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  119. p: ProjectPlan{
  120. baseLogicalPlan: baseLogicalPlan{
  121. children: []LogicalPlan{
  122. WindowPlan{
  123. baseLogicalPlan: baseLogicalPlan{
  124. children: []LogicalPlan{
  125. FilterPlan{
  126. baseLogicalPlan: baseLogicalPlan{
  127. children: []LogicalPlan{
  128. DataSourcePlan{
  129. name: "src1",
  130. streamFields: []interface{}{
  131. &ast.StreamField{
  132. Name: "name",
  133. FieldType: &ast.BasicType{Type: ast.STRINGS},
  134. },
  135. &ast.StreamField{
  136. Name: "temp",
  137. FieldType: &ast.BasicType{Type: ast.BIGINT},
  138. },
  139. },
  140. streamStmt: streams["src1"],
  141. metaFields: []string{},
  142. }.Init(),
  143. },
  144. },
  145. condition: &ast.BinaryExpr{
  146. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  147. OP: ast.EQ,
  148. RHS: &ast.StringLiteral{Val: "v1"},
  149. },
  150. }.Init(),
  151. },
  152. },
  153. condition: nil,
  154. wtype: ast.TUMBLING_WINDOW,
  155. length: 10000,
  156. interval: 0,
  157. limit: 0,
  158. }.Init(),
  159. },
  160. },
  161. fields: []ast.Field{
  162. {
  163. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  164. Name: "temp",
  165. AName: ""},
  166. },
  167. isAggregate: false,
  168. sendMeta: false,
  169. }.Init(),
  170. }, { // 2 condition that cannot be optimized
  171. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  172. p: ProjectPlan{
  173. baseLogicalPlan: baseLogicalPlan{
  174. children: []LogicalPlan{
  175. JoinPlan{
  176. baseLogicalPlan: baseLogicalPlan{
  177. children: []LogicalPlan{
  178. WindowPlan{
  179. baseLogicalPlan: baseLogicalPlan{
  180. children: []LogicalPlan{
  181. DataSourcePlan{
  182. name: "src1",
  183. streamFields: []interface{}{
  184. &ast.StreamField{
  185. Name: "id1",
  186. FieldType: &ast.BasicType{Type: ast.BIGINT},
  187. },
  188. &ast.StreamField{
  189. Name: "temp",
  190. FieldType: &ast.BasicType{Type: ast.BIGINT},
  191. },
  192. },
  193. streamStmt: streams["src1"],
  194. metaFields: []string{},
  195. }.Init(),
  196. DataSourcePlan{
  197. name: "src2",
  198. streamFields: []interface{}{
  199. &ast.StreamField{
  200. Name: "hum",
  201. FieldType: &ast.BasicType{Type: ast.BIGINT},
  202. },
  203. &ast.StreamField{
  204. Name: "id2",
  205. FieldType: &ast.BasicType{Type: ast.BIGINT},
  206. },
  207. },
  208. streamStmt: streams["src2"],
  209. metaFields: []string{},
  210. }.Init(),
  211. },
  212. },
  213. condition: nil,
  214. wtype: ast.TUMBLING_WINDOW,
  215. length: 10000,
  216. interval: 0,
  217. limit: 0,
  218. }.Init(),
  219. },
  220. },
  221. from: &ast.Table{Name: "src1"},
  222. joins: ast.Joins{ast.Join{
  223. Name: "src2",
  224. JoinType: ast.INNER_JOIN,
  225. Expr: &ast.BinaryExpr{
  226. OP: ast.AND,
  227. LHS: &ast.BinaryExpr{
  228. LHS: &ast.BinaryExpr{
  229. OP: ast.GT,
  230. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  231. RHS: &ast.IntegerLiteral{Val: 20},
  232. },
  233. OP: ast.OR,
  234. RHS: &ast.BinaryExpr{
  235. OP: ast.GT,
  236. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  237. RHS: &ast.IntegerLiteral{Val: 60},
  238. },
  239. },
  240. RHS: &ast.BinaryExpr{
  241. OP: ast.EQ,
  242. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  243. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  244. },
  245. },
  246. }},
  247. }.Init(),
  248. },
  249. },
  250. fields: []ast.Field{
  251. {
  252. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  253. Name: "id1",
  254. AName: ""},
  255. },
  256. isAggregate: false,
  257. sendMeta: false,
  258. }.Init(),
  259. }, { // 3 optimize window filter
  260. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  261. p: ProjectPlan{
  262. baseLogicalPlan: baseLogicalPlan{
  263. children: []LogicalPlan{
  264. WindowPlan{
  265. baseLogicalPlan: baseLogicalPlan{
  266. children: []LogicalPlan{
  267. FilterPlan{
  268. baseLogicalPlan: baseLogicalPlan{
  269. children: []LogicalPlan{
  270. DataSourcePlan{
  271. name: "src1",
  272. streamFields: []interface{}{
  273. &ast.StreamField{
  274. Name: "id1",
  275. FieldType: &ast.BasicType{Type: ast.BIGINT},
  276. },
  277. &ast.StreamField{
  278. Name: "name",
  279. FieldType: &ast.BasicType{Type: ast.STRINGS},
  280. },
  281. &ast.StreamField{
  282. Name: "temp",
  283. FieldType: &ast.BasicType{Type: ast.BIGINT},
  284. },
  285. },
  286. streamStmt: streams["src1"],
  287. metaFields: []string{},
  288. }.Init(),
  289. },
  290. },
  291. condition: &ast.BinaryExpr{
  292. OP: ast.AND,
  293. LHS: &ast.BinaryExpr{
  294. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  295. OP: ast.EQ,
  296. RHS: &ast.StringLiteral{Val: "v1"},
  297. },
  298. RHS: &ast.BinaryExpr{
  299. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  300. OP: ast.GT,
  301. RHS: &ast.IntegerLiteral{Val: 2},
  302. },
  303. },
  304. }.Init(),
  305. },
  306. },
  307. condition: nil,
  308. wtype: ast.TUMBLING_WINDOW,
  309. length: 10000,
  310. interval: 0,
  311. limit: 0,
  312. }.Init(),
  313. },
  314. },
  315. fields: []ast.Field{
  316. {
  317. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  318. Name: "id1",
  319. AName: ""},
  320. },
  321. isAggregate: false,
  322. sendMeta: false,
  323. }.Init(),
  324. }, { // 4. do not optimize count window
  325. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  326. p: ProjectPlan{
  327. baseLogicalPlan: baseLogicalPlan{
  328. children: []LogicalPlan{
  329. HavingPlan{
  330. baseLogicalPlan: baseLogicalPlan{
  331. children: []LogicalPlan{
  332. FilterPlan{
  333. baseLogicalPlan: baseLogicalPlan{
  334. children: []LogicalPlan{
  335. WindowPlan{
  336. baseLogicalPlan: baseLogicalPlan{
  337. children: []LogicalPlan{
  338. DataSourcePlan{
  339. name: "src1",
  340. isWildCard: true,
  341. streamFields: []interface{}{
  342. &ast.StreamField{
  343. Name: "id1",
  344. FieldType: &ast.BasicType{Type: ast.BIGINT},
  345. },
  346. &ast.StreamField{
  347. Name: "temp",
  348. FieldType: &ast.BasicType{Type: ast.BIGINT},
  349. },
  350. &ast.StreamField{
  351. Name: "name",
  352. FieldType: &ast.BasicType{Type: ast.STRINGS},
  353. },
  354. },
  355. streamStmt: streams["src1"],
  356. metaFields: []string{},
  357. }.Init(),
  358. },
  359. },
  360. condition: nil,
  361. wtype: ast.COUNT_WINDOW,
  362. length: 5,
  363. interval: 1,
  364. limit: 0,
  365. }.Init(),
  366. },
  367. },
  368. condition: &ast.BinaryExpr{
  369. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  370. OP: ast.GT,
  371. RHS: &ast.IntegerLiteral{Val: 20},
  372. },
  373. }.Init(),
  374. },
  375. },
  376. condition: &ast.BinaryExpr{
  377. LHS: &ast.Call{Name: "COUNT", Args: []ast.Expr{&ast.Wildcard{
  378. Token: ast.ASTERISK,
  379. }}},
  380. OP: ast.GT,
  381. RHS: &ast.IntegerLiteral{Val: 2},
  382. },
  383. }.Init(),
  384. },
  385. },
  386. fields: []ast.Field{
  387. {
  388. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  389. Name: "kuiper_field_0",
  390. AName: ""},
  391. },
  392. isAggregate: false,
  393. sendMeta: false,
  394. }.Init(),
  395. }, { // 5. optimize join on
  396. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  397. p: ProjectPlan{
  398. baseLogicalPlan: baseLogicalPlan{
  399. children: []LogicalPlan{
  400. JoinPlan{
  401. baseLogicalPlan: baseLogicalPlan{
  402. children: []LogicalPlan{
  403. WindowPlan{
  404. baseLogicalPlan: baseLogicalPlan{
  405. children: []LogicalPlan{
  406. FilterPlan{
  407. baseLogicalPlan: baseLogicalPlan{
  408. children: []LogicalPlan{
  409. DataSourcePlan{
  410. name: "src1",
  411. streamFields: []interface{}{
  412. &ast.StreamField{
  413. Name: "id1",
  414. FieldType: &ast.BasicType{Type: ast.BIGINT},
  415. },
  416. &ast.StreamField{
  417. Name: "temp",
  418. FieldType: &ast.BasicType{Type: ast.BIGINT},
  419. },
  420. },
  421. streamStmt: streams["src1"],
  422. metaFields: []string{},
  423. }.Init(),
  424. },
  425. },
  426. condition: &ast.BinaryExpr{
  427. RHS: &ast.BinaryExpr{
  428. OP: ast.GT,
  429. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  430. RHS: &ast.IntegerLiteral{Val: 20},
  431. },
  432. OP: ast.AND,
  433. LHS: &ast.BinaryExpr{
  434. OP: ast.GT,
  435. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  436. RHS: &ast.IntegerLiteral{Val: 111},
  437. },
  438. },
  439. }.Init(),
  440. FilterPlan{
  441. baseLogicalPlan: baseLogicalPlan{
  442. children: []LogicalPlan{
  443. DataSourcePlan{
  444. name: "src2",
  445. streamFields: []interface{}{
  446. &ast.StreamField{
  447. Name: "hum",
  448. FieldType: &ast.BasicType{Type: ast.BIGINT},
  449. },
  450. &ast.StreamField{
  451. Name: "id2",
  452. FieldType: &ast.BasicType{Type: ast.BIGINT},
  453. },
  454. },
  455. streamStmt: streams["src2"],
  456. metaFields: []string{},
  457. }.Init(),
  458. },
  459. },
  460. condition: &ast.BinaryExpr{
  461. OP: ast.LT,
  462. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  463. RHS: &ast.IntegerLiteral{Val: 60},
  464. },
  465. }.Init(),
  466. },
  467. },
  468. condition: nil,
  469. wtype: ast.TUMBLING_WINDOW,
  470. length: 10000,
  471. interval: 0,
  472. limit: 0,
  473. }.Init(),
  474. },
  475. },
  476. from: &ast.Table{
  477. Name: "src1",
  478. },
  479. joins: []ast.Join{
  480. {
  481. Name: "src2",
  482. Alias: "",
  483. JoinType: ast.INNER_JOIN,
  484. Expr: &ast.BinaryExpr{
  485. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  486. OP: ast.EQ,
  487. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  488. },
  489. },
  490. },
  491. }.Init(),
  492. },
  493. },
  494. fields: []ast.Field{
  495. {
  496. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  497. Name: "id1",
  498. AName: ""},
  499. },
  500. isAggregate: false,
  501. sendMeta: false,
  502. }.Init(),
  503. }, { // 6. optimize outter join on
  504. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  505. p: ProjectPlan{
  506. baseLogicalPlan: baseLogicalPlan{
  507. children: []LogicalPlan{
  508. JoinPlan{
  509. baseLogicalPlan: baseLogicalPlan{
  510. children: []LogicalPlan{
  511. WindowPlan{
  512. baseLogicalPlan: baseLogicalPlan{
  513. children: []LogicalPlan{
  514. FilterPlan{
  515. baseLogicalPlan: baseLogicalPlan{
  516. children: []LogicalPlan{
  517. DataSourcePlan{
  518. name: "src1",
  519. streamFields: []interface{}{
  520. &ast.StreamField{
  521. Name: "id1",
  522. FieldType: &ast.BasicType{Type: ast.BIGINT},
  523. },
  524. &ast.StreamField{
  525. Name: "temp",
  526. FieldType: &ast.BasicType{Type: ast.BIGINT},
  527. },
  528. },
  529. streamStmt: streams["src1"],
  530. metaFields: []string{},
  531. }.Init(),
  532. },
  533. },
  534. condition: &ast.BinaryExpr{
  535. OP: ast.GT,
  536. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  537. RHS: &ast.IntegerLiteral{Val: 111},
  538. },
  539. }.Init(),
  540. DataSourcePlan{
  541. name: "src2",
  542. streamFields: []interface{}{
  543. &ast.StreamField{
  544. Name: "hum",
  545. FieldType: &ast.BasicType{Type: ast.BIGINT},
  546. },
  547. &ast.StreamField{
  548. Name: "id2",
  549. FieldType: &ast.BasicType{Type: ast.BIGINT},
  550. },
  551. },
  552. streamStmt: streams["src2"],
  553. metaFields: []string{},
  554. }.Init(),
  555. },
  556. },
  557. condition: nil,
  558. wtype: ast.TUMBLING_WINDOW,
  559. length: 10000,
  560. interval: 0,
  561. limit: 0,
  562. }.Init(),
  563. },
  564. },
  565. from: &ast.Table{
  566. Name: "src1",
  567. },
  568. joins: []ast.Join{
  569. {
  570. Name: "src2",
  571. Alias: "",
  572. JoinType: ast.FULL_JOIN,
  573. Expr: &ast.BinaryExpr{
  574. OP: ast.AND,
  575. LHS: &ast.BinaryExpr{
  576. OP: ast.AND,
  577. LHS: &ast.BinaryExpr{
  578. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  579. OP: ast.EQ,
  580. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  581. },
  582. RHS: &ast.BinaryExpr{
  583. OP: ast.GT,
  584. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  585. RHS: &ast.IntegerLiteral{Val: 20},
  586. },
  587. },
  588. RHS: &ast.BinaryExpr{
  589. OP: ast.LT,
  590. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  591. RHS: &ast.IntegerLiteral{Val: 60},
  592. },
  593. },
  594. },
  595. },
  596. }.Init(),
  597. },
  598. },
  599. fields: []ast.Field{
  600. {
  601. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  602. Name: "id1",
  603. AName: ""},
  604. },
  605. isAggregate: false,
  606. sendMeta: false,
  607. }.Init(),
  608. }, { // 7 window error for table
  609. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  610. p: nil,
  611. err: "cannot run window for TABLE sources",
  612. }, { // 8 join table without window
  613. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  614. p: ProjectPlan{
  615. baseLogicalPlan: baseLogicalPlan{
  616. children: []LogicalPlan{
  617. JoinPlan{
  618. baseLogicalPlan: baseLogicalPlan{
  619. children: []LogicalPlan{
  620. JoinAlignPlan{
  621. baseLogicalPlan: baseLogicalPlan{
  622. children: []LogicalPlan{
  623. FilterPlan{
  624. baseLogicalPlan: baseLogicalPlan{
  625. children: []LogicalPlan{
  626. DataSourcePlan{
  627. name: "src1",
  628. streamFields: []interface{}{
  629. &ast.StreamField{
  630. Name: "id1",
  631. FieldType: &ast.BasicType{Type: ast.BIGINT},
  632. },
  633. &ast.StreamField{
  634. Name: "temp",
  635. FieldType: &ast.BasicType{Type: ast.BIGINT},
  636. },
  637. },
  638. streamStmt: streams["src1"],
  639. metaFields: []string{},
  640. }.Init(),
  641. },
  642. },
  643. condition: &ast.BinaryExpr{
  644. RHS: &ast.BinaryExpr{
  645. OP: ast.GT,
  646. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  647. RHS: &ast.IntegerLiteral{Val: 20},
  648. },
  649. OP: ast.AND,
  650. LHS: &ast.BinaryExpr{
  651. OP: ast.GT,
  652. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  653. RHS: &ast.IntegerLiteral{Val: 111},
  654. },
  655. },
  656. }.Init(),
  657. FilterPlan{
  658. baseLogicalPlan: baseLogicalPlan{
  659. children: []LogicalPlan{
  660. DataSourcePlan{
  661. name: "tableInPlanner",
  662. streamFields: []interface{}{
  663. &ast.StreamField{
  664. Name: "hum",
  665. FieldType: &ast.BasicType{Type: ast.BIGINT},
  666. },
  667. &ast.StreamField{
  668. Name: "id",
  669. FieldType: &ast.BasicType{Type: ast.BIGINT},
  670. },
  671. },
  672. streamStmt: streams["tableInPlanner"],
  673. metaFields: []string{},
  674. }.Init(),
  675. },
  676. },
  677. condition: &ast.BinaryExpr{
  678. OP: ast.LT,
  679. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  680. RHS: &ast.IntegerLiteral{Val: 60},
  681. },
  682. }.Init(),
  683. },
  684. },
  685. Emitters: []string{"tableInPlanner"},
  686. }.Init(),
  687. },
  688. },
  689. from: &ast.Table{
  690. Name: "src1",
  691. },
  692. joins: []ast.Join{
  693. {
  694. Name: "tableInPlanner",
  695. Alias: "",
  696. JoinType: ast.INNER_JOIN,
  697. Expr: &ast.BinaryExpr{
  698. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  699. OP: ast.EQ,
  700. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  701. },
  702. },
  703. },
  704. }.Init(),
  705. },
  706. },
  707. fields: []ast.Field{
  708. {
  709. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  710. Name: "id1",
  711. AName: ""},
  712. },
  713. isAggregate: false,
  714. sendMeta: false,
  715. }.Init(),
  716. }, { // 9 join table with window
  717. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  718. p: ProjectPlan{
  719. baseLogicalPlan: baseLogicalPlan{
  720. children: []LogicalPlan{
  721. JoinPlan{
  722. baseLogicalPlan: baseLogicalPlan{
  723. children: []LogicalPlan{
  724. JoinAlignPlan{
  725. baseLogicalPlan: baseLogicalPlan{
  726. children: []LogicalPlan{
  727. WindowPlan{
  728. baseLogicalPlan: baseLogicalPlan{
  729. children: []LogicalPlan{
  730. FilterPlan{
  731. baseLogicalPlan: baseLogicalPlan{
  732. children: []LogicalPlan{
  733. DataSourcePlan{
  734. name: "src1",
  735. streamFields: []interface{}{
  736. &ast.StreamField{
  737. Name: "id1",
  738. FieldType: &ast.BasicType{Type: ast.BIGINT},
  739. },
  740. &ast.StreamField{
  741. Name: "temp",
  742. FieldType: &ast.BasicType{Type: ast.BIGINT},
  743. },
  744. },
  745. streamStmt: streams["src1"],
  746. metaFields: []string{},
  747. }.Init(),
  748. },
  749. },
  750. condition: &ast.BinaryExpr{
  751. RHS: &ast.BinaryExpr{
  752. OP: ast.GT,
  753. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  754. RHS: &ast.IntegerLiteral{Val: 20},
  755. },
  756. OP: ast.AND,
  757. LHS: &ast.BinaryExpr{
  758. OP: ast.GT,
  759. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  760. RHS: &ast.IntegerLiteral{Val: 111},
  761. },
  762. },
  763. }.Init(),
  764. },
  765. },
  766. condition: nil,
  767. wtype: ast.TUMBLING_WINDOW,
  768. length: 10000,
  769. interval: 0,
  770. limit: 0,
  771. }.Init(),
  772. FilterPlan{
  773. baseLogicalPlan: baseLogicalPlan{
  774. children: []LogicalPlan{
  775. DataSourcePlan{
  776. name: "tableInPlanner",
  777. streamFields: []interface{}{
  778. &ast.StreamField{
  779. Name: "hum",
  780. FieldType: &ast.BasicType{Type: ast.BIGINT},
  781. },
  782. &ast.StreamField{
  783. Name: "id",
  784. FieldType: &ast.BasicType{Type: ast.BIGINT},
  785. },
  786. },
  787. streamStmt: streams["tableInPlanner"],
  788. metaFields: []string{},
  789. }.Init(),
  790. },
  791. },
  792. condition: &ast.BinaryExpr{
  793. OP: ast.LT,
  794. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  795. RHS: &ast.IntegerLiteral{Val: 60},
  796. },
  797. }.Init(),
  798. },
  799. },
  800. Emitters: []string{"tableInPlanner"},
  801. }.Init(),
  802. },
  803. },
  804. from: &ast.Table{
  805. Name: "src1",
  806. },
  807. joins: []ast.Join{
  808. {
  809. Name: "tableInPlanner",
  810. Alias: "",
  811. JoinType: ast.INNER_JOIN,
  812. Expr: &ast.BinaryExpr{
  813. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  814. OP: ast.EQ,
  815. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  816. },
  817. },
  818. },
  819. }.Init(),
  820. },
  821. },
  822. fields: []ast.Field{
  823. {
  824. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  825. Name: "id1",
  826. AName: ""},
  827. },
  828. isAggregate: false,
  829. sendMeta: false,
  830. }.Init(),
  831. }, { // 10 meta
  832. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  833. p: ProjectPlan{
  834. baseLogicalPlan: baseLogicalPlan{
  835. children: []LogicalPlan{
  836. FilterPlan{
  837. baseLogicalPlan: baseLogicalPlan{
  838. children: []LogicalPlan{
  839. DataSourcePlan{
  840. name: "src1",
  841. streamFields: []interface{}{
  842. &ast.StreamField{
  843. Name: "temp",
  844. FieldType: &ast.BasicType{Type: ast.BIGINT},
  845. },
  846. },
  847. streamStmt: streams["src1"],
  848. metaFields: []string{"Humidity", "device", "id"},
  849. }.Init(),
  850. },
  851. },
  852. condition: &ast.BinaryExpr{
  853. LHS: &ast.Call{
  854. Name: "meta",
  855. Args: []ast.Expr{&ast.MetaRef{
  856. Name: "device",
  857. StreamName: ast.DefaultStream,
  858. }},
  859. },
  860. OP: ast.EQ,
  861. RHS: &ast.StringLiteral{
  862. Val: "demo2",
  863. },
  864. },
  865. }.Init(),
  866. },
  867. },
  868. fields: []ast.Field{
  869. {
  870. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  871. Name: "temp",
  872. AName: "",
  873. }, {
  874. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  875. &ast.Call{Name: "meta", Args: []ast.Expr{&ast.MetaRef{
  876. Name: "id",
  877. StreamName: ast.DefaultStream,
  878. }}},
  879. []ast.StreamName{},
  880. nil,
  881. )},
  882. Name: "meta",
  883. AName: "eid",
  884. }, {
  885. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  886. &ast.Call{Name: "meta", Args: []ast.Expr{
  887. &ast.BinaryExpr{
  888. OP: ast.ARROW,
  889. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  890. RHS: &ast.MetaRef{Name: "Device"},
  891. },
  892. }},
  893. []ast.StreamName{},
  894. nil,
  895. )},
  896. Name: "meta",
  897. AName: "hdevice",
  898. },
  899. },
  900. isAggregate: false,
  901. sendMeta: false,
  902. }.Init(),
  903. }, { // 11 join with same name field and aliased
  904. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  905. p: ProjectPlan{
  906. baseLogicalPlan: baseLogicalPlan{
  907. children: []LogicalPlan{
  908. JoinPlan{
  909. baseLogicalPlan: baseLogicalPlan{
  910. children: []LogicalPlan{
  911. JoinAlignPlan{
  912. baseLogicalPlan: baseLogicalPlan{
  913. children: []LogicalPlan{
  914. DataSourcePlan{
  915. name: "src2",
  916. streamFields: []interface{}{
  917. &ast.StreamField{
  918. Name: "hum",
  919. FieldType: &ast.BasicType{Type: ast.BIGINT},
  920. },
  921. &ast.StreamField{
  922. Name: "id2",
  923. FieldType: &ast.BasicType{Type: ast.BIGINT},
  924. },
  925. },
  926. streamStmt: streams["src2"],
  927. metaFields: []string{},
  928. }.Init(),
  929. DataSourcePlan{
  930. name: "tableInPlanner",
  931. streamFields: []interface{}{
  932. &ast.StreamField{
  933. Name: "hum",
  934. FieldType: &ast.BasicType{Type: ast.BIGINT},
  935. },
  936. &ast.StreamField{
  937. Name: "id",
  938. FieldType: &ast.BasicType{Type: ast.BIGINT},
  939. },
  940. },
  941. streamStmt: streams["tableInPlanner"],
  942. metaFields: []string{},
  943. }.Init(),
  944. },
  945. },
  946. Emitters: []string{"tableInPlanner"},
  947. }.Init(),
  948. },
  949. },
  950. from: &ast.Table{
  951. Name: "src2",
  952. },
  953. joins: []ast.Join{
  954. {
  955. Name: "tableInPlanner",
  956. Alias: "",
  957. JoinType: ast.INNER_JOIN,
  958. Expr: &ast.BinaryExpr{
  959. RHS: &ast.BinaryExpr{
  960. OP: ast.EQ,
  961. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  962. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  963. },
  964. OP: ast.AND,
  965. LHS: &ast.BinaryExpr{
  966. OP: ast.GT,
  967. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  968. &ast.FieldRef{
  969. Name: "hum",
  970. StreamName: "src2",
  971. },
  972. []ast.StreamName{"src2"},
  973. &boolFalse,
  974. )},
  975. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  976. &ast.FieldRef{
  977. Name: "hum",
  978. StreamName: "tableInPlanner",
  979. },
  980. []ast.StreamName{"tableInPlanner"},
  981. &boolFalse,
  982. )},
  983. },
  984. },
  985. },
  986. },
  987. }.Init(),
  988. },
  989. },
  990. fields: []ast.Field{
  991. {
  992. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  993. &ast.FieldRef{
  994. Name: "hum",
  995. StreamName: "src2",
  996. },
  997. []ast.StreamName{"src2"},
  998. &boolFalse,
  999. )},
  1000. Name: "hum",
  1001. AName: "hum1",
  1002. }, {
  1003. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1004. &ast.FieldRef{
  1005. Name: "hum",
  1006. StreamName: "tableInPlanner",
  1007. },
  1008. []ast.StreamName{"tableInPlanner"},
  1009. &boolFalse,
  1010. )},
  1011. Name: "hum",
  1012. AName: "hum2",
  1013. },
  1014. },
  1015. isAggregate: false,
  1016. sendMeta: false,
  1017. }.Init(),
  1018. },
  1019. }
  1020. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1021. for i, tt := range tests {
  1022. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1023. if err != nil {
  1024. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1025. continue
  1026. }
  1027. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1028. IsEventTime: false,
  1029. LateTol: 0,
  1030. Concurrency: 0,
  1031. BufferLength: 0,
  1032. SendMetaToSink: false,
  1033. Qos: 0,
  1034. CheckpointInterval: 0,
  1035. SendError: true,
  1036. }, store)
  1037. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1038. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1039. } else if !reflect.DeepEqual(tt.p, p) {
  1040. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1041. }
  1042. }
  1043. }
  1044. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1045. err, store := kv.GetKVStore("stream")
  1046. if err != nil {
  1047. t.Error(err)
  1048. return
  1049. }
  1050. streamSqls := map[string]string{
  1051. "src1": `CREATE STREAM src1 (
  1052. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1053. "src2": `CREATE STREAM src2 (
  1054. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1055. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1056. id BIGINT,
  1057. name STRING,
  1058. value STRING,
  1059. hum BIGINT
  1060. ) WITH (TYPE="file");`,
  1061. }
  1062. types := map[string]ast.StreamType{
  1063. "src1": ast.TypeStream,
  1064. "src2": ast.TypeStream,
  1065. "tableInPlanner": ast.TypeTable,
  1066. }
  1067. for name, sql := range streamSqls {
  1068. s, err := json.Marshal(&xsql.StreamInfo{
  1069. StreamType: types[name],
  1070. Statement: sql,
  1071. })
  1072. if err != nil {
  1073. t.Error(err)
  1074. t.Fail()
  1075. }
  1076. store.Set(name, string(s))
  1077. }
  1078. streams := make(map[string]*ast.StreamStmt)
  1079. for n := range streamSqls {
  1080. streamStmt, err := xsql.GetDataSource(store, n)
  1081. if err != nil {
  1082. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1083. return
  1084. }
  1085. streams[n] = streamStmt
  1086. }
  1087. var (
  1088. //boolTrue = true
  1089. boolFalse = false
  1090. )
  1091. var tests = []struct {
  1092. sql string
  1093. p LogicalPlan
  1094. err string
  1095. }{
  1096. { // 0
  1097. sql: `SELECT name FROM src1`,
  1098. p: ProjectPlan{
  1099. baseLogicalPlan: baseLogicalPlan{
  1100. children: []LogicalPlan{
  1101. DataSourcePlan{
  1102. baseLogicalPlan: baseLogicalPlan{},
  1103. name: "src1",
  1104. streamFields: []interface{}{
  1105. "name",
  1106. },
  1107. streamStmt: streams["src1"],
  1108. metaFields: []string{},
  1109. }.Init(),
  1110. },
  1111. },
  1112. fields: []ast.Field{
  1113. {
  1114. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1115. Name: "name",
  1116. AName: "",
  1117. },
  1118. },
  1119. isAggregate: false,
  1120. sendMeta: false,
  1121. }.Init(),
  1122. }, { // 1 optimize where to data source
  1123. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1124. p: ProjectPlan{
  1125. baseLogicalPlan: baseLogicalPlan{
  1126. children: []LogicalPlan{
  1127. WindowPlan{
  1128. baseLogicalPlan: baseLogicalPlan{
  1129. children: []LogicalPlan{
  1130. FilterPlan{
  1131. baseLogicalPlan: baseLogicalPlan{
  1132. children: []LogicalPlan{
  1133. DataSourcePlan{
  1134. name: "src1",
  1135. streamFields: []interface{}{
  1136. "name", "temp",
  1137. },
  1138. streamStmt: streams["src1"],
  1139. metaFields: []string{},
  1140. }.Init(),
  1141. },
  1142. },
  1143. condition: &ast.BinaryExpr{
  1144. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1145. OP: ast.EQ,
  1146. RHS: &ast.StringLiteral{Val: "v1"},
  1147. },
  1148. }.Init(),
  1149. },
  1150. },
  1151. condition: nil,
  1152. wtype: ast.TUMBLING_WINDOW,
  1153. length: 10000,
  1154. interval: 0,
  1155. limit: 0,
  1156. }.Init(),
  1157. },
  1158. },
  1159. fields: []ast.Field{
  1160. {
  1161. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1162. Name: "temp",
  1163. AName: ""},
  1164. },
  1165. isAggregate: false,
  1166. sendMeta: false,
  1167. }.Init(),
  1168. }, { // 2 condition that cannot be optimized
  1169. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1170. p: ProjectPlan{
  1171. baseLogicalPlan: baseLogicalPlan{
  1172. children: []LogicalPlan{
  1173. JoinPlan{
  1174. baseLogicalPlan: baseLogicalPlan{
  1175. children: []LogicalPlan{
  1176. WindowPlan{
  1177. baseLogicalPlan: baseLogicalPlan{
  1178. children: []LogicalPlan{
  1179. DataSourcePlan{
  1180. name: "src1",
  1181. streamFields: []interface{}{
  1182. "id1", "temp",
  1183. },
  1184. streamStmt: streams["src1"],
  1185. metaFields: []string{},
  1186. }.Init(),
  1187. DataSourcePlan{
  1188. name: "src2",
  1189. streamFields: []interface{}{ // can't determine where is id1 belonged to
  1190. "hum", "id1", "id2",
  1191. },
  1192. streamStmt: streams["src2"],
  1193. metaFields: []string{},
  1194. }.Init(),
  1195. },
  1196. },
  1197. condition: nil,
  1198. wtype: ast.TUMBLING_WINDOW,
  1199. length: 10000,
  1200. interval: 0,
  1201. limit: 0,
  1202. }.Init(),
  1203. },
  1204. },
  1205. from: &ast.Table{Name: "src1"},
  1206. joins: ast.Joins{ast.Join{
  1207. Name: "src2",
  1208. JoinType: ast.INNER_JOIN,
  1209. Expr: &ast.BinaryExpr{
  1210. OP: ast.AND,
  1211. LHS: &ast.BinaryExpr{
  1212. LHS: &ast.BinaryExpr{
  1213. OP: ast.GT,
  1214. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1215. RHS: &ast.IntegerLiteral{Val: 20},
  1216. },
  1217. OP: ast.OR,
  1218. RHS: &ast.BinaryExpr{
  1219. OP: ast.GT,
  1220. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1221. RHS: &ast.IntegerLiteral{Val: 60},
  1222. },
  1223. },
  1224. RHS: &ast.BinaryExpr{
  1225. OP: ast.EQ,
  1226. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1227. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1228. },
  1229. },
  1230. }},
  1231. }.Init(),
  1232. },
  1233. },
  1234. fields: []ast.Field{
  1235. {
  1236. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1237. Name: "id1",
  1238. AName: ""},
  1239. },
  1240. isAggregate: false,
  1241. sendMeta: false,
  1242. }.Init(),
  1243. }, { // 3 optimize window filter
  1244. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1245. p: ProjectPlan{
  1246. baseLogicalPlan: baseLogicalPlan{
  1247. children: []LogicalPlan{
  1248. WindowPlan{
  1249. baseLogicalPlan: baseLogicalPlan{
  1250. children: []LogicalPlan{
  1251. FilterPlan{
  1252. baseLogicalPlan: baseLogicalPlan{
  1253. children: []LogicalPlan{
  1254. DataSourcePlan{
  1255. name: "src1",
  1256. streamFields: []interface{}{
  1257. "id1", "name", "temp",
  1258. },
  1259. streamStmt: streams["src1"],
  1260. metaFields: []string{},
  1261. }.Init(),
  1262. },
  1263. },
  1264. condition: &ast.BinaryExpr{
  1265. OP: ast.AND,
  1266. LHS: &ast.BinaryExpr{
  1267. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1268. OP: ast.EQ,
  1269. RHS: &ast.StringLiteral{Val: "v1"},
  1270. },
  1271. RHS: &ast.BinaryExpr{
  1272. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1273. OP: ast.GT,
  1274. RHS: &ast.IntegerLiteral{Val: 2},
  1275. },
  1276. },
  1277. }.Init(),
  1278. },
  1279. },
  1280. condition: nil,
  1281. wtype: ast.TUMBLING_WINDOW,
  1282. length: 10000,
  1283. interval: 0,
  1284. limit: 0,
  1285. }.Init(),
  1286. },
  1287. },
  1288. fields: []ast.Field{
  1289. {
  1290. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1291. Name: "id1",
  1292. AName: ""},
  1293. },
  1294. isAggregate: false,
  1295. sendMeta: false,
  1296. }.Init(),
  1297. }, { // 4. do not optimize count window
  1298. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1299. p: ProjectPlan{
  1300. baseLogicalPlan: baseLogicalPlan{
  1301. children: []LogicalPlan{
  1302. HavingPlan{
  1303. baseLogicalPlan: baseLogicalPlan{
  1304. children: []LogicalPlan{
  1305. FilterPlan{
  1306. baseLogicalPlan: baseLogicalPlan{
  1307. children: []LogicalPlan{
  1308. WindowPlan{
  1309. baseLogicalPlan: baseLogicalPlan{
  1310. children: []LogicalPlan{
  1311. DataSourcePlan{
  1312. name: "src1",
  1313. isWildCard: true,
  1314. streamFields: nil,
  1315. streamStmt: streams["src1"],
  1316. metaFields: []string{},
  1317. }.Init(),
  1318. },
  1319. },
  1320. condition: nil,
  1321. wtype: ast.COUNT_WINDOW,
  1322. length: 5,
  1323. interval: 1,
  1324. limit: 0,
  1325. }.Init(),
  1326. },
  1327. },
  1328. condition: &ast.BinaryExpr{
  1329. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1330. OP: ast.GT,
  1331. RHS: &ast.IntegerLiteral{Val: 20},
  1332. },
  1333. }.Init(),
  1334. },
  1335. },
  1336. condition: &ast.BinaryExpr{
  1337. LHS: &ast.Call{Name: "COUNT", Args: []ast.Expr{&ast.Wildcard{
  1338. Token: ast.ASTERISK,
  1339. }}},
  1340. OP: ast.GT,
  1341. RHS: &ast.IntegerLiteral{Val: 2},
  1342. },
  1343. }.Init(),
  1344. },
  1345. },
  1346. fields: []ast.Field{
  1347. {
  1348. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1349. Name: "kuiper_field_0",
  1350. AName: ""},
  1351. },
  1352. isAggregate: false,
  1353. sendMeta: false,
  1354. }.Init(),
  1355. }, { // 5. optimize join on
  1356. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1357. p: ProjectPlan{
  1358. baseLogicalPlan: baseLogicalPlan{
  1359. children: []LogicalPlan{
  1360. JoinPlan{
  1361. baseLogicalPlan: baseLogicalPlan{
  1362. children: []LogicalPlan{
  1363. WindowPlan{
  1364. baseLogicalPlan: baseLogicalPlan{
  1365. children: []LogicalPlan{
  1366. FilterPlan{
  1367. baseLogicalPlan: baseLogicalPlan{
  1368. children: []LogicalPlan{
  1369. DataSourcePlan{
  1370. name: "src1",
  1371. streamFields: []interface{}{
  1372. "id1", "temp",
  1373. },
  1374. streamStmt: streams["src1"],
  1375. metaFields: []string{},
  1376. }.Init(),
  1377. },
  1378. },
  1379. condition: &ast.BinaryExpr{
  1380. RHS: &ast.BinaryExpr{
  1381. OP: ast.GT,
  1382. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1383. RHS: &ast.IntegerLiteral{Val: 20},
  1384. },
  1385. OP: ast.AND,
  1386. LHS: &ast.BinaryExpr{
  1387. OP: ast.GT,
  1388. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1389. RHS: &ast.IntegerLiteral{Val: 111},
  1390. },
  1391. },
  1392. }.Init(),
  1393. FilterPlan{
  1394. baseLogicalPlan: baseLogicalPlan{
  1395. children: []LogicalPlan{
  1396. DataSourcePlan{
  1397. name: "src2",
  1398. streamFields: []interface{}{
  1399. "hum", "id1", "id2",
  1400. },
  1401. streamStmt: streams["src2"],
  1402. metaFields: []string{},
  1403. }.Init(),
  1404. },
  1405. },
  1406. condition: &ast.BinaryExpr{
  1407. OP: ast.LT,
  1408. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1409. RHS: &ast.IntegerLiteral{Val: 60},
  1410. },
  1411. }.Init(),
  1412. },
  1413. },
  1414. condition: nil,
  1415. wtype: ast.TUMBLING_WINDOW,
  1416. length: 10000,
  1417. interval: 0,
  1418. limit: 0,
  1419. }.Init(),
  1420. },
  1421. },
  1422. from: &ast.Table{
  1423. Name: "src1",
  1424. },
  1425. joins: []ast.Join{
  1426. {
  1427. Name: "src2",
  1428. Alias: "",
  1429. JoinType: ast.INNER_JOIN,
  1430. Expr: &ast.BinaryExpr{
  1431. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1432. OP: ast.EQ,
  1433. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1434. },
  1435. },
  1436. },
  1437. }.Init(),
  1438. },
  1439. },
  1440. fields: []ast.Field{
  1441. {
  1442. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1443. Name: "id1",
  1444. AName: ""},
  1445. },
  1446. isAggregate: false,
  1447. sendMeta: false,
  1448. }.Init(),
  1449. }, { // 6. optimize outter join on
  1450. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1451. p: ProjectPlan{
  1452. baseLogicalPlan: baseLogicalPlan{
  1453. children: []LogicalPlan{
  1454. JoinPlan{
  1455. baseLogicalPlan: baseLogicalPlan{
  1456. children: []LogicalPlan{
  1457. WindowPlan{
  1458. baseLogicalPlan: baseLogicalPlan{
  1459. children: []LogicalPlan{
  1460. FilterPlan{
  1461. baseLogicalPlan: baseLogicalPlan{
  1462. children: []LogicalPlan{
  1463. DataSourcePlan{
  1464. name: "src1",
  1465. streamFields: []interface{}{
  1466. "id1", "temp",
  1467. },
  1468. streamStmt: streams["src1"],
  1469. metaFields: []string{},
  1470. }.Init(),
  1471. },
  1472. },
  1473. condition: &ast.BinaryExpr{
  1474. OP: ast.GT,
  1475. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1476. RHS: &ast.IntegerLiteral{Val: 111},
  1477. },
  1478. }.Init(),
  1479. DataSourcePlan{
  1480. name: "src2",
  1481. streamFields: []interface{}{
  1482. "hum", "id1", "id2",
  1483. },
  1484. streamStmt: streams["src2"],
  1485. metaFields: []string{},
  1486. }.Init(),
  1487. },
  1488. },
  1489. condition: nil,
  1490. wtype: ast.TUMBLING_WINDOW,
  1491. length: 10000,
  1492. interval: 0,
  1493. limit: 0,
  1494. }.Init(),
  1495. },
  1496. },
  1497. from: &ast.Table{
  1498. Name: "src1",
  1499. },
  1500. joins: []ast.Join{
  1501. {
  1502. Name: "src2",
  1503. Alias: "",
  1504. JoinType: ast.FULL_JOIN,
  1505. Expr: &ast.BinaryExpr{
  1506. OP: ast.AND,
  1507. LHS: &ast.BinaryExpr{
  1508. OP: ast.AND,
  1509. LHS: &ast.BinaryExpr{
  1510. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1511. OP: ast.EQ,
  1512. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1513. },
  1514. RHS: &ast.BinaryExpr{
  1515. OP: ast.GT,
  1516. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1517. RHS: &ast.IntegerLiteral{Val: 20},
  1518. },
  1519. },
  1520. RHS: &ast.BinaryExpr{
  1521. OP: ast.LT,
  1522. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1523. RHS: &ast.IntegerLiteral{Val: 60},
  1524. },
  1525. },
  1526. },
  1527. },
  1528. }.Init(),
  1529. },
  1530. },
  1531. fields: []ast.Field{
  1532. {
  1533. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1534. Name: "id1",
  1535. AName: ""},
  1536. },
  1537. isAggregate: false,
  1538. sendMeta: false,
  1539. }.Init(),
  1540. }, { // 7 window error for table
  1541. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1542. p: nil,
  1543. err: "cannot run window for TABLE sources",
  1544. }, { // 8 join table without window
  1545. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  1546. p: ProjectPlan{
  1547. baseLogicalPlan: baseLogicalPlan{
  1548. children: []LogicalPlan{
  1549. JoinPlan{
  1550. baseLogicalPlan: baseLogicalPlan{
  1551. children: []LogicalPlan{
  1552. JoinAlignPlan{
  1553. baseLogicalPlan: baseLogicalPlan{
  1554. children: []LogicalPlan{
  1555. FilterPlan{
  1556. baseLogicalPlan: baseLogicalPlan{
  1557. children: []LogicalPlan{
  1558. DataSourcePlan{
  1559. name: "src1",
  1560. streamFields: []interface{}{
  1561. "hum", "id1", "temp",
  1562. },
  1563. streamStmt: streams["src1"],
  1564. metaFields: []string{},
  1565. }.Init(),
  1566. },
  1567. },
  1568. condition: &ast.BinaryExpr{
  1569. RHS: &ast.BinaryExpr{
  1570. OP: ast.GT,
  1571. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1572. RHS: &ast.IntegerLiteral{Val: 20},
  1573. },
  1574. OP: ast.AND,
  1575. LHS: &ast.BinaryExpr{
  1576. OP: ast.GT,
  1577. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1578. RHS: &ast.IntegerLiteral{Val: 111},
  1579. },
  1580. },
  1581. }.Init(),
  1582. DataSourcePlan{
  1583. name: "tableInPlanner",
  1584. streamFields: []interface{}{
  1585. &ast.StreamField{
  1586. Name: "hum",
  1587. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1588. },
  1589. &ast.StreamField{
  1590. Name: "id",
  1591. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1592. },
  1593. },
  1594. streamStmt: streams["tableInPlanner"],
  1595. metaFields: []string{},
  1596. }.Init(),
  1597. },
  1598. },
  1599. Emitters: []string{"tableInPlanner"},
  1600. }.Init(),
  1601. },
  1602. },
  1603. from: &ast.Table{
  1604. Name: "src1",
  1605. },
  1606. joins: []ast.Join{
  1607. {
  1608. Name: "tableInPlanner",
  1609. Alias: "",
  1610. JoinType: ast.INNER_JOIN,
  1611. Expr: &ast.BinaryExpr{
  1612. OP: ast.AND,
  1613. LHS: &ast.BinaryExpr{
  1614. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1615. OP: ast.EQ,
  1616. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1617. },
  1618. RHS: &ast.BinaryExpr{
  1619. OP: ast.LT,
  1620. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  1621. RHS: &ast.IntegerLiteral{Val: 60},
  1622. },
  1623. },
  1624. },
  1625. },
  1626. }.Init(),
  1627. },
  1628. },
  1629. fields: []ast.Field{
  1630. {
  1631. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1632. Name: "id1",
  1633. AName: ""},
  1634. },
  1635. isAggregate: false,
  1636. sendMeta: false,
  1637. }.Init(),
  1638. }, { // 9 join table with window
  1639. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1640. p: ProjectPlan{
  1641. baseLogicalPlan: baseLogicalPlan{
  1642. children: []LogicalPlan{
  1643. JoinPlan{
  1644. baseLogicalPlan: baseLogicalPlan{
  1645. children: []LogicalPlan{
  1646. JoinAlignPlan{
  1647. baseLogicalPlan: baseLogicalPlan{
  1648. children: []LogicalPlan{
  1649. WindowPlan{
  1650. baseLogicalPlan: baseLogicalPlan{
  1651. children: []LogicalPlan{
  1652. FilterPlan{
  1653. baseLogicalPlan: baseLogicalPlan{
  1654. children: []LogicalPlan{
  1655. DataSourcePlan{
  1656. name: "src1",
  1657. streamFields: []interface{}{
  1658. "id1", "temp",
  1659. },
  1660. streamStmt: streams["src1"],
  1661. metaFields: []string{},
  1662. }.Init(),
  1663. },
  1664. },
  1665. condition: &ast.BinaryExpr{
  1666. RHS: &ast.BinaryExpr{
  1667. OP: ast.GT,
  1668. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1669. RHS: &ast.IntegerLiteral{Val: 20},
  1670. },
  1671. OP: ast.AND,
  1672. LHS: &ast.BinaryExpr{
  1673. OP: ast.GT,
  1674. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1675. RHS: &ast.IntegerLiteral{Val: 111},
  1676. },
  1677. },
  1678. }.Init(),
  1679. },
  1680. },
  1681. condition: nil,
  1682. wtype: ast.TUMBLING_WINDOW,
  1683. length: 10000,
  1684. interval: 0,
  1685. limit: 0,
  1686. }.Init(),
  1687. FilterPlan{
  1688. baseLogicalPlan: baseLogicalPlan{
  1689. children: []LogicalPlan{
  1690. DataSourcePlan{
  1691. name: "tableInPlanner",
  1692. streamFields: []interface{}{
  1693. &ast.StreamField{
  1694. Name: "hum",
  1695. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1696. },
  1697. &ast.StreamField{
  1698. Name: "id",
  1699. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1700. },
  1701. },
  1702. streamStmt: streams["tableInPlanner"],
  1703. metaFields: []string{},
  1704. }.Init(),
  1705. },
  1706. },
  1707. condition: &ast.BinaryExpr{
  1708. OP: ast.LT,
  1709. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1710. RHS: &ast.IntegerLiteral{Val: 60},
  1711. },
  1712. }.Init(),
  1713. },
  1714. },
  1715. Emitters: []string{"tableInPlanner"},
  1716. }.Init(),
  1717. },
  1718. },
  1719. from: &ast.Table{
  1720. Name: "src1",
  1721. },
  1722. joins: []ast.Join{
  1723. {
  1724. Name: "tableInPlanner",
  1725. Alias: "",
  1726. JoinType: ast.INNER_JOIN,
  1727. Expr: &ast.BinaryExpr{
  1728. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1729. OP: ast.EQ,
  1730. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1731. },
  1732. },
  1733. },
  1734. }.Init(),
  1735. },
  1736. },
  1737. fields: []ast.Field{
  1738. {
  1739. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1740. Name: "id1",
  1741. AName: ""},
  1742. },
  1743. isAggregate: false,
  1744. sendMeta: false,
  1745. }.Init(),
  1746. }, { // 10 meta
  1747. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1748. p: ProjectPlan{
  1749. baseLogicalPlan: baseLogicalPlan{
  1750. children: []LogicalPlan{
  1751. FilterPlan{
  1752. baseLogicalPlan: baseLogicalPlan{
  1753. children: []LogicalPlan{
  1754. DataSourcePlan{
  1755. name: "src1",
  1756. streamFields: []interface{}{
  1757. "temp",
  1758. },
  1759. streamStmt: streams["src1"],
  1760. metaFields: []string{"Humidity", "device", "id"},
  1761. }.Init(),
  1762. },
  1763. },
  1764. condition: &ast.BinaryExpr{
  1765. LHS: &ast.Call{
  1766. Name: "meta",
  1767. Args: []ast.Expr{&ast.MetaRef{
  1768. Name: "device",
  1769. StreamName: ast.DefaultStream,
  1770. }},
  1771. },
  1772. OP: ast.EQ,
  1773. RHS: &ast.StringLiteral{
  1774. Val: "demo2",
  1775. },
  1776. },
  1777. }.Init(),
  1778. },
  1779. },
  1780. fields: []ast.Field{
  1781. {
  1782. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1783. Name: "temp",
  1784. AName: "",
  1785. }, {
  1786. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1787. &ast.Call{Name: "meta", Args: []ast.Expr{&ast.MetaRef{
  1788. Name: "id",
  1789. StreamName: ast.DefaultStream,
  1790. }}},
  1791. []ast.StreamName{},
  1792. nil,
  1793. )},
  1794. Name: "meta",
  1795. AName: "eid",
  1796. }, {
  1797. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1798. &ast.Call{Name: "meta", Args: []ast.Expr{
  1799. &ast.BinaryExpr{
  1800. OP: ast.ARROW,
  1801. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  1802. RHS: &ast.MetaRef{Name: "Device"},
  1803. },
  1804. }},
  1805. []ast.StreamName{},
  1806. nil,
  1807. )},
  1808. Name: "meta",
  1809. AName: "hdevice",
  1810. },
  1811. },
  1812. isAggregate: false,
  1813. sendMeta: false,
  1814. }.Init(),
  1815. }, { // 11 join with same name field and aliased
  1816. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1817. p: ProjectPlan{
  1818. baseLogicalPlan: baseLogicalPlan{
  1819. children: []LogicalPlan{
  1820. JoinPlan{
  1821. baseLogicalPlan: baseLogicalPlan{
  1822. children: []LogicalPlan{
  1823. JoinAlignPlan{
  1824. baseLogicalPlan: baseLogicalPlan{
  1825. children: []LogicalPlan{
  1826. DataSourcePlan{
  1827. name: "src2",
  1828. streamFields: []interface{}{
  1829. "hum", "id", "id2",
  1830. },
  1831. streamStmt: streams["src2"],
  1832. metaFields: []string{},
  1833. }.Init(),
  1834. DataSourcePlan{
  1835. name: "tableInPlanner",
  1836. streamFields: []interface{}{
  1837. &ast.StreamField{
  1838. Name: "hum",
  1839. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1840. },
  1841. &ast.StreamField{
  1842. Name: "id",
  1843. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1844. },
  1845. },
  1846. streamStmt: streams["tableInPlanner"],
  1847. metaFields: []string{},
  1848. }.Init(),
  1849. },
  1850. },
  1851. Emitters: []string{"tableInPlanner"},
  1852. }.Init(),
  1853. },
  1854. },
  1855. from: &ast.Table{
  1856. Name: "src2",
  1857. },
  1858. joins: []ast.Join{
  1859. {
  1860. Name: "tableInPlanner",
  1861. Alias: "",
  1862. JoinType: ast.INNER_JOIN,
  1863. Expr: &ast.BinaryExpr{
  1864. RHS: &ast.BinaryExpr{
  1865. OP: ast.EQ,
  1866. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  1867. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  1868. },
  1869. OP: ast.AND,
  1870. LHS: &ast.BinaryExpr{
  1871. OP: ast.GT,
  1872. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1873. &ast.FieldRef{
  1874. Name: "hum",
  1875. StreamName: "src2",
  1876. },
  1877. []ast.StreamName{"src2"},
  1878. &boolFalse,
  1879. )},
  1880. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1881. &ast.FieldRef{
  1882. Name: "hum",
  1883. StreamName: "tableInPlanner",
  1884. },
  1885. []ast.StreamName{"tableInPlanner"},
  1886. &boolFalse,
  1887. )},
  1888. },
  1889. },
  1890. },
  1891. },
  1892. }.Init(),
  1893. },
  1894. },
  1895. fields: []ast.Field{
  1896. {
  1897. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1898. &ast.FieldRef{
  1899. Name: "hum",
  1900. StreamName: "src2",
  1901. },
  1902. []ast.StreamName{"src2"},
  1903. &boolFalse,
  1904. )},
  1905. Name: "hum",
  1906. AName: "hum1",
  1907. }, {
  1908. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1909. &ast.FieldRef{
  1910. Name: "hum",
  1911. StreamName: "tableInPlanner",
  1912. },
  1913. []ast.StreamName{"tableInPlanner"},
  1914. &boolFalse,
  1915. )},
  1916. Name: "hum",
  1917. AName: "hum2",
  1918. },
  1919. },
  1920. isAggregate: false,
  1921. sendMeta: false,
  1922. }.Init(),
  1923. },
  1924. }
  1925. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1926. for i, tt := range tests {
  1927. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1928. if err != nil {
  1929. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1930. continue
  1931. }
  1932. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1933. IsEventTime: false,
  1934. LateTol: 0,
  1935. Concurrency: 0,
  1936. BufferLength: 0,
  1937. SendMetaToSink: false,
  1938. Qos: 0,
  1939. CheckpointInterval: 0,
  1940. SendError: true,
  1941. }, store)
  1942. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1943. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1944. } else if !reflect.DeepEqual(tt.p, p) {
  1945. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1946. }
  1947. }
  1948. }