planner_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. package planner
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/common/kv"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "path"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. var (
  14. DbDir = getDbDir()
  15. )
  16. func getDbDir() string {
  17. common.InitConf()
  18. dbDir, err := common.GetDataLoc()
  19. if err != nil {
  20. common.Log.Panic(err)
  21. }
  22. common.Log.Infof("db location is %s", dbDir)
  23. return dbDir
  24. }
  25. func Test_createLogicalPlan(t *testing.T) {
  26. store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
  27. err := store.Open()
  28. if err != nil {
  29. t.Error(err)
  30. return
  31. }
  32. defer store.Close()
  33. streamSqls := map[string]string{
  34. "src1": `CREATE STREAM src1 (
  35. id1 BIGINT,
  36. temp BIGINT,
  37. name string
  38. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  39. "src2": `CREATE STREAM src1 (
  40. id2 BIGINT,
  41. hum BIGINT
  42. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  43. }
  44. for name, sql := range streamSqls {
  45. store.Set(name, sql)
  46. }
  47. streams := make(map[string]*xsql.StreamStmt)
  48. for n, _ := range streamSqls {
  49. streamStmt, err := getStream(store, n)
  50. if err != nil {
  51. t.Errorf("fail to get stream %s, please check if stream is created", n)
  52. return
  53. }
  54. streams[n] = streamStmt
  55. }
  56. var tests = []struct {
  57. sql string
  58. p LogicalPlan
  59. err string
  60. }{
  61. { // 0
  62. sql: `SELECT name FROM src1`,
  63. p: ProjectPlan{
  64. baseLogicalPlan: baseLogicalPlan{
  65. children: []LogicalPlan{
  66. DataSourcePlan{
  67. baseLogicalPlan: baseLogicalPlan{},
  68. name: "src1",
  69. streamFields: []interface{}{
  70. &xsql.StreamField{
  71. Name: "name",
  72. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  73. },
  74. },
  75. streamStmt: streams["src1"],
  76. metaFields: []string{},
  77. }.Init(),
  78. },
  79. },
  80. fields: []xsql.Field{
  81. {
  82. Expr: &xsql.FieldRef{Name: "name"},
  83. Name: "name",
  84. AName: ""},
  85. },
  86. isAggregate: false,
  87. sendMeta: false,
  88. }.Init(),
  89. }, { // 1 optimize where to data source
  90. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  91. p: ProjectPlan{
  92. baseLogicalPlan: baseLogicalPlan{
  93. children: []LogicalPlan{
  94. WindowPlan{
  95. baseLogicalPlan: baseLogicalPlan{
  96. children: []LogicalPlan{
  97. FilterPlan{
  98. baseLogicalPlan: baseLogicalPlan{
  99. children: []LogicalPlan{
  100. DataSourcePlan{
  101. name: "src1",
  102. streamFields: []interface{}{
  103. &xsql.StreamField{
  104. Name: "name",
  105. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  106. },
  107. &xsql.StreamField{
  108. Name: "temp",
  109. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  110. },
  111. },
  112. streamStmt: streams["src1"],
  113. metaFields: []string{},
  114. }.Init(),
  115. },
  116. },
  117. condition: &xsql.BinaryExpr{
  118. LHS: &xsql.FieldRef{Name: "name"},
  119. OP: xsql.EQ,
  120. RHS: &xsql.StringLiteral{Val: "v1"},
  121. },
  122. }.Init(),
  123. },
  124. },
  125. condition: nil,
  126. wtype: xsql.TUMBLING_WINDOW,
  127. length: 10000,
  128. interval: 0,
  129. limit: 0,
  130. }.Init(),
  131. },
  132. },
  133. fields: []xsql.Field{
  134. {
  135. Expr: &xsql.FieldRef{Name: "temp"},
  136. Name: "temp",
  137. AName: ""},
  138. },
  139. isAggregate: false,
  140. sendMeta: false,
  141. }.Init(),
  142. }, { // 2 condition that cannot be optimized
  143. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  144. p: ProjectPlan{
  145. baseLogicalPlan: baseLogicalPlan{
  146. children: []LogicalPlan{
  147. JoinPlan{
  148. baseLogicalPlan: baseLogicalPlan{
  149. children: []LogicalPlan{
  150. WindowPlan{
  151. baseLogicalPlan: baseLogicalPlan{
  152. children: []LogicalPlan{
  153. DataSourcePlan{
  154. name: "src1",
  155. streamFields: []interface{}{
  156. &xsql.StreamField{
  157. Name: "id1",
  158. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  159. },
  160. &xsql.StreamField{
  161. Name: "temp",
  162. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  163. },
  164. },
  165. streamStmt: streams["src1"],
  166. metaFields: []string{},
  167. }.Init(),
  168. DataSourcePlan{
  169. name: "src2",
  170. streamFields: []interface{}{
  171. &xsql.StreamField{
  172. Name: "hum",
  173. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  174. },
  175. &xsql.StreamField{
  176. Name: "id2",
  177. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  178. },
  179. },
  180. streamStmt: streams["src2"],
  181. metaFields: []string{},
  182. }.Init(),
  183. },
  184. },
  185. condition: nil,
  186. wtype: xsql.TUMBLING_WINDOW,
  187. length: 10000,
  188. interval: 0,
  189. limit: 0,
  190. }.Init(),
  191. },
  192. },
  193. from: &xsql.Table{Name: "src1"},
  194. joins: xsql.Joins{xsql.Join{
  195. Name: "src2",
  196. JoinType: xsql.INNER_JOIN,
  197. Expr: &xsql.BinaryExpr{
  198. OP: xsql.AND,
  199. LHS: &xsql.BinaryExpr{
  200. LHS: &xsql.BinaryExpr{
  201. OP: xsql.GT,
  202. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  203. RHS: &xsql.IntegerLiteral{Val: 20},
  204. },
  205. OP: xsql.OR,
  206. RHS: &xsql.BinaryExpr{
  207. OP: xsql.GT,
  208. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  209. RHS: &xsql.IntegerLiteral{Val: 60},
  210. },
  211. },
  212. RHS: &xsql.BinaryExpr{
  213. OP: xsql.EQ,
  214. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  215. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  216. },
  217. },
  218. }},
  219. }.Init(),
  220. },
  221. },
  222. fields: []xsql.Field{
  223. {
  224. Expr: &xsql.FieldRef{Name: "id1"},
  225. Name: "id1",
  226. AName: ""},
  227. },
  228. isAggregate: false,
  229. sendMeta: false,
  230. }.Init(),
  231. }, { // 3 optimize window filter
  232. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  233. p: ProjectPlan{
  234. baseLogicalPlan: baseLogicalPlan{
  235. children: []LogicalPlan{
  236. WindowPlan{
  237. baseLogicalPlan: baseLogicalPlan{
  238. children: []LogicalPlan{
  239. FilterPlan{
  240. baseLogicalPlan: baseLogicalPlan{
  241. children: []LogicalPlan{
  242. DataSourcePlan{
  243. name: "src1",
  244. streamFields: []interface{}{
  245. &xsql.StreamField{
  246. Name: "id1",
  247. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  248. },
  249. &xsql.StreamField{
  250. Name: "name",
  251. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  252. },
  253. &xsql.StreamField{
  254. Name: "temp",
  255. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  256. },
  257. },
  258. streamStmt: streams["src1"],
  259. metaFields: []string{},
  260. }.Init(),
  261. },
  262. },
  263. condition: &xsql.BinaryExpr{
  264. OP: xsql.AND,
  265. LHS: &xsql.BinaryExpr{
  266. LHS: &xsql.FieldRef{Name: "name"},
  267. OP: xsql.EQ,
  268. RHS: &xsql.StringLiteral{Val: "v1"},
  269. },
  270. RHS: &xsql.BinaryExpr{
  271. LHS: &xsql.FieldRef{Name: "temp"},
  272. OP: xsql.GT,
  273. RHS: &xsql.IntegerLiteral{Val: 2},
  274. },
  275. },
  276. }.Init(),
  277. },
  278. },
  279. condition: nil,
  280. wtype: xsql.TUMBLING_WINDOW,
  281. length: 10000,
  282. interval: 0,
  283. limit: 0,
  284. }.Init(),
  285. },
  286. },
  287. fields: []xsql.Field{
  288. {
  289. Expr: &xsql.FieldRef{Name: "id1"},
  290. Name: "id1",
  291. AName: ""},
  292. },
  293. isAggregate: false,
  294. sendMeta: false,
  295. }.Init(),
  296. }, { // 4. do not optimize count window
  297. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  298. p: ProjectPlan{
  299. baseLogicalPlan: baseLogicalPlan{
  300. children: []LogicalPlan{
  301. HavingPlan{
  302. baseLogicalPlan: baseLogicalPlan{
  303. children: []LogicalPlan{
  304. FilterPlan{
  305. baseLogicalPlan: baseLogicalPlan{
  306. children: []LogicalPlan{
  307. WindowPlan{
  308. baseLogicalPlan: baseLogicalPlan{
  309. children: []LogicalPlan{
  310. DataSourcePlan{
  311. name: "src1",
  312. isWildCard: true,
  313. streamFields: []interface{}{
  314. &xsql.StreamField{
  315. Name: "id1",
  316. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  317. },
  318. &xsql.StreamField{
  319. Name: "temp",
  320. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  321. },
  322. &xsql.StreamField{
  323. Name: "name",
  324. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  325. },
  326. },
  327. streamStmt: streams["src1"],
  328. metaFields: []string{},
  329. }.Init(),
  330. },
  331. },
  332. condition: nil,
  333. wtype: xsql.COUNT_WINDOW,
  334. length: 5,
  335. interval: 1,
  336. limit: 0,
  337. }.Init(),
  338. },
  339. },
  340. condition: &xsql.BinaryExpr{
  341. LHS: &xsql.FieldRef{Name: "temp"},
  342. OP: xsql.GT,
  343. RHS: &xsql.IntegerLiteral{Val: 20},
  344. },
  345. }.Init(),
  346. },
  347. },
  348. condition: &xsql.BinaryExpr{
  349. LHS: &xsql.Call{Name: "COUNT", Args: []xsql.Expr{&xsql.StringLiteral{
  350. Val: "*",
  351. }}},
  352. OP: xsql.GT,
  353. RHS: &xsql.IntegerLiteral{Val: 2},
  354. },
  355. }.Init(),
  356. },
  357. },
  358. fields: []xsql.Field{
  359. {
  360. Expr: &xsql.Wildcard{Token: xsql.ASTERISK},
  361. Name: "",
  362. AName: ""},
  363. },
  364. isAggregate: false,
  365. sendMeta: false,
  366. }.Init(),
  367. }, { // 5. optimize join on
  368. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  369. p: ProjectPlan{
  370. baseLogicalPlan: baseLogicalPlan{
  371. children: []LogicalPlan{
  372. JoinPlan{
  373. baseLogicalPlan: baseLogicalPlan{
  374. children: []LogicalPlan{
  375. WindowPlan{
  376. baseLogicalPlan: baseLogicalPlan{
  377. children: []LogicalPlan{
  378. FilterPlan{
  379. baseLogicalPlan: baseLogicalPlan{
  380. children: []LogicalPlan{
  381. DataSourcePlan{
  382. name: "src1",
  383. streamFields: []interface{}{
  384. &xsql.StreamField{
  385. Name: "id1",
  386. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  387. },
  388. &xsql.StreamField{
  389. Name: "temp",
  390. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  391. },
  392. },
  393. streamStmt: streams["src1"],
  394. metaFields: []string{},
  395. }.Init(),
  396. },
  397. },
  398. condition: &xsql.BinaryExpr{
  399. RHS: &xsql.BinaryExpr{
  400. OP: xsql.GT,
  401. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  402. RHS: &xsql.IntegerLiteral{Val: 20},
  403. },
  404. OP: xsql.AND,
  405. LHS: &xsql.BinaryExpr{
  406. OP: xsql.GT,
  407. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  408. RHS: &xsql.IntegerLiteral{Val: 111},
  409. },
  410. },
  411. }.Init(),
  412. FilterPlan{
  413. baseLogicalPlan: baseLogicalPlan{
  414. children: []LogicalPlan{
  415. DataSourcePlan{
  416. name: "src2",
  417. streamFields: []interface{}{
  418. &xsql.StreamField{
  419. Name: "hum",
  420. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  421. },
  422. &xsql.StreamField{
  423. Name: "id2",
  424. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  425. },
  426. },
  427. streamStmt: streams["src2"],
  428. metaFields: []string{},
  429. }.Init(),
  430. },
  431. },
  432. condition: &xsql.BinaryExpr{
  433. OP: xsql.LT,
  434. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  435. RHS: &xsql.IntegerLiteral{Val: 60},
  436. },
  437. }.Init(),
  438. },
  439. },
  440. condition: nil,
  441. wtype: xsql.TUMBLING_WINDOW,
  442. length: 10000,
  443. interval: 0,
  444. limit: 0,
  445. }.Init(),
  446. },
  447. },
  448. from: &xsql.Table{
  449. Name: "src1",
  450. },
  451. joins: []xsql.Join{
  452. {
  453. Name: "src2",
  454. Alias: "",
  455. JoinType: xsql.INNER_JOIN,
  456. Expr: &xsql.BinaryExpr{
  457. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  458. OP: xsql.EQ,
  459. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  460. },
  461. },
  462. },
  463. }.Init(),
  464. },
  465. },
  466. fields: []xsql.Field{
  467. {
  468. Expr: &xsql.FieldRef{Name: "id1"},
  469. Name: "id1",
  470. AName: ""},
  471. },
  472. isAggregate: false,
  473. sendMeta: false,
  474. }.Init(),
  475. }, { // 6. optimize outter join on
  476. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  477. p: ProjectPlan{
  478. baseLogicalPlan: baseLogicalPlan{
  479. children: []LogicalPlan{
  480. JoinPlan{
  481. baseLogicalPlan: baseLogicalPlan{
  482. children: []LogicalPlan{
  483. WindowPlan{
  484. baseLogicalPlan: baseLogicalPlan{
  485. children: []LogicalPlan{
  486. FilterPlan{
  487. baseLogicalPlan: baseLogicalPlan{
  488. children: []LogicalPlan{
  489. DataSourcePlan{
  490. name: "src1",
  491. streamFields: []interface{}{
  492. &xsql.StreamField{
  493. Name: "id1",
  494. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  495. },
  496. &xsql.StreamField{
  497. Name: "temp",
  498. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  499. },
  500. },
  501. streamStmt: streams["src1"],
  502. metaFields: []string{},
  503. }.Init(),
  504. },
  505. },
  506. condition: &xsql.BinaryExpr{
  507. OP: xsql.GT,
  508. LHS: &xsql.FieldRef{Name: "id", StreamName: "src1"},
  509. RHS: &xsql.IntegerLiteral{Val: 111},
  510. },
  511. }.Init(),
  512. DataSourcePlan{
  513. name: "src2",
  514. streamFields: []interface{}{
  515. &xsql.StreamField{
  516. Name: "hum",
  517. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  518. },
  519. &xsql.StreamField{
  520. Name: "id2",
  521. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  522. },
  523. },
  524. streamStmt: streams["src2"],
  525. metaFields: []string{},
  526. }.Init(),
  527. },
  528. },
  529. condition: nil,
  530. wtype: xsql.TUMBLING_WINDOW,
  531. length: 10000,
  532. interval: 0,
  533. limit: 0,
  534. }.Init(),
  535. },
  536. },
  537. from: &xsql.Table{
  538. Name: "src1",
  539. },
  540. joins: []xsql.Join{
  541. {
  542. Name: "src2",
  543. Alias: "",
  544. JoinType: xsql.FULL_JOIN,
  545. Expr: &xsql.BinaryExpr{
  546. OP: xsql.AND,
  547. LHS: &xsql.BinaryExpr{
  548. OP: xsql.AND,
  549. LHS: &xsql.BinaryExpr{
  550. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  551. OP: xsql.EQ,
  552. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  553. },
  554. RHS: &xsql.BinaryExpr{
  555. OP: xsql.GT,
  556. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  557. RHS: &xsql.IntegerLiteral{Val: 20},
  558. },
  559. },
  560. RHS: &xsql.BinaryExpr{
  561. OP: xsql.LT,
  562. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  563. RHS: &xsql.IntegerLiteral{Val: 60},
  564. },
  565. },
  566. },
  567. },
  568. }.Init(),
  569. },
  570. },
  571. fields: []xsql.Field{
  572. {
  573. Expr: &xsql.FieldRef{Name: "id1"},
  574. Name: "id1",
  575. AName: ""},
  576. },
  577. isAggregate: false,
  578. sendMeta: false,
  579. }.Init(),
  580. },
  581. }
  582. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  583. for i, tt := range tests {
  584. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  585. if err != nil {
  586. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  587. continue
  588. }
  589. p, err := createLogicalPlan(stmt, &api.RuleOption{
  590. IsEventTime: false,
  591. LateTol: 0,
  592. Concurrency: 0,
  593. BufferLength: 0,
  594. SendMetaToSink: false,
  595. Qos: 0,
  596. CheckpointInterval: 0,
  597. SendError: true,
  598. }, store)
  599. if err != nil {
  600. t.Errorf("%d. %q\n\nerror:%v\n\n", i, tt.sql, err)
  601. }
  602. if !reflect.DeepEqual(tt.p, p) {
  603. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.p, p)
  604. }
  605. }
  606. }