planner_test.go 57 KB


  1. package planner
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/common/kv"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/gdexlab/go-render/render"
  10. "path"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. )
  15. var (
  16. DbDir = common.GetDbDir()
  17. )
  18. func Test_createLogicalPlan(t *testing.T) {
  19. store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
  20. err := store.Open()
  21. if err != nil {
  22. t.Error(err)
  23. return
  24. }
  25. defer store.Close()
  26. streamSqls := map[string]string{
  27. "src1": `CREATE STREAM src1 (
  28. id1 BIGINT,
  29. temp BIGINT,
  30. name string
  31. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  32. "src2": `CREATE STREAM src2 (
  33. id2 BIGINT,
  34. hum BIGINT
  35. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  36. "tableInPlanner": `CREATE TABLE tableInPlanner (
  37. id BIGINT,
  38. name STRING,
  39. value STRING,
  40. hum BIGINT
  41. ) WITH (TYPE="file");`,
  42. }
  43. types := map[string]xsql.StreamType{
  44. "src1": xsql.TypeStream,
  45. "src2": xsql.TypeStream,
  46. "tableInPlanner": xsql.TypeTable,
  47. }
  48. for name, sql := range streamSqls {
  49. s, err := json.Marshal(&xsql.StreamInfo{
  50. StreamType: types[name],
  51. Statement: sql,
  52. })
  53. if err != nil {
  54. t.Error(err)
  55. t.Fail()
  56. }
  57. store.Set(name, string(s))
  58. }
  59. streams := make(map[string]*xsql.StreamStmt)
  60. for n := range streamSqls {
  61. streamStmt, err := xsql.GetDataSource(store, n)
  62. if err != nil {
  63. t.Errorf("fail to get stream %s, please check if stream is created", n)
  64. return
  65. }
  66. streams[n] = streamStmt
  67. }
  68. var (
  69. //boolTrue = true
  70. boolFalse = false
  71. )
  72. var tests = []struct {
  73. sql string
  74. p LogicalPlan
  75. err string
  76. }{
  77. { // 0
  78. sql: `SELECT name FROM src1`,
  79. p: ProjectPlan{
  80. baseLogicalPlan: baseLogicalPlan{
  81. children: []LogicalPlan{
  82. DataSourcePlan{
  83. baseLogicalPlan: baseLogicalPlan{},
  84. name: "src1",
  85. streamFields: []interface{}{
  86. &xsql.StreamField{
  87. Name: "name",
  88. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  89. },
  90. },
  91. streamStmt: streams["src1"],
  92. metaFields: []string{},
  93. }.Init(),
  94. },
  95. },
  96. fields: []xsql.Field{
  97. {
  98. Expr: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  99. Name: "name",
  100. AName: "",
  101. },
  102. },
  103. isAggregate: false,
  104. sendMeta: false,
  105. }.Init(),
  106. }, { // 1 optimize where to data source
  107. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  108. p: ProjectPlan{
  109. baseLogicalPlan: baseLogicalPlan{
  110. children: []LogicalPlan{
  111. WindowPlan{
  112. baseLogicalPlan: baseLogicalPlan{
  113. children: []LogicalPlan{
  114. FilterPlan{
  115. baseLogicalPlan: baseLogicalPlan{
  116. children: []LogicalPlan{
  117. DataSourcePlan{
  118. name: "src1",
  119. streamFields: []interface{}{
  120. &xsql.StreamField{
  121. Name: "name",
  122. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  123. },
  124. &xsql.StreamField{
  125. Name: "temp",
  126. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  127. },
  128. },
  129. streamStmt: streams["src1"],
  130. metaFields: []string{},
  131. }.Init(),
  132. },
  133. },
  134. condition: &xsql.BinaryExpr{
  135. LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  136. OP: xsql.EQ,
  137. RHS: &xsql.StringLiteral{Val: "v1"},
  138. },
  139. }.Init(),
  140. },
  141. },
  142. condition: nil,
  143. wtype: xsql.TUMBLING_WINDOW,
  144. length: 10000,
  145. interval: 0,
  146. limit: 0,
  147. }.Init(),
  148. },
  149. },
  150. fields: []xsql.Field{
  151. {
  152. Expr: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  153. Name: "temp",
  154. AName: ""},
  155. },
  156. isAggregate: false,
  157. sendMeta: false,
  158. }.Init(),
  159. }, { // 2 condition that cannot be optimized
  160. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  161. p: ProjectPlan{
  162. baseLogicalPlan: baseLogicalPlan{
  163. children: []LogicalPlan{
  164. JoinPlan{
  165. baseLogicalPlan: baseLogicalPlan{
  166. children: []LogicalPlan{
  167. WindowPlan{
  168. baseLogicalPlan: baseLogicalPlan{
  169. children: []LogicalPlan{
  170. DataSourcePlan{
  171. name: "src1",
  172. streamFields: []interface{}{
  173. &xsql.StreamField{
  174. Name: "id1",
  175. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  176. },
  177. &xsql.StreamField{
  178. Name: "temp",
  179. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  180. },
  181. },
  182. streamStmt: streams["src1"],
  183. metaFields: []string{},
  184. }.Init(),
  185. DataSourcePlan{
  186. name: "src2",
  187. streamFields: []interface{}{
  188. &xsql.StreamField{
  189. Name: "hum",
  190. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  191. },
  192. &xsql.StreamField{
  193. Name: "id2",
  194. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  195. },
  196. },
  197. streamStmt: streams["src2"],
  198. metaFields: []string{},
  199. }.Init(),
  200. },
  201. },
  202. condition: nil,
  203. wtype: xsql.TUMBLING_WINDOW,
  204. length: 10000,
  205. interval: 0,
  206. limit: 0,
  207. }.Init(),
  208. },
  209. },
  210. from: &xsql.Table{Name: "src1"},
  211. joins: xsql.Joins{xsql.Join{
  212. Name: "src2",
  213. JoinType: xsql.INNER_JOIN,
  214. Expr: &xsql.BinaryExpr{
  215. OP: xsql.AND,
  216. LHS: &xsql.BinaryExpr{
  217. LHS: &xsql.BinaryExpr{
  218. OP: xsql.GT,
  219. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  220. RHS: &xsql.IntegerLiteral{Val: 20},
  221. },
  222. OP: xsql.OR,
  223. RHS: &xsql.BinaryExpr{
  224. OP: xsql.GT,
  225. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  226. RHS: &xsql.IntegerLiteral{Val: 60},
  227. },
  228. },
  229. RHS: &xsql.BinaryExpr{
  230. OP: xsql.EQ,
  231. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  232. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  233. },
  234. },
  235. }},
  236. }.Init(),
  237. },
  238. },
  239. fields: []xsql.Field{
  240. {
  241. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  242. Name: "id1",
  243. AName: ""},
  244. },
  245. isAggregate: false,
  246. sendMeta: false,
  247. }.Init(),
  248. }, { // 3 optimize window filter
  249. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  250. p: ProjectPlan{
  251. baseLogicalPlan: baseLogicalPlan{
  252. children: []LogicalPlan{
  253. WindowPlan{
  254. baseLogicalPlan: baseLogicalPlan{
  255. children: []LogicalPlan{
  256. FilterPlan{
  257. baseLogicalPlan: baseLogicalPlan{
  258. children: []LogicalPlan{
  259. DataSourcePlan{
  260. name: "src1",
  261. streamFields: []interface{}{
  262. &xsql.StreamField{
  263. Name: "id1",
  264. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  265. },
  266. &xsql.StreamField{
  267. Name: "name",
  268. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  269. },
  270. &xsql.StreamField{
  271. Name: "temp",
  272. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  273. },
  274. },
  275. streamStmt: streams["src1"],
  276. metaFields: []string{},
  277. }.Init(),
  278. },
  279. },
  280. condition: &xsql.BinaryExpr{
  281. OP: xsql.AND,
  282. LHS: &xsql.BinaryExpr{
  283. LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  284. OP: xsql.EQ,
  285. RHS: &xsql.StringLiteral{Val: "v1"},
  286. },
  287. RHS: &xsql.BinaryExpr{
  288. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  289. OP: xsql.GT,
  290. RHS: &xsql.IntegerLiteral{Val: 2},
  291. },
  292. },
  293. }.Init(),
  294. },
  295. },
  296. condition: nil,
  297. wtype: xsql.TUMBLING_WINDOW,
  298. length: 10000,
  299. interval: 0,
  300. limit: 0,
  301. }.Init(),
  302. },
  303. },
  304. fields: []xsql.Field{
  305. {
  306. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  307. Name: "id1",
  308. AName: ""},
  309. },
  310. isAggregate: false,
  311. sendMeta: false,
  312. }.Init(),
  313. }, { // 4. do not optimize count window
  314. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  315. p: ProjectPlan{
  316. baseLogicalPlan: baseLogicalPlan{
  317. children: []LogicalPlan{
  318. HavingPlan{
  319. baseLogicalPlan: baseLogicalPlan{
  320. children: []LogicalPlan{
  321. FilterPlan{
  322. baseLogicalPlan: baseLogicalPlan{
  323. children: []LogicalPlan{
  324. WindowPlan{
  325. baseLogicalPlan: baseLogicalPlan{
  326. children: []LogicalPlan{
  327. DataSourcePlan{
  328. name: "src1",
  329. isWildCard: true,
  330. streamFields: []interface{}{
  331. &xsql.StreamField{
  332. Name: "id1",
  333. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  334. },
  335. &xsql.StreamField{
  336. Name: "temp",
  337. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  338. },
  339. &xsql.StreamField{
  340. Name: "name",
  341. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  342. },
  343. },
  344. streamStmt: streams["src1"],
  345. metaFields: []string{},
  346. }.Init(),
  347. },
  348. },
  349. condition: nil,
  350. wtype: xsql.COUNT_WINDOW,
  351. length: 5,
  352. interval: 1,
  353. limit: 0,
  354. }.Init(),
  355. },
  356. },
  357. condition: &xsql.BinaryExpr{
  358. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  359. OP: xsql.GT,
  360. RHS: &xsql.IntegerLiteral{Val: 20},
  361. },
  362. }.Init(),
  363. },
  364. },
  365. condition: &xsql.BinaryExpr{
  366. LHS: &xsql.Call{Name: "COUNT", Args: []xsql.Expr{&xsql.Wildcard{
  367. Token: xsql.ASTERISK,
  368. }}},
  369. OP: xsql.GT,
  370. RHS: &xsql.IntegerLiteral{Val: 2},
  371. },
  372. }.Init(),
  373. },
  374. },
  375. fields: []xsql.Field{
  376. {
  377. Expr: &xsql.Wildcard{Token: xsql.ASTERISK},
  378. Name: "kuiper_field_0",
  379. AName: ""},
  380. },
  381. isAggregate: false,
  382. sendMeta: false,
  383. }.Init(),
  384. }, { // 5. optimize join on
  385. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  386. p: ProjectPlan{
  387. baseLogicalPlan: baseLogicalPlan{
  388. children: []LogicalPlan{
  389. JoinPlan{
  390. baseLogicalPlan: baseLogicalPlan{
  391. children: []LogicalPlan{
  392. WindowPlan{
  393. baseLogicalPlan: baseLogicalPlan{
  394. children: []LogicalPlan{
  395. FilterPlan{
  396. baseLogicalPlan: baseLogicalPlan{
  397. children: []LogicalPlan{
  398. DataSourcePlan{
  399. name: "src1",
  400. streamFields: []interface{}{
  401. &xsql.StreamField{
  402. Name: "id1",
  403. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  404. },
  405. &xsql.StreamField{
  406. Name: "temp",
  407. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  408. },
  409. },
  410. streamStmt: streams["src1"],
  411. metaFields: []string{},
  412. }.Init(),
  413. },
  414. },
  415. condition: &xsql.BinaryExpr{
  416. RHS: &xsql.BinaryExpr{
  417. OP: xsql.GT,
  418. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  419. RHS: &xsql.IntegerLiteral{Val: 20},
  420. },
  421. OP: xsql.AND,
  422. LHS: &xsql.BinaryExpr{
  423. OP: xsql.GT,
  424. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  425. RHS: &xsql.IntegerLiteral{Val: 111},
  426. },
  427. },
  428. }.Init(),
  429. FilterPlan{
  430. baseLogicalPlan: baseLogicalPlan{
  431. children: []LogicalPlan{
  432. DataSourcePlan{
  433. name: "src2",
  434. streamFields: []interface{}{
  435. &xsql.StreamField{
  436. Name: "hum",
  437. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  438. },
  439. &xsql.StreamField{
  440. Name: "id2",
  441. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  442. },
  443. },
  444. streamStmt: streams["src2"],
  445. metaFields: []string{},
  446. }.Init(),
  447. },
  448. },
  449. condition: &xsql.BinaryExpr{
  450. OP: xsql.LT,
  451. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  452. RHS: &xsql.IntegerLiteral{Val: 60},
  453. },
  454. }.Init(),
  455. },
  456. },
  457. condition: nil,
  458. wtype: xsql.TUMBLING_WINDOW,
  459. length: 10000,
  460. interval: 0,
  461. limit: 0,
  462. }.Init(),
  463. },
  464. },
  465. from: &xsql.Table{
  466. Name: "src1",
  467. },
  468. joins: []xsql.Join{
  469. {
  470. Name: "src2",
  471. Alias: "",
  472. JoinType: xsql.INNER_JOIN,
  473. Expr: &xsql.BinaryExpr{
  474. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  475. OP: xsql.EQ,
  476. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  477. },
  478. },
  479. },
  480. }.Init(),
  481. },
  482. },
  483. fields: []xsql.Field{
  484. {
  485. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  486. Name: "id1",
  487. AName: ""},
  488. },
  489. isAggregate: false,
  490. sendMeta: false,
  491. }.Init(),
  492. }, { // 6. optimize outter join on
  493. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  494. p: ProjectPlan{
  495. baseLogicalPlan: baseLogicalPlan{
  496. children: []LogicalPlan{
  497. JoinPlan{
  498. baseLogicalPlan: baseLogicalPlan{
  499. children: []LogicalPlan{
  500. WindowPlan{
  501. baseLogicalPlan: baseLogicalPlan{
  502. children: []LogicalPlan{
  503. FilterPlan{
  504. baseLogicalPlan: baseLogicalPlan{
  505. children: []LogicalPlan{
  506. DataSourcePlan{
  507. name: "src1",
  508. streamFields: []interface{}{
  509. &xsql.StreamField{
  510. Name: "id1",
  511. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  512. },
  513. &xsql.StreamField{
  514. Name: "temp",
  515. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  516. },
  517. },
  518. streamStmt: streams["src1"],
  519. metaFields: []string{},
  520. }.Init(),
  521. },
  522. },
  523. condition: &xsql.BinaryExpr{
  524. OP: xsql.GT,
  525. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  526. RHS: &xsql.IntegerLiteral{Val: 111},
  527. },
  528. }.Init(),
  529. DataSourcePlan{
  530. name: "src2",
  531. streamFields: []interface{}{
  532. &xsql.StreamField{
  533. Name: "hum",
  534. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  535. },
  536. &xsql.StreamField{
  537. Name: "id2",
  538. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  539. },
  540. },
  541. streamStmt: streams["src2"],
  542. metaFields: []string{},
  543. }.Init(),
  544. },
  545. },
  546. condition: nil,
  547. wtype: xsql.TUMBLING_WINDOW,
  548. length: 10000,
  549. interval: 0,
  550. limit: 0,
  551. }.Init(),
  552. },
  553. },
  554. from: &xsql.Table{
  555. Name: "src1",
  556. },
  557. joins: []xsql.Join{
  558. {
  559. Name: "src2",
  560. Alias: "",
  561. JoinType: xsql.FULL_JOIN,
  562. Expr: &xsql.BinaryExpr{
  563. OP: xsql.AND,
  564. LHS: &xsql.BinaryExpr{
  565. OP: xsql.AND,
  566. LHS: &xsql.BinaryExpr{
  567. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  568. OP: xsql.EQ,
  569. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  570. },
  571. RHS: &xsql.BinaryExpr{
  572. OP: xsql.GT,
  573. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  574. RHS: &xsql.IntegerLiteral{Val: 20},
  575. },
  576. },
  577. RHS: &xsql.BinaryExpr{
  578. OP: xsql.LT,
  579. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  580. RHS: &xsql.IntegerLiteral{Val: 60},
  581. },
  582. },
  583. },
  584. },
  585. }.Init(),
  586. },
  587. },
  588. fields: []xsql.Field{
  589. {
  590. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  591. Name: "id1",
  592. AName: ""},
  593. },
  594. isAggregate: false,
  595. sendMeta: false,
  596. }.Init(),
  597. }, { // 7 window error for table
  598. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  599. p: nil,
  600. err: "cannot run window for TABLE sources",
  601. }, { // 8 join table without window
  602. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  603. p: ProjectPlan{
  604. baseLogicalPlan: baseLogicalPlan{
  605. children: []LogicalPlan{
  606. JoinPlan{
  607. baseLogicalPlan: baseLogicalPlan{
  608. children: []LogicalPlan{
  609. JoinAlignPlan{
  610. baseLogicalPlan: baseLogicalPlan{
  611. children: []LogicalPlan{
  612. FilterPlan{
  613. baseLogicalPlan: baseLogicalPlan{
  614. children: []LogicalPlan{
  615. DataSourcePlan{
  616. name: "src1",
  617. streamFields: []interface{}{
  618. &xsql.StreamField{
  619. Name: "id1",
  620. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  621. },
  622. &xsql.StreamField{
  623. Name: "temp",
  624. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  625. },
  626. },
  627. streamStmt: streams["src1"],
  628. metaFields: []string{},
  629. }.Init(),
  630. },
  631. },
  632. condition: &xsql.BinaryExpr{
  633. RHS: &xsql.BinaryExpr{
  634. OP: xsql.GT,
  635. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  636. RHS: &xsql.IntegerLiteral{Val: 20},
  637. },
  638. OP: xsql.AND,
  639. LHS: &xsql.BinaryExpr{
  640. OP: xsql.GT,
  641. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  642. RHS: &xsql.IntegerLiteral{Val: 111},
  643. },
  644. },
  645. }.Init(),
  646. FilterPlan{
  647. baseLogicalPlan: baseLogicalPlan{
  648. children: []LogicalPlan{
  649. DataSourcePlan{
  650. name: "tableInPlanner",
  651. streamFields: []interface{}{
  652. &xsql.StreamField{
  653. Name: "hum",
  654. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  655. },
  656. &xsql.StreamField{
  657. Name: "id",
  658. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  659. },
  660. },
  661. streamStmt: streams["tableInPlanner"],
  662. metaFields: []string{},
  663. }.Init(),
  664. },
  665. },
  666. condition: &xsql.BinaryExpr{
  667. OP: xsql.LT,
  668. LHS: &xsql.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  669. RHS: &xsql.IntegerLiteral{Val: 60},
  670. },
  671. }.Init(),
  672. },
  673. },
  674. Emitters: []string{"tableInPlanner"},
  675. }.Init(),
  676. },
  677. },
  678. from: &xsql.Table{
  679. Name: "src1",
  680. },
  681. joins: []xsql.Join{
  682. {
  683. Name: "tableInPlanner",
  684. Alias: "",
  685. JoinType: xsql.INNER_JOIN,
  686. Expr: &xsql.BinaryExpr{
  687. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  688. OP: xsql.EQ,
  689. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  690. },
  691. },
  692. },
  693. }.Init(),
  694. },
  695. },
  696. fields: []xsql.Field{
  697. {
  698. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  699. Name: "id1",
  700. AName: ""},
  701. },
  702. isAggregate: false,
  703. sendMeta: false,
  704. }.Init(),
  705. }, { // 9 join table with window
  706. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  707. p: ProjectPlan{
  708. baseLogicalPlan: baseLogicalPlan{
  709. children: []LogicalPlan{
  710. JoinPlan{
  711. baseLogicalPlan: baseLogicalPlan{
  712. children: []LogicalPlan{
  713. JoinAlignPlan{
  714. baseLogicalPlan: baseLogicalPlan{
  715. children: []LogicalPlan{
  716. WindowPlan{
  717. baseLogicalPlan: baseLogicalPlan{
  718. children: []LogicalPlan{
  719. FilterPlan{
  720. baseLogicalPlan: baseLogicalPlan{
  721. children: []LogicalPlan{
  722. DataSourcePlan{
  723. name: "src1",
  724. streamFields: []interface{}{
  725. &xsql.StreamField{
  726. Name: "id1",
  727. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  728. },
  729. &xsql.StreamField{
  730. Name: "temp",
  731. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  732. },
  733. },
  734. streamStmt: streams["src1"],
  735. metaFields: []string{},
  736. }.Init(),
  737. },
  738. },
  739. condition: &xsql.BinaryExpr{
  740. RHS: &xsql.BinaryExpr{
  741. OP: xsql.GT,
  742. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  743. RHS: &xsql.IntegerLiteral{Val: 20},
  744. },
  745. OP: xsql.AND,
  746. LHS: &xsql.BinaryExpr{
  747. OP: xsql.GT,
  748. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  749. RHS: &xsql.IntegerLiteral{Val: 111},
  750. },
  751. },
  752. }.Init(),
  753. },
  754. },
  755. condition: nil,
  756. wtype: xsql.TUMBLING_WINDOW,
  757. length: 10000,
  758. interval: 0,
  759. limit: 0,
  760. }.Init(),
  761. FilterPlan{
  762. baseLogicalPlan: baseLogicalPlan{
  763. children: []LogicalPlan{
  764. DataSourcePlan{
  765. name: "tableInPlanner",
  766. streamFields: []interface{}{
  767. &xsql.StreamField{
  768. Name: "hum",
  769. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  770. },
  771. &xsql.StreamField{
  772. Name: "id",
  773. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  774. },
  775. },
  776. streamStmt: streams["tableInPlanner"],
  777. metaFields: []string{},
  778. }.Init(),
  779. },
  780. },
  781. condition: &xsql.BinaryExpr{
  782. OP: xsql.LT,
  783. LHS: &xsql.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  784. RHS: &xsql.IntegerLiteral{Val: 60},
  785. },
  786. }.Init(),
  787. },
  788. },
  789. Emitters: []string{"tableInPlanner"},
  790. }.Init(),
  791. },
  792. },
  793. from: &xsql.Table{
  794. Name: "src1",
  795. },
  796. joins: []xsql.Join{
  797. {
  798. Name: "tableInPlanner",
  799. Alias: "",
  800. JoinType: xsql.INNER_JOIN,
  801. Expr: &xsql.BinaryExpr{
  802. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  803. OP: xsql.EQ,
  804. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  805. },
  806. },
  807. },
  808. }.Init(),
  809. },
  810. },
  811. fields: []xsql.Field{
  812. {
  813. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  814. Name: "id1",
  815. AName: ""},
  816. },
  817. isAggregate: false,
  818. sendMeta: false,
  819. }.Init(),
  820. }, { // 10 meta
  821. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  822. p: ProjectPlan{
  823. baseLogicalPlan: baseLogicalPlan{
  824. children: []LogicalPlan{
  825. FilterPlan{
  826. baseLogicalPlan: baseLogicalPlan{
  827. children: []LogicalPlan{
  828. DataSourcePlan{
  829. name: "src1",
  830. streamFields: []interface{}{
  831. &xsql.StreamField{
  832. Name: "temp",
  833. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  834. },
  835. },
  836. streamStmt: streams["src1"],
  837. metaFields: []string{"Humidity", "device", "id"},
  838. }.Init(),
  839. },
  840. },
  841. condition: &xsql.BinaryExpr{
  842. LHS: &xsql.Call{
  843. Name: "meta",
  844. Args: []xsql.Expr{&xsql.MetaRef{
  845. Name: "device",
  846. StreamName: xsql.DefaultStream,
  847. }},
  848. },
  849. OP: xsql.EQ,
  850. RHS: &xsql.StringLiteral{
  851. Val: "demo2",
  852. },
  853. },
  854. }.Init(),
  855. },
  856. },
  857. fields: []xsql.Field{
  858. {
  859. Expr: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  860. Name: "temp",
  861. AName: "",
  862. }, {
  863. Expr: &xsql.FieldRef{Name: "eid", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  864. &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
  865. Name: "id",
  866. StreamName: xsql.DefaultStream,
  867. }}},
  868. []xsql.StreamName{},
  869. nil,
  870. )},
  871. Name: "meta",
  872. AName: "eid",
  873. }, {
  874. Expr: &xsql.FieldRef{Name: "hdevice", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  875. &xsql.Call{Name: "meta", Args: []xsql.Expr{
  876. &xsql.BinaryExpr{
  877. OP: xsql.ARROW,
  878. LHS: &xsql.MetaRef{Name: "Humidity", StreamName: xsql.DefaultStream},
  879. RHS: &xsql.MetaRef{Name: "Device"},
  880. },
  881. }},
  882. []xsql.StreamName{},
  883. nil,
  884. )},
  885. Name: "meta",
  886. AName: "hdevice",
  887. },
  888. },
  889. isAggregate: false,
  890. sendMeta: false,
  891. }.Init(),
  892. }, { // 11 join with same name field and aliased
  893. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  894. p: ProjectPlan{
  895. baseLogicalPlan: baseLogicalPlan{
  896. children: []LogicalPlan{
  897. JoinPlan{
  898. baseLogicalPlan: baseLogicalPlan{
  899. children: []LogicalPlan{
  900. JoinAlignPlan{
  901. baseLogicalPlan: baseLogicalPlan{
  902. children: []LogicalPlan{
  903. DataSourcePlan{
  904. name: "src2",
  905. streamFields: []interface{}{
  906. &xsql.StreamField{
  907. Name: "hum",
  908. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  909. },
  910. &xsql.StreamField{
  911. Name: "id2",
  912. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  913. },
  914. },
  915. streamStmt: streams["src2"],
  916. metaFields: []string{},
  917. }.Init(),
  918. DataSourcePlan{
  919. name: "tableInPlanner",
  920. streamFields: []interface{}{
  921. &xsql.StreamField{
  922. Name: "hum",
  923. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  924. },
  925. &xsql.StreamField{
  926. Name: "id",
  927. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  928. },
  929. },
  930. streamStmt: streams["tableInPlanner"],
  931. metaFields: []string{},
  932. }.Init(),
  933. },
  934. },
  935. Emitters: []string{"tableInPlanner"},
  936. }.Init(),
  937. },
  938. },
  939. from: &xsql.Table{
  940. Name: "src2",
  941. },
  942. joins: []xsql.Join{
  943. {
  944. Name: "tableInPlanner",
  945. Alias: "",
  946. JoinType: xsql.INNER_JOIN,
  947. Expr: &xsql.BinaryExpr{
  948. RHS: &xsql.BinaryExpr{
  949. OP: xsql.EQ,
  950. LHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  951. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  952. },
  953. OP: xsql.AND,
  954. LHS: &xsql.BinaryExpr{
  955. OP: xsql.GT,
  956. LHS: &xsql.FieldRef{Name: "hum1", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  957. &xsql.FieldRef{
  958. Name: "hum",
  959. StreamName: "src2",
  960. },
  961. []xsql.StreamName{"src2"},
  962. &boolFalse,
  963. )},
  964. RHS: &xsql.FieldRef{Name: "hum2", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  965. &xsql.FieldRef{
  966. Name: "hum",
  967. StreamName: "tableInPlanner",
  968. },
  969. []xsql.StreamName{"tableInPlanner"},
  970. &boolFalse,
  971. )},
  972. },
  973. },
  974. },
  975. },
  976. }.Init(),
  977. },
  978. },
  979. fields: []xsql.Field{
  980. {
  981. Expr: &xsql.FieldRef{Name: "hum1", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  982. &xsql.FieldRef{
  983. Name: "hum",
  984. StreamName: "src2",
  985. },
  986. []xsql.StreamName{"src2"},
  987. &boolFalse,
  988. )},
  989. Name: "hum",
  990. AName: "hum1",
  991. }, {
  992. Expr: &xsql.FieldRef{Name: "hum2", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  993. &xsql.FieldRef{
  994. Name: "hum",
  995. StreamName: "tableInPlanner",
  996. },
  997. []xsql.StreamName{"tableInPlanner"},
  998. &boolFalse,
  999. )},
  1000. Name: "hum",
  1001. AName: "hum2",
  1002. },
  1003. },
  1004. isAggregate: false,
  1005. sendMeta: false,
  1006. }.Init(),
  1007. },
  1008. }
  1009. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1010. for i, tt := range tests {
  1011. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1012. if err != nil {
  1013. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1014. continue
  1015. }
  1016. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1017. IsEventTime: false,
  1018. LateTol: 0,
  1019. Concurrency: 0,
  1020. BufferLength: 0,
  1021. SendMetaToSink: false,
  1022. Qos: 0,
  1023. CheckpointInterval: 0,
  1024. SendError: true,
  1025. }, store)
  1026. if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
  1027. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1028. } else if !reflect.DeepEqual(tt.p, p) {
  1029. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1030. }
  1031. }
  1032. }
  1033. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1034. store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
  1035. err := store.Open()
  1036. if err != nil {
  1037. t.Error(err)
  1038. return
  1039. }
  1040. defer store.Close()
  1041. streamSqls := map[string]string{
  1042. "src1": `CREATE STREAM src1 (
  1043. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1044. "src2": `CREATE STREAM src2 (
  1045. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1046. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1047. id BIGINT,
  1048. name STRING,
  1049. value STRING,
  1050. hum BIGINT
  1051. ) WITH (TYPE="file");`,
  1052. }
  1053. types := map[string]xsql.StreamType{
  1054. "src1": xsql.TypeStream,
  1055. "src2": xsql.TypeStream,
  1056. "tableInPlanner": xsql.TypeTable,
  1057. }
  1058. for name, sql := range streamSqls {
  1059. s, err := json.Marshal(&xsql.StreamInfo{
  1060. StreamType: types[name],
  1061. Statement: sql,
  1062. })
  1063. if err != nil {
  1064. t.Error(err)
  1065. t.Fail()
  1066. }
  1067. store.Set(name, string(s))
  1068. }
  1069. streams := make(map[string]*xsql.StreamStmt)
  1070. for n := range streamSqls {
  1071. streamStmt, err := xsql.GetDataSource(store, n)
  1072. if err != nil {
  1073. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1074. return
  1075. }
  1076. streams[n] = streamStmt
  1077. }
  1078. var (
  1079. //boolTrue = true
  1080. boolFalse = false
  1081. )
  1082. var tests = []struct {
  1083. sql string
  1084. p LogicalPlan
  1085. err string
  1086. }{
  1087. { // 0
  1088. sql: `SELECT name FROM src1`,
  1089. p: ProjectPlan{
  1090. baseLogicalPlan: baseLogicalPlan{
  1091. children: []LogicalPlan{
  1092. DataSourcePlan{
  1093. baseLogicalPlan: baseLogicalPlan{},
  1094. name: "src1",
  1095. streamFields: []interface{}{
  1096. "name",
  1097. },
  1098. streamStmt: streams["src1"],
  1099. metaFields: []string{},
  1100. }.Init(),
  1101. },
  1102. },
  1103. fields: []xsql.Field{
  1104. {
  1105. Expr: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  1106. Name: "name",
  1107. AName: "",
  1108. },
  1109. },
  1110. isAggregate: false,
  1111. sendMeta: false,
  1112. }.Init(),
  1113. }, { // 1 optimize where to data source
  1114. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1115. p: ProjectPlan{
  1116. baseLogicalPlan: baseLogicalPlan{
  1117. children: []LogicalPlan{
  1118. WindowPlan{
  1119. baseLogicalPlan: baseLogicalPlan{
  1120. children: []LogicalPlan{
  1121. FilterPlan{
  1122. baseLogicalPlan: baseLogicalPlan{
  1123. children: []LogicalPlan{
  1124. DataSourcePlan{
  1125. name: "src1",
  1126. streamFields: []interface{}{
  1127. "name", "temp",
  1128. },
  1129. streamStmt: streams["src1"],
  1130. metaFields: []string{},
  1131. }.Init(),
  1132. },
  1133. },
  1134. condition: &xsql.BinaryExpr{
  1135. LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  1136. OP: xsql.EQ,
  1137. RHS: &xsql.StringLiteral{Val: "v1"},
  1138. },
  1139. }.Init(),
  1140. },
  1141. },
  1142. condition: nil,
  1143. wtype: xsql.TUMBLING_WINDOW,
  1144. length: 10000,
  1145. interval: 0,
  1146. limit: 0,
  1147. }.Init(),
  1148. },
  1149. },
  1150. fields: []xsql.Field{
  1151. {
  1152. Expr: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  1153. Name: "temp",
  1154. AName: ""},
  1155. },
  1156. isAggregate: false,
  1157. sendMeta: false,
  1158. }.Init(),
  1159. }, { // 2 condition that cannot be optimized
  1160. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1161. p: ProjectPlan{
  1162. baseLogicalPlan: baseLogicalPlan{
  1163. children: []LogicalPlan{
  1164. JoinPlan{
  1165. baseLogicalPlan: baseLogicalPlan{
  1166. children: []LogicalPlan{
  1167. WindowPlan{
  1168. baseLogicalPlan: baseLogicalPlan{
  1169. children: []LogicalPlan{
  1170. DataSourcePlan{
  1171. name: "src1",
  1172. streamFields: []interface{}{
  1173. "id1", "temp",
  1174. },
  1175. streamStmt: streams["src1"],
  1176. metaFields: []string{},
  1177. }.Init(),
  1178. DataSourcePlan{
  1179. name: "src2",
  1180. streamFields: []interface{}{ // can't determine where is id1 belonged to
  1181. "hum", "id1", "id2",
  1182. },
  1183. streamStmt: streams["src2"],
  1184. metaFields: []string{},
  1185. }.Init(),
  1186. },
  1187. },
  1188. condition: nil,
  1189. wtype: xsql.TUMBLING_WINDOW,
  1190. length: 10000,
  1191. interval: 0,
  1192. limit: 0,
  1193. }.Init(),
  1194. },
  1195. },
  1196. from: &xsql.Table{Name: "src1"},
  1197. joins: xsql.Joins{xsql.Join{
  1198. Name: "src2",
  1199. JoinType: xsql.INNER_JOIN,
  1200. Expr: &xsql.BinaryExpr{
  1201. OP: xsql.AND,
  1202. LHS: &xsql.BinaryExpr{
  1203. LHS: &xsql.BinaryExpr{
  1204. OP: xsql.GT,
  1205. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  1206. RHS: &xsql.IntegerLiteral{Val: 20},
  1207. },
  1208. OP: xsql.OR,
  1209. RHS: &xsql.BinaryExpr{
  1210. OP: xsql.GT,
  1211. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  1212. RHS: &xsql.IntegerLiteral{Val: 60},
  1213. },
  1214. },
  1215. RHS: &xsql.BinaryExpr{
  1216. OP: xsql.EQ,
  1217. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1218. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  1219. },
  1220. },
  1221. }},
  1222. }.Init(),
  1223. },
  1224. },
  1225. fields: []xsql.Field{
  1226. {
  1227. Expr: &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
  1228. Name: "id1",
  1229. AName: ""},
  1230. },
  1231. isAggregate: false,
  1232. sendMeta: false,
  1233. }.Init(),
  1234. }, { // 3 optimize window filter
  1235. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1236. p: ProjectPlan{
  1237. baseLogicalPlan: baseLogicalPlan{
  1238. children: []LogicalPlan{
  1239. WindowPlan{
  1240. baseLogicalPlan: baseLogicalPlan{
  1241. children: []LogicalPlan{
  1242. FilterPlan{
  1243. baseLogicalPlan: baseLogicalPlan{
  1244. children: []LogicalPlan{
  1245. DataSourcePlan{
  1246. name: "src1",
  1247. streamFields: []interface{}{
  1248. "id1", "name", "temp",
  1249. },
  1250. streamStmt: streams["src1"],
  1251. metaFields: []string{},
  1252. }.Init(),
  1253. },
  1254. },
  1255. condition: &xsql.BinaryExpr{
  1256. OP: xsql.AND,
  1257. LHS: &xsql.BinaryExpr{
  1258. LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  1259. OP: xsql.EQ,
  1260. RHS: &xsql.StringLiteral{Val: "v1"},
  1261. },
  1262. RHS: &xsql.BinaryExpr{
  1263. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  1264. OP: xsql.GT,
  1265. RHS: &xsql.IntegerLiteral{Val: 2},
  1266. },
  1267. },
  1268. }.Init(),
  1269. },
  1270. },
  1271. condition: nil,
  1272. wtype: xsql.TUMBLING_WINDOW,
  1273. length: 10000,
  1274. interval: 0,
  1275. limit: 0,
  1276. }.Init(),
  1277. },
  1278. },
  1279. fields: []xsql.Field{
  1280. {
  1281. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1282. Name: "id1",
  1283. AName: ""},
  1284. },
  1285. isAggregate: false,
  1286. sendMeta: false,
  1287. }.Init(),
  1288. }, { // 4. do not optimize count window
  1289. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1290. p: ProjectPlan{
  1291. baseLogicalPlan: baseLogicalPlan{
  1292. children: []LogicalPlan{
  1293. HavingPlan{
  1294. baseLogicalPlan: baseLogicalPlan{
  1295. children: []LogicalPlan{
  1296. FilterPlan{
  1297. baseLogicalPlan: baseLogicalPlan{
  1298. children: []LogicalPlan{
  1299. WindowPlan{
  1300. baseLogicalPlan: baseLogicalPlan{
  1301. children: []LogicalPlan{
  1302. DataSourcePlan{
  1303. name: "src1",
  1304. isWildCard: true,
  1305. streamFields: nil,
  1306. streamStmt: streams["src1"],
  1307. metaFields: []string{},
  1308. }.Init(),
  1309. },
  1310. },
  1311. condition: nil,
  1312. wtype: xsql.COUNT_WINDOW,
  1313. length: 5,
  1314. interval: 1,
  1315. limit: 0,
  1316. }.Init(),
  1317. },
  1318. },
  1319. condition: &xsql.BinaryExpr{
  1320. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  1321. OP: xsql.GT,
  1322. RHS: &xsql.IntegerLiteral{Val: 20},
  1323. },
  1324. }.Init(),
  1325. },
  1326. },
  1327. condition: &xsql.BinaryExpr{
  1328. LHS: &xsql.Call{Name: "COUNT", Args: []xsql.Expr{&xsql.Wildcard{
  1329. Token: xsql.ASTERISK,
  1330. }}},
  1331. OP: xsql.GT,
  1332. RHS: &xsql.IntegerLiteral{Val: 2},
  1333. },
  1334. }.Init(),
  1335. },
  1336. },
  1337. fields: []xsql.Field{
  1338. {
  1339. Expr: &xsql.Wildcard{Token: xsql.ASTERISK},
  1340. Name: "kuiper_field_0",
  1341. AName: ""},
  1342. },
  1343. isAggregate: false,
  1344. sendMeta: false,
  1345. }.Init(),
  1346. }, { // 5. optimize join on
  1347. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1348. p: ProjectPlan{
  1349. baseLogicalPlan: baseLogicalPlan{
  1350. children: []LogicalPlan{
  1351. JoinPlan{
  1352. baseLogicalPlan: baseLogicalPlan{
  1353. children: []LogicalPlan{
  1354. WindowPlan{
  1355. baseLogicalPlan: baseLogicalPlan{
  1356. children: []LogicalPlan{
  1357. FilterPlan{
  1358. baseLogicalPlan: baseLogicalPlan{
  1359. children: []LogicalPlan{
  1360. DataSourcePlan{
  1361. name: "src1",
  1362. streamFields: []interface{}{
  1363. "id1", "temp",
  1364. },
  1365. streamStmt: streams["src1"],
  1366. metaFields: []string{},
  1367. }.Init(),
  1368. },
  1369. },
  1370. condition: &xsql.BinaryExpr{
  1371. RHS: &xsql.BinaryExpr{
  1372. OP: xsql.GT,
  1373. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  1374. RHS: &xsql.IntegerLiteral{Val: 20},
  1375. },
  1376. OP: xsql.AND,
  1377. LHS: &xsql.BinaryExpr{
  1378. OP: xsql.GT,
  1379. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1380. RHS: &xsql.IntegerLiteral{Val: 111},
  1381. },
  1382. },
  1383. }.Init(),
  1384. FilterPlan{
  1385. baseLogicalPlan: baseLogicalPlan{
  1386. children: []LogicalPlan{
  1387. DataSourcePlan{
  1388. name: "src2",
  1389. streamFields: []interface{}{
  1390. "hum", "id1", "id2",
  1391. },
  1392. streamStmt: streams["src2"],
  1393. metaFields: []string{},
  1394. }.Init(),
  1395. },
  1396. },
  1397. condition: &xsql.BinaryExpr{
  1398. OP: xsql.LT,
  1399. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  1400. RHS: &xsql.IntegerLiteral{Val: 60},
  1401. },
  1402. }.Init(),
  1403. },
  1404. },
  1405. condition: nil,
  1406. wtype: xsql.TUMBLING_WINDOW,
  1407. length: 10000,
  1408. interval: 0,
  1409. limit: 0,
  1410. }.Init(),
  1411. },
  1412. },
  1413. from: &xsql.Table{
  1414. Name: "src1",
  1415. },
  1416. joins: []xsql.Join{
  1417. {
  1418. Name: "src2",
  1419. Alias: "",
  1420. JoinType: xsql.INNER_JOIN,
  1421. Expr: &xsql.BinaryExpr{
  1422. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1423. OP: xsql.EQ,
  1424. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  1425. },
  1426. },
  1427. },
  1428. }.Init(),
  1429. },
  1430. },
  1431. fields: []xsql.Field{
  1432. {
  1433. Expr: &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
  1434. Name: "id1",
  1435. AName: ""},
  1436. },
  1437. isAggregate: false,
  1438. sendMeta: false,
  1439. }.Init(),
  1440. }, { // 6. optimize outter join on
  1441. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1442. p: ProjectPlan{
  1443. baseLogicalPlan: baseLogicalPlan{
  1444. children: []LogicalPlan{
  1445. JoinPlan{
  1446. baseLogicalPlan: baseLogicalPlan{
  1447. children: []LogicalPlan{
  1448. WindowPlan{
  1449. baseLogicalPlan: baseLogicalPlan{
  1450. children: []LogicalPlan{
  1451. FilterPlan{
  1452. baseLogicalPlan: baseLogicalPlan{
  1453. children: []LogicalPlan{
  1454. DataSourcePlan{
  1455. name: "src1",
  1456. streamFields: []interface{}{
  1457. "id1", "temp",
  1458. },
  1459. streamStmt: streams["src1"],
  1460. metaFields: []string{},
  1461. }.Init(),
  1462. },
  1463. },
  1464. condition: &xsql.BinaryExpr{
  1465. OP: xsql.GT,
  1466. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1467. RHS: &xsql.IntegerLiteral{Val: 111},
  1468. },
  1469. }.Init(),
  1470. DataSourcePlan{
  1471. name: "src2",
  1472. streamFields: []interface{}{
  1473. "hum", "id1", "id2",
  1474. },
  1475. streamStmt: streams["src2"],
  1476. metaFields: []string{},
  1477. }.Init(),
  1478. },
  1479. },
  1480. condition: nil,
  1481. wtype: xsql.TUMBLING_WINDOW,
  1482. length: 10000,
  1483. interval: 0,
  1484. limit: 0,
  1485. }.Init(),
  1486. },
  1487. },
  1488. from: &xsql.Table{
  1489. Name: "src1",
  1490. },
  1491. joins: []xsql.Join{
  1492. {
  1493. Name: "src2",
  1494. Alias: "",
  1495. JoinType: xsql.FULL_JOIN,
  1496. Expr: &xsql.BinaryExpr{
  1497. OP: xsql.AND,
  1498. LHS: &xsql.BinaryExpr{
  1499. OP: xsql.AND,
  1500. LHS: &xsql.BinaryExpr{
  1501. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1502. OP: xsql.EQ,
  1503. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  1504. },
  1505. RHS: &xsql.BinaryExpr{
  1506. OP: xsql.GT,
  1507. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  1508. RHS: &xsql.IntegerLiteral{Val: 20},
  1509. },
  1510. },
  1511. RHS: &xsql.BinaryExpr{
  1512. OP: xsql.LT,
  1513. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  1514. RHS: &xsql.IntegerLiteral{Val: 60},
  1515. },
  1516. },
  1517. },
  1518. },
  1519. }.Init(),
  1520. },
  1521. },
  1522. fields: []xsql.Field{
  1523. {
  1524. Expr: &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
  1525. Name: "id1",
  1526. AName: ""},
  1527. },
  1528. isAggregate: false,
  1529. sendMeta: false,
  1530. }.Init(),
  1531. }, { // 7 window error for table
  1532. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1533. p: nil,
  1534. err: "cannot run window for TABLE sources",
  1535. }, { // 8 join table without window
  1536. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  1537. p: ProjectPlan{
  1538. baseLogicalPlan: baseLogicalPlan{
  1539. children: []LogicalPlan{
  1540. JoinPlan{
  1541. baseLogicalPlan: baseLogicalPlan{
  1542. children: []LogicalPlan{
  1543. JoinAlignPlan{
  1544. baseLogicalPlan: baseLogicalPlan{
  1545. children: []LogicalPlan{
  1546. FilterPlan{
  1547. baseLogicalPlan: baseLogicalPlan{
  1548. children: []LogicalPlan{
  1549. DataSourcePlan{
  1550. name: "src1",
  1551. streamFields: []interface{}{
  1552. "hum", "id1", "temp",
  1553. },
  1554. streamStmt: streams["src1"],
  1555. metaFields: []string{},
  1556. }.Init(),
  1557. },
  1558. },
  1559. condition: &xsql.BinaryExpr{
  1560. RHS: &xsql.BinaryExpr{
  1561. OP: xsql.GT,
  1562. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  1563. RHS: &xsql.IntegerLiteral{Val: 20},
  1564. },
  1565. OP: xsql.AND,
  1566. LHS: &xsql.BinaryExpr{
  1567. OP: xsql.GT,
  1568. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1569. RHS: &xsql.IntegerLiteral{Val: 111},
  1570. },
  1571. },
  1572. }.Init(),
  1573. DataSourcePlan{
  1574. name: "tableInPlanner",
  1575. streamFields: []interface{}{
  1576. &xsql.StreamField{
  1577. Name: "hum",
  1578. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  1579. },
  1580. &xsql.StreamField{
  1581. Name: "id",
  1582. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  1583. },
  1584. },
  1585. streamStmt: streams["tableInPlanner"],
  1586. metaFields: []string{},
  1587. }.Init(),
  1588. },
  1589. },
  1590. Emitters: []string{"tableInPlanner"},
  1591. }.Init(),
  1592. },
  1593. },
  1594. from: &xsql.Table{
  1595. Name: "src1",
  1596. },
  1597. joins: []xsql.Join{
  1598. {
  1599. Name: "tableInPlanner",
  1600. Alias: "",
  1601. JoinType: xsql.INNER_JOIN,
  1602. Expr: &xsql.BinaryExpr{
  1603. OP: xsql.AND,
  1604. LHS: &xsql.BinaryExpr{
  1605. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1606. OP: xsql.EQ,
  1607. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1608. },
  1609. RHS: &xsql.BinaryExpr{
  1610. OP: xsql.LT,
  1611. LHS: &xsql.FieldRef{Name: "hum", StreamName: xsql.DefaultStream},
  1612. RHS: &xsql.IntegerLiteral{Val: 60},
  1613. },
  1614. },
  1615. },
  1616. },
  1617. }.Init(),
  1618. },
  1619. },
  1620. fields: []xsql.Field{
  1621. {
  1622. Expr: &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
  1623. Name: "id1",
  1624. AName: ""},
  1625. },
  1626. isAggregate: false,
  1627. sendMeta: false,
  1628. }.Init(),
  1629. }, { // 9 join table with window
  1630. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1631. p: ProjectPlan{
  1632. baseLogicalPlan: baseLogicalPlan{
  1633. children: []LogicalPlan{
  1634. JoinPlan{
  1635. baseLogicalPlan: baseLogicalPlan{
  1636. children: []LogicalPlan{
  1637. JoinAlignPlan{
  1638. baseLogicalPlan: baseLogicalPlan{
  1639. children: []LogicalPlan{
  1640. WindowPlan{
  1641. baseLogicalPlan: baseLogicalPlan{
  1642. children: []LogicalPlan{
  1643. FilterPlan{
  1644. baseLogicalPlan: baseLogicalPlan{
  1645. children: []LogicalPlan{
  1646. DataSourcePlan{
  1647. name: "src1",
  1648. streamFields: []interface{}{
  1649. "id1", "temp",
  1650. },
  1651. streamStmt: streams["src1"],
  1652. metaFields: []string{},
  1653. }.Init(),
  1654. },
  1655. },
  1656. condition: &xsql.BinaryExpr{
  1657. RHS: &xsql.BinaryExpr{
  1658. OP: xsql.GT,
  1659. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  1660. RHS: &xsql.IntegerLiteral{Val: 20},
  1661. },
  1662. OP: xsql.AND,
  1663. LHS: &xsql.BinaryExpr{
  1664. OP: xsql.GT,
  1665. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1666. RHS: &xsql.IntegerLiteral{Val: 111},
  1667. },
  1668. },
  1669. }.Init(),
  1670. },
  1671. },
  1672. condition: nil,
  1673. wtype: xsql.TUMBLING_WINDOW,
  1674. length: 10000,
  1675. interval: 0,
  1676. limit: 0,
  1677. }.Init(),
  1678. FilterPlan{
  1679. baseLogicalPlan: baseLogicalPlan{
  1680. children: []LogicalPlan{
  1681. DataSourcePlan{
  1682. name: "tableInPlanner",
  1683. streamFields: []interface{}{
  1684. &xsql.StreamField{
  1685. Name: "hum",
  1686. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  1687. },
  1688. &xsql.StreamField{
  1689. Name: "id",
  1690. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  1691. },
  1692. },
  1693. streamStmt: streams["tableInPlanner"],
  1694. metaFields: []string{},
  1695. }.Init(),
  1696. },
  1697. },
  1698. condition: &xsql.BinaryExpr{
  1699. OP: xsql.LT,
  1700. LHS: &xsql.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1701. RHS: &xsql.IntegerLiteral{Val: 60},
  1702. },
  1703. }.Init(),
  1704. },
  1705. },
  1706. Emitters: []string{"tableInPlanner"},
  1707. }.Init(),
  1708. },
  1709. },
  1710. from: &xsql.Table{
  1711. Name: "src1",
  1712. },
  1713. joins: []xsql.Join{
  1714. {
  1715. Name: "tableInPlanner",
  1716. Alias: "",
  1717. JoinType: xsql.INNER_JOIN,
  1718. Expr: &xsql.BinaryExpr{
  1719. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  1720. OP: xsql.EQ,
  1721. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1722. },
  1723. },
  1724. },
  1725. }.Init(),
  1726. },
  1727. },
  1728. fields: []xsql.Field{
  1729. {
  1730. Expr: &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
  1731. Name: "id1",
  1732. AName: ""},
  1733. },
  1734. isAggregate: false,
  1735. sendMeta: false,
  1736. }.Init(),
  1737. }, { // 10 meta
  1738. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1739. p: ProjectPlan{
  1740. baseLogicalPlan: baseLogicalPlan{
  1741. children: []LogicalPlan{
  1742. FilterPlan{
  1743. baseLogicalPlan: baseLogicalPlan{
  1744. children: []LogicalPlan{
  1745. DataSourcePlan{
  1746. name: "src1",
  1747. streamFields: []interface{}{
  1748. "temp",
  1749. },
  1750. streamStmt: streams["src1"],
  1751. metaFields: []string{"Humidity", "device", "id"},
  1752. }.Init(),
  1753. },
  1754. },
  1755. condition: &xsql.BinaryExpr{
  1756. LHS: &xsql.Call{
  1757. Name: "meta",
  1758. Args: []xsql.Expr{&xsql.MetaRef{
  1759. Name: "device",
  1760. StreamName: xsql.DefaultStream,
  1761. }},
  1762. },
  1763. OP: xsql.EQ,
  1764. RHS: &xsql.StringLiteral{
  1765. Val: "demo2",
  1766. },
  1767. },
  1768. }.Init(),
  1769. },
  1770. },
  1771. fields: []xsql.Field{
  1772. {
  1773. Expr: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  1774. Name: "temp",
  1775. AName: "",
  1776. }, {
  1777. Expr: &xsql.FieldRef{Name: "eid", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  1778. &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
  1779. Name: "id",
  1780. StreamName: xsql.DefaultStream,
  1781. }}},
  1782. []xsql.StreamName{},
  1783. nil,
  1784. )},
  1785. Name: "meta",
  1786. AName: "eid",
  1787. }, {
  1788. Expr: &xsql.FieldRef{Name: "hdevice", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  1789. &xsql.Call{Name: "meta", Args: []xsql.Expr{
  1790. &xsql.BinaryExpr{
  1791. OP: xsql.ARROW,
  1792. LHS: &xsql.MetaRef{Name: "Humidity", StreamName: xsql.DefaultStream},
  1793. RHS: &xsql.MetaRef{Name: "Device"},
  1794. },
  1795. }},
  1796. []xsql.StreamName{},
  1797. nil,
  1798. )},
  1799. Name: "meta",
  1800. AName: "hdevice",
  1801. },
  1802. },
  1803. isAggregate: false,
  1804. sendMeta: false,
  1805. }.Init(),
  1806. }, { // 11 join with same name field and aliased
  1807. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1808. p: ProjectPlan{
  1809. baseLogicalPlan: baseLogicalPlan{
  1810. children: []LogicalPlan{
  1811. JoinPlan{
  1812. baseLogicalPlan: baseLogicalPlan{
  1813. children: []LogicalPlan{
  1814. JoinAlignPlan{
  1815. baseLogicalPlan: baseLogicalPlan{
  1816. children: []LogicalPlan{
  1817. DataSourcePlan{
  1818. name: "src2",
  1819. streamFields: []interface{}{
  1820. "hum", "id", "id2",
  1821. },
  1822. streamStmt: streams["src2"],
  1823. metaFields: []string{},
  1824. }.Init(),
  1825. DataSourcePlan{
  1826. name: "tableInPlanner",
  1827. streamFields: []interface{}{
  1828. &xsql.StreamField{
  1829. Name: "hum",
  1830. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  1831. },
  1832. &xsql.StreamField{
  1833. Name: "id",
  1834. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  1835. },
  1836. },
  1837. streamStmt: streams["tableInPlanner"],
  1838. metaFields: []string{},
  1839. }.Init(),
  1840. },
  1841. },
  1842. Emitters: []string{"tableInPlanner"},
  1843. }.Init(),
  1844. },
  1845. },
  1846. from: &xsql.Table{
  1847. Name: "src2",
  1848. },
  1849. joins: []xsql.Join{
  1850. {
  1851. Name: "tableInPlanner",
  1852. Alias: "",
  1853. JoinType: xsql.INNER_JOIN,
  1854. Expr: &xsql.BinaryExpr{
  1855. RHS: &xsql.BinaryExpr{
  1856. OP: xsql.EQ,
  1857. LHS: &xsql.FieldRef{Name: "id2", StreamName: xsql.DefaultStream},
  1858. RHS: &xsql.FieldRef{Name: "id", StreamName: xsql.DefaultStream},
  1859. },
  1860. OP: xsql.AND,
  1861. LHS: &xsql.BinaryExpr{
  1862. OP: xsql.GT,
  1863. LHS: &xsql.FieldRef{Name: "hum1", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  1864. &xsql.FieldRef{
  1865. Name: "hum",
  1866. StreamName: "src2",
  1867. },
  1868. []xsql.StreamName{"src2"},
  1869. &boolFalse,
  1870. )},
  1871. RHS: &xsql.FieldRef{Name: "hum2", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  1872. &xsql.FieldRef{
  1873. Name: "hum",
  1874. StreamName: "tableInPlanner",
  1875. },
  1876. []xsql.StreamName{"tableInPlanner"},
  1877. &boolFalse,
  1878. )},
  1879. },
  1880. },
  1881. },
  1882. },
  1883. }.Init(),
  1884. },
  1885. },
  1886. fields: []xsql.Field{
  1887. {
  1888. Expr: &xsql.FieldRef{Name: "hum1", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  1889. &xsql.FieldRef{
  1890. Name: "hum",
  1891. StreamName: "src2",
  1892. },
  1893. []xsql.StreamName{"src2"},
  1894. &boolFalse,
  1895. )},
  1896. Name: "hum",
  1897. AName: "hum1",
  1898. }, {
  1899. Expr: &xsql.FieldRef{Name: "hum2", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
  1900. &xsql.FieldRef{
  1901. Name: "hum",
  1902. StreamName: "tableInPlanner",
  1903. },
  1904. []xsql.StreamName{"tableInPlanner"},
  1905. &boolFalse,
  1906. )},
  1907. Name: "hum",
  1908. AName: "hum2",
  1909. },
  1910. },
  1911. isAggregate: false,
  1912. sendMeta: false,
  1913. }.Init(),
  1914. },
  1915. }
  1916. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1917. for i, tt := range tests {
  1918. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1919. if err != nil {
  1920. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1921. continue
  1922. }
  1923. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1924. IsEventTime: false,
  1925. LateTol: 0,
  1926. Concurrency: 0,
  1927. BufferLength: 0,
  1928. SendMetaToSink: false,
  1929. Qos: 0,
  1930. CheckpointInterval: 0,
  1931. SendError: true,
  1932. }, store)
  1933. if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
  1934. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1935. } else if !reflect.DeepEqual(tt.p, p) {
  1936. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1937. }
  1938. }
  1939. }