planner_test.go 57 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950
  1. package planner
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/gdexlab/go-render/render"
  6. "github.com/lf-edge/ekuiper/internal/testx"
  7. "github.com/lf-edge/ekuiper/internal/xsql"
  8. "github.com/lf-edge/ekuiper/pkg/api"
  9. "github.com/lf-edge/ekuiper/pkg/ast"
  10. "github.com/lf-edge/ekuiper/pkg/kv"
  11. "path"
  12. "reflect"
  13. "strings"
  14. "testing"
  15. )
  16. var (
  17. DbDir = testx.GetDbDir()
  18. )
  19. func Test_createLogicalPlan(t *testing.T) {
  20. store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
  21. err := store.Open()
  22. if err != nil {
  23. t.Error(err)
  24. return
  25. }
  26. defer store.Close()
  27. streamSqls := map[string]string{
  28. "src1": `CREATE STREAM src1 (
  29. id1 BIGINT,
  30. temp BIGINT,
  31. name string
  32. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  33. "src2": `CREATE STREAM src2 (
  34. id2 BIGINT,
  35. hum BIGINT
  36. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  37. "tableInPlanner": `CREATE TABLE tableInPlanner (
  38. id BIGINT,
  39. name STRING,
  40. value STRING,
  41. hum BIGINT
  42. ) WITH (TYPE="file");`,
  43. }
  44. types := map[string]ast.StreamType{
  45. "src1": ast.TypeStream,
  46. "src2": ast.TypeStream,
  47. "tableInPlanner": ast.TypeTable,
  48. }
  49. for name, sql := range streamSqls {
  50. s, err := json.Marshal(&xsql.StreamInfo{
  51. StreamType: types[name],
  52. Statement: sql,
  53. })
  54. if err != nil {
  55. t.Error(err)
  56. t.Fail()
  57. }
  58. store.Set(name, string(s))
  59. }
  60. streams := make(map[string]*ast.StreamStmt)
  61. for n := range streamSqls {
  62. streamStmt, err := xsql.GetDataSource(store, n)
  63. if err != nil {
  64. t.Errorf("fail to get stream %s, please check if stream is created", n)
  65. return
  66. }
  67. streams[n] = streamStmt
  68. }
  69. var (
  70. //boolTrue = true
  71. boolFalse = false
  72. )
  73. var tests = []struct {
  74. sql string
  75. p LogicalPlan
  76. err string
  77. }{
  78. { // 0
  79. sql: `SELECT name FROM src1`,
  80. p: ProjectPlan{
  81. baseLogicalPlan: baseLogicalPlan{
  82. children: []LogicalPlan{
  83. DataSourcePlan{
  84. baseLogicalPlan: baseLogicalPlan{},
  85. name: "src1",
  86. streamFields: []interface{}{
  87. &ast.StreamField{
  88. Name: "name",
  89. FieldType: &ast.BasicType{Type: ast.STRINGS},
  90. },
  91. },
  92. streamStmt: streams["src1"],
  93. metaFields: []string{},
  94. }.Init(),
  95. },
  96. },
  97. fields: []ast.Field{
  98. {
  99. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  100. Name: "name",
  101. AName: "",
  102. },
  103. },
  104. isAggregate: false,
  105. sendMeta: false,
  106. }.Init(),
  107. }, { // 1 optimize where to data source
  108. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  109. p: ProjectPlan{
  110. baseLogicalPlan: baseLogicalPlan{
  111. children: []LogicalPlan{
  112. WindowPlan{
  113. baseLogicalPlan: baseLogicalPlan{
  114. children: []LogicalPlan{
  115. FilterPlan{
  116. baseLogicalPlan: baseLogicalPlan{
  117. children: []LogicalPlan{
  118. DataSourcePlan{
  119. name: "src1",
  120. streamFields: []interface{}{
  121. &ast.StreamField{
  122. Name: "name",
  123. FieldType: &ast.BasicType{Type: ast.STRINGS},
  124. },
  125. &ast.StreamField{
  126. Name: "temp",
  127. FieldType: &ast.BasicType{Type: ast.BIGINT},
  128. },
  129. },
  130. streamStmt: streams["src1"],
  131. metaFields: []string{},
  132. }.Init(),
  133. },
  134. },
  135. condition: &ast.BinaryExpr{
  136. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  137. OP: ast.EQ,
  138. RHS: &ast.StringLiteral{Val: "v1"},
  139. },
  140. }.Init(),
  141. },
  142. },
  143. condition: nil,
  144. wtype: ast.TUMBLING_WINDOW,
  145. length: 10000,
  146. interval: 0,
  147. limit: 0,
  148. }.Init(),
  149. },
  150. },
  151. fields: []ast.Field{
  152. {
  153. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  154. Name: "temp",
  155. AName: ""},
  156. },
  157. isAggregate: false,
  158. sendMeta: false,
  159. }.Init(),
  160. }, { // 2 condition that cannot be optimized
  161. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  162. p: ProjectPlan{
  163. baseLogicalPlan: baseLogicalPlan{
  164. children: []LogicalPlan{
  165. JoinPlan{
  166. baseLogicalPlan: baseLogicalPlan{
  167. children: []LogicalPlan{
  168. WindowPlan{
  169. baseLogicalPlan: baseLogicalPlan{
  170. children: []LogicalPlan{
  171. DataSourcePlan{
  172. name: "src1",
  173. streamFields: []interface{}{
  174. &ast.StreamField{
  175. Name: "id1",
  176. FieldType: &ast.BasicType{Type: ast.BIGINT},
  177. },
  178. &ast.StreamField{
  179. Name: "temp",
  180. FieldType: &ast.BasicType{Type: ast.BIGINT},
  181. },
  182. },
  183. streamStmt: streams["src1"],
  184. metaFields: []string{},
  185. }.Init(),
  186. DataSourcePlan{
  187. name: "src2",
  188. streamFields: []interface{}{
  189. &ast.StreamField{
  190. Name: "hum",
  191. FieldType: &ast.BasicType{Type: ast.BIGINT},
  192. },
  193. &ast.StreamField{
  194. Name: "id2",
  195. FieldType: &ast.BasicType{Type: ast.BIGINT},
  196. },
  197. },
  198. streamStmt: streams["src2"],
  199. metaFields: []string{},
  200. }.Init(),
  201. },
  202. },
  203. condition: nil,
  204. wtype: ast.TUMBLING_WINDOW,
  205. length: 10000,
  206. interval: 0,
  207. limit: 0,
  208. }.Init(),
  209. },
  210. },
  211. from: &ast.Table{Name: "src1"},
  212. joins: ast.Joins{ast.Join{
  213. Name: "src2",
  214. JoinType: ast.INNER_JOIN,
  215. Expr: &ast.BinaryExpr{
  216. OP: ast.AND,
  217. LHS: &ast.BinaryExpr{
  218. LHS: &ast.BinaryExpr{
  219. OP: ast.GT,
  220. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  221. RHS: &ast.IntegerLiteral{Val: 20},
  222. },
  223. OP: ast.OR,
  224. RHS: &ast.BinaryExpr{
  225. OP: ast.GT,
  226. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  227. RHS: &ast.IntegerLiteral{Val: 60},
  228. },
  229. },
  230. RHS: &ast.BinaryExpr{
  231. OP: ast.EQ,
  232. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  233. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  234. },
  235. },
  236. }},
  237. }.Init(),
  238. },
  239. },
  240. fields: []ast.Field{
  241. {
  242. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  243. Name: "id1",
  244. AName: ""},
  245. },
  246. isAggregate: false,
  247. sendMeta: false,
  248. }.Init(),
  249. }, { // 3 optimize window filter
  250. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  251. p: ProjectPlan{
  252. baseLogicalPlan: baseLogicalPlan{
  253. children: []LogicalPlan{
  254. WindowPlan{
  255. baseLogicalPlan: baseLogicalPlan{
  256. children: []LogicalPlan{
  257. FilterPlan{
  258. baseLogicalPlan: baseLogicalPlan{
  259. children: []LogicalPlan{
  260. DataSourcePlan{
  261. name: "src1",
  262. streamFields: []interface{}{
  263. &ast.StreamField{
  264. Name: "id1",
  265. FieldType: &ast.BasicType{Type: ast.BIGINT},
  266. },
  267. &ast.StreamField{
  268. Name: "name",
  269. FieldType: &ast.BasicType{Type: ast.STRINGS},
  270. },
  271. &ast.StreamField{
  272. Name: "temp",
  273. FieldType: &ast.BasicType{Type: ast.BIGINT},
  274. },
  275. },
  276. streamStmt: streams["src1"],
  277. metaFields: []string{},
  278. }.Init(),
  279. },
  280. },
  281. condition: &ast.BinaryExpr{
  282. OP: ast.AND,
  283. LHS: &ast.BinaryExpr{
  284. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  285. OP: ast.EQ,
  286. RHS: &ast.StringLiteral{Val: "v1"},
  287. },
  288. RHS: &ast.BinaryExpr{
  289. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  290. OP: ast.GT,
  291. RHS: &ast.IntegerLiteral{Val: 2},
  292. },
  293. },
  294. }.Init(),
  295. },
  296. },
  297. condition: nil,
  298. wtype: ast.TUMBLING_WINDOW,
  299. length: 10000,
  300. interval: 0,
  301. limit: 0,
  302. }.Init(),
  303. },
  304. },
  305. fields: []ast.Field{
  306. {
  307. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  308. Name: "id1",
  309. AName: ""},
  310. },
  311. isAggregate: false,
  312. sendMeta: false,
  313. }.Init(),
  314. }, { // 4. do not optimize count window
  315. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  316. p: ProjectPlan{
  317. baseLogicalPlan: baseLogicalPlan{
  318. children: []LogicalPlan{
  319. HavingPlan{
  320. baseLogicalPlan: baseLogicalPlan{
  321. children: []LogicalPlan{
  322. FilterPlan{
  323. baseLogicalPlan: baseLogicalPlan{
  324. children: []LogicalPlan{
  325. WindowPlan{
  326. baseLogicalPlan: baseLogicalPlan{
  327. children: []LogicalPlan{
  328. DataSourcePlan{
  329. name: "src1",
  330. isWildCard: true,
  331. streamFields: []interface{}{
  332. &ast.StreamField{
  333. Name: "id1",
  334. FieldType: &ast.BasicType{Type: ast.BIGINT},
  335. },
  336. &ast.StreamField{
  337. Name: "temp",
  338. FieldType: &ast.BasicType{Type: ast.BIGINT},
  339. },
  340. &ast.StreamField{
  341. Name: "name",
  342. FieldType: &ast.BasicType{Type: ast.STRINGS},
  343. },
  344. },
  345. streamStmt: streams["src1"],
  346. metaFields: []string{},
  347. }.Init(),
  348. },
  349. },
  350. condition: nil,
  351. wtype: ast.COUNT_WINDOW,
  352. length: 5,
  353. interval: 1,
  354. limit: 0,
  355. }.Init(),
  356. },
  357. },
  358. condition: &ast.BinaryExpr{
  359. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  360. OP: ast.GT,
  361. RHS: &ast.IntegerLiteral{Val: 20},
  362. },
  363. }.Init(),
  364. },
  365. },
  366. condition: &ast.BinaryExpr{
  367. LHS: &ast.Call{Name: "COUNT", Args: []ast.Expr{&ast.Wildcard{
  368. Token: ast.ASTERISK,
  369. }}},
  370. OP: ast.GT,
  371. RHS: &ast.IntegerLiteral{Val: 2},
  372. },
  373. }.Init(),
  374. },
  375. },
  376. fields: []ast.Field{
  377. {
  378. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  379. Name: "kuiper_field_0",
  380. AName: ""},
  381. },
  382. isAggregate: false,
  383. sendMeta: false,
  384. }.Init(),
  385. }, { // 5. optimize join on
  386. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  387. p: ProjectPlan{
  388. baseLogicalPlan: baseLogicalPlan{
  389. children: []LogicalPlan{
  390. JoinPlan{
  391. baseLogicalPlan: baseLogicalPlan{
  392. children: []LogicalPlan{
  393. WindowPlan{
  394. baseLogicalPlan: baseLogicalPlan{
  395. children: []LogicalPlan{
  396. FilterPlan{
  397. baseLogicalPlan: baseLogicalPlan{
  398. children: []LogicalPlan{
  399. DataSourcePlan{
  400. name: "src1",
  401. streamFields: []interface{}{
  402. &ast.StreamField{
  403. Name: "id1",
  404. FieldType: &ast.BasicType{Type: ast.BIGINT},
  405. },
  406. &ast.StreamField{
  407. Name: "temp",
  408. FieldType: &ast.BasicType{Type: ast.BIGINT},
  409. },
  410. },
  411. streamStmt: streams["src1"],
  412. metaFields: []string{},
  413. }.Init(),
  414. },
  415. },
  416. condition: &ast.BinaryExpr{
  417. RHS: &ast.BinaryExpr{
  418. OP: ast.GT,
  419. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  420. RHS: &ast.IntegerLiteral{Val: 20},
  421. },
  422. OP: ast.AND,
  423. LHS: &ast.BinaryExpr{
  424. OP: ast.GT,
  425. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  426. RHS: &ast.IntegerLiteral{Val: 111},
  427. },
  428. },
  429. }.Init(),
  430. FilterPlan{
  431. baseLogicalPlan: baseLogicalPlan{
  432. children: []LogicalPlan{
  433. DataSourcePlan{
  434. name: "src2",
  435. streamFields: []interface{}{
  436. &ast.StreamField{
  437. Name: "hum",
  438. FieldType: &ast.BasicType{Type: ast.BIGINT},
  439. },
  440. &ast.StreamField{
  441. Name: "id2",
  442. FieldType: &ast.BasicType{Type: ast.BIGINT},
  443. },
  444. },
  445. streamStmt: streams["src2"],
  446. metaFields: []string{},
  447. }.Init(),
  448. },
  449. },
  450. condition: &ast.BinaryExpr{
  451. OP: ast.LT,
  452. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  453. RHS: &ast.IntegerLiteral{Val: 60},
  454. },
  455. }.Init(),
  456. },
  457. },
  458. condition: nil,
  459. wtype: ast.TUMBLING_WINDOW,
  460. length: 10000,
  461. interval: 0,
  462. limit: 0,
  463. }.Init(),
  464. },
  465. },
  466. from: &ast.Table{
  467. Name: "src1",
  468. },
  469. joins: []ast.Join{
  470. {
  471. Name: "src2",
  472. Alias: "",
  473. JoinType: ast.INNER_JOIN,
  474. Expr: &ast.BinaryExpr{
  475. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  476. OP: ast.EQ,
  477. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  478. },
  479. },
  480. },
  481. }.Init(),
  482. },
  483. },
  484. fields: []ast.Field{
  485. {
  486. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  487. Name: "id1",
  488. AName: ""},
  489. },
  490. isAggregate: false,
  491. sendMeta: false,
  492. }.Init(),
  493. }, { // 6. optimize outter join on
  494. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  495. p: ProjectPlan{
  496. baseLogicalPlan: baseLogicalPlan{
  497. children: []LogicalPlan{
  498. JoinPlan{
  499. baseLogicalPlan: baseLogicalPlan{
  500. children: []LogicalPlan{
  501. WindowPlan{
  502. baseLogicalPlan: baseLogicalPlan{
  503. children: []LogicalPlan{
  504. FilterPlan{
  505. baseLogicalPlan: baseLogicalPlan{
  506. children: []LogicalPlan{
  507. DataSourcePlan{
  508. name: "src1",
  509. streamFields: []interface{}{
  510. &ast.StreamField{
  511. Name: "id1",
  512. FieldType: &ast.BasicType{Type: ast.BIGINT},
  513. },
  514. &ast.StreamField{
  515. Name: "temp",
  516. FieldType: &ast.BasicType{Type: ast.BIGINT},
  517. },
  518. },
  519. streamStmt: streams["src1"],
  520. metaFields: []string{},
  521. }.Init(),
  522. },
  523. },
  524. condition: &ast.BinaryExpr{
  525. OP: ast.GT,
  526. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  527. RHS: &ast.IntegerLiteral{Val: 111},
  528. },
  529. }.Init(),
  530. DataSourcePlan{
  531. name: "src2",
  532. streamFields: []interface{}{
  533. &ast.StreamField{
  534. Name: "hum",
  535. FieldType: &ast.BasicType{Type: ast.BIGINT},
  536. },
  537. &ast.StreamField{
  538. Name: "id2",
  539. FieldType: &ast.BasicType{Type: ast.BIGINT},
  540. },
  541. },
  542. streamStmt: streams["src2"],
  543. metaFields: []string{},
  544. }.Init(),
  545. },
  546. },
  547. condition: nil,
  548. wtype: ast.TUMBLING_WINDOW,
  549. length: 10000,
  550. interval: 0,
  551. limit: 0,
  552. }.Init(),
  553. },
  554. },
  555. from: &ast.Table{
  556. Name: "src1",
  557. },
  558. joins: []ast.Join{
  559. {
  560. Name: "src2",
  561. Alias: "",
  562. JoinType: ast.FULL_JOIN,
  563. Expr: &ast.BinaryExpr{
  564. OP: ast.AND,
  565. LHS: &ast.BinaryExpr{
  566. OP: ast.AND,
  567. LHS: &ast.BinaryExpr{
  568. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  569. OP: ast.EQ,
  570. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  571. },
  572. RHS: &ast.BinaryExpr{
  573. OP: ast.GT,
  574. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  575. RHS: &ast.IntegerLiteral{Val: 20},
  576. },
  577. },
  578. RHS: &ast.BinaryExpr{
  579. OP: ast.LT,
  580. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  581. RHS: &ast.IntegerLiteral{Val: 60},
  582. },
  583. },
  584. },
  585. },
  586. }.Init(),
  587. },
  588. },
  589. fields: []ast.Field{
  590. {
  591. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  592. Name: "id1",
  593. AName: ""},
  594. },
  595. isAggregate: false,
  596. sendMeta: false,
  597. }.Init(),
  598. }, { // 7 window error for table
  599. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  600. p: nil,
  601. err: "cannot run window for TABLE sources",
  602. }, { // 8 join table without window
  603. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  604. p: ProjectPlan{
  605. baseLogicalPlan: baseLogicalPlan{
  606. children: []LogicalPlan{
  607. JoinPlan{
  608. baseLogicalPlan: baseLogicalPlan{
  609. children: []LogicalPlan{
  610. JoinAlignPlan{
  611. baseLogicalPlan: baseLogicalPlan{
  612. children: []LogicalPlan{
  613. FilterPlan{
  614. baseLogicalPlan: baseLogicalPlan{
  615. children: []LogicalPlan{
  616. DataSourcePlan{
  617. name: "src1",
  618. streamFields: []interface{}{
  619. &ast.StreamField{
  620. Name: "id1",
  621. FieldType: &ast.BasicType{Type: ast.BIGINT},
  622. },
  623. &ast.StreamField{
  624. Name: "temp",
  625. FieldType: &ast.BasicType{Type: ast.BIGINT},
  626. },
  627. },
  628. streamStmt: streams["src1"],
  629. metaFields: []string{},
  630. }.Init(),
  631. },
  632. },
  633. condition: &ast.BinaryExpr{
  634. RHS: &ast.BinaryExpr{
  635. OP: ast.GT,
  636. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  637. RHS: &ast.IntegerLiteral{Val: 20},
  638. },
  639. OP: ast.AND,
  640. LHS: &ast.BinaryExpr{
  641. OP: ast.GT,
  642. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  643. RHS: &ast.IntegerLiteral{Val: 111},
  644. },
  645. },
  646. }.Init(),
  647. FilterPlan{
  648. baseLogicalPlan: baseLogicalPlan{
  649. children: []LogicalPlan{
  650. DataSourcePlan{
  651. name: "tableInPlanner",
  652. streamFields: []interface{}{
  653. &ast.StreamField{
  654. Name: "hum",
  655. FieldType: &ast.BasicType{Type: ast.BIGINT},
  656. },
  657. &ast.StreamField{
  658. Name: "id",
  659. FieldType: &ast.BasicType{Type: ast.BIGINT},
  660. },
  661. },
  662. streamStmt: streams["tableInPlanner"],
  663. metaFields: []string{},
  664. }.Init(),
  665. },
  666. },
  667. condition: &ast.BinaryExpr{
  668. OP: ast.LT,
  669. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  670. RHS: &ast.IntegerLiteral{Val: 60},
  671. },
  672. }.Init(),
  673. },
  674. },
  675. Emitters: []string{"tableInPlanner"},
  676. }.Init(),
  677. },
  678. },
  679. from: &ast.Table{
  680. Name: "src1",
  681. },
  682. joins: []ast.Join{
  683. {
  684. Name: "tableInPlanner",
  685. Alias: "",
  686. JoinType: ast.INNER_JOIN,
  687. Expr: &ast.BinaryExpr{
  688. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  689. OP: ast.EQ,
  690. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  691. },
  692. },
  693. },
  694. }.Init(),
  695. },
  696. },
  697. fields: []ast.Field{
  698. {
  699. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  700. Name: "id1",
  701. AName: ""},
  702. },
  703. isAggregate: false,
  704. sendMeta: false,
  705. }.Init(),
  706. }, { // 9 join table with window
  707. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  708. p: ProjectPlan{
  709. baseLogicalPlan: baseLogicalPlan{
  710. children: []LogicalPlan{
  711. JoinPlan{
  712. baseLogicalPlan: baseLogicalPlan{
  713. children: []LogicalPlan{
  714. JoinAlignPlan{
  715. baseLogicalPlan: baseLogicalPlan{
  716. children: []LogicalPlan{
  717. WindowPlan{
  718. baseLogicalPlan: baseLogicalPlan{
  719. children: []LogicalPlan{
  720. FilterPlan{
  721. baseLogicalPlan: baseLogicalPlan{
  722. children: []LogicalPlan{
  723. DataSourcePlan{
  724. name: "src1",
  725. streamFields: []interface{}{
  726. &ast.StreamField{
  727. Name: "id1",
  728. FieldType: &ast.BasicType{Type: ast.BIGINT},
  729. },
  730. &ast.StreamField{
  731. Name: "temp",
  732. FieldType: &ast.BasicType{Type: ast.BIGINT},
  733. },
  734. },
  735. streamStmt: streams["src1"],
  736. metaFields: []string{},
  737. }.Init(),
  738. },
  739. },
  740. condition: &ast.BinaryExpr{
  741. RHS: &ast.BinaryExpr{
  742. OP: ast.GT,
  743. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  744. RHS: &ast.IntegerLiteral{Val: 20},
  745. },
  746. OP: ast.AND,
  747. LHS: &ast.BinaryExpr{
  748. OP: ast.GT,
  749. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  750. RHS: &ast.IntegerLiteral{Val: 111},
  751. },
  752. },
  753. }.Init(),
  754. },
  755. },
  756. condition: nil,
  757. wtype: ast.TUMBLING_WINDOW,
  758. length: 10000,
  759. interval: 0,
  760. limit: 0,
  761. }.Init(),
  762. FilterPlan{
  763. baseLogicalPlan: baseLogicalPlan{
  764. children: []LogicalPlan{
  765. DataSourcePlan{
  766. name: "tableInPlanner",
  767. streamFields: []interface{}{
  768. &ast.StreamField{
  769. Name: "hum",
  770. FieldType: &ast.BasicType{Type: ast.BIGINT},
  771. },
  772. &ast.StreamField{
  773. Name: "id",
  774. FieldType: &ast.BasicType{Type: ast.BIGINT},
  775. },
  776. },
  777. streamStmt: streams["tableInPlanner"],
  778. metaFields: []string{},
  779. }.Init(),
  780. },
  781. },
  782. condition: &ast.BinaryExpr{
  783. OP: ast.LT,
  784. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  785. RHS: &ast.IntegerLiteral{Val: 60},
  786. },
  787. }.Init(),
  788. },
  789. },
  790. Emitters: []string{"tableInPlanner"},
  791. }.Init(),
  792. },
  793. },
  794. from: &ast.Table{
  795. Name: "src1",
  796. },
  797. joins: []ast.Join{
  798. {
  799. Name: "tableInPlanner",
  800. Alias: "",
  801. JoinType: ast.INNER_JOIN,
  802. Expr: &ast.BinaryExpr{
  803. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  804. OP: ast.EQ,
  805. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  806. },
  807. },
  808. },
  809. }.Init(),
  810. },
  811. },
  812. fields: []ast.Field{
  813. {
  814. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  815. Name: "id1",
  816. AName: ""},
  817. },
  818. isAggregate: false,
  819. sendMeta: false,
  820. }.Init(),
  821. }, { // 10 meta
  822. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  823. p: ProjectPlan{
  824. baseLogicalPlan: baseLogicalPlan{
  825. children: []LogicalPlan{
  826. FilterPlan{
  827. baseLogicalPlan: baseLogicalPlan{
  828. children: []LogicalPlan{
  829. DataSourcePlan{
  830. name: "src1",
  831. streamFields: []interface{}{
  832. &ast.StreamField{
  833. Name: "temp",
  834. FieldType: &ast.BasicType{Type: ast.BIGINT},
  835. },
  836. },
  837. streamStmt: streams["src1"],
  838. metaFields: []string{"Humidity", "device", "id"},
  839. }.Init(),
  840. },
  841. },
  842. condition: &ast.BinaryExpr{
  843. LHS: &ast.Call{
  844. Name: "meta",
  845. Args: []ast.Expr{&ast.MetaRef{
  846. Name: "device",
  847. StreamName: ast.DefaultStream,
  848. }},
  849. },
  850. OP: ast.EQ,
  851. RHS: &ast.StringLiteral{
  852. Val: "demo2",
  853. },
  854. },
  855. }.Init(),
  856. },
  857. },
  858. fields: []ast.Field{
  859. {
  860. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  861. Name: "temp",
  862. AName: "",
  863. }, {
  864. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  865. &ast.Call{Name: "meta", Args: []ast.Expr{&ast.MetaRef{
  866. Name: "id",
  867. StreamName: ast.DefaultStream,
  868. }}},
  869. []ast.StreamName{},
  870. nil,
  871. )},
  872. Name: "meta",
  873. AName: "eid",
  874. }, {
  875. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  876. &ast.Call{Name: "meta", Args: []ast.Expr{
  877. &ast.BinaryExpr{
  878. OP: ast.ARROW,
  879. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  880. RHS: &ast.MetaRef{Name: "Device"},
  881. },
  882. }},
  883. []ast.StreamName{},
  884. nil,
  885. )},
  886. Name: "meta",
  887. AName: "hdevice",
  888. },
  889. },
  890. isAggregate: false,
  891. sendMeta: false,
  892. }.Init(),
  893. }, { // 11 join with same name field and aliased
  894. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  895. p: ProjectPlan{
  896. baseLogicalPlan: baseLogicalPlan{
  897. children: []LogicalPlan{
  898. JoinPlan{
  899. baseLogicalPlan: baseLogicalPlan{
  900. children: []LogicalPlan{
  901. JoinAlignPlan{
  902. baseLogicalPlan: baseLogicalPlan{
  903. children: []LogicalPlan{
  904. DataSourcePlan{
  905. name: "src2",
  906. streamFields: []interface{}{
  907. &ast.StreamField{
  908. Name: "hum",
  909. FieldType: &ast.BasicType{Type: ast.BIGINT},
  910. },
  911. &ast.StreamField{
  912. Name: "id2",
  913. FieldType: &ast.BasicType{Type: ast.BIGINT},
  914. },
  915. },
  916. streamStmt: streams["src2"],
  917. metaFields: []string{},
  918. }.Init(),
  919. DataSourcePlan{
  920. name: "tableInPlanner",
  921. streamFields: []interface{}{
  922. &ast.StreamField{
  923. Name: "hum",
  924. FieldType: &ast.BasicType{Type: ast.BIGINT},
  925. },
  926. &ast.StreamField{
  927. Name: "id",
  928. FieldType: &ast.BasicType{Type: ast.BIGINT},
  929. },
  930. },
  931. streamStmt: streams["tableInPlanner"],
  932. metaFields: []string{},
  933. }.Init(),
  934. },
  935. },
  936. Emitters: []string{"tableInPlanner"},
  937. }.Init(),
  938. },
  939. },
  940. from: &ast.Table{
  941. Name: "src2",
  942. },
  943. joins: []ast.Join{
  944. {
  945. Name: "tableInPlanner",
  946. Alias: "",
  947. JoinType: ast.INNER_JOIN,
  948. Expr: &ast.BinaryExpr{
  949. RHS: &ast.BinaryExpr{
  950. OP: ast.EQ,
  951. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  952. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  953. },
  954. OP: ast.AND,
  955. LHS: &ast.BinaryExpr{
  956. OP: ast.GT,
  957. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  958. &ast.FieldRef{
  959. Name: "hum",
  960. StreamName: "src2",
  961. },
  962. []ast.StreamName{"src2"},
  963. &boolFalse,
  964. )},
  965. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  966. &ast.FieldRef{
  967. Name: "hum",
  968. StreamName: "tableInPlanner",
  969. },
  970. []ast.StreamName{"tableInPlanner"},
  971. &boolFalse,
  972. )},
  973. },
  974. },
  975. },
  976. },
  977. }.Init(),
  978. },
  979. },
  980. fields: []ast.Field{
  981. {
  982. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  983. &ast.FieldRef{
  984. Name: "hum",
  985. StreamName: "src2",
  986. },
  987. []ast.StreamName{"src2"},
  988. &boolFalse,
  989. )},
  990. Name: "hum",
  991. AName: "hum1",
  992. }, {
  993. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  994. &ast.FieldRef{
  995. Name: "hum",
  996. StreamName: "tableInPlanner",
  997. },
  998. []ast.StreamName{"tableInPlanner"},
  999. &boolFalse,
  1000. )},
  1001. Name: "hum",
  1002. AName: "hum2",
  1003. },
  1004. },
  1005. isAggregate: false,
  1006. sendMeta: false,
  1007. }.Init(),
  1008. },
  1009. }
  1010. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1011. for i, tt := range tests {
  1012. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1013. if err != nil {
  1014. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1015. continue
  1016. }
  1017. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1018. IsEventTime: false,
  1019. LateTol: 0,
  1020. Concurrency: 0,
  1021. BufferLength: 0,
  1022. SendMetaToSink: false,
  1023. Qos: 0,
  1024. CheckpointInterval: 0,
  1025. SendError: true,
  1026. }, store)
  1027. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1028. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1029. } else if !reflect.DeepEqual(tt.p, p) {
  1030. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1031. }
  1032. }
  1033. }
  1034. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1035. store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
  1036. err := store.Open()
  1037. if err != nil {
  1038. t.Error(err)
  1039. return
  1040. }
  1041. defer store.Close()
  1042. streamSqls := map[string]string{
  1043. "src1": `CREATE STREAM src1 (
  1044. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1045. "src2": `CREATE STREAM src2 (
  1046. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1047. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1048. id BIGINT,
  1049. name STRING,
  1050. value STRING,
  1051. hum BIGINT
  1052. ) WITH (TYPE="file");`,
  1053. }
  1054. types := map[string]ast.StreamType{
  1055. "src1": ast.TypeStream,
  1056. "src2": ast.TypeStream,
  1057. "tableInPlanner": ast.TypeTable,
  1058. }
  1059. for name, sql := range streamSqls {
  1060. s, err := json.Marshal(&xsql.StreamInfo{
  1061. StreamType: types[name],
  1062. Statement: sql,
  1063. })
  1064. if err != nil {
  1065. t.Error(err)
  1066. t.Fail()
  1067. }
  1068. store.Set(name, string(s))
  1069. }
  1070. streams := make(map[string]*ast.StreamStmt)
  1071. for n := range streamSqls {
  1072. streamStmt, err := xsql.GetDataSource(store, n)
  1073. if err != nil {
  1074. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1075. return
  1076. }
  1077. streams[n] = streamStmt
  1078. }
  1079. var (
  1080. //boolTrue = true
  1081. boolFalse = false
  1082. )
  1083. var tests = []struct {
  1084. sql string
  1085. p LogicalPlan
  1086. err string
  1087. }{
  1088. { // 0
  1089. sql: `SELECT name FROM src1`,
  1090. p: ProjectPlan{
  1091. baseLogicalPlan: baseLogicalPlan{
  1092. children: []LogicalPlan{
  1093. DataSourcePlan{
  1094. baseLogicalPlan: baseLogicalPlan{},
  1095. name: "src1",
  1096. streamFields: []interface{}{
  1097. "name",
  1098. },
  1099. streamStmt: streams["src1"],
  1100. metaFields: []string{},
  1101. }.Init(),
  1102. },
  1103. },
  1104. fields: []ast.Field{
  1105. {
  1106. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1107. Name: "name",
  1108. AName: "",
  1109. },
  1110. },
  1111. isAggregate: false,
  1112. sendMeta: false,
  1113. }.Init(),
  1114. }, { // 1 optimize where to data source
  1115. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1116. p: ProjectPlan{
  1117. baseLogicalPlan: baseLogicalPlan{
  1118. children: []LogicalPlan{
  1119. WindowPlan{
  1120. baseLogicalPlan: baseLogicalPlan{
  1121. children: []LogicalPlan{
  1122. FilterPlan{
  1123. baseLogicalPlan: baseLogicalPlan{
  1124. children: []LogicalPlan{
  1125. DataSourcePlan{
  1126. name: "src1",
  1127. streamFields: []interface{}{
  1128. "name", "temp",
  1129. },
  1130. streamStmt: streams["src1"],
  1131. metaFields: []string{},
  1132. }.Init(),
  1133. },
  1134. },
  1135. condition: &ast.BinaryExpr{
  1136. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1137. OP: ast.EQ,
  1138. RHS: &ast.StringLiteral{Val: "v1"},
  1139. },
  1140. }.Init(),
  1141. },
  1142. },
  1143. condition: nil,
  1144. wtype: ast.TUMBLING_WINDOW,
  1145. length: 10000,
  1146. interval: 0,
  1147. limit: 0,
  1148. }.Init(),
  1149. },
  1150. },
  1151. fields: []ast.Field{
  1152. {
  1153. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1154. Name: "temp",
  1155. AName: ""},
  1156. },
  1157. isAggregate: false,
  1158. sendMeta: false,
  1159. }.Init(),
  1160. }, { // 2 condition that cannot be optimized
  1161. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1162. p: ProjectPlan{
  1163. baseLogicalPlan: baseLogicalPlan{
  1164. children: []LogicalPlan{
  1165. JoinPlan{
  1166. baseLogicalPlan: baseLogicalPlan{
  1167. children: []LogicalPlan{
  1168. WindowPlan{
  1169. baseLogicalPlan: baseLogicalPlan{
  1170. children: []LogicalPlan{
  1171. DataSourcePlan{
  1172. name: "src1",
  1173. streamFields: []interface{}{
  1174. "id1", "temp",
  1175. },
  1176. streamStmt: streams["src1"],
  1177. metaFields: []string{},
  1178. }.Init(),
  1179. DataSourcePlan{
  1180. name: "src2",
  1181. streamFields: []interface{}{ // can't determine where is id1 belonged to
  1182. "hum", "id1", "id2",
  1183. },
  1184. streamStmt: streams["src2"],
  1185. metaFields: []string{},
  1186. }.Init(),
  1187. },
  1188. },
  1189. condition: nil,
  1190. wtype: ast.TUMBLING_WINDOW,
  1191. length: 10000,
  1192. interval: 0,
  1193. limit: 0,
  1194. }.Init(),
  1195. },
  1196. },
  1197. from: &ast.Table{Name: "src1"},
  1198. joins: ast.Joins{ast.Join{
  1199. Name: "src2",
  1200. JoinType: ast.INNER_JOIN,
  1201. Expr: &ast.BinaryExpr{
  1202. OP: ast.AND,
  1203. LHS: &ast.BinaryExpr{
  1204. LHS: &ast.BinaryExpr{
  1205. OP: ast.GT,
  1206. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1207. RHS: &ast.IntegerLiteral{Val: 20},
  1208. },
  1209. OP: ast.OR,
  1210. RHS: &ast.BinaryExpr{
  1211. OP: ast.GT,
  1212. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1213. RHS: &ast.IntegerLiteral{Val: 60},
  1214. },
  1215. },
  1216. RHS: &ast.BinaryExpr{
  1217. OP: ast.EQ,
  1218. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1219. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1220. },
  1221. },
  1222. }},
  1223. }.Init(),
  1224. },
  1225. },
  1226. fields: []ast.Field{
  1227. {
  1228. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1229. Name: "id1",
  1230. AName: ""},
  1231. },
  1232. isAggregate: false,
  1233. sendMeta: false,
  1234. }.Init(),
  1235. }, { // 3 optimize window filter
  1236. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1237. p: ProjectPlan{
  1238. baseLogicalPlan: baseLogicalPlan{
  1239. children: []LogicalPlan{
  1240. WindowPlan{
  1241. baseLogicalPlan: baseLogicalPlan{
  1242. children: []LogicalPlan{
  1243. FilterPlan{
  1244. baseLogicalPlan: baseLogicalPlan{
  1245. children: []LogicalPlan{
  1246. DataSourcePlan{
  1247. name: "src1",
  1248. streamFields: []interface{}{
  1249. "id1", "name", "temp",
  1250. },
  1251. streamStmt: streams["src1"],
  1252. metaFields: []string{},
  1253. }.Init(),
  1254. },
  1255. },
  1256. condition: &ast.BinaryExpr{
  1257. OP: ast.AND,
  1258. LHS: &ast.BinaryExpr{
  1259. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1260. OP: ast.EQ,
  1261. RHS: &ast.StringLiteral{Val: "v1"},
  1262. },
  1263. RHS: &ast.BinaryExpr{
  1264. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1265. OP: ast.GT,
  1266. RHS: &ast.IntegerLiteral{Val: 2},
  1267. },
  1268. },
  1269. }.Init(),
  1270. },
  1271. },
  1272. condition: nil,
  1273. wtype: ast.TUMBLING_WINDOW,
  1274. length: 10000,
  1275. interval: 0,
  1276. limit: 0,
  1277. }.Init(),
  1278. },
  1279. },
  1280. fields: []ast.Field{
  1281. {
  1282. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1283. Name: "id1",
  1284. AName: ""},
  1285. },
  1286. isAggregate: false,
  1287. sendMeta: false,
  1288. }.Init(),
  1289. }, { // 4. do not optimize count window
  1290. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1291. p: ProjectPlan{
  1292. baseLogicalPlan: baseLogicalPlan{
  1293. children: []LogicalPlan{
  1294. HavingPlan{
  1295. baseLogicalPlan: baseLogicalPlan{
  1296. children: []LogicalPlan{
  1297. FilterPlan{
  1298. baseLogicalPlan: baseLogicalPlan{
  1299. children: []LogicalPlan{
  1300. WindowPlan{
  1301. baseLogicalPlan: baseLogicalPlan{
  1302. children: []LogicalPlan{
  1303. DataSourcePlan{
  1304. name: "src1",
  1305. isWildCard: true,
  1306. streamFields: nil,
  1307. streamStmt: streams["src1"],
  1308. metaFields: []string{},
  1309. }.Init(),
  1310. },
  1311. },
  1312. condition: nil,
  1313. wtype: ast.COUNT_WINDOW,
  1314. length: 5,
  1315. interval: 1,
  1316. limit: 0,
  1317. }.Init(),
  1318. },
  1319. },
  1320. condition: &ast.BinaryExpr{
  1321. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1322. OP: ast.GT,
  1323. RHS: &ast.IntegerLiteral{Val: 20},
  1324. },
  1325. }.Init(),
  1326. },
  1327. },
  1328. condition: &ast.BinaryExpr{
  1329. LHS: &ast.Call{Name: "COUNT", Args: []ast.Expr{&ast.Wildcard{
  1330. Token: ast.ASTERISK,
  1331. }}},
  1332. OP: ast.GT,
  1333. RHS: &ast.IntegerLiteral{Val: 2},
  1334. },
  1335. }.Init(),
  1336. },
  1337. },
  1338. fields: []ast.Field{
  1339. {
  1340. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1341. Name: "kuiper_field_0",
  1342. AName: ""},
  1343. },
  1344. isAggregate: false,
  1345. sendMeta: false,
  1346. }.Init(),
  1347. }, { // 5. optimize join on
  1348. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1349. p: ProjectPlan{
  1350. baseLogicalPlan: baseLogicalPlan{
  1351. children: []LogicalPlan{
  1352. JoinPlan{
  1353. baseLogicalPlan: baseLogicalPlan{
  1354. children: []LogicalPlan{
  1355. WindowPlan{
  1356. baseLogicalPlan: baseLogicalPlan{
  1357. children: []LogicalPlan{
  1358. FilterPlan{
  1359. baseLogicalPlan: baseLogicalPlan{
  1360. children: []LogicalPlan{
  1361. DataSourcePlan{
  1362. name: "src1",
  1363. streamFields: []interface{}{
  1364. "id1", "temp",
  1365. },
  1366. streamStmt: streams["src1"],
  1367. metaFields: []string{},
  1368. }.Init(),
  1369. },
  1370. },
  1371. condition: &ast.BinaryExpr{
  1372. RHS: &ast.BinaryExpr{
  1373. OP: ast.GT,
  1374. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1375. RHS: &ast.IntegerLiteral{Val: 20},
  1376. },
  1377. OP: ast.AND,
  1378. LHS: &ast.BinaryExpr{
  1379. OP: ast.GT,
  1380. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1381. RHS: &ast.IntegerLiteral{Val: 111},
  1382. },
  1383. },
  1384. }.Init(),
  1385. FilterPlan{
  1386. baseLogicalPlan: baseLogicalPlan{
  1387. children: []LogicalPlan{
  1388. DataSourcePlan{
  1389. name: "src2",
  1390. streamFields: []interface{}{
  1391. "hum", "id1", "id2",
  1392. },
  1393. streamStmt: streams["src2"],
  1394. metaFields: []string{},
  1395. }.Init(),
  1396. },
  1397. },
  1398. condition: &ast.BinaryExpr{
  1399. OP: ast.LT,
  1400. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1401. RHS: &ast.IntegerLiteral{Val: 60},
  1402. },
  1403. }.Init(),
  1404. },
  1405. },
  1406. condition: nil,
  1407. wtype: ast.TUMBLING_WINDOW,
  1408. length: 10000,
  1409. interval: 0,
  1410. limit: 0,
  1411. }.Init(),
  1412. },
  1413. },
  1414. from: &ast.Table{
  1415. Name: "src1",
  1416. },
  1417. joins: []ast.Join{
  1418. {
  1419. Name: "src2",
  1420. Alias: "",
  1421. JoinType: ast.INNER_JOIN,
  1422. Expr: &ast.BinaryExpr{
  1423. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1424. OP: ast.EQ,
  1425. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1426. },
  1427. },
  1428. },
  1429. }.Init(),
  1430. },
  1431. },
  1432. fields: []ast.Field{
  1433. {
  1434. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1435. Name: "id1",
  1436. AName: ""},
  1437. },
  1438. isAggregate: false,
  1439. sendMeta: false,
  1440. }.Init(),
  1441. }, { // 6. optimize outter join on
  1442. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1443. p: ProjectPlan{
  1444. baseLogicalPlan: baseLogicalPlan{
  1445. children: []LogicalPlan{
  1446. JoinPlan{
  1447. baseLogicalPlan: baseLogicalPlan{
  1448. children: []LogicalPlan{
  1449. WindowPlan{
  1450. baseLogicalPlan: baseLogicalPlan{
  1451. children: []LogicalPlan{
  1452. FilterPlan{
  1453. baseLogicalPlan: baseLogicalPlan{
  1454. children: []LogicalPlan{
  1455. DataSourcePlan{
  1456. name: "src1",
  1457. streamFields: []interface{}{
  1458. "id1", "temp",
  1459. },
  1460. streamStmt: streams["src1"],
  1461. metaFields: []string{},
  1462. }.Init(),
  1463. },
  1464. },
  1465. condition: &ast.BinaryExpr{
  1466. OP: ast.GT,
  1467. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1468. RHS: &ast.IntegerLiteral{Val: 111},
  1469. },
  1470. }.Init(),
  1471. DataSourcePlan{
  1472. name: "src2",
  1473. streamFields: []interface{}{
  1474. "hum", "id1", "id2",
  1475. },
  1476. streamStmt: streams["src2"],
  1477. metaFields: []string{},
  1478. }.Init(),
  1479. },
  1480. },
  1481. condition: nil,
  1482. wtype: ast.TUMBLING_WINDOW,
  1483. length: 10000,
  1484. interval: 0,
  1485. limit: 0,
  1486. }.Init(),
  1487. },
  1488. },
  1489. from: &ast.Table{
  1490. Name: "src1",
  1491. },
  1492. joins: []ast.Join{
  1493. {
  1494. Name: "src2",
  1495. Alias: "",
  1496. JoinType: ast.FULL_JOIN,
  1497. Expr: &ast.BinaryExpr{
  1498. OP: ast.AND,
  1499. LHS: &ast.BinaryExpr{
  1500. OP: ast.AND,
  1501. LHS: &ast.BinaryExpr{
  1502. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1503. OP: ast.EQ,
  1504. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1505. },
  1506. RHS: &ast.BinaryExpr{
  1507. OP: ast.GT,
  1508. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1509. RHS: &ast.IntegerLiteral{Val: 20},
  1510. },
  1511. },
  1512. RHS: &ast.BinaryExpr{
  1513. OP: ast.LT,
  1514. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1515. RHS: &ast.IntegerLiteral{Val: 60},
  1516. },
  1517. },
  1518. },
  1519. },
  1520. }.Init(),
  1521. },
  1522. },
  1523. fields: []ast.Field{
  1524. {
  1525. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1526. Name: "id1",
  1527. AName: ""},
  1528. },
  1529. isAggregate: false,
  1530. sendMeta: false,
  1531. }.Init(),
  1532. }, { // 7 window error for table
  1533. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1534. p: nil,
  1535. err: "cannot run window for TABLE sources",
  1536. }, { // 8 join table without window
  1537. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  1538. p: ProjectPlan{
  1539. baseLogicalPlan: baseLogicalPlan{
  1540. children: []LogicalPlan{
  1541. JoinPlan{
  1542. baseLogicalPlan: baseLogicalPlan{
  1543. children: []LogicalPlan{
  1544. JoinAlignPlan{
  1545. baseLogicalPlan: baseLogicalPlan{
  1546. children: []LogicalPlan{
  1547. FilterPlan{
  1548. baseLogicalPlan: baseLogicalPlan{
  1549. children: []LogicalPlan{
  1550. DataSourcePlan{
  1551. name: "src1",
  1552. streamFields: []interface{}{
  1553. "hum", "id1", "temp",
  1554. },
  1555. streamStmt: streams["src1"],
  1556. metaFields: []string{},
  1557. }.Init(),
  1558. },
  1559. },
  1560. condition: &ast.BinaryExpr{
  1561. RHS: &ast.BinaryExpr{
  1562. OP: ast.GT,
  1563. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1564. RHS: &ast.IntegerLiteral{Val: 20},
  1565. },
  1566. OP: ast.AND,
  1567. LHS: &ast.BinaryExpr{
  1568. OP: ast.GT,
  1569. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1570. RHS: &ast.IntegerLiteral{Val: 111},
  1571. },
  1572. },
  1573. }.Init(),
  1574. DataSourcePlan{
  1575. name: "tableInPlanner",
  1576. streamFields: []interface{}{
  1577. &ast.StreamField{
  1578. Name: "hum",
  1579. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1580. },
  1581. &ast.StreamField{
  1582. Name: "id",
  1583. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1584. },
  1585. },
  1586. streamStmt: streams["tableInPlanner"],
  1587. metaFields: []string{},
  1588. }.Init(),
  1589. },
  1590. },
  1591. Emitters: []string{"tableInPlanner"},
  1592. }.Init(),
  1593. },
  1594. },
  1595. from: &ast.Table{
  1596. Name: "src1",
  1597. },
  1598. joins: []ast.Join{
  1599. {
  1600. Name: "tableInPlanner",
  1601. Alias: "",
  1602. JoinType: ast.INNER_JOIN,
  1603. Expr: &ast.BinaryExpr{
  1604. OP: ast.AND,
  1605. LHS: &ast.BinaryExpr{
  1606. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1607. OP: ast.EQ,
  1608. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1609. },
  1610. RHS: &ast.BinaryExpr{
  1611. OP: ast.LT,
  1612. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  1613. RHS: &ast.IntegerLiteral{Val: 60},
  1614. },
  1615. },
  1616. },
  1617. },
  1618. }.Init(),
  1619. },
  1620. },
  1621. fields: []ast.Field{
  1622. {
  1623. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1624. Name: "id1",
  1625. AName: ""},
  1626. },
  1627. isAggregate: false,
  1628. sendMeta: false,
  1629. }.Init(),
  1630. }, { // 9 join table with window
  1631. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1632. p: ProjectPlan{
  1633. baseLogicalPlan: baseLogicalPlan{
  1634. children: []LogicalPlan{
  1635. JoinPlan{
  1636. baseLogicalPlan: baseLogicalPlan{
  1637. children: []LogicalPlan{
  1638. JoinAlignPlan{
  1639. baseLogicalPlan: baseLogicalPlan{
  1640. children: []LogicalPlan{
  1641. WindowPlan{
  1642. baseLogicalPlan: baseLogicalPlan{
  1643. children: []LogicalPlan{
  1644. FilterPlan{
  1645. baseLogicalPlan: baseLogicalPlan{
  1646. children: []LogicalPlan{
  1647. DataSourcePlan{
  1648. name: "src1",
  1649. streamFields: []interface{}{
  1650. "id1", "temp",
  1651. },
  1652. streamStmt: streams["src1"],
  1653. metaFields: []string{},
  1654. }.Init(),
  1655. },
  1656. },
  1657. condition: &ast.BinaryExpr{
  1658. RHS: &ast.BinaryExpr{
  1659. OP: ast.GT,
  1660. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1661. RHS: &ast.IntegerLiteral{Val: 20},
  1662. },
  1663. OP: ast.AND,
  1664. LHS: &ast.BinaryExpr{
  1665. OP: ast.GT,
  1666. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1667. RHS: &ast.IntegerLiteral{Val: 111},
  1668. },
  1669. },
  1670. }.Init(),
  1671. },
  1672. },
  1673. condition: nil,
  1674. wtype: ast.TUMBLING_WINDOW,
  1675. length: 10000,
  1676. interval: 0,
  1677. limit: 0,
  1678. }.Init(),
  1679. FilterPlan{
  1680. baseLogicalPlan: baseLogicalPlan{
  1681. children: []LogicalPlan{
  1682. DataSourcePlan{
  1683. name: "tableInPlanner",
  1684. streamFields: []interface{}{
  1685. &ast.StreamField{
  1686. Name: "hum",
  1687. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1688. },
  1689. &ast.StreamField{
  1690. Name: "id",
  1691. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1692. },
  1693. },
  1694. streamStmt: streams["tableInPlanner"],
  1695. metaFields: []string{},
  1696. }.Init(),
  1697. },
  1698. },
  1699. condition: &ast.BinaryExpr{
  1700. OP: ast.LT,
  1701. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1702. RHS: &ast.IntegerLiteral{Val: 60},
  1703. },
  1704. }.Init(),
  1705. },
  1706. },
  1707. Emitters: []string{"tableInPlanner"},
  1708. }.Init(),
  1709. },
  1710. },
  1711. from: &ast.Table{
  1712. Name: "src1",
  1713. },
  1714. joins: []ast.Join{
  1715. {
  1716. Name: "tableInPlanner",
  1717. Alias: "",
  1718. JoinType: ast.INNER_JOIN,
  1719. Expr: &ast.BinaryExpr{
  1720. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1721. OP: ast.EQ,
  1722. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1723. },
  1724. },
  1725. },
  1726. }.Init(),
  1727. },
  1728. },
  1729. fields: []ast.Field{
  1730. {
  1731. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1732. Name: "id1",
  1733. AName: ""},
  1734. },
  1735. isAggregate: false,
  1736. sendMeta: false,
  1737. }.Init(),
  1738. }, { // 10 meta
  1739. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1740. p: ProjectPlan{
  1741. baseLogicalPlan: baseLogicalPlan{
  1742. children: []LogicalPlan{
  1743. FilterPlan{
  1744. baseLogicalPlan: baseLogicalPlan{
  1745. children: []LogicalPlan{
  1746. DataSourcePlan{
  1747. name: "src1",
  1748. streamFields: []interface{}{
  1749. "temp",
  1750. },
  1751. streamStmt: streams["src1"],
  1752. metaFields: []string{"Humidity", "device", "id"},
  1753. }.Init(),
  1754. },
  1755. },
  1756. condition: &ast.BinaryExpr{
  1757. LHS: &ast.Call{
  1758. Name: "meta",
  1759. Args: []ast.Expr{&ast.MetaRef{
  1760. Name: "device",
  1761. StreamName: ast.DefaultStream,
  1762. }},
  1763. },
  1764. OP: ast.EQ,
  1765. RHS: &ast.StringLiteral{
  1766. Val: "demo2",
  1767. },
  1768. },
  1769. }.Init(),
  1770. },
  1771. },
  1772. fields: []ast.Field{
  1773. {
  1774. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1775. Name: "temp",
  1776. AName: "",
  1777. }, {
  1778. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1779. &ast.Call{Name: "meta", Args: []ast.Expr{&ast.MetaRef{
  1780. Name: "id",
  1781. StreamName: ast.DefaultStream,
  1782. }}},
  1783. []ast.StreamName{},
  1784. nil,
  1785. )},
  1786. Name: "meta",
  1787. AName: "eid",
  1788. }, {
  1789. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1790. &ast.Call{Name: "meta", Args: []ast.Expr{
  1791. &ast.BinaryExpr{
  1792. OP: ast.ARROW,
  1793. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  1794. RHS: &ast.MetaRef{Name: "Device"},
  1795. },
  1796. }},
  1797. []ast.StreamName{},
  1798. nil,
  1799. )},
  1800. Name: "meta",
  1801. AName: "hdevice",
  1802. },
  1803. },
  1804. isAggregate: false,
  1805. sendMeta: false,
  1806. }.Init(),
  1807. }, { // 11 join with same name field and aliased
  1808. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1809. p: ProjectPlan{
  1810. baseLogicalPlan: baseLogicalPlan{
  1811. children: []LogicalPlan{
  1812. JoinPlan{
  1813. baseLogicalPlan: baseLogicalPlan{
  1814. children: []LogicalPlan{
  1815. JoinAlignPlan{
  1816. baseLogicalPlan: baseLogicalPlan{
  1817. children: []LogicalPlan{
  1818. DataSourcePlan{
  1819. name: "src2",
  1820. streamFields: []interface{}{
  1821. "hum", "id", "id2",
  1822. },
  1823. streamStmt: streams["src2"],
  1824. metaFields: []string{},
  1825. }.Init(),
  1826. DataSourcePlan{
  1827. name: "tableInPlanner",
  1828. streamFields: []interface{}{
  1829. &ast.StreamField{
  1830. Name: "hum",
  1831. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1832. },
  1833. &ast.StreamField{
  1834. Name: "id",
  1835. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1836. },
  1837. },
  1838. streamStmt: streams["tableInPlanner"],
  1839. metaFields: []string{},
  1840. }.Init(),
  1841. },
  1842. },
  1843. Emitters: []string{"tableInPlanner"},
  1844. }.Init(),
  1845. },
  1846. },
  1847. from: &ast.Table{
  1848. Name: "src2",
  1849. },
  1850. joins: []ast.Join{
  1851. {
  1852. Name: "tableInPlanner",
  1853. Alias: "",
  1854. JoinType: ast.INNER_JOIN,
  1855. Expr: &ast.BinaryExpr{
  1856. RHS: &ast.BinaryExpr{
  1857. OP: ast.EQ,
  1858. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  1859. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  1860. },
  1861. OP: ast.AND,
  1862. LHS: &ast.BinaryExpr{
  1863. OP: ast.GT,
  1864. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1865. &ast.FieldRef{
  1866. Name: "hum",
  1867. StreamName: "src2",
  1868. },
  1869. []ast.StreamName{"src2"},
  1870. &boolFalse,
  1871. )},
  1872. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1873. &ast.FieldRef{
  1874. Name: "hum",
  1875. StreamName: "tableInPlanner",
  1876. },
  1877. []ast.StreamName{"tableInPlanner"},
  1878. &boolFalse,
  1879. )},
  1880. },
  1881. },
  1882. },
  1883. },
  1884. }.Init(),
  1885. },
  1886. },
  1887. fields: []ast.Field{
  1888. {
  1889. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1890. &ast.FieldRef{
  1891. Name: "hum",
  1892. StreamName: "src2",
  1893. },
  1894. []ast.StreamName{"src2"},
  1895. &boolFalse,
  1896. )},
  1897. Name: "hum",
  1898. AName: "hum1",
  1899. }, {
  1900. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1901. &ast.FieldRef{
  1902. Name: "hum",
  1903. StreamName: "tableInPlanner",
  1904. },
  1905. []ast.StreamName{"tableInPlanner"},
  1906. &boolFalse,
  1907. )},
  1908. Name: "hum",
  1909. AName: "hum2",
  1910. },
  1911. },
  1912. isAggregate: false,
  1913. sendMeta: false,
  1914. }.Init(),
  1915. },
  1916. }
  1917. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1918. for i, tt := range tests {
  1919. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1920. if err != nil {
  1921. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1922. continue
  1923. }
  1924. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1925. IsEventTime: false,
  1926. LateTol: 0,
  1927. Concurrency: 0,
  1928. BufferLength: 0,
  1929. SendMetaToSink: false,
  1930. Qos: 0,
  1931. CheckpointInterval: 0,
  1932. SendError: true,
  1933. }, store)
  1934. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1935. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1936. } else if !reflect.DeepEqual(tt.p, p) {
  1937. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1938. }
  1939. }
  1940. }