planner_test.go 60 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/internal/testx"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func init() {
  29. testx.InitEnv()
  30. }
  31. func Test_createLogicalPlan(t *testing.T) {
  32. err, store := store.GetKV("stream")
  33. if err != nil {
  34. t.Error(err)
  35. return
  36. }
  37. streamSqls := map[string]string{
  38. "src1": `CREATE STREAM src1 (
  39. id1 BIGINT,
  40. temp BIGINT,
  41. name string,
  42. myarray array(string)
  43. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  44. "src2": `CREATE STREAM src2 (
  45. id2 BIGINT,
  46. hum BIGINT
  47. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  48. "tableInPlanner": `CREATE TABLE tableInPlanner (
  49. id BIGINT,
  50. name STRING,
  51. value STRING,
  52. hum BIGINT
  53. ) WITH (TYPE="file");`,
  54. }
  55. types := map[string]ast.StreamType{
  56. "src1": ast.TypeStream,
  57. "src2": ast.TypeStream,
  58. "tableInPlanner": ast.TypeTable,
  59. }
  60. for name, sql := range streamSqls {
  61. s, err := json.Marshal(&xsql.StreamInfo{
  62. StreamType: types[name],
  63. Statement: sql,
  64. })
  65. if err != nil {
  66. t.Error(err)
  67. t.Fail()
  68. }
  69. err = store.Set(name, string(s))
  70. if err != nil {
  71. t.Error(err)
  72. t.Fail()
  73. }
  74. }
  75. streams := make(map[string]*ast.StreamStmt)
  76. for n := range streamSqls {
  77. streamStmt, err := xsql.GetDataSource(store, n)
  78. if err != nil {
  79. t.Errorf("fail to get stream %s, please check if stream is created", n)
  80. return
  81. }
  82. streams[n] = streamStmt
  83. }
  84. var (
  85. //boolTrue = true
  86. boolFalse = false
  87. )
  88. var tests = []struct {
  89. sql string
  90. p LogicalPlan
  91. err string
  92. }{
  93. { // 0
  94. sql: `SELECT myarray[temp] FROM src1`,
  95. p: ProjectPlan{
  96. baseLogicalPlan: baseLogicalPlan{
  97. children: []LogicalPlan{
  98. DataSourcePlan{
  99. baseLogicalPlan: baseLogicalPlan{},
  100. name: "src1",
  101. streamFields: []interface{}{
  102. &ast.StreamField{
  103. Name: "myarray",
  104. FieldType: &ast.ArrayType{Type: ast.STRINGS},
  105. },
  106. &ast.StreamField{
  107. Name: "temp",
  108. FieldType: &ast.BasicType{Type: ast.BIGINT},
  109. },
  110. },
  111. streamStmt: streams["src1"],
  112. metaFields: []string{},
  113. }.Init(),
  114. },
  115. },
  116. fields: []ast.Field{
  117. {
  118. Expr: &ast.BinaryExpr{
  119. OP: ast.SUBSET,
  120. LHS: &ast.FieldRef{
  121. StreamName: "src1",
  122. Name: "myarray",
  123. },
  124. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  125. StreamName: "src1",
  126. Name: "temp",
  127. }},
  128. },
  129. Name: "kuiper_field_0",
  130. AName: "",
  131. },
  132. },
  133. isAggregate: false,
  134. sendMeta: false,
  135. }.Init(),
  136. }, { // 1 optimize where to data source
  137. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  138. p: ProjectPlan{
  139. baseLogicalPlan: baseLogicalPlan{
  140. children: []LogicalPlan{
  141. WindowPlan{
  142. baseLogicalPlan: baseLogicalPlan{
  143. children: []LogicalPlan{
  144. FilterPlan{
  145. baseLogicalPlan: baseLogicalPlan{
  146. children: []LogicalPlan{
  147. DataSourcePlan{
  148. name: "src1",
  149. streamFields: []interface{}{
  150. &ast.StreamField{
  151. Name: "name",
  152. FieldType: &ast.BasicType{Type: ast.STRINGS},
  153. },
  154. &ast.StreamField{
  155. Name: "temp",
  156. FieldType: &ast.BasicType{Type: ast.BIGINT},
  157. },
  158. },
  159. streamStmt: streams["src1"],
  160. metaFields: []string{},
  161. }.Init(),
  162. },
  163. },
  164. condition: &ast.BinaryExpr{
  165. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  166. OP: ast.EQ,
  167. RHS: &ast.StringLiteral{Val: "v1"},
  168. },
  169. }.Init(),
  170. },
  171. },
  172. condition: nil,
  173. wtype: ast.TUMBLING_WINDOW,
  174. length: 10000,
  175. interval: 0,
  176. limit: 0,
  177. }.Init(),
  178. },
  179. },
  180. fields: []ast.Field{
  181. {
  182. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  183. Name: "temp",
  184. AName: ""},
  185. },
  186. isAggregate: false,
  187. sendMeta: false,
  188. }.Init(),
  189. }, { // 2 condition that cannot be optimized
  190. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  191. p: ProjectPlan{
  192. baseLogicalPlan: baseLogicalPlan{
  193. children: []LogicalPlan{
  194. JoinPlan{
  195. baseLogicalPlan: baseLogicalPlan{
  196. children: []LogicalPlan{
  197. WindowPlan{
  198. baseLogicalPlan: baseLogicalPlan{
  199. children: []LogicalPlan{
  200. DataSourcePlan{
  201. name: "src1",
  202. streamFields: []interface{}{
  203. &ast.StreamField{
  204. Name: "id1",
  205. FieldType: &ast.BasicType{Type: ast.BIGINT},
  206. },
  207. &ast.StreamField{
  208. Name: "temp",
  209. FieldType: &ast.BasicType{Type: ast.BIGINT},
  210. },
  211. },
  212. streamStmt: streams["src1"],
  213. metaFields: []string{},
  214. }.Init(),
  215. DataSourcePlan{
  216. name: "src2",
  217. streamFields: []interface{}{
  218. &ast.StreamField{
  219. Name: "hum",
  220. FieldType: &ast.BasicType{Type: ast.BIGINT},
  221. },
  222. &ast.StreamField{
  223. Name: "id2",
  224. FieldType: &ast.BasicType{Type: ast.BIGINT},
  225. },
  226. },
  227. streamStmt: streams["src2"],
  228. metaFields: []string{},
  229. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  230. }.Init(),
  231. },
  232. },
  233. condition: nil,
  234. wtype: ast.TUMBLING_WINDOW,
  235. length: 10000,
  236. interval: 0,
  237. limit: 0,
  238. }.Init(),
  239. },
  240. },
  241. from: &ast.Table{Name: "src1"},
  242. joins: ast.Joins{ast.Join{
  243. Name: "src2",
  244. JoinType: ast.INNER_JOIN,
  245. Expr: &ast.BinaryExpr{
  246. OP: ast.AND,
  247. LHS: &ast.BinaryExpr{
  248. LHS: &ast.BinaryExpr{
  249. OP: ast.GT,
  250. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  251. RHS: &ast.IntegerLiteral{Val: 20},
  252. },
  253. OP: ast.OR,
  254. RHS: &ast.BinaryExpr{
  255. OP: ast.GT,
  256. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  257. RHS: &ast.IntegerLiteral{Val: 60},
  258. },
  259. },
  260. RHS: &ast.BinaryExpr{
  261. OP: ast.EQ,
  262. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  263. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  264. },
  265. },
  266. }},
  267. }.Init(),
  268. },
  269. },
  270. fields: []ast.Field{
  271. {
  272. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  273. Name: "id1",
  274. AName: ""},
  275. },
  276. isAggregate: false,
  277. sendMeta: false,
  278. }.Init(),
  279. }, { // 3 optimize window filter
  280. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  281. p: ProjectPlan{
  282. baseLogicalPlan: baseLogicalPlan{
  283. children: []LogicalPlan{
  284. WindowPlan{
  285. baseLogicalPlan: baseLogicalPlan{
  286. children: []LogicalPlan{
  287. FilterPlan{
  288. baseLogicalPlan: baseLogicalPlan{
  289. children: []LogicalPlan{
  290. DataSourcePlan{
  291. name: "src1",
  292. streamFields: []interface{}{
  293. &ast.StreamField{
  294. Name: "id1",
  295. FieldType: &ast.BasicType{Type: ast.BIGINT},
  296. },
  297. &ast.StreamField{
  298. Name: "name",
  299. FieldType: &ast.BasicType{Type: ast.STRINGS},
  300. },
  301. &ast.StreamField{
  302. Name: "temp",
  303. FieldType: &ast.BasicType{Type: ast.BIGINT},
  304. },
  305. },
  306. streamStmt: streams["src1"],
  307. metaFields: []string{},
  308. }.Init(),
  309. },
  310. },
  311. condition: &ast.BinaryExpr{
  312. OP: ast.AND,
  313. LHS: &ast.BinaryExpr{
  314. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  315. OP: ast.EQ,
  316. RHS: &ast.StringLiteral{Val: "v1"},
  317. },
  318. RHS: &ast.BinaryExpr{
  319. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  320. OP: ast.GT,
  321. RHS: &ast.IntegerLiteral{Val: 2},
  322. },
  323. },
  324. }.Init(),
  325. },
  326. },
  327. condition: nil,
  328. wtype: ast.TUMBLING_WINDOW,
  329. length: 10000,
  330. interval: 0,
  331. limit: 0,
  332. }.Init(),
  333. },
  334. },
  335. fields: []ast.Field{
  336. {
  337. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  338. Name: "id1",
  339. AName: ""},
  340. },
  341. isAggregate: false,
  342. sendMeta: false,
  343. }.Init(),
  344. }, { // 4. do not optimize count window
  345. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  346. p: ProjectPlan{
  347. baseLogicalPlan: baseLogicalPlan{
  348. children: []LogicalPlan{
  349. HavingPlan{
  350. baseLogicalPlan: baseLogicalPlan{
  351. children: []LogicalPlan{
  352. FilterPlan{
  353. baseLogicalPlan: baseLogicalPlan{
  354. children: []LogicalPlan{
  355. WindowPlan{
  356. baseLogicalPlan: baseLogicalPlan{
  357. children: []LogicalPlan{
  358. DataSourcePlan{
  359. name: "src1",
  360. isWildCard: true,
  361. streamFields: []interface{}{
  362. &ast.StreamField{
  363. Name: "id1",
  364. FieldType: &ast.BasicType{Type: ast.BIGINT},
  365. },
  366. &ast.StreamField{
  367. Name: "temp",
  368. FieldType: &ast.BasicType{Type: ast.BIGINT},
  369. },
  370. &ast.StreamField{
  371. Name: "name",
  372. FieldType: &ast.BasicType{Type: ast.STRINGS},
  373. },
  374. &ast.StreamField{
  375. Name: "myarray",
  376. FieldType: &ast.ArrayType{Type: ast.STRINGS},
  377. },
  378. },
  379. streamStmt: streams["src1"],
  380. metaFields: []string{},
  381. }.Init(),
  382. },
  383. },
  384. condition: nil,
  385. wtype: ast.COUNT_WINDOW,
  386. length: 5,
  387. interval: 1,
  388. limit: 0,
  389. }.Init(),
  390. },
  391. },
  392. condition: &ast.BinaryExpr{
  393. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  394. OP: ast.GT,
  395. RHS: &ast.IntegerLiteral{Val: 20},
  396. },
  397. }.Init(),
  398. },
  399. },
  400. condition: &ast.BinaryExpr{
  401. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  402. Token: ast.ASTERISK,
  403. }}},
  404. OP: ast.GT,
  405. RHS: &ast.IntegerLiteral{Val: 2},
  406. },
  407. }.Init(),
  408. },
  409. },
  410. fields: []ast.Field{
  411. {
  412. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  413. Name: "",
  414. AName: ""},
  415. },
  416. isAggregate: false,
  417. sendMeta: false,
  418. }.Init(),
  419. }, { // 5. optimize join on
  420. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  421. p: ProjectPlan{
  422. baseLogicalPlan: baseLogicalPlan{
  423. children: []LogicalPlan{
  424. JoinPlan{
  425. baseLogicalPlan: baseLogicalPlan{
  426. children: []LogicalPlan{
  427. WindowPlan{
  428. baseLogicalPlan: baseLogicalPlan{
  429. children: []LogicalPlan{
  430. FilterPlan{
  431. baseLogicalPlan: baseLogicalPlan{
  432. children: []LogicalPlan{
  433. DataSourcePlan{
  434. name: "src1",
  435. streamFields: []interface{}{
  436. &ast.StreamField{
  437. Name: "id1",
  438. FieldType: &ast.BasicType{Type: ast.BIGINT},
  439. },
  440. &ast.StreamField{
  441. Name: "temp",
  442. FieldType: &ast.BasicType{Type: ast.BIGINT},
  443. },
  444. },
  445. streamStmt: streams["src1"],
  446. metaFields: []string{},
  447. }.Init(),
  448. },
  449. },
  450. condition: &ast.BinaryExpr{
  451. RHS: &ast.BinaryExpr{
  452. OP: ast.GT,
  453. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  454. RHS: &ast.IntegerLiteral{Val: 20},
  455. },
  456. OP: ast.AND,
  457. LHS: &ast.BinaryExpr{
  458. OP: ast.GT,
  459. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  460. RHS: &ast.IntegerLiteral{Val: 111},
  461. },
  462. },
  463. }.Init(),
  464. FilterPlan{
  465. baseLogicalPlan: baseLogicalPlan{
  466. children: []LogicalPlan{
  467. DataSourcePlan{
  468. name: "src2",
  469. streamFields: []interface{}{
  470. &ast.StreamField{
  471. Name: "hum",
  472. FieldType: &ast.BasicType{Type: ast.BIGINT},
  473. },
  474. &ast.StreamField{
  475. Name: "id2",
  476. FieldType: &ast.BasicType{Type: ast.BIGINT},
  477. },
  478. },
  479. streamStmt: streams["src2"],
  480. metaFields: []string{},
  481. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  482. }.Init(),
  483. },
  484. },
  485. condition: &ast.BinaryExpr{
  486. OP: ast.LT,
  487. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  488. RHS: &ast.IntegerLiteral{Val: 60},
  489. },
  490. }.Init(),
  491. },
  492. },
  493. condition: nil,
  494. wtype: ast.TUMBLING_WINDOW,
  495. length: 10000,
  496. interval: 0,
  497. limit: 0,
  498. }.Init(),
  499. },
  500. },
  501. from: &ast.Table{
  502. Name: "src1",
  503. },
  504. joins: []ast.Join{
  505. {
  506. Name: "src2",
  507. Alias: "",
  508. JoinType: ast.INNER_JOIN,
  509. Expr: &ast.BinaryExpr{
  510. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  511. OP: ast.EQ,
  512. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  513. },
  514. },
  515. },
  516. }.Init(),
  517. },
  518. },
  519. fields: []ast.Field{
  520. {
  521. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  522. Name: "id1",
  523. AName: ""},
  524. },
  525. isAggregate: false,
  526. sendMeta: false,
  527. }.Init(),
  528. }, { // 6. optimize outter join on
  529. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  530. p: ProjectPlan{
  531. baseLogicalPlan: baseLogicalPlan{
  532. children: []LogicalPlan{
  533. JoinPlan{
  534. baseLogicalPlan: baseLogicalPlan{
  535. children: []LogicalPlan{
  536. WindowPlan{
  537. baseLogicalPlan: baseLogicalPlan{
  538. children: []LogicalPlan{
  539. FilterPlan{
  540. baseLogicalPlan: baseLogicalPlan{
  541. children: []LogicalPlan{
  542. DataSourcePlan{
  543. name: "src1",
  544. streamFields: []interface{}{
  545. &ast.StreamField{
  546. Name: "id1",
  547. FieldType: &ast.BasicType{Type: ast.BIGINT},
  548. },
  549. &ast.StreamField{
  550. Name: "temp",
  551. FieldType: &ast.BasicType{Type: ast.BIGINT},
  552. },
  553. },
  554. streamStmt: streams["src1"],
  555. metaFields: []string{},
  556. }.Init(),
  557. },
  558. },
  559. condition: &ast.BinaryExpr{
  560. OP: ast.GT,
  561. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  562. RHS: &ast.IntegerLiteral{Val: 111},
  563. },
  564. }.Init(),
  565. DataSourcePlan{
  566. name: "src2",
  567. streamFields: []interface{}{
  568. &ast.StreamField{
  569. Name: "hum",
  570. FieldType: &ast.BasicType{Type: ast.BIGINT},
  571. },
  572. &ast.StreamField{
  573. Name: "id2",
  574. FieldType: &ast.BasicType{Type: ast.BIGINT},
  575. },
  576. },
  577. streamStmt: streams["src2"],
  578. metaFields: []string{},
  579. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  580. }.Init(),
  581. },
  582. },
  583. condition: nil,
  584. wtype: ast.TUMBLING_WINDOW,
  585. length: 10000,
  586. interval: 0,
  587. limit: 0,
  588. }.Init(),
  589. },
  590. },
  591. from: &ast.Table{
  592. Name: "src1",
  593. },
  594. joins: []ast.Join{
  595. {
  596. Name: "src2",
  597. Alias: "",
  598. JoinType: ast.FULL_JOIN,
  599. Expr: &ast.BinaryExpr{
  600. OP: ast.AND,
  601. LHS: &ast.BinaryExpr{
  602. OP: ast.AND,
  603. LHS: &ast.BinaryExpr{
  604. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  605. OP: ast.EQ,
  606. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  607. },
  608. RHS: &ast.BinaryExpr{
  609. OP: ast.GT,
  610. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  611. RHS: &ast.IntegerLiteral{Val: 20},
  612. },
  613. },
  614. RHS: &ast.BinaryExpr{
  615. OP: ast.LT,
  616. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  617. RHS: &ast.IntegerLiteral{Val: 60},
  618. },
  619. },
  620. },
  621. },
  622. }.Init(),
  623. },
  624. },
  625. fields: []ast.Field{
  626. {
  627. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  628. Name: "id1",
  629. AName: ""},
  630. },
  631. isAggregate: false,
  632. sendMeta: false,
  633. }.Init(),
  634. }, { // 7 window error for table
  635. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  636. p: nil,
  637. err: "cannot run window for TABLE sources",
  638. }, { // 8 join table without window
  639. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  640. p: ProjectPlan{
  641. baseLogicalPlan: baseLogicalPlan{
  642. children: []LogicalPlan{
  643. JoinPlan{
  644. baseLogicalPlan: baseLogicalPlan{
  645. children: []LogicalPlan{
  646. JoinAlignPlan{
  647. baseLogicalPlan: baseLogicalPlan{
  648. children: []LogicalPlan{
  649. FilterPlan{
  650. baseLogicalPlan: baseLogicalPlan{
  651. children: []LogicalPlan{
  652. DataSourcePlan{
  653. name: "src1",
  654. streamFields: []interface{}{
  655. &ast.StreamField{
  656. Name: "id1",
  657. FieldType: &ast.BasicType{Type: ast.BIGINT},
  658. },
  659. &ast.StreamField{
  660. Name: "temp",
  661. FieldType: &ast.BasicType{Type: ast.BIGINT},
  662. },
  663. },
  664. streamStmt: streams["src1"],
  665. metaFields: []string{},
  666. }.Init(),
  667. },
  668. },
  669. condition: &ast.BinaryExpr{
  670. RHS: &ast.BinaryExpr{
  671. OP: ast.GT,
  672. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  673. RHS: &ast.IntegerLiteral{Val: 20},
  674. },
  675. OP: ast.AND,
  676. LHS: &ast.BinaryExpr{
  677. OP: ast.GT,
  678. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  679. RHS: &ast.IntegerLiteral{Val: 111},
  680. },
  681. },
  682. }.Init(),
  683. DataSourcePlan{
  684. name: "tableInPlanner",
  685. streamFields: []interface{}{
  686. &ast.StreamField{
  687. Name: "hum",
  688. FieldType: &ast.BasicType{Type: ast.BIGINT},
  689. },
  690. &ast.StreamField{
  691. Name: "id",
  692. FieldType: &ast.BasicType{Type: ast.BIGINT},
  693. },
  694. },
  695. streamStmt: streams["tableInPlanner"],
  696. metaFields: []string{},
  697. }.Init(),
  698. },
  699. },
  700. Emitters: []string{"tableInPlanner"},
  701. }.Init(),
  702. },
  703. },
  704. from: &ast.Table{
  705. Name: "src1",
  706. },
  707. joins: []ast.Join{
  708. {
  709. Name: "tableInPlanner",
  710. Alias: "",
  711. JoinType: ast.INNER_JOIN,
  712. Expr: &ast.BinaryExpr{
  713. OP: ast.AND,
  714. LHS: &ast.BinaryExpr{
  715. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  716. OP: ast.EQ,
  717. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  718. },
  719. RHS: &ast.BinaryExpr{
  720. OP: ast.LT,
  721. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  722. RHS: &ast.IntegerLiteral{Val: 60},
  723. },
  724. },
  725. },
  726. },
  727. }.Init(),
  728. },
  729. },
  730. fields: []ast.Field{
  731. {
  732. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  733. Name: "id1",
  734. AName: ""},
  735. },
  736. isAggregate: false,
  737. sendMeta: false,
  738. }.Init(),
  739. }, { // 9 join table with window
  740. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  741. p: ProjectPlan{
  742. baseLogicalPlan: baseLogicalPlan{
  743. children: []LogicalPlan{
  744. JoinPlan{
  745. baseLogicalPlan: baseLogicalPlan{
  746. children: []LogicalPlan{
  747. JoinAlignPlan{
  748. baseLogicalPlan: baseLogicalPlan{
  749. children: []LogicalPlan{
  750. WindowPlan{
  751. baseLogicalPlan: baseLogicalPlan{
  752. children: []LogicalPlan{
  753. DataSourcePlan{
  754. name: "src1",
  755. streamFields: []interface{}{
  756. &ast.StreamField{
  757. Name: "id1",
  758. FieldType: &ast.BasicType{Type: ast.BIGINT},
  759. },
  760. &ast.StreamField{
  761. Name: "temp",
  762. FieldType: &ast.BasicType{Type: ast.BIGINT},
  763. },
  764. },
  765. streamStmt: streams["src1"],
  766. metaFields: []string{},
  767. }.Init(),
  768. },
  769. },
  770. condition: nil,
  771. wtype: ast.TUMBLING_WINDOW,
  772. length: 10000,
  773. interval: 0,
  774. limit: 0,
  775. }.Init(),
  776. DataSourcePlan{
  777. name: "tableInPlanner",
  778. streamFields: []interface{}{
  779. &ast.StreamField{
  780. Name: "hum",
  781. FieldType: &ast.BasicType{Type: ast.BIGINT},
  782. },
  783. &ast.StreamField{
  784. Name: "id",
  785. FieldType: &ast.BasicType{Type: ast.BIGINT},
  786. },
  787. },
  788. streamStmt: streams["tableInPlanner"],
  789. metaFields: []string{},
  790. }.Init(),
  791. },
  792. },
  793. Emitters: []string{"tableInPlanner"},
  794. }.Init(),
  795. },
  796. },
  797. from: &ast.Table{
  798. Name: "src1",
  799. },
  800. joins: []ast.Join{
  801. {
  802. Name: "tableInPlanner",
  803. Alias: "",
  804. JoinType: ast.INNER_JOIN,
  805. Expr: &ast.BinaryExpr{
  806. OP: ast.AND,
  807. LHS: &ast.BinaryExpr{
  808. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  809. OP: ast.EQ,
  810. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  811. },
  812. RHS: &ast.BinaryExpr{
  813. RHS: &ast.BinaryExpr{
  814. OP: ast.AND,
  815. LHS: &ast.BinaryExpr{
  816. OP: ast.GT,
  817. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  818. RHS: &ast.IntegerLiteral{Val: 20},
  819. },
  820. RHS: &ast.BinaryExpr{
  821. OP: ast.LT,
  822. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  823. RHS: &ast.IntegerLiteral{Val: 60},
  824. },
  825. },
  826. OP: ast.AND,
  827. LHS: &ast.BinaryExpr{
  828. OP: ast.GT,
  829. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  830. RHS: &ast.IntegerLiteral{Val: 111},
  831. },
  832. },
  833. },
  834. },
  835. },
  836. }.Init(),
  837. },
  838. },
  839. fields: []ast.Field{
  840. {
  841. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  842. Name: "id1",
  843. AName: ""},
  844. },
  845. isAggregate: false,
  846. sendMeta: false,
  847. }.Init(),
  848. }, { // 10 meta
  849. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  850. p: ProjectPlan{
  851. baseLogicalPlan: baseLogicalPlan{
  852. children: []LogicalPlan{
  853. FilterPlan{
  854. baseLogicalPlan: baseLogicalPlan{
  855. children: []LogicalPlan{
  856. DataSourcePlan{
  857. name: "src1",
  858. streamFields: []interface{}{
  859. &ast.StreamField{
  860. Name: "temp",
  861. FieldType: &ast.BasicType{Type: ast.BIGINT},
  862. },
  863. },
  864. streamStmt: streams["src1"],
  865. metaFields: []string{"Humidity", "device", "id"},
  866. }.Init(),
  867. },
  868. },
  869. condition: &ast.BinaryExpr{
  870. LHS: &ast.Call{
  871. Name: "meta",
  872. FuncId: 2,
  873. Args: []ast.Expr{&ast.MetaRef{
  874. Name: "device",
  875. StreamName: ast.DefaultStream,
  876. }},
  877. },
  878. OP: ast.EQ,
  879. RHS: &ast.StringLiteral{
  880. Val: "demo2",
  881. },
  882. },
  883. }.Init(),
  884. },
  885. },
  886. fields: []ast.Field{
  887. {
  888. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  889. Name: "temp",
  890. AName: "",
  891. }, {
  892. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  893. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  894. Name: "id",
  895. StreamName: ast.DefaultStream,
  896. }}},
  897. []ast.StreamName{},
  898. nil,
  899. )},
  900. Name: "meta",
  901. AName: "eid",
  902. }, {
  903. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  904. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  905. &ast.BinaryExpr{
  906. OP: ast.ARROW,
  907. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  908. RHS: &ast.JsonFieldRef{Name: "Device"},
  909. },
  910. }},
  911. []ast.StreamName{},
  912. nil,
  913. )},
  914. Name: "meta",
  915. AName: "hdevice",
  916. },
  917. },
  918. isAggregate: false,
  919. sendMeta: false,
  920. }.Init(),
  921. }, { // 11 join with same name field and aliased
  922. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  923. p: ProjectPlan{
  924. baseLogicalPlan: baseLogicalPlan{
  925. children: []LogicalPlan{
  926. JoinPlan{
  927. baseLogicalPlan: baseLogicalPlan{
  928. children: []LogicalPlan{
  929. JoinAlignPlan{
  930. baseLogicalPlan: baseLogicalPlan{
  931. children: []LogicalPlan{
  932. DataSourcePlan{
  933. name: "src2",
  934. streamFields: []interface{}{
  935. &ast.StreamField{
  936. Name: "hum",
  937. FieldType: &ast.BasicType{Type: ast.BIGINT},
  938. },
  939. &ast.StreamField{
  940. Name: "id2",
  941. FieldType: &ast.BasicType{Type: ast.BIGINT},
  942. },
  943. },
  944. streamStmt: streams["src2"],
  945. metaFields: []string{},
  946. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  947. }.Init(),
  948. DataSourcePlan{
  949. name: "tableInPlanner",
  950. streamFields: []interface{}{
  951. &ast.StreamField{
  952. Name: "hum",
  953. FieldType: &ast.BasicType{Type: ast.BIGINT},
  954. },
  955. &ast.StreamField{
  956. Name: "id",
  957. FieldType: &ast.BasicType{Type: ast.BIGINT},
  958. },
  959. },
  960. streamStmt: streams["tableInPlanner"],
  961. metaFields: []string{},
  962. }.Init(),
  963. },
  964. },
  965. Emitters: []string{"tableInPlanner"},
  966. }.Init(),
  967. },
  968. },
  969. from: &ast.Table{
  970. Name: "src2",
  971. },
  972. joins: []ast.Join{
  973. {
  974. Name: "tableInPlanner",
  975. Alias: "",
  976. JoinType: ast.INNER_JOIN,
  977. Expr: &ast.BinaryExpr{
  978. RHS: &ast.BinaryExpr{
  979. OP: ast.EQ,
  980. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  981. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  982. },
  983. OP: ast.AND,
  984. LHS: &ast.BinaryExpr{
  985. OP: ast.GT,
  986. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  987. &ast.FieldRef{
  988. Name: "hum",
  989. StreamName: "src2",
  990. },
  991. []ast.StreamName{"src2"},
  992. &boolFalse,
  993. )},
  994. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  995. &ast.FieldRef{
  996. Name: "hum",
  997. StreamName: "tableInPlanner",
  998. },
  999. []ast.StreamName{"tableInPlanner"},
  1000. &boolFalse,
  1001. )},
  1002. },
  1003. },
  1004. },
  1005. },
  1006. }.Init(),
  1007. },
  1008. },
  1009. fields: []ast.Field{
  1010. {
  1011. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1012. &ast.FieldRef{
  1013. Name: "hum",
  1014. StreamName: "src2",
  1015. },
  1016. []ast.StreamName{"src2"},
  1017. &boolFalse,
  1018. )},
  1019. Name: "hum",
  1020. AName: "hum1",
  1021. }, {
  1022. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1023. &ast.FieldRef{
  1024. Name: "hum",
  1025. StreamName: "tableInPlanner",
  1026. },
  1027. []ast.StreamName{"tableInPlanner"},
  1028. &boolFalse,
  1029. )},
  1030. Name: "hum",
  1031. AName: "hum2",
  1032. },
  1033. },
  1034. isAggregate: false,
  1035. sendMeta: false,
  1036. }.Init(),
  1037. }, { // 12 meta with more fields
  1038. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1039. p: ProjectPlan{
  1040. baseLogicalPlan: baseLogicalPlan{
  1041. children: []LogicalPlan{
  1042. FilterPlan{
  1043. baseLogicalPlan: baseLogicalPlan{
  1044. children: []LogicalPlan{
  1045. DataSourcePlan{
  1046. name: "src1",
  1047. streamFields: []interface{}{
  1048. &ast.StreamField{
  1049. Name: "temp",
  1050. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1051. },
  1052. },
  1053. streamStmt: streams["src1"],
  1054. metaFields: []string{},
  1055. allMeta: true,
  1056. }.Init(),
  1057. },
  1058. },
  1059. condition: &ast.BinaryExpr{
  1060. LHS: &ast.Call{
  1061. Name: "meta",
  1062. FuncId: 1,
  1063. Args: []ast.Expr{&ast.MetaRef{
  1064. Name: "device",
  1065. StreamName: ast.DefaultStream,
  1066. }},
  1067. },
  1068. OP: ast.EQ,
  1069. RHS: &ast.StringLiteral{
  1070. Val: "demo2",
  1071. },
  1072. },
  1073. }.Init(),
  1074. },
  1075. },
  1076. fields: []ast.Field{
  1077. {
  1078. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1079. Name: "temp",
  1080. AName: "",
  1081. }, {
  1082. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1083. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1084. Name: "*",
  1085. StreamName: ast.DefaultStream,
  1086. }}},
  1087. []ast.StreamName{},
  1088. nil,
  1089. )},
  1090. Name: "meta",
  1091. AName: "m",
  1092. },
  1093. },
  1094. isAggregate: false,
  1095. sendMeta: false,
  1096. }.Init(),
  1097. },
  1098. }
  1099. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1100. for i, tt := range tests {
  1101. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1102. if err != nil {
  1103. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1104. continue
  1105. }
  1106. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1107. IsEventTime: false,
  1108. LateTol: 0,
  1109. Concurrency: 0,
  1110. BufferLength: 0,
  1111. SendMetaToSink: false,
  1112. Qos: 0,
  1113. CheckpointInterval: 0,
  1114. SendError: true,
  1115. }, store)
  1116. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1117. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1118. } else if !reflect.DeepEqual(tt.p, p) {
  1119. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1120. }
  1121. }
  1122. }
  1123. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1124. err, store := store.GetKV("stream")
  1125. if err != nil {
  1126. t.Error(err)
  1127. return
  1128. }
  1129. streamSqls := map[string]string{
  1130. "src1": `CREATE STREAM src1 (
  1131. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1132. "src2": `CREATE STREAM src2 (
  1133. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1134. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1135. id BIGINT,
  1136. name STRING,
  1137. value STRING,
  1138. hum BIGINT
  1139. ) WITH (TYPE="file");`,
  1140. }
  1141. types := map[string]ast.StreamType{
  1142. "src1": ast.TypeStream,
  1143. "src2": ast.TypeStream,
  1144. "tableInPlanner": ast.TypeTable,
  1145. }
  1146. for name, sql := range streamSqls {
  1147. s, err := json.Marshal(&xsql.StreamInfo{
  1148. StreamType: types[name],
  1149. Statement: sql,
  1150. })
  1151. if err != nil {
  1152. t.Error(err)
  1153. t.Fail()
  1154. }
  1155. err = store.Set(name, string(s))
  1156. if err != nil {
  1157. t.Error(err)
  1158. t.Fail()
  1159. }
  1160. }
  1161. streams := make(map[string]*ast.StreamStmt)
  1162. for n := range streamSqls {
  1163. streamStmt, err := xsql.GetDataSource(store, n)
  1164. if err != nil {
  1165. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1166. return
  1167. }
  1168. streams[n] = streamStmt
  1169. }
  1170. var (
  1171. //boolTrue = true
  1172. boolFalse = false
  1173. )
  1174. var tests = []struct {
  1175. sql string
  1176. p LogicalPlan
  1177. err string
  1178. }{
  1179. { // 0
  1180. sql: `SELECT name FROM src1`,
  1181. p: ProjectPlan{
  1182. baseLogicalPlan: baseLogicalPlan{
  1183. children: []LogicalPlan{
  1184. DataSourcePlan{
  1185. baseLogicalPlan: baseLogicalPlan{},
  1186. name: "src1",
  1187. streamFields: []interface{}{
  1188. "name",
  1189. },
  1190. streamStmt: streams["src1"],
  1191. metaFields: []string{},
  1192. }.Init(),
  1193. },
  1194. },
  1195. fields: []ast.Field{
  1196. {
  1197. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1198. Name: "name",
  1199. AName: "",
  1200. },
  1201. },
  1202. isAggregate: false,
  1203. sendMeta: false,
  1204. }.Init(),
  1205. }, { // 1 optimize where to data source
  1206. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1207. p: ProjectPlan{
  1208. baseLogicalPlan: baseLogicalPlan{
  1209. children: []LogicalPlan{
  1210. WindowPlan{
  1211. baseLogicalPlan: baseLogicalPlan{
  1212. children: []LogicalPlan{
  1213. FilterPlan{
  1214. baseLogicalPlan: baseLogicalPlan{
  1215. children: []LogicalPlan{
  1216. DataSourcePlan{
  1217. name: "src1",
  1218. streamFields: []interface{}{
  1219. "name", "temp",
  1220. },
  1221. streamStmt: streams["src1"],
  1222. metaFields: []string{},
  1223. }.Init(),
  1224. },
  1225. },
  1226. condition: &ast.BinaryExpr{
  1227. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1228. OP: ast.EQ,
  1229. RHS: &ast.StringLiteral{Val: "v1"},
  1230. },
  1231. }.Init(),
  1232. },
  1233. },
  1234. condition: nil,
  1235. wtype: ast.TUMBLING_WINDOW,
  1236. length: 10000,
  1237. interval: 0,
  1238. limit: 0,
  1239. }.Init(),
  1240. },
  1241. },
  1242. fields: []ast.Field{
  1243. {
  1244. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1245. Name: "temp",
  1246. AName: ""},
  1247. },
  1248. isAggregate: false,
  1249. sendMeta: false,
  1250. }.Init(),
  1251. }, { // 2 condition that cannot be optimized
  1252. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1253. p: ProjectPlan{
  1254. baseLogicalPlan: baseLogicalPlan{
  1255. children: []LogicalPlan{
  1256. JoinPlan{
  1257. baseLogicalPlan: baseLogicalPlan{
  1258. children: []LogicalPlan{
  1259. WindowPlan{
  1260. baseLogicalPlan: baseLogicalPlan{
  1261. children: []LogicalPlan{
  1262. DataSourcePlan{
  1263. name: "src1",
  1264. streamFields: []interface{}{
  1265. "id1", "temp",
  1266. },
  1267. streamStmt: streams["src1"],
  1268. metaFields: []string{},
  1269. }.Init(),
  1270. DataSourcePlan{
  1271. name: "src2",
  1272. streamFields: []interface{}{ // can't determine where is id1 belonged to
  1273. "hum", "id1", "id2",
  1274. },
  1275. streamStmt: streams["src2"],
  1276. metaFields: []string{},
  1277. }.Init(),
  1278. },
  1279. },
  1280. condition: nil,
  1281. wtype: ast.TUMBLING_WINDOW,
  1282. length: 10000,
  1283. interval: 0,
  1284. limit: 0,
  1285. }.Init(),
  1286. },
  1287. },
  1288. from: &ast.Table{Name: "src1"},
  1289. joins: ast.Joins{ast.Join{
  1290. Name: "src2",
  1291. JoinType: ast.INNER_JOIN,
  1292. Expr: &ast.BinaryExpr{
  1293. OP: ast.AND,
  1294. LHS: &ast.BinaryExpr{
  1295. LHS: &ast.BinaryExpr{
  1296. OP: ast.GT,
  1297. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1298. RHS: &ast.IntegerLiteral{Val: 20},
  1299. },
  1300. OP: ast.OR,
  1301. RHS: &ast.BinaryExpr{
  1302. OP: ast.GT,
  1303. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1304. RHS: &ast.IntegerLiteral{Val: 60},
  1305. },
  1306. },
  1307. RHS: &ast.BinaryExpr{
  1308. OP: ast.EQ,
  1309. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1310. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1311. },
  1312. },
  1313. }},
  1314. }.Init(),
  1315. },
  1316. },
  1317. fields: []ast.Field{
  1318. {
  1319. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1320. Name: "id1",
  1321. AName: ""},
  1322. },
  1323. isAggregate: false,
  1324. sendMeta: false,
  1325. }.Init(),
  1326. }, { // 3 optimize window filter
  1327. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1328. p: ProjectPlan{
  1329. baseLogicalPlan: baseLogicalPlan{
  1330. children: []LogicalPlan{
  1331. WindowPlan{
  1332. baseLogicalPlan: baseLogicalPlan{
  1333. children: []LogicalPlan{
  1334. FilterPlan{
  1335. baseLogicalPlan: baseLogicalPlan{
  1336. children: []LogicalPlan{
  1337. DataSourcePlan{
  1338. name: "src1",
  1339. streamFields: []interface{}{
  1340. "id1", "name", "temp",
  1341. },
  1342. streamStmt: streams["src1"],
  1343. metaFields: []string{},
  1344. }.Init(),
  1345. },
  1346. },
  1347. condition: &ast.BinaryExpr{
  1348. OP: ast.AND,
  1349. LHS: &ast.BinaryExpr{
  1350. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1351. OP: ast.EQ,
  1352. RHS: &ast.StringLiteral{Val: "v1"},
  1353. },
  1354. RHS: &ast.BinaryExpr{
  1355. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1356. OP: ast.GT,
  1357. RHS: &ast.IntegerLiteral{Val: 2},
  1358. },
  1359. },
  1360. }.Init(),
  1361. },
  1362. },
  1363. condition: nil,
  1364. wtype: ast.TUMBLING_WINDOW,
  1365. length: 10000,
  1366. interval: 0,
  1367. limit: 0,
  1368. }.Init(),
  1369. },
  1370. },
  1371. fields: []ast.Field{
  1372. {
  1373. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1374. Name: "id1",
  1375. AName: ""},
  1376. },
  1377. isAggregate: false,
  1378. sendMeta: false,
  1379. }.Init(),
  1380. }, { // 4. do not optimize count window
  1381. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1382. p: ProjectPlan{
  1383. baseLogicalPlan: baseLogicalPlan{
  1384. children: []LogicalPlan{
  1385. HavingPlan{
  1386. baseLogicalPlan: baseLogicalPlan{
  1387. children: []LogicalPlan{
  1388. FilterPlan{
  1389. baseLogicalPlan: baseLogicalPlan{
  1390. children: []LogicalPlan{
  1391. WindowPlan{
  1392. baseLogicalPlan: baseLogicalPlan{
  1393. children: []LogicalPlan{
  1394. DataSourcePlan{
  1395. name: "src1",
  1396. isWildCard: true,
  1397. streamFields: nil,
  1398. streamStmt: streams["src1"],
  1399. metaFields: []string{},
  1400. }.Init(),
  1401. },
  1402. },
  1403. condition: nil,
  1404. wtype: ast.COUNT_WINDOW,
  1405. length: 5,
  1406. interval: 1,
  1407. limit: 0,
  1408. }.Init(),
  1409. },
  1410. },
  1411. condition: &ast.BinaryExpr{
  1412. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1413. OP: ast.GT,
  1414. RHS: &ast.IntegerLiteral{Val: 20},
  1415. },
  1416. }.Init(),
  1417. },
  1418. },
  1419. condition: &ast.BinaryExpr{
  1420. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1421. Token: ast.ASTERISK,
  1422. }}},
  1423. OP: ast.GT,
  1424. RHS: &ast.IntegerLiteral{Val: 2},
  1425. },
  1426. }.Init(),
  1427. },
  1428. },
  1429. fields: []ast.Field{
  1430. {
  1431. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1432. Name: "",
  1433. AName: ""},
  1434. },
  1435. isAggregate: false,
  1436. sendMeta: false,
  1437. }.Init(),
  1438. }, { // 5. optimize join on
  1439. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1440. p: ProjectPlan{
  1441. baseLogicalPlan: baseLogicalPlan{
  1442. children: []LogicalPlan{
  1443. JoinPlan{
  1444. baseLogicalPlan: baseLogicalPlan{
  1445. children: []LogicalPlan{
  1446. WindowPlan{
  1447. baseLogicalPlan: baseLogicalPlan{
  1448. children: []LogicalPlan{
  1449. FilterPlan{
  1450. baseLogicalPlan: baseLogicalPlan{
  1451. children: []LogicalPlan{
  1452. DataSourcePlan{
  1453. name: "src1",
  1454. streamFields: []interface{}{
  1455. "id1", "temp",
  1456. },
  1457. streamStmt: streams["src1"],
  1458. metaFields: []string{},
  1459. }.Init(),
  1460. },
  1461. },
  1462. condition: &ast.BinaryExpr{
  1463. RHS: &ast.BinaryExpr{
  1464. OP: ast.GT,
  1465. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1466. RHS: &ast.IntegerLiteral{Val: 20},
  1467. },
  1468. OP: ast.AND,
  1469. LHS: &ast.BinaryExpr{
  1470. OP: ast.GT,
  1471. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1472. RHS: &ast.IntegerLiteral{Val: 111},
  1473. },
  1474. },
  1475. }.Init(),
  1476. FilterPlan{
  1477. baseLogicalPlan: baseLogicalPlan{
  1478. children: []LogicalPlan{
  1479. DataSourcePlan{
  1480. name: "src2",
  1481. streamFields: []interface{}{
  1482. "hum", "id1", "id2",
  1483. },
  1484. streamStmt: streams["src2"],
  1485. metaFields: []string{},
  1486. }.Init(),
  1487. },
  1488. },
  1489. condition: &ast.BinaryExpr{
  1490. OP: ast.LT,
  1491. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1492. RHS: &ast.IntegerLiteral{Val: 60},
  1493. },
  1494. }.Init(),
  1495. },
  1496. },
  1497. condition: nil,
  1498. wtype: ast.TUMBLING_WINDOW,
  1499. length: 10000,
  1500. interval: 0,
  1501. limit: 0,
  1502. }.Init(),
  1503. },
  1504. },
  1505. from: &ast.Table{
  1506. Name: "src1",
  1507. },
  1508. joins: []ast.Join{
  1509. {
  1510. Name: "src2",
  1511. Alias: "",
  1512. JoinType: ast.INNER_JOIN,
  1513. Expr: &ast.BinaryExpr{
  1514. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1515. OP: ast.EQ,
  1516. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1517. },
  1518. },
  1519. },
  1520. }.Init(),
  1521. },
  1522. },
  1523. fields: []ast.Field{
  1524. {
  1525. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1526. Name: "id1",
  1527. AName: ""},
  1528. },
  1529. isAggregate: false,
  1530. sendMeta: false,
  1531. }.Init(),
  1532. }, { // 6. optimize outter join on
  1533. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1534. p: ProjectPlan{
  1535. baseLogicalPlan: baseLogicalPlan{
  1536. children: []LogicalPlan{
  1537. JoinPlan{
  1538. baseLogicalPlan: baseLogicalPlan{
  1539. children: []LogicalPlan{
  1540. WindowPlan{
  1541. baseLogicalPlan: baseLogicalPlan{
  1542. children: []LogicalPlan{
  1543. FilterPlan{
  1544. baseLogicalPlan: baseLogicalPlan{
  1545. children: []LogicalPlan{
  1546. DataSourcePlan{
  1547. name: "src1",
  1548. streamFields: []interface{}{
  1549. "id1", "temp",
  1550. },
  1551. streamStmt: streams["src1"],
  1552. metaFields: []string{},
  1553. }.Init(),
  1554. },
  1555. },
  1556. condition: &ast.BinaryExpr{
  1557. OP: ast.GT,
  1558. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1559. RHS: &ast.IntegerLiteral{Val: 111},
  1560. },
  1561. }.Init(),
  1562. DataSourcePlan{
  1563. name: "src2",
  1564. streamFields: []interface{}{
  1565. "hum", "id1", "id2",
  1566. },
  1567. streamStmt: streams["src2"],
  1568. metaFields: []string{},
  1569. }.Init(),
  1570. },
  1571. },
  1572. condition: nil,
  1573. wtype: ast.TUMBLING_WINDOW,
  1574. length: 10000,
  1575. interval: 0,
  1576. limit: 0,
  1577. }.Init(),
  1578. },
  1579. },
  1580. from: &ast.Table{
  1581. Name: "src1",
  1582. },
  1583. joins: []ast.Join{
  1584. {
  1585. Name: "src2",
  1586. Alias: "",
  1587. JoinType: ast.FULL_JOIN,
  1588. Expr: &ast.BinaryExpr{
  1589. OP: ast.AND,
  1590. LHS: &ast.BinaryExpr{
  1591. OP: ast.AND,
  1592. LHS: &ast.BinaryExpr{
  1593. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1594. OP: ast.EQ,
  1595. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1596. },
  1597. RHS: &ast.BinaryExpr{
  1598. OP: ast.GT,
  1599. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1600. RHS: &ast.IntegerLiteral{Val: 20},
  1601. },
  1602. },
  1603. RHS: &ast.BinaryExpr{
  1604. OP: ast.LT,
  1605. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1606. RHS: &ast.IntegerLiteral{Val: 60},
  1607. },
  1608. },
  1609. },
  1610. },
  1611. }.Init(),
  1612. },
  1613. },
  1614. fields: []ast.Field{
  1615. {
  1616. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1617. Name: "id1",
  1618. AName: ""},
  1619. },
  1620. isAggregate: false,
  1621. sendMeta: false,
  1622. }.Init(),
  1623. }, { // 7 window error for table
  1624. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1625. p: nil,
  1626. err: "cannot run window for TABLE sources",
  1627. }, { // 8 join table without window
  1628. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  1629. p: ProjectPlan{
  1630. baseLogicalPlan: baseLogicalPlan{
  1631. children: []LogicalPlan{
  1632. JoinPlan{
  1633. baseLogicalPlan: baseLogicalPlan{
  1634. children: []LogicalPlan{
  1635. JoinAlignPlan{
  1636. baseLogicalPlan: baseLogicalPlan{
  1637. children: []LogicalPlan{
  1638. FilterPlan{
  1639. baseLogicalPlan: baseLogicalPlan{
  1640. children: []LogicalPlan{
  1641. DataSourcePlan{
  1642. name: "src1",
  1643. streamFields: []interface{}{
  1644. "hum", "id1", "temp",
  1645. },
  1646. streamStmt: streams["src1"],
  1647. metaFields: []string{},
  1648. }.Init(),
  1649. },
  1650. },
  1651. condition: &ast.BinaryExpr{
  1652. RHS: &ast.BinaryExpr{
  1653. OP: ast.GT,
  1654. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1655. RHS: &ast.IntegerLiteral{Val: 20},
  1656. },
  1657. OP: ast.AND,
  1658. LHS: &ast.BinaryExpr{
  1659. OP: ast.GT,
  1660. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1661. RHS: &ast.IntegerLiteral{Val: 111},
  1662. },
  1663. },
  1664. }.Init(),
  1665. DataSourcePlan{
  1666. name: "tableInPlanner",
  1667. streamFields: []interface{}{
  1668. &ast.StreamField{
  1669. Name: "hum",
  1670. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1671. },
  1672. &ast.StreamField{
  1673. Name: "id",
  1674. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1675. },
  1676. },
  1677. streamStmt: streams["tableInPlanner"],
  1678. metaFields: []string{},
  1679. }.Init(),
  1680. },
  1681. },
  1682. Emitters: []string{"tableInPlanner"},
  1683. }.Init(),
  1684. },
  1685. },
  1686. from: &ast.Table{
  1687. Name: "src1",
  1688. },
  1689. joins: []ast.Join{
  1690. {
  1691. Name: "tableInPlanner",
  1692. Alias: "",
  1693. JoinType: ast.INNER_JOIN,
  1694. Expr: &ast.BinaryExpr{
  1695. OP: ast.AND,
  1696. LHS: &ast.BinaryExpr{
  1697. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1698. OP: ast.EQ,
  1699. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1700. },
  1701. RHS: &ast.BinaryExpr{
  1702. OP: ast.LT,
  1703. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  1704. RHS: &ast.IntegerLiteral{Val: 60},
  1705. },
  1706. },
  1707. },
  1708. },
  1709. }.Init(),
  1710. },
  1711. },
  1712. fields: []ast.Field{
  1713. {
  1714. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1715. Name: "id1",
  1716. AName: ""},
  1717. },
  1718. isAggregate: false,
  1719. sendMeta: false,
  1720. }.Init(),
  1721. }, { // 9 join table with window
  1722. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1723. p: ProjectPlan{
  1724. baseLogicalPlan: baseLogicalPlan{
  1725. children: []LogicalPlan{
  1726. JoinPlan{
  1727. baseLogicalPlan: baseLogicalPlan{
  1728. children: []LogicalPlan{
  1729. JoinAlignPlan{
  1730. baseLogicalPlan: baseLogicalPlan{
  1731. children: []LogicalPlan{
  1732. WindowPlan{
  1733. baseLogicalPlan: baseLogicalPlan{
  1734. children: []LogicalPlan{
  1735. DataSourcePlan{
  1736. name: "src1",
  1737. streamFields: []interface{}{
  1738. "id1", "temp",
  1739. },
  1740. streamStmt: streams["src1"],
  1741. metaFields: []string{},
  1742. }.Init(),
  1743. },
  1744. },
  1745. condition: nil,
  1746. wtype: ast.TUMBLING_WINDOW,
  1747. length: 10000,
  1748. interval: 0,
  1749. limit: 0,
  1750. }.Init(),
  1751. DataSourcePlan{
  1752. name: "tableInPlanner",
  1753. streamFields: []interface{}{
  1754. &ast.StreamField{
  1755. Name: "hum",
  1756. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1757. },
  1758. &ast.StreamField{
  1759. Name: "id",
  1760. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1761. },
  1762. },
  1763. streamStmt: streams["tableInPlanner"],
  1764. metaFields: []string{},
  1765. }.Init(),
  1766. },
  1767. },
  1768. Emitters: []string{"tableInPlanner"},
  1769. }.Init(),
  1770. },
  1771. },
  1772. from: &ast.Table{
  1773. Name: "src1",
  1774. },
  1775. joins: []ast.Join{
  1776. {
  1777. Name: "tableInPlanner",
  1778. Alias: "",
  1779. JoinType: ast.INNER_JOIN,
  1780. Expr: &ast.BinaryExpr{
  1781. OP: ast.AND,
  1782. LHS: &ast.BinaryExpr{
  1783. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1784. OP: ast.EQ,
  1785. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1786. },
  1787. RHS: &ast.BinaryExpr{
  1788. RHS: &ast.BinaryExpr{
  1789. OP: ast.AND,
  1790. LHS: &ast.BinaryExpr{
  1791. OP: ast.GT,
  1792. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1793. RHS: &ast.IntegerLiteral{Val: 20},
  1794. },
  1795. RHS: &ast.BinaryExpr{
  1796. OP: ast.LT,
  1797. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1798. RHS: &ast.IntegerLiteral{Val: 60},
  1799. },
  1800. },
  1801. OP: ast.AND,
  1802. LHS: &ast.BinaryExpr{
  1803. OP: ast.GT,
  1804. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1805. RHS: &ast.IntegerLiteral{Val: 111},
  1806. },
  1807. },
  1808. },
  1809. },
  1810. },
  1811. }.Init(),
  1812. },
  1813. },
  1814. fields: []ast.Field{
  1815. {
  1816. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1817. Name: "id1",
  1818. AName: ""},
  1819. },
  1820. isAggregate: false,
  1821. sendMeta: false,
  1822. }.Init(),
  1823. }, { // 10 meta
  1824. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1825. p: ProjectPlan{
  1826. baseLogicalPlan: baseLogicalPlan{
  1827. children: []LogicalPlan{
  1828. FilterPlan{
  1829. baseLogicalPlan: baseLogicalPlan{
  1830. children: []LogicalPlan{
  1831. DataSourcePlan{
  1832. name: "src1",
  1833. streamFields: []interface{}{
  1834. "temp",
  1835. },
  1836. streamStmt: streams["src1"],
  1837. metaFields: []string{"Humidity", "device", "id"},
  1838. }.Init(),
  1839. },
  1840. },
  1841. condition: &ast.BinaryExpr{
  1842. LHS: &ast.Call{
  1843. Name: "meta",
  1844. FuncId: 2,
  1845. Args: []ast.Expr{&ast.MetaRef{
  1846. Name: "device",
  1847. StreamName: ast.DefaultStream,
  1848. }},
  1849. },
  1850. OP: ast.EQ,
  1851. RHS: &ast.StringLiteral{
  1852. Val: "demo2",
  1853. },
  1854. },
  1855. }.Init(),
  1856. },
  1857. },
  1858. fields: []ast.Field{
  1859. {
  1860. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1861. Name: "temp",
  1862. AName: "",
  1863. }, {
  1864. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1865. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1866. Name: "id",
  1867. StreamName: ast.DefaultStream,
  1868. }}},
  1869. []ast.StreamName{},
  1870. nil,
  1871. )},
  1872. Name: "meta",
  1873. AName: "eid",
  1874. }, {
  1875. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1876. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  1877. &ast.BinaryExpr{
  1878. OP: ast.ARROW,
  1879. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  1880. RHS: &ast.JsonFieldRef{Name: "Device"},
  1881. },
  1882. }},
  1883. []ast.StreamName{},
  1884. nil,
  1885. )},
  1886. Name: "meta",
  1887. AName: "hdevice",
  1888. },
  1889. },
  1890. isAggregate: false,
  1891. sendMeta: false,
  1892. }.Init(),
  1893. }, { // 11 join with same name field and aliased
  1894. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1895. p: ProjectPlan{
  1896. baseLogicalPlan: baseLogicalPlan{
  1897. children: []LogicalPlan{
  1898. JoinPlan{
  1899. baseLogicalPlan: baseLogicalPlan{
  1900. children: []LogicalPlan{
  1901. JoinAlignPlan{
  1902. baseLogicalPlan: baseLogicalPlan{
  1903. children: []LogicalPlan{
  1904. DataSourcePlan{
  1905. name: "src2",
  1906. streamFields: []interface{}{
  1907. "hum", "id", "id2",
  1908. },
  1909. streamStmt: streams["src2"],
  1910. metaFields: []string{},
  1911. }.Init(),
  1912. DataSourcePlan{
  1913. name: "tableInPlanner",
  1914. streamFields: []interface{}{
  1915. &ast.StreamField{
  1916. Name: "hum",
  1917. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1918. },
  1919. &ast.StreamField{
  1920. Name: "id",
  1921. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1922. },
  1923. },
  1924. streamStmt: streams["tableInPlanner"],
  1925. metaFields: []string{},
  1926. }.Init(),
  1927. },
  1928. },
  1929. Emitters: []string{"tableInPlanner"},
  1930. }.Init(),
  1931. },
  1932. },
  1933. from: &ast.Table{
  1934. Name: "src2",
  1935. },
  1936. joins: []ast.Join{
  1937. {
  1938. Name: "tableInPlanner",
  1939. Alias: "",
  1940. JoinType: ast.INNER_JOIN,
  1941. Expr: &ast.BinaryExpr{
  1942. RHS: &ast.BinaryExpr{
  1943. OP: ast.EQ,
  1944. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  1945. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  1946. },
  1947. OP: ast.AND,
  1948. LHS: &ast.BinaryExpr{
  1949. OP: ast.GT,
  1950. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1951. &ast.FieldRef{
  1952. Name: "hum",
  1953. StreamName: "src2",
  1954. },
  1955. []ast.StreamName{"src2"},
  1956. &boolFalse,
  1957. )},
  1958. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1959. &ast.FieldRef{
  1960. Name: "hum",
  1961. StreamName: "tableInPlanner",
  1962. },
  1963. []ast.StreamName{"tableInPlanner"},
  1964. &boolFalse,
  1965. )},
  1966. },
  1967. },
  1968. },
  1969. },
  1970. }.Init(),
  1971. },
  1972. },
  1973. fields: []ast.Field{
  1974. {
  1975. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1976. &ast.FieldRef{
  1977. Name: "hum",
  1978. StreamName: "src2",
  1979. },
  1980. []ast.StreamName{"src2"},
  1981. &boolFalse,
  1982. )},
  1983. Name: "hum",
  1984. AName: "hum1",
  1985. }, {
  1986. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1987. &ast.FieldRef{
  1988. Name: "hum",
  1989. StreamName: "tableInPlanner",
  1990. },
  1991. []ast.StreamName{"tableInPlanner"},
  1992. &boolFalse,
  1993. )},
  1994. Name: "hum",
  1995. AName: "hum2",
  1996. },
  1997. },
  1998. isAggregate: false,
  1999. sendMeta: false,
  2000. }.Init(),
  2001. }, { // 12
  2002. sql: `SELECT name->first, name->last FROM src1`,
  2003. p: ProjectPlan{
  2004. baseLogicalPlan: baseLogicalPlan{
  2005. children: []LogicalPlan{
  2006. DataSourcePlan{
  2007. baseLogicalPlan: baseLogicalPlan{},
  2008. name: "src1",
  2009. streamFields: []interface{}{
  2010. "name",
  2011. },
  2012. streamStmt: streams["src1"],
  2013. metaFields: []string{},
  2014. }.Init(),
  2015. },
  2016. },
  2017. fields: []ast.Field{
  2018. {
  2019. Expr: &ast.BinaryExpr{
  2020. OP: ast.ARROW,
  2021. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2022. RHS: &ast.JsonFieldRef{Name: "first"},
  2023. },
  2024. Name: "kuiper_field_0",
  2025. AName: "",
  2026. }, {
  2027. Expr: &ast.BinaryExpr{
  2028. OP: ast.ARROW,
  2029. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2030. RHS: &ast.JsonFieldRef{Name: "last"},
  2031. },
  2032. Name: "kuiper_field_1",
  2033. AName: "",
  2034. },
  2035. },
  2036. isAggregate: false,
  2037. sendMeta: false,
  2038. }.Init(),
  2039. },
  2040. }
  2041. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2042. for i, tt := range tests {
  2043. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2044. if err != nil {
  2045. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2046. continue
  2047. }
  2048. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2049. IsEventTime: false,
  2050. LateTol: 0,
  2051. Concurrency: 0,
  2052. BufferLength: 0,
  2053. SendMetaToSink: false,
  2054. Qos: 0,
  2055. CheckpointInterval: 0,
  2056. SendError: true,
  2057. }, store)
  2058. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2059. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2060. } else if !reflect.DeepEqual(tt.p, p) {
  2061. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2062. }
  2063. }
  2064. }