planner_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928
  1. package planner
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/common/kv"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "path"
  10. "reflect"
  11. "strings"
  12. "testing"
  13. )
  14. var (
  15. DbDir = common.GetDbDir()
  16. )
  17. func Test_createLogicalPlan(t *testing.T) {
  18. store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
  19. err := store.Open()
  20. if err != nil {
  21. t.Error(err)
  22. return
  23. }
  24. defer store.Close()
  25. streamSqls := map[string]string{
  26. "src1": `CREATE STREAM src1 (
  27. id1 BIGINT,
  28. temp BIGINT,
  29. name string
  30. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  31. "src2": `CREATE STREAM src2 (
  32. id2 BIGINT,
  33. hum BIGINT
  34. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  35. "tableInPlanner": `CREATE TABLE tableInPlanner (
  36. id BIGINT,
  37. name STRING,
  38. value STRING,
  39. hum BIGINT
  40. ) WITH (TYPE="file");`,
  41. }
  42. types := map[string]xsql.StreamType{
  43. "src1": xsql.TypeStream,
  44. "src2": xsql.TypeStream,
  45. "tableInPlanner": xsql.TypeTable,
  46. }
  47. for name, sql := range streamSqls {
  48. s, err := json.Marshal(&xsql.StreamInfo{
  49. StreamType: types[name],
  50. Statement: sql,
  51. })
  52. if err != nil {
  53. t.Error(err)
  54. t.Fail()
  55. }
  56. store.Set(name, string(s))
  57. }
  58. streams := make(map[string]*xsql.StreamStmt)
  59. for n, _ := range streamSqls {
  60. streamStmt, err := xsql.GetDataSource(store, n)
  61. if err != nil {
  62. t.Errorf("fail to get stream %s, please check if stream is created", n)
  63. return
  64. }
  65. streams[n] = streamStmt
  66. }
  67. var tests = []struct {
  68. sql string
  69. p LogicalPlan
  70. err string
  71. }{
  72. { // 0
  73. sql: `SELECT name FROM src1`,
  74. p: ProjectPlan{
  75. baseLogicalPlan: baseLogicalPlan{
  76. children: []LogicalPlan{
  77. DataSourcePlan{
  78. baseLogicalPlan: baseLogicalPlan{},
  79. name: "src1",
  80. streamFields: []interface{}{
  81. &xsql.StreamField{
  82. Name: "name",
  83. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  84. },
  85. },
  86. streamStmt: streams["src1"],
  87. metaFields: []string{},
  88. }.Init(),
  89. },
  90. },
  91. fields: []xsql.Field{
  92. {
  93. Expr: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  94. Name: "name",
  95. AName: ""},
  96. },
  97. isAggregate: false,
  98. sendMeta: false,
  99. }.Init(),
  100. }, { // 1 optimize where to data source
  101. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  102. p: ProjectPlan{
  103. baseLogicalPlan: baseLogicalPlan{
  104. children: []LogicalPlan{
  105. WindowPlan{
  106. baseLogicalPlan: baseLogicalPlan{
  107. children: []LogicalPlan{
  108. FilterPlan{
  109. baseLogicalPlan: baseLogicalPlan{
  110. children: []LogicalPlan{
  111. DataSourcePlan{
  112. name: "src1",
  113. streamFields: []interface{}{
  114. &xsql.StreamField{
  115. Name: "name",
  116. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  117. },
  118. &xsql.StreamField{
  119. Name: "temp",
  120. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  121. },
  122. },
  123. streamStmt: streams["src1"],
  124. metaFields: []string{},
  125. }.Init(),
  126. },
  127. },
  128. condition: &xsql.BinaryExpr{
  129. LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  130. OP: xsql.EQ,
  131. RHS: &xsql.StringLiteral{Val: "v1"},
  132. },
  133. }.Init(),
  134. },
  135. },
  136. condition: nil,
  137. wtype: xsql.TUMBLING_WINDOW,
  138. length: 10000,
  139. interval: 0,
  140. limit: 0,
  141. }.Init(),
  142. },
  143. },
  144. fields: []xsql.Field{
  145. {
  146. Expr: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  147. Name: "temp",
  148. AName: ""},
  149. },
  150. isAggregate: false,
  151. sendMeta: false,
  152. }.Init(),
  153. }, { // 2 condition that cannot be optimized
  154. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  155. p: ProjectPlan{
  156. baseLogicalPlan: baseLogicalPlan{
  157. children: []LogicalPlan{
  158. JoinPlan{
  159. baseLogicalPlan: baseLogicalPlan{
  160. children: []LogicalPlan{
  161. WindowPlan{
  162. baseLogicalPlan: baseLogicalPlan{
  163. children: []LogicalPlan{
  164. DataSourcePlan{
  165. name: "src1",
  166. streamFields: []interface{}{
  167. &xsql.StreamField{
  168. Name: "id1",
  169. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  170. },
  171. &xsql.StreamField{
  172. Name: "temp",
  173. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  174. },
  175. },
  176. streamStmt: streams["src1"],
  177. metaFields: []string{},
  178. }.Init(),
  179. DataSourcePlan{
  180. name: "src2",
  181. streamFields: []interface{}{
  182. &xsql.StreamField{
  183. Name: "hum",
  184. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  185. },
  186. &xsql.StreamField{
  187. Name: "id2",
  188. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  189. },
  190. },
  191. streamStmt: streams["src2"],
  192. metaFields: []string{},
  193. }.Init(),
  194. },
  195. },
  196. condition: nil,
  197. wtype: xsql.TUMBLING_WINDOW,
  198. length: 10000,
  199. interval: 0,
  200. limit: 0,
  201. }.Init(),
  202. },
  203. },
  204. from: &xsql.Table{Name: "src1"},
  205. joins: xsql.Joins{xsql.Join{
  206. Name: "src2",
  207. JoinType: xsql.INNER_JOIN,
  208. Expr: &xsql.BinaryExpr{
  209. OP: xsql.AND,
  210. LHS: &xsql.BinaryExpr{
  211. LHS: &xsql.BinaryExpr{
  212. OP: xsql.GT,
  213. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  214. RHS: &xsql.IntegerLiteral{Val: 20},
  215. },
  216. OP: xsql.OR,
  217. RHS: &xsql.BinaryExpr{
  218. OP: xsql.GT,
  219. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  220. RHS: &xsql.IntegerLiteral{Val: 60},
  221. },
  222. },
  223. RHS: &xsql.BinaryExpr{
  224. OP: xsql.EQ,
  225. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  226. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  227. },
  228. },
  229. }},
  230. }.Init(),
  231. },
  232. },
  233. fields: []xsql.Field{
  234. {
  235. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  236. Name: "id1",
  237. AName: ""},
  238. },
  239. isAggregate: false,
  240. sendMeta: false,
  241. }.Init(),
  242. }, { // 3 optimize window filter
  243. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  244. p: ProjectPlan{
  245. baseLogicalPlan: baseLogicalPlan{
  246. children: []LogicalPlan{
  247. WindowPlan{
  248. baseLogicalPlan: baseLogicalPlan{
  249. children: []LogicalPlan{
  250. FilterPlan{
  251. baseLogicalPlan: baseLogicalPlan{
  252. children: []LogicalPlan{
  253. DataSourcePlan{
  254. name: "src1",
  255. streamFields: []interface{}{
  256. &xsql.StreamField{
  257. Name: "id1",
  258. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  259. },
  260. &xsql.StreamField{
  261. Name: "name",
  262. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  263. },
  264. &xsql.StreamField{
  265. Name: "temp",
  266. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  267. },
  268. },
  269. streamStmt: streams["src1"],
  270. metaFields: []string{},
  271. }.Init(),
  272. },
  273. },
  274. condition: &xsql.BinaryExpr{
  275. OP: xsql.AND,
  276. LHS: &xsql.BinaryExpr{
  277. LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  278. OP: xsql.EQ,
  279. RHS: &xsql.StringLiteral{Val: "v1"},
  280. },
  281. RHS: &xsql.BinaryExpr{
  282. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  283. OP: xsql.GT,
  284. RHS: &xsql.IntegerLiteral{Val: 2},
  285. },
  286. },
  287. }.Init(),
  288. },
  289. },
  290. condition: nil,
  291. wtype: xsql.TUMBLING_WINDOW,
  292. length: 10000,
  293. interval: 0,
  294. limit: 0,
  295. }.Init(),
  296. },
  297. },
  298. fields: []xsql.Field{
  299. {
  300. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  301. Name: "id1",
  302. AName: ""},
  303. },
  304. isAggregate: false,
  305. sendMeta: false,
  306. }.Init(),
  307. }, { // 4. do not optimize count window
  308. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  309. p: ProjectPlan{
  310. baseLogicalPlan: baseLogicalPlan{
  311. children: []LogicalPlan{
  312. HavingPlan{
  313. baseLogicalPlan: baseLogicalPlan{
  314. children: []LogicalPlan{
  315. FilterPlan{
  316. baseLogicalPlan: baseLogicalPlan{
  317. children: []LogicalPlan{
  318. WindowPlan{
  319. baseLogicalPlan: baseLogicalPlan{
  320. children: []LogicalPlan{
  321. DataSourcePlan{
  322. name: "src1",
  323. isWildCard: true,
  324. streamFields: []interface{}{
  325. &xsql.StreamField{
  326. Name: "id1",
  327. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  328. },
  329. &xsql.StreamField{
  330. Name: "temp",
  331. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  332. },
  333. &xsql.StreamField{
  334. Name: "name",
  335. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  336. },
  337. },
  338. streamStmt: streams["src1"],
  339. metaFields: []string{},
  340. }.Init(),
  341. },
  342. },
  343. condition: nil,
  344. wtype: xsql.COUNT_WINDOW,
  345. length: 5,
  346. interval: 1,
  347. limit: 0,
  348. }.Init(),
  349. },
  350. },
  351. condition: &xsql.BinaryExpr{
  352. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  353. OP: xsql.GT,
  354. RHS: &xsql.IntegerLiteral{Val: 20},
  355. },
  356. }.Init(),
  357. },
  358. },
  359. condition: &xsql.BinaryExpr{
  360. LHS: &xsql.Call{Name: "COUNT", Args: []xsql.Expr{&xsql.Wildcard{
  361. Token: xsql.ASTERISK,
  362. }}},
  363. OP: xsql.GT,
  364. RHS: &xsql.IntegerLiteral{Val: 2},
  365. },
  366. }.Init(),
  367. },
  368. },
  369. fields: []xsql.Field{
  370. {
  371. Expr: &xsql.Wildcard{Token: xsql.ASTERISK},
  372. Name: "",
  373. AName: ""},
  374. },
  375. isAggregate: false,
  376. sendMeta: false,
  377. }.Init(),
  378. }, { // 5. optimize join on
  379. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  380. p: ProjectPlan{
  381. baseLogicalPlan: baseLogicalPlan{
  382. children: []LogicalPlan{
  383. JoinPlan{
  384. baseLogicalPlan: baseLogicalPlan{
  385. children: []LogicalPlan{
  386. WindowPlan{
  387. baseLogicalPlan: baseLogicalPlan{
  388. children: []LogicalPlan{
  389. FilterPlan{
  390. baseLogicalPlan: baseLogicalPlan{
  391. children: []LogicalPlan{
  392. DataSourcePlan{
  393. name: "src1",
  394. streamFields: []interface{}{
  395. &xsql.StreamField{
  396. Name: "id1",
  397. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  398. },
  399. &xsql.StreamField{
  400. Name: "temp",
  401. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  402. },
  403. },
  404. streamStmt: streams["src1"],
  405. metaFields: []string{},
  406. }.Init(),
  407. },
  408. },
  409. condition: &xsql.BinaryExpr{
  410. RHS: &xsql.BinaryExpr{
  411. OP: xsql.GT,
  412. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  413. RHS: &xsql.IntegerLiteral{Val: 20},
  414. },
  415. OP: xsql.AND,
  416. LHS: &xsql.BinaryExpr{
  417. OP: xsql.GT,
  418. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  419. RHS: &xsql.IntegerLiteral{Val: 111},
  420. },
  421. },
  422. }.Init(),
  423. FilterPlan{
  424. baseLogicalPlan: baseLogicalPlan{
  425. children: []LogicalPlan{
  426. DataSourcePlan{
  427. name: "src2",
  428. streamFields: []interface{}{
  429. &xsql.StreamField{
  430. Name: "hum",
  431. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  432. },
  433. &xsql.StreamField{
  434. Name: "id2",
  435. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  436. },
  437. },
  438. streamStmt: streams["src2"],
  439. metaFields: []string{},
  440. }.Init(),
  441. },
  442. },
  443. condition: &xsql.BinaryExpr{
  444. OP: xsql.LT,
  445. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  446. RHS: &xsql.IntegerLiteral{Val: 60},
  447. },
  448. }.Init(),
  449. },
  450. },
  451. condition: nil,
  452. wtype: xsql.TUMBLING_WINDOW,
  453. length: 10000,
  454. interval: 0,
  455. limit: 0,
  456. }.Init(),
  457. },
  458. },
  459. from: &xsql.Table{
  460. Name: "src1",
  461. },
  462. joins: []xsql.Join{
  463. {
  464. Name: "src2",
  465. Alias: "",
  466. JoinType: xsql.INNER_JOIN,
  467. Expr: &xsql.BinaryExpr{
  468. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  469. OP: xsql.EQ,
  470. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  471. },
  472. },
  473. },
  474. }.Init(),
  475. },
  476. },
  477. fields: []xsql.Field{
  478. {
  479. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  480. Name: "id1",
  481. AName: ""},
  482. },
  483. isAggregate: false,
  484. sendMeta: false,
  485. }.Init(),
  486. }, { // 6. optimize outter join on
  487. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  488. p: ProjectPlan{
  489. baseLogicalPlan: baseLogicalPlan{
  490. children: []LogicalPlan{
  491. JoinPlan{
  492. baseLogicalPlan: baseLogicalPlan{
  493. children: []LogicalPlan{
  494. WindowPlan{
  495. baseLogicalPlan: baseLogicalPlan{
  496. children: []LogicalPlan{
  497. FilterPlan{
  498. baseLogicalPlan: baseLogicalPlan{
  499. children: []LogicalPlan{
  500. DataSourcePlan{
  501. name: "src1",
  502. streamFields: []interface{}{
  503. &xsql.StreamField{
  504. Name: "id1",
  505. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  506. },
  507. &xsql.StreamField{
  508. Name: "temp",
  509. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  510. },
  511. },
  512. streamStmt: streams["src1"],
  513. metaFields: []string{},
  514. }.Init(),
  515. },
  516. },
  517. condition: &xsql.BinaryExpr{
  518. OP: xsql.GT,
  519. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  520. RHS: &xsql.IntegerLiteral{Val: 111},
  521. },
  522. }.Init(),
  523. DataSourcePlan{
  524. name: "src2",
  525. streamFields: []interface{}{
  526. &xsql.StreamField{
  527. Name: "hum",
  528. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  529. },
  530. &xsql.StreamField{
  531. Name: "id2",
  532. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  533. },
  534. },
  535. streamStmt: streams["src2"],
  536. metaFields: []string{},
  537. }.Init(),
  538. },
  539. },
  540. condition: nil,
  541. wtype: xsql.TUMBLING_WINDOW,
  542. length: 10000,
  543. interval: 0,
  544. limit: 0,
  545. }.Init(),
  546. },
  547. },
  548. from: &xsql.Table{
  549. Name: "src1",
  550. },
  551. joins: []xsql.Join{
  552. {
  553. Name: "src2",
  554. Alias: "",
  555. JoinType: xsql.FULL_JOIN,
  556. Expr: &xsql.BinaryExpr{
  557. OP: xsql.AND,
  558. LHS: &xsql.BinaryExpr{
  559. OP: xsql.AND,
  560. LHS: &xsql.BinaryExpr{
  561. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  562. OP: xsql.EQ,
  563. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  564. },
  565. RHS: &xsql.BinaryExpr{
  566. OP: xsql.GT,
  567. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  568. RHS: &xsql.IntegerLiteral{Val: 20},
  569. },
  570. },
  571. RHS: &xsql.BinaryExpr{
  572. OP: xsql.LT,
  573. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  574. RHS: &xsql.IntegerLiteral{Val: 60},
  575. },
  576. },
  577. },
  578. },
  579. }.Init(),
  580. },
  581. },
  582. fields: []xsql.Field{
  583. {
  584. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  585. Name: "id1",
  586. AName: ""},
  587. },
  588. isAggregate: false,
  589. sendMeta: false,
  590. }.Init(),
  591. }, { // 7 window error for table
  592. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  593. p: nil,
  594. err: "cannot run window for TABLE sources",
  595. }, { // 8 join table without window
  596. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  597. p: ProjectPlan{
  598. baseLogicalPlan: baseLogicalPlan{
  599. children: []LogicalPlan{
  600. JoinPlan{
  601. baseLogicalPlan: baseLogicalPlan{
  602. children: []LogicalPlan{
  603. JoinAlignPlan{
  604. baseLogicalPlan: baseLogicalPlan{
  605. children: []LogicalPlan{
  606. FilterPlan{
  607. baseLogicalPlan: baseLogicalPlan{
  608. children: []LogicalPlan{
  609. DataSourcePlan{
  610. name: "src1",
  611. streamFields: []interface{}{
  612. &xsql.StreamField{
  613. Name: "id1",
  614. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  615. },
  616. &xsql.StreamField{
  617. Name: "temp",
  618. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  619. },
  620. },
  621. streamStmt: streams["src1"],
  622. metaFields: []string{},
  623. }.Init(),
  624. },
  625. },
  626. condition: &xsql.BinaryExpr{
  627. RHS: &xsql.BinaryExpr{
  628. OP: xsql.GT,
  629. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  630. RHS: &xsql.IntegerLiteral{Val: 20},
  631. },
  632. OP: xsql.AND,
  633. LHS: &xsql.BinaryExpr{
  634. OP: xsql.GT,
  635. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  636. RHS: &xsql.IntegerLiteral{Val: 111},
  637. },
  638. },
  639. }.Init(),
  640. FilterPlan{
  641. baseLogicalPlan: baseLogicalPlan{
  642. children: []LogicalPlan{
  643. DataSourcePlan{
  644. name: "tableInPlanner",
  645. streamFields: []interface{}{
  646. &xsql.StreamField{
  647. Name: "hum",
  648. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  649. },
  650. &xsql.StreamField{
  651. Name: "id",
  652. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  653. },
  654. },
  655. streamStmt: streams["tableInPlanner"],
  656. metaFields: []string{},
  657. }.Init(),
  658. },
  659. },
  660. condition: &xsql.BinaryExpr{
  661. OP: xsql.LT,
  662. LHS: &xsql.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  663. RHS: &xsql.IntegerLiteral{Val: 60},
  664. },
  665. }.Init(),
  666. },
  667. },
  668. Emitters: []string{"tableInPlanner"},
  669. }.Init(),
  670. },
  671. },
  672. from: &xsql.Table{
  673. Name: "src1",
  674. },
  675. joins: []xsql.Join{
  676. {
  677. Name: "tableInPlanner",
  678. Alias: "",
  679. JoinType: xsql.INNER_JOIN,
  680. Expr: &xsql.BinaryExpr{
  681. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  682. OP: xsql.EQ,
  683. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  684. },
  685. },
  686. },
  687. }.Init(),
  688. },
  689. },
  690. fields: []xsql.Field{
  691. {
  692. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  693. Name: "id1",
  694. AName: ""},
  695. },
  696. isAggregate: false,
  697. sendMeta: false,
  698. }.Init(),
  699. }, { // 9 join table with window
  700. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  701. p: ProjectPlan{
  702. baseLogicalPlan: baseLogicalPlan{
  703. children: []LogicalPlan{
  704. JoinPlan{
  705. baseLogicalPlan: baseLogicalPlan{
  706. children: []LogicalPlan{
  707. JoinAlignPlan{
  708. baseLogicalPlan: baseLogicalPlan{
  709. children: []LogicalPlan{
  710. WindowPlan{
  711. baseLogicalPlan: baseLogicalPlan{
  712. children: []LogicalPlan{
  713. FilterPlan{
  714. baseLogicalPlan: baseLogicalPlan{
  715. children: []LogicalPlan{
  716. DataSourcePlan{
  717. name: "src1",
  718. streamFields: []interface{}{
  719. &xsql.StreamField{
  720. Name: "id1",
  721. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  722. },
  723. &xsql.StreamField{
  724. Name: "temp",
  725. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  726. },
  727. },
  728. streamStmt: streams["src1"],
  729. metaFields: []string{},
  730. }.Init(),
  731. },
  732. },
  733. condition: &xsql.BinaryExpr{
  734. RHS: &xsql.BinaryExpr{
  735. OP: xsql.GT,
  736. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  737. RHS: &xsql.IntegerLiteral{Val: 20},
  738. },
  739. OP: xsql.AND,
  740. LHS: &xsql.BinaryExpr{
  741. OP: xsql.GT,
  742. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  743. RHS: &xsql.IntegerLiteral{Val: 111},
  744. },
  745. },
  746. }.Init(),
  747. },
  748. },
  749. condition: nil,
  750. wtype: xsql.TUMBLING_WINDOW,
  751. length: 10000,
  752. interval: 0,
  753. limit: 0,
  754. }.Init(),
  755. FilterPlan{
  756. baseLogicalPlan: baseLogicalPlan{
  757. children: []LogicalPlan{
  758. DataSourcePlan{
  759. name: "tableInPlanner",
  760. streamFields: []interface{}{
  761. &xsql.StreamField{
  762. Name: "hum",
  763. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  764. },
  765. &xsql.StreamField{
  766. Name: "id",
  767. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  768. },
  769. },
  770. streamStmt: streams["tableInPlanner"],
  771. metaFields: []string{},
  772. }.Init(),
  773. },
  774. },
  775. condition: &xsql.BinaryExpr{
  776. OP: xsql.LT,
  777. LHS: &xsql.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  778. RHS: &xsql.IntegerLiteral{Val: 60},
  779. },
  780. }.Init(),
  781. },
  782. },
  783. Emitters: []string{"tableInPlanner"},
  784. }.Init(),
  785. },
  786. },
  787. from: &xsql.Table{
  788. Name: "src1",
  789. },
  790. joins: []xsql.Join{
  791. {
  792. Name: "tableInPlanner",
  793. Alias: "",
  794. JoinType: xsql.INNER_JOIN,
  795. Expr: &xsql.BinaryExpr{
  796. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  797. OP: xsql.EQ,
  798. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  799. },
  800. },
  801. },
  802. }.Init(),
  803. },
  804. },
  805. fields: []xsql.Field{
  806. {
  807. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  808. Name: "id1",
  809. AName: ""},
  810. },
  811. isAggregate: false,
  812. sendMeta: false,
  813. }.Init(),
  814. }, { // 10 meta
  815. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  816. p: ProjectPlan{
  817. baseLogicalPlan: baseLogicalPlan{
  818. children: []LogicalPlan{
  819. FilterPlan{
  820. baseLogicalPlan: baseLogicalPlan{
  821. children: []LogicalPlan{
  822. DataSourcePlan{
  823. name: "src1",
  824. streamFields: []interface{}{
  825. &xsql.StreamField{
  826. Name: "temp",
  827. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  828. },
  829. },
  830. streamStmt: streams["src1"],
  831. metaFields: []string{"Humidity", "device", "id"},
  832. alias: xsql.Fields{
  833. xsql.Field{
  834. Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
  835. Name: "id",
  836. StreamName: xsql.DEFAULT_STREAM,
  837. }}},
  838. Name: "meta",
  839. AName: "eid",
  840. },
  841. xsql.Field{
  842. Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{
  843. &xsql.BinaryExpr{
  844. OP: xsql.ARROW,
  845. LHS: &xsql.MetaRef{Name: "Humidity", StreamName: xsql.DEFAULT_STREAM},
  846. RHS: &xsql.MetaRef{Name: "Device"},
  847. },
  848. }},
  849. Name: "meta",
  850. AName: "hdevice",
  851. },
  852. },
  853. }.Init(),
  854. },
  855. },
  856. condition: &xsql.BinaryExpr{
  857. LHS: &xsql.Call{
  858. Name: "meta",
  859. Args: []xsql.Expr{&xsql.MetaRef{
  860. Name: "device",
  861. StreamName: xsql.DEFAULT_STREAM,
  862. }},
  863. },
  864. OP: xsql.EQ,
  865. RHS: &xsql.StringLiteral{
  866. Val: "demo2",
  867. },
  868. },
  869. }.Init(),
  870. },
  871. },
  872. fields: []xsql.Field{
  873. {
  874. Expr: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  875. Name: "temp",
  876. AName: "",
  877. }, {
  878. Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
  879. Name: "id",
  880. StreamName: xsql.DEFAULT_STREAM,
  881. }}},
  882. Name: "meta",
  883. AName: "eid",
  884. }, {
  885. Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{
  886. &xsql.BinaryExpr{
  887. OP: xsql.ARROW,
  888. LHS: &xsql.MetaRef{Name: "Humidity", StreamName: xsql.DEFAULT_STREAM},
  889. RHS: &xsql.MetaRef{Name: "Device"},
  890. },
  891. }},
  892. Name: "meta",
  893. AName: "hdevice",
  894. },
  895. },
  896. isAggregate: false,
  897. sendMeta: false,
  898. }.Init(),
  899. },
  900. }
  901. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  902. for i, tt := range tests {
  903. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  904. if err != nil {
  905. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  906. continue
  907. }
  908. p, err := createLogicalPlan(stmt, &api.RuleOption{
  909. IsEventTime: false,
  910. LateTol: 0,
  911. Concurrency: 0,
  912. BufferLength: 0,
  913. SendMetaToSink: false,
  914. Qos: 0,
  915. CheckpointInterval: 0,
  916. SendError: true,
  917. }, store)
  918. if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
  919. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  920. } else if !reflect.DeepEqual(tt.p, p) {
  921. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.p, p)
  922. }
  923. }
  924. }