planner_test.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042
  1. package planner
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/common/kv"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/gdexlab/go-render/render"
  10. "path"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. )
  15. var (
  16. DbDir = common.GetDbDir()
  17. )
  18. func Test_createLogicalPlan(t *testing.T) {
  19. store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
  20. err := store.Open()
  21. if err != nil {
  22. t.Error(err)
  23. return
  24. }
  25. defer store.Close()
  26. streamSqls := map[string]string{
  27. "src1": `CREATE STREAM src1 (
  28. id1 BIGINT,
  29. temp BIGINT,
  30. name string
  31. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  32. "src2": `CREATE STREAM src2 (
  33. id2 BIGINT,
  34. hum BIGINT
  35. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  36. "tableInPlanner": `CREATE TABLE tableInPlanner (
  37. id BIGINT,
  38. name STRING,
  39. value STRING,
  40. hum BIGINT
  41. ) WITH (TYPE="file");`,
  42. }
  43. types := map[string]xsql.StreamType{
  44. "src1": xsql.TypeStream,
  45. "src2": xsql.TypeStream,
  46. "tableInPlanner": xsql.TypeTable,
  47. }
  48. for name, sql := range streamSqls {
  49. s, err := json.Marshal(&xsql.StreamInfo{
  50. StreamType: types[name],
  51. Statement: sql,
  52. })
  53. if err != nil {
  54. t.Error(err)
  55. t.Fail()
  56. }
  57. store.Set(name, string(s))
  58. }
  59. streams := make(map[string]*xsql.StreamStmt)
  60. for n, _ := range streamSqls {
  61. streamStmt, err := xsql.GetDataSource(store, n)
  62. if err != nil {
  63. t.Errorf("fail to get stream %s, please check if stream is created", n)
  64. return
  65. }
  66. streams[n] = streamStmt
  67. }
  68. var tests = []struct {
  69. sql string
  70. p LogicalPlan
  71. err string
  72. }{
  73. { // 0
  74. sql: `SELECT name FROM src1`,
  75. p: ProjectPlan{
  76. baseLogicalPlan: baseLogicalPlan{
  77. children: []LogicalPlan{
  78. DataSourcePlan{
  79. baseLogicalPlan: baseLogicalPlan{},
  80. name: "src1",
  81. streamFields: []interface{}{
  82. &xsql.StreamField{
  83. Name: "name",
  84. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  85. },
  86. },
  87. streamStmt: streams["src1"],
  88. metaFields: []string{},
  89. }.Init(),
  90. },
  91. },
  92. fields: []xsql.Field{
  93. {
  94. Expr: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  95. Name: "name",
  96. AName: ""},
  97. },
  98. isAggregate: false,
  99. sendMeta: false,
  100. }.Init(),
  101. }, { // 1 optimize where to data source
  102. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  103. p: ProjectPlan{
  104. baseLogicalPlan: baseLogicalPlan{
  105. children: []LogicalPlan{
  106. WindowPlan{
  107. baseLogicalPlan: baseLogicalPlan{
  108. children: []LogicalPlan{
  109. FilterPlan{
  110. baseLogicalPlan: baseLogicalPlan{
  111. children: []LogicalPlan{
  112. DataSourcePlan{
  113. name: "src1",
  114. streamFields: []interface{}{
  115. &xsql.StreamField{
  116. Name: "name",
  117. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  118. },
  119. &xsql.StreamField{
  120. Name: "temp",
  121. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  122. },
  123. },
  124. streamStmt: streams["src1"],
  125. metaFields: []string{},
  126. }.Init(),
  127. },
  128. },
  129. condition: &xsql.BinaryExpr{
  130. LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  131. OP: xsql.EQ,
  132. RHS: &xsql.StringLiteral{Val: "v1"},
  133. },
  134. }.Init(),
  135. },
  136. },
  137. condition: nil,
  138. wtype: xsql.TUMBLING_WINDOW,
  139. length: 10000,
  140. interval: 0,
  141. limit: 0,
  142. }.Init(),
  143. },
  144. },
  145. fields: []xsql.Field{
  146. {
  147. Expr: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  148. Name: "temp",
  149. AName: ""},
  150. },
  151. isAggregate: false,
  152. sendMeta: false,
  153. }.Init(),
  154. }, { // 2 condition that cannot be optimized
  155. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  156. p: ProjectPlan{
  157. baseLogicalPlan: baseLogicalPlan{
  158. children: []LogicalPlan{
  159. JoinPlan{
  160. baseLogicalPlan: baseLogicalPlan{
  161. children: []LogicalPlan{
  162. WindowPlan{
  163. baseLogicalPlan: baseLogicalPlan{
  164. children: []LogicalPlan{
  165. DataSourcePlan{
  166. name: "src1",
  167. streamFields: []interface{}{
  168. &xsql.StreamField{
  169. Name: "id1",
  170. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  171. },
  172. &xsql.StreamField{
  173. Name: "temp",
  174. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  175. },
  176. },
  177. streamStmt: streams["src1"],
  178. metaFields: []string{},
  179. }.Init(),
  180. DataSourcePlan{
  181. name: "src2",
  182. streamFields: []interface{}{
  183. &xsql.StreamField{
  184. Name: "hum",
  185. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  186. },
  187. &xsql.StreamField{
  188. Name: "id2",
  189. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  190. },
  191. },
  192. streamStmt: streams["src2"],
  193. metaFields: []string{},
  194. }.Init(),
  195. },
  196. },
  197. condition: nil,
  198. wtype: xsql.TUMBLING_WINDOW,
  199. length: 10000,
  200. interval: 0,
  201. limit: 0,
  202. }.Init(),
  203. },
  204. },
  205. from: &xsql.Table{Name: "src1"},
  206. joins: xsql.Joins{xsql.Join{
  207. Name: "src2",
  208. JoinType: xsql.INNER_JOIN,
  209. Expr: &xsql.BinaryExpr{
  210. OP: xsql.AND,
  211. LHS: &xsql.BinaryExpr{
  212. LHS: &xsql.BinaryExpr{
  213. OP: xsql.GT,
  214. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  215. RHS: &xsql.IntegerLiteral{Val: 20},
  216. },
  217. OP: xsql.OR,
  218. RHS: &xsql.BinaryExpr{
  219. OP: xsql.GT,
  220. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  221. RHS: &xsql.IntegerLiteral{Val: 60},
  222. },
  223. },
  224. RHS: &xsql.BinaryExpr{
  225. OP: xsql.EQ,
  226. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  227. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  228. },
  229. },
  230. }},
  231. }.Init(),
  232. },
  233. },
  234. fields: []xsql.Field{
  235. {
  236. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  237. Name: "id1",
  238. AName: ""},
  239. },
  240. isAggregate: false,
  241. sendMeta: false,
  242. }.Init(),
  243. }, { // 3 optimize window filter
  244. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  245. p: ProjectPlan{
  246. baseLogicalPlan: baseLogicalPlan{
  247. children: []LogicalPlan{
  248. WindowPlan{
  249. baseLogicalPlan: baseLogicalPlan{
  250. children: []LogicalPlan{
  251. FilterPlan{
  252. baseLogicalPlan: baseLogicalPlan{
  253. children: []LogicalPlan{
  254. DataSourcePlan{
  255. name: "src1",
  256. streamFields: []interface{}{
  257. &xsql.StreamField{
  258. Name: "id1",
  259. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  260. },
  261. &xsql.StreamField{
  262. Name: "name",
  263. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  264. },
  265. &xsql.StreamField{
  266. Name: "temp",
  267. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  268. },
  269. },
  270. streamStmt: streams["src1"],
  271. metaFields: []string{},
  272. }.Init(),
  273. },
  274. },
  275. condition: &xsql.BinaryExpr{
  276. OP: xsql.AND,
  277. LHS: &xsql.BinaryExpr{
  278. LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
  279. OP: xsql.EQ,
  280. RHS: &xsql.StringLiteral{Val: "v1"},
  281. },
  282. RHS: &xsql.BinaryExpr{
  283. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  284. OP: xsql.GT,
  285. RHS: &xsql.IntegerLiteral{Val: 2},
  286. },
  287. },
  288. }.Init(),
  289. },
  290. },
  291. condition: nil,
  292. wtype: xsql.TUMBLING_WINDOW,
  293. length: 10000,
  294. interval: 0,
  295. limit: 0,
  296. }.Init(),
  297. },
  298. },
  299. fields: []xsql.Field{
  300. {
  301. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  302. Name: "id1",
  303. AName: ""},
  304. },
  305. isAggregate: false,
  306. sendMeta: false,
  307. }.Init(),
  308. }, { // 4. do not optimize count window
  309. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  310. p: ProjectPlan{
  311. baseLogicalPlan: baseLogicalPlan{
  312. children: []LogicalPlan{
  313. HavingPlan{
  314. baseLogicalPlan: baseLogicalPlan{
  315. children: []LogicalPlan{
  316. FilterPlan{
  317. baseLogicalPlan: baseLogicalPlan{
  318. children: []LogicalPlan{
  319. WindowPlan{
  320. baseLogicalPlan: baseLogicalPlan{
  321. children: []LogicalPlan{
  322. DataSourcePlan{
  323. name: "src1",
  324. isWildCard: true,
  325. streamFields: []interface{}{
  326. &xsql.StreamField{
  327. Name: "id1",
  328. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  329. },
  330. &xsql.StreamField{
  331. Name: "temp",
  332. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  333. },
  334. &xsql.StreamField{
  335. Name: "name",
  336. FieldType: &xsql.BasicType{Type: xsql.STRINGS},
  337. },
  338. },
  339. streamStmt: streams["src1"],
  340. metaFields: []string{},
  341. }.Init(),
  342. },
  343. },
  344. condition: nil,
  345. wtype: xsql.COUNT_WINDOW,
  346. length: 5,
  347. interval: 1,
  348. limit: 0,
  349. }.Init(),
  350. },
  351. },
  352. condition: &xsql.BinaryExpr{
  353. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  354. OP: xsql.GT,
  355. RHS: &xsql.IntegerLiteral{Val: 20},
  356. },
  357. }.Init(),
  358. },
  359. },
  360. condition: &xsql.BinaryExpr{
  361. LHS: &xsql.Call{Name: "COUNT", Args: []xsql.Expr{&xsql.Wildcard{
  362. Token: xsql.ASTERISK,
  363. }}},
  364. OP: xsql.GT,
  365. RHS: &xsql.IntegerLiteral{Val: 2},
  366. },
  367. }.Init(),
  368. },
  369. },
  370. fields: []xsql.Field{
  371. {
  372. Expr: &xsql.Wildcard{Token: xsql.ASTERISK},
  373. Name: "",
  374. AName: ""},
  375. },
  376. isAggregate: false,
  377. sendMeta: false,
  378. }.Init(),
  379. }, { // 5. optimize join on
  380. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  381. p: ProjectPlan{
  382. baseLogicalPlan: baseLogicalPlan{
  383. children: []LogicalPlan{
  384. JoinPlan{
  385. baseLogicalPlan: baseLogicalPlan{
  386. children: []LogicalPlan{
  387. WindowPlan{
  388. baseLogicalPlan: baseLogicalPlan{
  389. children: []LogicalPlan{
  390. FilterPlan{
  391. baseLogicalPlan: baseLogicalPlan{
  392. children: []LogicalPlan{
  393. DataSourcePlan{
  394. name: "src1",
  395. streamFields: []interface{}{
  396. &xsql.StreamField{
  397. Name: "id1",
  398. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  399. },
  400. &xsql.StreamField{
  401. Name: "temp",
  402. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  403. },
  404. },
  405. streamStmt: streams["src1"],
  406. metaFields: []string{},
  407. }.Init(),
  408. },
  409. },
  410. condition: &xsql.BinaryExpr{
  411. RHS: &xsql.BinaryExpr{
  412. OP: xsql.GT,
  413. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  414. RHS: &xsql.IntegerLiteral{Val: 20},
  415. },
  416. OP: xsql.AND,
  417. LHS: &xsql.BinaryExpr{
  418. OP: xsql.GT,
  419. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  420. RHS: &xsql.IntegerLiteral{Val: 111},
  421. },
  422. },
  423. }.Init(),
  424. FilterPlan{
  425. baseLogicalPlan: baseLogicalPlan{
  426. children: []LogicalPlan{
  427. DataSourcePlan{
  428. name: "src2",
  429. streamFields: []interface{}{
  430. &xsql.StreamField{
  431. Name: "hum",
  432. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  433. },
  434. &xsql.StreamField{
  435. Name: "id2",
  436. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  437. },
  438. },
  439. streamStmt: streams["src2"],
  440. metaFields: []string{},
  441. }.Init(),
  442. },
  443. },
  444. condition: &xsql.BinaryExpr{
  445. OP: xsql.LT,
  446. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  447. RHS: &xsql.IntegerLiteral{Val: 60},
  448. },
  449. }.Init(),
  450. },
  451. },
  452. condition: nil,
  453. wtype: xsql.TUMBLING_WINDOW,
  454. length: 10000,
  455. interval: 0,
  456. limit: 0,
  457. }.Init(),
  458. },
  459. },
  460. from: &xsql.Table{
  461. Name: "src1",
  462. },
  463. joins: []xsql.Join{
  464. {
  465. Name: "src2",
  466. Alias: "",
  467. JoinType: xsql.INNER_JOIN,
  468. Expr: &xsql.BinaryExpr{
  469. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  470. OP: xsql.EQ,
  471. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  472. },
  473. },
  474. },
  475. }.Init(),
  476. },
  477. },
  478. fields: []xsql.Field{
  479. {
  480. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  481. Name: "id1",
  482. AName: ""},
  483. },
  484. isAggregate: false,
  485. sendMeta: false,
  486. }.Init(),
  487. }, { // 6. optimize outter join on
  488. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  489. p: ProjectPlan{
  490. baseLogicalPlan: baseLogicalPlan{
  491. children: []LogicalPlan{
  492. JoinPlan{
  493. baseLogicalPlan: baseLogicalPlan{
  494. children: []LogicalPlan{
  495. WindowPlan{
  496. baseLogicalPlan: baseLogicalPlan{
  497. children: []LogicalPlan{
  498. FilterPlan{
  499. baseLogicalPlan: baseLogicalPlan{
  500. children: []LogicalPlan{
  501. DataSourcePlan{
  502. name: "src1",
  503. streamFields: []interface{}{
  504. &xsql.StreamField{
  505. Name: "id1",
  506. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  507. },
  508. &xsql.StreamField{
  509. Name: "temp",
  510. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  511. },
  512. },
  513. streamStmt: streams["src1"],
  514. metaFields: []string{},
  515. }.Init(),
  516. },
  517. },
  518. condition: &xsql.BinaryExpr{
  519. OP: xsql.GT,
  520. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  521. RHS: &xsql.IntegerLiteral{Val: 111},
  522. },
  523. }.Init(),
  524. DataSourcePlan{
  525. name: "src2",
  526. streamFields: []interface{}{
  527. &xsql.StreamField{
  528. Name: "hum",
  529. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  530. },
  531. &xsql.StreamField{
  532. Name: "id2",
  533. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  534. },
  535. },
  536. streamStmt: streams["src2"],
  537. metaFields: []string{},
  538. }.Init(),
  539. },
  540. },
  541. condition: nil,
  542. wtype: xsql.TUMBLING_WINDOW,
  543. length: 10000,
  544. interval: 0,
  545. limit: 0,
  546. }.Init(),
  547. },
  548. },
  549. from: &xsql.Table{
  550. Name: "src1",
  551. },
  552. joins: []xsql.Join{
  553. {
  554. Name: "src2",
  555. Alias: "",
  556. JoinType: xsql.FULL_JOIN,
  557. Expr: &xsql.BinaryExpr{
  558. OP: xsql.AND,
  559. LHS: &xsql.BinaryExpr{
  560. OP: xsql.AND,
  561. LHS: &xsql.BinaryExpr{
  562. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  563. OP: xsql.EQ,
  564. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  565. },
  566. RHS: &xsql.BinaryExpr{
  567. OP: xsql.GT,
  568. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  569. RHS: &xsql.IntegerLiteral{Val: 20},
  570. },
  571. },
  572. RHS: &xsql.BinaryExpr{
  573. OP: xsql.LT,
  574. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  575. RHS: &xsql.IntegerLiteral{Val: 60},
  576. },
  577. },
  578. },
  579. },
  580. }.Init(),
  581. },
  582. },
  583. fields: []xsql.Field{
  584. {
  585. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  586. Name: "id1",
  587. AName: ""},
  588. },
  589. isAggregate: false,
  590. sendMeta: false,
  591. }.Init(),
  592. }, { // 7 window error for table
  593. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  594. p: nil,
  595. err: "cannot run window for TABLE sources",
  596. }, { // 8 join table without window
  597. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  598. p: ProjectPlan{
  599. baseLogicalPlan: baseLogicalPlan{
  600. children: []LogicalPlan{
  601. JoinPlan{
  602. baseLogicalPlan: baseLogicalPlan{
  603. children: []LogicalPlan{
  604. JoinAlignPlan{
  605. baseLogicalPlan: baseLogicalPlan{
  606. children: []LogicalPlan{
  607. FilterPlan{
  608. baseLogicalPlan: baseLogicalPlan{
  609. children: []LogicalPlan{
  610. DataSourcePlan{
  611. name: "src1",
  612. streamFields: []interface{}{
  613. &xsql.StreamField{
  614. Name: "id1",
  615. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  616. },
  617. &xsql.StreamField{
  618. Name: "temp",
  619. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  620. },
  621. },
  622. streamStmt: streams["src1"],
  623. metaFields: []string{},
  624. }.Init(),
  625. },
  626. },
  627. condition: &xsql.BinaryExpr{
  628. RHS: &xsql.BinaryExpr{
  629. OP: xsql.GT,
  630. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  631. RHS: &xsql.IntegerLiteral{Val: 20},
  632. },
  633. OP: xsql.AND,
  634. LHS: &xsql.BinaryExpr{
  635. OP: xsql.GT,
  636. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  637. RHS: &xsql.IntegerLiteral{Val: 111},
  638. },
  639. },
  640. }.Init(),
  641. FilterPlan{
  642. baseLogicalPlan: baseLogicalPlan{
  643. children: []LogicalPlan{
  644. DataSourcePlan{
  645. name: "tableInPlanner",
  646. streamFields: []interface{}{
  647. &xsql.StreamField{
  648. Name: "hum",
  649. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  650. },
  651. &xsql.StreamField{
  652. Name: "id",
  653. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  654. },
  655. },
  656. streamStmt: streams["tableInPlanner"],
  657. metaFields: []string{},
  658. }.Init(),
  659. },
  660. },
  661. condition: &xsql.BinaryExpr{
  662. OP: xsql.LT,
  663. LHS: &xsql.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  664. RHS: &xsql.IntegerLiteral{Val: 60},
  665. },
  666. }.Init(),
  667. },
  668. },
  669. Emitters: []string{"tableInPlanner"},
  670. }.Init(),
  671. },
  672. },
  673. from: &xsql.Table{
  674. Name: "src1",
  675. },
  676. joins: []xsql.Join{
  677. {
  678. Name: "tableInPlanner",
  679. Alias: "",
  680. JoinType: xsql.INNER_JOIN,
  681. Expr: &xsql.BinaryExpr{
  682. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  683. OP: xsql.EQ,
  684. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  685. },
  686. },
  687. },
  688. }.Init(),
  689. },
  690. },
  691. fields: []xsql.Field{
  692. {
  693. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  694. Name: "id1",
  695. AName: ""},
  696. },
  697. isAggregate: false,
  698. sendMeta: false,
  699. }.Init(),
  700. }, { // 9 join table with window
  701. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  702. p: ProjectPlan{
  703. baseLogicalPlan: baseLogicalPlan{
  704. children: []LogicalPlan{
  705. JoinPlan{
  706. baseLogicalPlan: baseLogicalPlan{
  707. children: []LogicalPlan{
  708. JoinAlignPlan{
  709. baseLogicalPlan: baseLogicalPlan{
  710. children: []LogicalPlan{
  711. WindowPlan{
  712. baseLogicalPlan: baseLogicalPlan{
  713. children: []LogicalPlan{
  714. FilterPlan{
  715. baseLogicalPlan: baseLogicalPlan{
  716. children: []LogicalPlan{
  717. DataSourcePlan{
  718. name: "src1",
  719. streamFields: []interface{}{
  720. &xsql.StreamField{
  721. Name: "id1",
  722. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  723. },
  724. &xsql.StreamField{
  725. Name: "temp",
  726. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  727. },
  728. },
  729. streamStmt: streams["src1"],
  730. metaFields: []string{},
  731. }.Init(),
  732. },
  733. },
  734. condition: &xsql.BinaryExpr{
  735. RHS: &xsql.BinaryExpr{
  736. OP: xsql.GT,
  737. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  738. RHS: &xsql.IntegerLiteral{Val: 20},
  739. },
  740. OP: xsql.AND,
  741. LHS: &xsql.BinaryExpr{
  742. OP: xsql.GT,
  743. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  744. RHS: &xsql.IntegerLiteral{Val: 111},
  745. },
  746. },
  747. }.Init(),
  748. },
  749. },
  750. condition: nil,
  751. wtype: xsql.TUMBLING_WINDOW,
  752. length: 10000,
  753. interval: 0,
  754. limit: 0,
  755. }.Init(),
  756. FilterPlan{
  757. baseLogicalPlan: baseLogicalPlan{
  758. children: []LogicalPlan{
  759. DataSourcePlan{
  760. name: "tableInPlanner",
  761. streamFields: []interface{}{
  762. &xsql.StreamField{
  763. Name: "hum",
  764. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  765. },
  766. &xsql.StreamField{
  767. Name: "id",
  768. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  769. },
  770. },
  771. streamStmt: streams["tableInPlanner"],
  772. metaFields: []string{},
  773. }.Init(),
  774. },
  775. },
  776. condition: &xsql.BinaryExpr{
  777. OP: xsql.LT,
  778. LHS: &xsql.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  779. RHS: &xsql.IntegerLiteral{Val: 60},
  780. },
  781. }.Init(),
  782. },
  783. },
  784. Emitters: []string{"tableInPlanner"},
  785. }.Init(),
  786. },
  787. },
  788. from: &xsql.Table{
  789. Name: "src1",
  790. },
  791. joins: []xsql.Join{
  792. {
  793. Name: "tableInPlanner",
  794. Alias: "",
  795. JoinType: xsql.INNER_JOIN,
  796. Expr: &xsql.BinaryExpr{
  797. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  798. OP: xsql.EQ,
  799. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  800. },
  801. },
  802. },
  803. }.Init(),
  804. },
  805. },
  806. fields: []xsql.Field{
  807. {
  808. Expr: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  809. Name: "id1",
  810. AName: ""},
  811. },
  812. isAggregate: false,
  813. sendMeta: false,
  814. }.Init(),
  815. }, { // 10 meta
  816. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  817. p: ProjectPlan{
  818. baseLogicalPlan: baseLogicalPlan{
  819. children: []LogicalPlan{
  820. FilterPlan{
  821. baseLogicalPlan: baseLogicalPlan{
  822. children: []LogicalPlan{
  823. DataSourcePlan{
  824. name: "src1",
  825. streamFields: []interface{}{
  826. &xsql.StreamField{
  827. Name: "temp",
  828. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  829. },
  830. },
  831. streamStmt: streams["src1"],
  832. metaFields: []string{"Humidity", "device", "id"},
  833. alias: xsql.Fields{
  834. xsql.Field{
  835. Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
  836. Name: "id",
  837. StreamName: xsql.DEFAULT_STREAM,
  838. }}},
  839. Name: "meta",
  840. AName: "eid",
  841. },
  842. xsql.Field{
  843. Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{
  844. &xsql.BinaryExpr{
  845. OP: xsql.ARROW,
  846. LHS: &xsql.MetaRef{Name: "Humidity", StreamName: xsql.DEFAULT_STREAM},
  847. RHS: &xsql.MetaRef{Name: "Device"},
  848. },
  849. }},
  850. Name: "meta",
  851. AName: "hdevice",
  852. },
  853. },
  854. }.Init(),
  855. },
  856. },
  857. condition: &xsql.BinaryExpr{
  858. LHS: &xsql.Call{
  859. Name: "meta",
  860. Args: []xsql.Expr{&xsql.MetaRef{
  861. Name: "device",
  862. StreamName: xsql.DEFAULT_STREAM,
  863. }},
  864. },
  865. OP: xsql.EQ,
  866. RHS: &xsql.StringLiteral{
  867. Val: "demo2",
  868. },
  869. },
  870. }.Init(),
  871. },
  872. },
  873. fields: []xsql.Field{
  874. {
  875. Expr: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  876. Name: "temp",
  877. AName: "",
  878. }, {
  879. Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
  880. Name: "id",
  881. StreamName: xsql.DEFAULT_STREAM,
  882. }}},
  883. Name: "meta",
  884. AName: "eid",
  885. }, {
  886. Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{
  887. &xsql.BinaryExpr{
  888. OP: xsql.ARROW,
  889. LHS: &xsql.MetaRef{Name: "Humidity", StreamName: xsql.DEFAULT_STREAM},
  890. RHS: &xsql.MetaRef{Name: "Device"},
  891. },
  892. }},
  893. Name: "meta",
  894. AName: "hdevice",
  895. },
  896. },
  897. isAggregate: false,
  898. sendMeta: false,
  899. }.Init(),
  900. }, { // 11 join with same name field and aliased
  901. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  902. p: ProjectPlan{
  903. baseLogicalPlan: baseLogicalPlan{
  904. children: []LogicalPlan{
  905. JoinPlan{
  906. baseLogicalPlan: baseLogicalPlan{
  907. children: []LogicalPlan{
  908. JoinAlignPlan{
  909. baseLogicalPlan: baseLogicalPlan{
  910. children: []LogicalPlan{
  911. DataSourcePlan{
  912. name: "src2",
  913. streamFields: []interface{}{
  914. &xsql.StreamField{
  915. Name: "hum",
  916. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  917. },
  918. &xsql.StreamField{
  919. Name: "id2",
  920. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  921. },
  922. },
  923. streamStmt: streams["src2"],
  924. alias: xsql.Fields{
  925. xsql.Field{
  926. Expr: &xsql.FieldRef{
  927. Name: "hum",
  928. StreamName: "src2",
  929. },
  930. Name: "hum",
  931. AName: "hum1",
  932. },
  933. },
  934. metaFields: []string{},
  935. }.Init(),
  936. DataSourcePlan{
  937. name: "tableInPlanner",
  938. streamFields: []interface{}{
  939. &xsql.StreamField{
  940. Name: "hum",
  941. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  942. },
  943. &xsql.StreamField{
  944. Name: "id",
  945. FieldType: &xsql.BasicType{Type: xsql.BIGINT},
  946. },
  947. },
  948. streamStmt: streams["tableInPlanner"],
  949. alias: xsql.Fields{
  950. xsql.Field{
  951. Expr: &xsql.FieldRef{
  952. Name: "hum",
  953. StreamName: "tableInPlanner",
  954. },
  955. Name: "hum",
  956. AName: "hum2",
  957. },
  958. },
  959. metaFields: []string{},
  960. }.Init(),
  961. },
  962. },
  963. Emitters: []string{"tableInPlanner"},
  964. }.Init(),
  965. },
  966. },
  967. from: &xsql.Table{
  968. Name: "src2",
  969. },
  970. joins: []xsql.Join{
  971. {
  972. Name: "tableInPlanner",
  973. Alias: "",
  974. JoinType: xsql.INNER_JOIN,
  975. Expr: &xsql.BinaryExpr{
  976. RHS: &xsql.BinaryExpr{
  977. OP: xsql.EQ,
  978. LHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  979. RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  980. },
  981. OP: xsql.AND,
  982. LHS: &xsql.BinaryExpr{
  983. OP: xsql.GT,
  984. LHS: &xsql.FieldRef{Name: "hum1", StreamName: "src2"},
  985. RHS: &xsql.FieldRef{Name: "hum2", StreamName: "tableInPlanner"},
  986. },
  987. },
  988. },
  989. },
  990. }.Init(),
  991. },
  992. },
  993. fields: []xsql.Field{
  994. {
  995. Expr: &xsql.FieldRef{
  996. Name: "hum",
  997. StreamName: "src2",
  998. },
  999. Name: "hum",
  1000. AName: "hum1",
  1001. }, {
  1002. Expr: &xsql.FieldRef{
  1003. Name: "hum",
  1004. StreamName: "tableInPlanner",
  1005. },
  1006. Name: "hum",
  1007. AName: "hum2",
  1008. },
  1009. },
  1010. isAggregate: false,
  1011. sendMeta: false,
  1012. }.Init(),
  1013. },
  1014. }
  1015. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1016. for i, tt := range tests {
  1017. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1018. if err != nil {
  1019. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1020. continue
  1021. }
  1022. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1023. IsEventTime: false,
  1024. LateTol: 0,
  1025. Concurrency: 0,
  1026. BufferLength: 0,
  1027. SendMetaToSink: false,
  1028. Qos: 0,
  1029. CheckpointInterval: 0,
  1030. SendError: true,
  1031. }, store)
  1032. if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
  1033. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1034. } else if !reflect.DeepEqual(tt.p, p) {
  1035. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1036. }
  1037. }
  1038. }