planner_test.go 83 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/internal/testx"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func init() {
  29. testx.InitEnv()
  30. }
  31. func Test_createLogicalPlan(t *testing.T) {
  32. err, store := store.GetKV("stream")
  33. if err != nil {
  34. t.Error(err)
  35. return
  36. }
  37. streamSqls := map[string]string{
  38. "src1": `CREATE STREAM src1 (
  39. id1 BIGINT,
  40. temp BIGINT,
  41. name string,
  42. myarray array(string)
  43. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  44. "src2": `CREATE STREAM src2 (
  45. id2 BIGINT,
  46. hum BIGINT
  47. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  48. "tableInPlanner": `CREATE TABLE tableInPlanner (
  49. id BIGINT,
  50. name STRING,
  51. value STRING,
  52. hum BIGINT
  53. ) WITH (TYPE="file");`,
  54. }
  55. types := map[string]ast.StreamType{
  56. "src1": ast.TypeStream,
  57. "src2": ast.TypeStream,
  58. "tableInPlanner": ast.TypeTable,
  59. }
  60. for name, sql := range streamSqls {
  61. s, err := json.Marshal(&xsql.StreamInfo{
  62. StreamType: types[name],
  63. Statement: sql,
  64. })
  65. if err != nil {
  66. t.Error(err)
  67. t.Fail()
  68. }
  69. err = store.Set(name, string(s))
  70. if err != nil {
  71. t.Error(err)
  72. t.Fail()
  73. }
  74. }
  75. streams := make(map[string]*ast.StreamStmt)
  76. for n := range streamSqls {
  77. streamStmt, err := xsql.GetDataSource(store, n)
  78. if err != nil {
  79. t.Errorf("fail to get stream %s, please check if stream is created", n)
  80. return
  81. }
  82. streams[n] = streamStmt
  83. }
  84. var (
  85. //boolTrue = true
  86. boolFalse = false
  87. )
  88. var tests = []struct {
  89. sql string
  90. p LogicalPlan
  91. err string
  92. }{
  93. { // 0
  94. sql: `SELECT myarray[temp] FROM src1`,
  95. p: ProjectPlan{
  96. baseLogicalPlan: baseLogicalPlan{
  97. children: []LogicalPlan{
  98. DataSourcePlan{
  99. baseLogicalPlan: baseLogicalPlan{},
  100. name: "src1",
  101. streamFields: map[string]*ast.JsonStreamField{
  102. "myarray": {
  103. Type: "array",
  104. Items: &ast.JsonStreamField{
  105. Type: "string",
  106. },
  107. },
  108. "temp": {
  109. Type: "bigint",
  110. },
  111. },
  112. streamStmt: streams["src1"],
  113. metaFields: []string{},
  114. }.Init(),
  115. },
  116. },
  117. fields: []ast.Field{
  118. {
  119. Expr: &ast.BinaryExpr{
  120. OP: ast.SUBSET,
  121. LHS: &ast.FieldRef{
  122. StreamName: "src1",
  123. Name: "myarray",
  124. },
  125. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  126. StreamName: "src1",
  127. Name: "temp",
  128. }},
  129. },
  130. Name: "kuiper_field_0",
  131. AName: "",
  132. },
  133. },
  134. isAggregate: false,
  135. sendMeta: false,
  136. }.Init(),
  137. }, { // 1 optimize where to data source
  138. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  139. p: ProjectPlan{
  140. baseLogicalPlan: baseLogicalPlan{
  141. children: []LogicalPlan{
  142. WindowPlan{
  143. baseLogicalPlan: baseLogicalPlan{
  144. children: []LogicalPlan{
  145. FilterPlan{
  146. baseLogicalPlan: baseLogicalPlan{
  147. children: []LogicalPlan{
  148. DataSourcePlan{
  149. name: "src1",
  150. streamFields: map[string]*ast.JsonStreamField{
  151. "name": {
  152. Type: "string",
  153. },
  154. "temp": {
  155. Type: "bigint",
  156. },
  157. },
  158. streamStmt: streams["src1"],
  159. metaFields: []string{},
  160. }.Init(),
  161. },
  162. },
  163. condition: &ast.BinaryExpr{
  164. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  165. OP: ast.EQ,
  166. RHS: &ast.StringLiteral{Val: "v1"},
  167. },
  168. }.Init(),
  169. },
  170. },
  171. condition: nil,
  172. wtype: ast.TUMBLING_WINDOW,
  173. length: 10000,
  174. interval: 0,
  175. limit: 0,
  176. }.Init(),
  177. },
  178. },
  179. fields: []ast.Field{
  180. {
  181. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  182. Name: "temp",
  183. AName: ""},
  184. },
  185. isAggregate: false,
  186. sendMeta: false,
  187. }.Init(),
  188. }, { // 2 condition that cannot be optimized
  189. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  190. p: ProjectPlan{
  191. baseLogicalPlan: baseLogicalPlan{
  192. children: []LogicalPlan{
  193. JoinPlan{
  194. baseLogicalPlan: baseLogicalPlan{
  195. children: []LogicalPlan{
  196. WindowPlan{
  197. baseLogicalPlan: baseLogicalPlan{
  198. children: []LogicalPlan{
  199. DataSourcePlan{
  200. name: "src1",
  201. streamFields: map[string]*ast.JsonStreamField{
  202. "id1": {
  203. Type: "bigint",
  204. },
  205. "temp": {
  206. Type: "bigint",
  207. },
  208. },
  209. streamStmt: streams["src1"],
  210. metaFields: []string{},
  211. }.Init(),
  212. DataSourcePlan{
  213. name: "src2",
  214. streamFields: map[string]*ast.JsonStreamField{
  215. "hum": {
  216. Type: "bigint",
  217. },
  218. "id2": {
  219. Type: "bigint",
  220. },
  221. },
  222. streamStmt: streams["src2"],
  223. metaFields: []string{},
  224. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  225. }.Init(),
  226. },
  227. },
  228. condition: nil,
  229. wtype: ast.TUMBLING_WINDOW,
  230. length: 10000,
  231. interval: 0,
  232. limit: 0,
  233. }.Init(),
  234. },
  235. },
  236. from: &ast.Table{Name: "src1"},
  237. joins: ast.Joins{ast.Join{
  238. Name: "src2",
  239. JoinType: ast.INNER_JOIN,
  240. Expr: &ast.BinaryExpr{
  241. OP: ast.AND,
  242. LHS: &ast.BinaryExpr{
  243. LHS: &ast.BinaryExpr{
  244. OP: ast.GT,
  245. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  246. RHS: &ast.IntegerLiteral{Val: 20},
  247. },
  248. OP: ast.OR,
  249. RHS: &ast.BinaryExpr{
  250. OP: ast.GT,
  251. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  252. RHS: &ast.IntegerLiteral{Val: 60},
  253. },
  254. },
  255. RHS: &ast.BinaryExpr{
  256. OP: ast.EQ,
  257. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  258. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  259. },
  260. },
  261. }},
  262. }.Init(),
  263. },
  264. },
  265. fields: []ast.Field{
  266. {
  267. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  268. Name: "id1",
  269. AName: ""},
  270. },
  271. isAggregate: false,
  272. sendMeta: false,
  273. }.Init(),
  274. }, { // 3 optimize window filter
  275. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  276. p: ProjectPlan{
  277. baseLogicalPlan: baseLogicalPlan{
  278. children: []LogicalPlan{
  279. WindowPlan{
  280. baseLogicalPlan: baseLogicalPlan{
  281. children: []LogicalPlan{
  282. FilterPlan{
  283. baseLogicalPlan: baseLogicalPlan{
  284. children: []LogicalPlan{
  285. DataSourcePlan{
  286. name: "src1",
  287. streamFields: map[string]*ast.JsonStreamField{
  288. "id1": {
  289. Type: "bigint",
  290. },
  291. "name": {
  292. Type: "string",
  293. },
  294. "temp": {
  295. Type: "bigint",
  296. },
  297. },
  298. streamStmt: streams["src1"],
  299. metaFields: []string{},
  300. }.Init(),
  301. },
  302. },
  303. condition: &ast.BinaryExpr{
  304. OP: ast.AND,
  305. LHS: &ast.BinaryExpr{
  306. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  307. OP: ast.EQ,
  308. RHS: &ast.StringLiteral{Val: "v1"},
  309. },
  310. RHS: &ast.BinaryExpr{
  311. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  312. OP: ast.GT,
  313. RHS: &ast.IntegerLiteral{Val: 2},
  314. },
  315. },
  316. }.Init(),
  317. },
  318. },
  319. condition: nil,
  320. wtype: ast.TUMBLING_WINDOW,
  321. length: 10000,
  322. interval: 0,
  323. limit: 0,
  324. }.Init(),
  325. },
  326. },
  327. fields: []ast.Field{
  328. {
  329. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  330. Name: "id1",
  331. AName: ""},
  332. },
  333. isAggregate: false,
  334. sendMeta: false,
  335. }.Init(),
  336. }, { // 4. do not optimize count window
  337. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  338. p: ProjectPlan{
  339. baseLogicalPlan: baseLogicalPlan{
  340. children: []LogicalPlan{
  341. HavingPlan{
  342. baseLogicalPlan: baseLogicalPlan{
  343. children: []LogicalPlan{
  344. FilterPlan{
  345. baseLogicalPlan: baseLogicalPlan{
  346. children: []LogicalPlan{
  347. WindowPlan{
  348. baseLogicalPlan: baseLogicalPlan{
  349. children: []LogicalPlan{
  350. DataSourcePlan{
  351. name: "src1",
  352. isWildCard: true,
  353. streamFields: map[string]*ast.JsonStreamField{
  354. "id1": {
  355. Type: "bigint",
  356. },
  357. "temp": {
  358. Type: "bigint",
  359. },
  360. "name": {
  361. Type: "string",
  362. },
  363. "myarray": {
  364. Type: "array",
  365. Items: &ast.JsonStreamField{
  366. Type: "string",
  367. },
  368. },
  369. },
  370. streamStmt: streams["src1"],
  371. metaFields: []string{},
  372. }.Init(),
  373. },
  374. },
  375. condition: nil,
  376. wtype: ast.COUNT_WINDOW,
  377. length: 5,
  378. interval: 1,
  379. limit: 0,
  380. }.Init(),
  381. },
  382. },
  383. condition: &ast.BinaryExpr{
  384. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  385. OP: ast.GT,
  386. RHS: &ast.IntegerLiteral{Val: 20},
  387. },
  388. }.Init(),
  389. },
  390. },
  391. condition: &ast.BinaryExpr{
  392. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  393. Token: ast.ASTERISK,
  394. }}, FuncType: ast.FuncTypeAgg},
  395. OP: ast.GT,
  396. RHS: &ast.IntegerLiteral{Val: 2},
  397. },
  398. }.Init(),
  399. },
  400. },
  401. fields: []ast.Field{
  402. {
  403. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  404. Name: "*",
  405. AName: ""},
  406. },
  407. isAggregate: false,
  408. sendMeta: false,
  409. }.Init(),
  410. }, { // 5. optimize join on
  411. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  412. p: ProjectPlan{
  413. baseLogicalPlan: baseLogicalPlan{
  414. children: []LogicalPlan{
  415. JoinPlan{
  416. baseLogicalPlan: baseLogicalPlan{
  417. children: []LogicalPlan{
  418. WindowPlan{
  419. baseLogicalPlan: baseLogicalPlan{
  420. children: []LogicalPlan{
  421. FilterPlan{
  422. baseLogicalPlan: baseLogicalPlan{
  423. children: []LogicalPlan{
  424. DataSourcePlan{
  425. name: "src1",
  426. streamFields: map[string]*ast.JsonStreamField{
  427. "id1": {
  428. Type: "bigint",
  429. },
  430. "temp": {
  431. Type: "bigint",
  432. },
  433. },
  434. streamStmt: streams["src1"],
  435. metaFields: []string{},
  436. }.Init(),
  437. },
  438. },
  439. condition: &ast.BinaryExpr{
  440. RHS: &ast.BinaryExpr{
  441. OP: ast.GT,
  442. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  443. RHS: &ast.IntegerLiteral{Val: 20},
  444. },
  445. OP: ast.AND,
  446. LHS: &ast.BinaryExpr{
  447. OP: ast.GT,
  448. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  449. RHS: &ast.IntegerLiteral{Val: 111},
  450. },
  451. },
  452. }.Init(),
  453. FilterPlan{
  454. baseLogicalPlan: baseLogicalPlan{
  455. children: []LogicalPlan{
  456. DataSourcePlan{
  457. name: "src2",
  458. streamFields: map[string]*ast.JsonStreamField{
  459. "hum": {
  460. Type: "bigint",
  461. },
  462. "id2": {
  463. Type: "bigint",
  464. },
  465. },
  466. streamStmt: streams["src2"],
  467. metaFields: []string{},
  468. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  469. }.Init(),
  470. },
  471. },
  472. condition: &ast.BinaryExpr{
  473. OP: ast.LT,
  474. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  475. RHS: &ast.IntegerLiteral{Val: 60},
  476. },
  477. }.Init(),
  478. },
  479. },
  480. condition: nil,
  481. wtype: ast.TUMBLING_WINDOW,
  482. length: 10000,
  483. interval: 0,
  484. limit: 0,
  485. }.Init(),
  486. },
  487. },
  488. from: &ast.Table{
  489. Name: "src1",
  490. },
  491. joins: []ast.Join{
  492. {
  493. Name: "src2",
  494. Alias: "",
  495. JoinType: ast.INNER_JOIN,
  496. Expr: &ast.BinaryExpr{
  497. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  498. OP: ast.EQ,
  499. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  500. },
  501. },
  502. },
  503. }.Init(),
  504. },
  505. },
  506. fields: []ast.Field{
  507. {
  508. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  509. Name: "id1",
  510. AName: ""},
  511. },
  512. isAggregate: false,
  513. sendMeta: false,
  514. }.Init(),
  515. }, { // 6. optimize outter join on
  516. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  517. p: ProjectPlan{
  518. baseLogicalPlan: baseLogicalPlan{
  519. children: []LogicalPlan{
  520. JoinPlan{
  521. baseLogicalPlan: baseLogicalPlan{
  522. children: []LogicalPlan{
  523. WindowPlan{
  524. baseLogicalPlan: baseLogicalPlan{
  525. children: []LogicalPlan{
  526. FilterPlan{
  527. baseLogicalPlan: baseLogicalPlan{
  528. children: []LogicalPlan{
  529. DataSourcePlan{
  530. name: "src1",
  531. streamFields: map[string]*ast.JsonStreamField{
  532. "id1": {
  533. Type: "bigint",
  534. },
  535. "temp": {
  536. Type: "bigint",
  537. },
  538. },
  539. streamStmt: streams["src1"],
  540. metaFields: []string{},
  541. }.Init(),
  542. },
  543. },
  544. condition: &ast.BinaryExpr{
  545. OP: ast.GT,
  546. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  547. RHS: &ast.IntegerLiteral{Val: 111},
  548. },
  549. }.Init(),
  550. DataSourcePlan{
  551. name: "src2",
  552. streamFields: map[string]*ast.JsonStreamField{
  553. "hum": {
  554. Type: "bigint",
  555. },
  556. "id2": {
  557. Type: "bigint",
  558. },
  559. },
  560. streamStmt: streams["src2"],
  561. metaFields: []string{},
  562. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  563. }.Init(),
  564. },
  565. },
  566. condition: nil,
  567. wtype: ast.TUMBLING_WINDOW,
  568. length: 10000,
  569. interval: 0,
  570. limit: 0,
  571. }.Init(),
  572. },
  573. },
  574. from: &ast.Table{
  575. Name: "src1",
  576. },
  577. joins: []ast.Join{
  578. {
  579. Name: "src2",
  580. Alias: "",
  581. JoinType: ast.FULL_JOIN,
  582. Expr: &ast.BinaryExpr{
  583. OP: ast.AND,
  584. LHS: &ast.BinaryExpr{
  585. OP: ast.AND,
  586. LHS: &ast.BinaryExpr{
  587. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  588. OP: ast.EQ,
  589. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  590. },
  591. RHS: &ast.BinaryExpr{
  592. OP: ast.GT,
  593. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  594. RHS: &ast.IntegerLiteral{Val: 20},
  595. },
  596. },
  597. RHS: &ast.BinaryExpr{
  598. OP: ast.LT,
  599. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  600. RHS: &ast.IntegerLiteral{Val: 60},
  601. },
  602. },
  603. },
  604. },
  605. }.Init(),
  606. },
  607. },
  608. fields: []ast.Field{
  609. {
  610. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  611. Name: "id1",
  612. AName: ""},
  613. },
  614. isAggregate: false,
  615. sendMeta: false,
  616. }.Init(),
  617. }, { // 7 window error for table
  618. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  619. p: nil,
  620. err: "cannot run window for TABLE sources",
  621. }, { // 8 join table without window
  622. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  623. p: ProjectPlan{
  624. baseLogicalPlan: baseLogicalPlan{
  625. children: []LogicalPlan{
  626. JoinPlan{
  627. baseLogicalPlan: baseLogicalPlan{
  628. children: []LogicalPlan{
  629. JoinAlignPlan{
  630. baseLogicalPlan: baseLogicalPlan{
  631. children: []LogicalPlan{
  632. FilterPlan{
  633. baseLogicalPlan: baseLogicalPlan{
  634. children: []LogicalPlan{
  635. DataSourcePlan{
  636. name: "src1",
  637. streamFields: map[string]*ast.JsonStreamField{
  638. "id1": {
  639. Type: "bigint",
  640. },
  641. "temp": {
  642. Type: "bigint",
  643. },
  644. },
  645. streamStmt: streams["src1"],
  646. metaFields: []string{},
  647. }.Init(),
  648. },
  649. },
  650. condition: &ast.BinaryExpr{
  651. RHS: &ast.BinaryExpr{
  652. OP: ast.GT,
  653. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  654. RHS: &ast.IntegerLiteral{Val: 20},
  655. },
  656. OP: ast.AND,
  657. LHS: &ast.BinaryExpr{
  658. OP: ast.GT,
  659. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  660. RHS: &ast.IntegerLiteral{Val: 111},
  661. },
  662. },
  663. }.Init(),
  664. DataSourcePlan{
  665. name: "tableInPlanner",
  666. streamFields: map[string]*ast.JsonStreamField{
  667. "hum": {
  668. Type: "bigint",
  669. },
  670. "id": {
  671. Type: "bigint",
  672. },
  673. },
  674. streamStmt: streams["tableInPlanner"],
  675. metaFields: []string{},
  676. }.Init(),
  677. },
  678. },
  679. Emitters: []string{"tableInPlanner"},
  680. }.Init(),
  681. },
  682. },
  683. from: &ast.Table{
  684. Name: "src1",
  685. },
  686. joins: []ast.Join{
  687. {
  688. Name: "tableInPlanner",
  689. Alias: "",
  690. JoinType: ast.INNER_JOIN,
  691. Expr: &ast.BinaryExpr{
  692. OP: ast.AND,
  693. LHS: &ast.BinaryExpr{
  694. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  695. OP: ast.EQ,
  696. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  697. },
  698. RHS: &ast.BinaryExpr{
  699. OP: ast.LT,
  700. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  701. RHS: &ast.IntegerLiteral{Val: 60},
  702. },
  703. },
  704. },
  705. },
  706. }.Init(),
  707. },
  708. },
  709. fields: []ast.Field{
  710. {
  711. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  712. Name: "id1",
  713. AName: ""},
  714. },
  715. isAggregate: false,
  716. sendMeta: false,
  717. }.Init(),
  718. }, { // 9 join table with window
  719. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  720. p: ProjectPlan{
  721. baseLogicalPlan: baseLogicalPlan{
  722. children: []LogicalPlan{
  723. JoinPlan{
  724. baseLogicalPlan: baseLogicalPlan{
  725. children: []LogicalPlan{
  726. JoinAlignPlan{
  727. baseLogicalPlan: baseLogicalPlan{
  728. children: []LogicalPlan{
  729. WindowPlan{
  730. baseLogicalPlan: baseLogicalPlan{
  731. children: []LogicalPlan{
  732. DataSourcePlan{
  733. name: "src1",
  734. streamFields: map[string]*ast.JsonStreamField{
  735. "id1": {
  736. Type: "bigint",
  737. },
  738. "temp": {
  739. Type: "bigint",
  740. },
  741. },
  742. streamStmt: streams["src1"],
  743. metaFields: []string{},
  744. }.Init(),
  745. },
  746. },
  747. condition: nil,
  748. wtype: ast.TUMBLING_WINDOW,
  749. length: 10000,
  750. interval: 0,
  751. limit: 0,
  752. }.Init(),
  753. DataSourcePlan{
  754. name: "tableInPlanner",
  755. streamFields: map[string]*ast.JsonStreamField{
  756. "hum": {
  757. Type: "bigint",
  758. },
  759. "id": {
  760. Type: "bigint",
  761. },
  762. },
  763. streamStmt: streams["tableInPlanner"],
  764. metaFields: []string{},
  765. }.Init(),
  766. },
  767. },
  768. Emitters: []string{"tableInPlanner"},
  769. }.Init(),
  770. },
  771. },
  772. from: &ast.Table{
  773. Name: "src1",
  774. },
  775. joins: []ast.Join{
  776. {
  777. Name: "tableInPlanner",
  778. Alias: "",
  779. JoinType: ast.INNER_JOIN,
  780. Expr: &ast.BinaryExpr{
  781. OP: ast.AND,
  782. LHS: &ast.BinaryExpr{
  783. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  784. OP: ast.EQ,
  785. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  786. },
  787. RHS: &ast.BinaryExpr{
  788. RHS: &ast.BinaryExpr{
  789. OP: ast.AND,
  790. LHS: &ast.BinaryExpr{
  791. OP: ast.GT,
  792. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  793. RHS: &ast.IntegerLiteral{Val: 20},
  794. },
  795. RHS: &ast.BinaryExpr{
  796. OP: ast.LT,
  797. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  798. RHS: &ast.IntegerLiteral{Val: 60},
  799. },
  800. },
  801. OP: ast.AND,
  802. LHS: &ast.BinaryExpr{
  803. OP: ast.GT,
  804. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  805. RHS: &ast.IntegerLiteral{Val: 111},
  806. },
  807. },
  808. },
  809. },
  810. },
  811. }.Init(),
  812. },
  813. },
  814. fields: []ast.Field{
  815. {
  816. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  817. Name: "id1",
  818. AName: ""},
  819. },
  820. isAggregate: false,
  821. sendMeta: false,
  822. }.Init(),
  823. }, { // 10 meta
  824. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  825. p: ProjectPlan{
  826. baseLogicalPlan: baseLogicalPlan{
  827. children: []LogicalPlan{
  828. FilterPlan{
  829. baseLogicalPlan: baseLogicalPlan{
  830. children: []LogicalPlan{
  831. DataSourcePlan{
  832. name: "src1",
  833. streamFields: map[string]*ast.JsonStreamField{
  834. "temp": {
  835. Type: "bigint",
  836. },
  837. },
  838. streamStmt: streams["src1"],
  839. metaFields: []string{"Humidity", "device", "id"},
  840. }.Init(),
  841. },
  842. },
  843. condition: &ast.BinaryExpr{
  844. LHS: &ast.Call{
  845. Name: "meta",
  846. FuncId: 2,
  847. Args: []ast.Expr{&ast.MetaRef{
  848. Name: "device",
  849. StreamName: ast.DefaultStream,
  850. }},
  851. },
  852. OP: ast.EQ,
  853. RHS: &ast.StringLiteral{
  854. Val: "demo2",
  855. },
  856. },
  857. }.Init(),
  858. },
  859. },
  860. fields: []ast.Field{
  861. {
  862. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  863. Name: "temp",
  864. AName: "",
  865. }, {
  866. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  867. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  868. Name: "id",
  869. StreamName: ast.DefaultStream,
  870. }}},
  871. []ast.StreamName{},
  872. nil,
  873. )},
  874. Name: "meta",
  875. AName: "eid",
  876. }, {
  877. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  878. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  879. &ast.BinaryExpr{
  880. OP: ast.ARROW,
  881. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  882. RHS: &ast.JsonFieldRef{Name: "Device"},
  883. },
  884. }},
  885. []ast.StreamName{},
  886. nil,
  887. )},
  888. Name: "meta",
  889. AName: "hdevice",
  890. },
  891. },
  892. isAggregate: false,
  893. sendMeta: false,
  894. }.Init(),
  895. }, { // 11 join with same name field and aliased
  896. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  897. p: ProjectPlan{
  898. baseLogicalPlan: baseLogicalPlan{
  899. children: []LogicalPlan{
  900. JoinPlan{
  901. baseLogicalPlan: baseLogicalPlan{
  902. children: []LogicalPlan{
  903. JoinAlignPlan{
  904. baseLogicalPlan: baseLogicalPlan{
  905. children: []LogicalPlan{
  906. DataSourcePlan{
  907. name: "src2",
  908. streamFields: map[string]*ast.JsonStreamField{
  909. "hum": {
  910. Type: "bigint",
  911. },
  912. "id2": {
  913. Type: "bigint",
  914. },
  915. },
  916. streamStmt: streams["src2"],
  917. metaFields: []string{},
  918. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  919. }.Init(),
  920. DataSourcePlan{
  921. name: "tableInPlanner",
  922. streamFields: map[string]*ast.JsonStreamField{
  923. "hum": {
  924. Type: "bigint",
  925. },
  926. "id": {
  927. Type: "bigint",
  928. },
  929. },
  930. streamStmt: streams["tableInPlanner"],
  931. metaFields: []string{},
  932. }.Init(),
  933. },
  934. },
  935. Emitters: []string{"tableInPlanner"},
  936. }.Init(),
  937. },
  938. },
  939. from: &ast.Table{
  940. Name: "src2",
  941. },
  942. joins: []ast.Join{
  943. {
  944. Name: "tableInPlanner",
  945. Alias: "",
  946. JoinType: ast.INNER_JOIN,
  947. Expr: &ast.BinaryExpr{
  948. RHS: &ast.BinaryExpr{
  949. OP: ast.EQ,
  950. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  951. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  952. },
  953. OP: ast.AND,
  954. LHS: &ast.BinaryExpr{
  955. OP: ast.GT,
  956. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  957. &ast.FieldRef{
  958. Name: "hum",
  959. StreamName: "src2",
  960. },
  961. []ast.StreamName{"src2"},
  962. &boolFalse,
  963. )},
  964. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  965. &ast.FieldRef{
  966. Name: "hum",
  967. StreamName: "tableInPlanner",
  968. },
  969. []ast.StreamName{"tableInPlanner"},
  970. &boolFalse,
  971. )},
  972. },
  973. },
  974. },
  975. },
  976. }.Init(),
  977. },
  978. },
  979. fields: []ast.Field{
  980. {
  981. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  982. &ast.FieldRef{
  983. Name: "hum",
  984. StreamName: "src2",
  985. },
  986. []ast.StreamName{"src2"},
  987. &boolFalse,
  988. )},
  989. Name: "hum",
  990. AName: "hum1",
  991. }, {
  992. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  993. &ast.FieldRef{
  994. Name: "hum",
  995. StreamName: "tableInPlanner",
  996. },
  997. []ast.StreamName{"tableInPlanner"},
  998. &boolFalse,
  999. )},
  1000. Name: "hum",
  1001. AName: "hum2",
  1002. },
  1003. },
  1004. isAggregate: false,
  1005. sendMeta: false,
  1006. }.Init(),
  1007. }, { // 12 meta with more fields
  1008. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1009. p: ProjectPlan{
  1010. baseLogicalPlan: baseLogicalPlan{
  1011. children: []LogicalPlan{
  1012. FilterPlan{
  1013. baseLogicalPlan: baseLogicalPlan{
  1014. children: []LogicalPlan{
  1015. DataSourcePlan{
  1016. name: "src1",
  1017. streamFields: map[string]*ast.JsonStreamField{
  1018. "temp": {
  1019. Type: "bigint",
  1020. },
  1021. },
  1022. streamStmt: streams["src1"],
  1023. metaFields: []string{},
  1024. allMeta: true,
  1025. }.Init(),
  1026. },
  1027. },
  1028. condition: &ast.BinaryExpr{
  1029. LHS: &ast.Call{
  1030. Name: "meta",
  1031. FuncId: 1,
  1032. Args: []ast.Expr{&ast.MetaRef{
  1033. Name: "device",
  1034. StreamName: ast.DefaultStream,
  1035. }},
  1036. },
  1037. OP: ast.EQ,
  1038. RHS: &ast.StringLiteral{
  1039. Val: "demo2",
  1040. },
  1041. },
  1042. }.Init(),
  1043. },
  1044. },
  1045. fields: []ast.Field{
  1046. {
  1047. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1048. Name: "temp",
  1049. AName: "",
  1050. }, {
  1051. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1052. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1053. Name: "*",
  1054. StreamName: ast.DefaultStream,
  1055. }}},
  1056. []ast.StreamName{},
  1057. nil,
  1058. )},
  1059. Name: "meta",
  1060. AName: "m",
  1061. },
  1062. },
  1063. isAggregate: false,
  1064. sendMeta: false,
  1065. }.Init(),
  1066. }, { // 13 analytic function plan
  1067. sql: `SELECT latest(lag(name)), id1 FROM src1 WHERE lag(temp) > temp`,
  1068. p: ProjectPlan{
  1069. baseLogicalPlan: baseLogicalPlan{
  1070. children: []LogicalPlan{
  1071. FilterPlan{
  1072. baseLogicalPlan: baseLogicalPlan{
  1073. children: []LogicalPlan{
  1074. AnalyticFuncsPlan{
  1075. baseLogicalPlan: baseLogicalPlan{
  1076. children: []LogicalPlan{
  1077. DataSourcePlan{
  1078. name: "src1",
  1079. streamFields: map[string]*ast.JsonStreamField{
  1080. "id1": {
  1081. Type: "bigint",
  1082. },
  1083. "name": {
  1084. Type: "string",
  1085. },
  1086. "temp": {
  1087. Type: "bigint",
  1088. },
  1089. },
  1090. streamStmt: streams["src1"],
  1091. metaFields: []string{},
  1092. }.Init(),
  1093. },
  1094. },
  1095. funcs: []*ast.Call{
  1096. {
  1097. Name: "lag",
  1098. FuncId: 2,
  1099. CachedField: "$$a_lag_2",
  1100. Args: []ast.Expr{&ast.FieldRef{
  1101. Name: "temp",
  1102. StreamName: "src1",
  1103. }},
  1104. },
  1105. {
  1106. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1107. },
  1108. {
  1109. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1110. },
  1111. },
  1112. }.Init(),
  1113. },
  1114. },
  1115. condition: &ast.BinaryExpr{
  1116. LHS: &ast.Call{
  1117. Name: "lag",
  1118. FuncId: 2,
  1119. Args: []ast.Expr{&ast.FieldRef{
  1120. Name: "temp",
  1121. StreamName: "src1",
  1122. }},
  1123. CachedField: "$$a_lag_2",
  1124. Cached: true,
  1125. },
  1126. OP: ast.GT,
  1127. RHS: &ast.FieldRef{
  1128. Name: "temp",
  1129. StreamName: "src1",
  1130. },
  1131. },
  1132. }.Init(),
  1133. },
  1134. },
  1135. fields: []ast.Field{
  1136. {
  1137. Expr: &ast.Call{
  1138. Name: "latest",
  1139. FuncId: 1,
  1140. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1141. CachedField: "$$a_latest_1",
  1142. Cached: true,
  1143. },
  1144. Name: "latest",
  1145. }, {
  1146. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1147. Name: "id1",
  1148. },
  1149. },
  1150. isAggregate: false,
  1151. sendMeta: false,
  1152. }.Init(),
  1153. },
  1154. { // 14
  1155. sql: `SELECT name, *, meta(device) FROM src1`,
  1156. p: ProjectPlan{
  1157. baseLogicalPlan: baseLogicalPlan{
  1158. children: []LogicalPlan{
  1159. DataSourcePlan{
  1160. baseLogicalPlan: baseLogicalPlan{},
  1161. name: "src1",
  1162. streamFields: map[string]*ast.JsonStreamField{
  1163. "id1": {
  1164. Type: "bigint",
  1165. },
  1166. "temp": {
  1167. Type: "bigint",
  1168. },
  1169. "name": {
  1170. Type: "string",
  1171. },
  1172. "myarray": {
  1173. Type: "array",
  1174. Items: &ast.JsonStreamField{
  1175. Type: "string",
  1176. },
  1177. },
  1178. },
  1179. streamStmt: streams["src1"],
  1180. metaFields: []string{"device"},
  1181. isWildCard: true,
  1182. }.Init(),
  1183. },
  1184. },
  1185. fields: []ast.Field{
  1186. {
  1187. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1188. Name: "name",
  1189. AName: "",
  1190. },
  1191. {
  1192. Name: "*",
  1193. Expr: &ast.Wildcard{
  1194. Token: ast.ASTERISK,
  1195. },
  1196. },
  1197. {
  1198. Name: "meta",
  1199. Expr: &ast.Call{
  1200. Name: "meta",
  1201. Args: []ast.Expr{
  1202. &ast.MetaRef{
  1203. StreamName: ast.DefaultStream,
  1204. Name: "device",
  1205. },
  1206. },
  1207. },
  1208. },
  1209. },
  1210. isAggregate: false,
  1211. allWildcard: true,
  1212. sendMeta: false,
  1213. }.Init(),
  1214. },
  1215. { // 15 analytic function over partition plan
  1216. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp), id1 FROM src1 WHERE lag(temp) > temp`,
  1217. p: ProjectPlan{
  1218. baseLogicalPlan: baseLogicalPlan{
  1219. children: []LogicalPlan{
  1220. FilterPlan{
  1221. baseLogicalPlan: baseLogicalPlan{
  1222. children: []LogicalPlan{
  1223. AnalyticFuncsPlan{
  1224. baseLogicalPlan: baseLogicalPlan{
  1225. children: []LogicalPlan{
  1226. DataSourcePlan{
  1227. name: "src1",
  1228. streamFields: map[string]*ast.JsonStreamField{
  1229. "id1": {
  1230. Type: "bigint",
  1231. },
  1232. "name": {
  1233. Type: "string",
  1234. },
  1235. "temp": {
  1236. Type: "bigint",
  1237. },
  1238. },
  1239. streamStmt: streams["src1"],
  1240. metaFields: []string{},
  1241. }.Init(),
  1242. },
  1243. },
  1244. funcs: []*ast.Call{
  1245. {
  1246. Name: "lag",
  1247. FuncId: 2,
  1248. CachedField: "$$a_lag_2",
  1249. Args: []ast.Expr{&ast.FieldRef{
  1250. Name: "temp",
  1251. StreamName: "src1",
  1252. }},
  1253. },
  1254. {
  1255. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}},
  1256. },
  1257. {
  1258. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1259. },
  1260. },
  1261. }.Init(),
  1262. },
  1263. },
  1264. condition: &ast.BinaryExpr{
  1265. LHS: &ast.Call{
  1266. Name: "lag",
  1267. FuncId: 2,
  1268. Args: []ast.Expr{&ast.FieldRef{
  1269. Name: "temp",
  1270. StreamName: "src1",
  1271. }},
  1272. CachedField: "$$a_lag_2",
  1273. Cached: true,
  1274. },
  1275. OP: ast.GT,
  1276. RHS: &ast.FieldRef{
  1277. Name: "temp",
  1278. StreamName: "src1",
  1279. },
  1280. },
  1281. }.Init(),
  1282. },
  1283. },
  1284. fields: []ast.Field{
  1285. {
  1286. Expr: &ast.Call{
  1287. Name: "latest",
  1288. FuncId: 1,
  1289. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1290. CachedField: "$$a_latest_1",
  1291. Cached: true,
  1292. Partition: &ast.PartitionExpr{
  1293. Exprs: []ast.Expr{
  1294. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1295. },
  1296. },
  1297. },
  1298. Name: "latest",
  1299. }, {
  1300. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1301. Name: "id1",
  1302. },
  1303. },
  1304. isAggregate: false,
  1305. sendMeta: false,
  1306. }.Init(),
  1307. },
  1308. { // 16 analytic function over partition when plan
  1309. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp WHEN temp > 12), id1 FROM src1 WHERE lag(temp) > temp`,
  1310. p: ProjectPlan{
  1311. baseLogicalPlan: baseLogicalPlan{
  1312. children: []LogicalPlan{
  1313. FilterPlan{
  1314. baseLogicalPlan: baseLogicalPlan{
  1315. children: []LogicalPlan{
  1316. AnalyticFuncsPlan{
  1317. baseLogicalPlan: baseLogicalPlan{
  1318. children: []LogicalPlan{
  1319. DataSourcePlan{
  1320. name: "src1",
  1321. streamFields: map[string]*ast.JsonStreamField{
  1322. "id1": {
  1323. Type: "bigint",
  1324. },
  1325. "name": {
  1326. Type: "string",
  1327. },
  1328. "temp": {
  1329. Type: "bigint",
  1330. },
  1331. },
  1332. streamStmt: streams["src1"],
  1333. metaFields: []string{},
  1334. }.Init(),
  1335. },
  1336. },
  1337. funcs: []*ast.Call{
  1338. {
  1339. Name: "lag",
  1340. FuncId: 2,
  1341. CachedField: "$$a_lag_2",
  1342. Args: []ast.Expr{&ast.FieldRef{
  1343. Name: "temp",
  1344. StreamName: "src1",
  1345. }},
  1346. },
  1347. {
  1348. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}}, WhenExpr: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"}, OP: ast.GT, RHS: &ast.IntegerLiteral{Val: 12}},
  1349. },
  1350. {
  1351. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1352. },
  1353. },
  1354. }.Init(),
  1355. },
  1356. },
  1357. condition: &ast.BinaryExpr{
  1358. LHS: &ast.Call{
  1359. Name: "lag",
  1360. FuncId: 2,
  1361. Args: []ast.Expr{&ast.FieldRef{
  1362. Name: "temp",
  1363. StreamName: "src1",
  1364. }},
  1365. CachedField: "$$a_lag_2",
  1366. Cached: true,
  1367. },
  1368. OP: ast.GT,
  1369. RHS: &ast.FieldRef{
  1370. Name: "temp",
  1371. StreamName: "src1",
  1372. },
  1373. },
  1374. }.Init(),
  1375. },
  1376. },
  1377. fields: []ast.Field{
  1378. {
  1379. Expr: &ast.Call{
  1380. Name: "latest",
  1381. FuncId: 1,
  1382. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1383. CachedField: "$$a_latest_1",
  1384. Cached: true,
  1385. Partition: &ast.PartitionExpr{
  1386. Exprs: []ast.Expr{
  1387. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1388. },
  1389. },
  1390. WhenExpr: &ast.BinaryExpr{
  1391. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1392. OP: ast.GT,
  1393. RHS: &ast.IntegerLiteral{Val: 12},
  1394. },
  1395. },
  1396. Name: "latest",
  1397. }, {
  1398. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1399. Name: "id1",
  1400. },
  1401. },
  1402. isAggregate: false,
  1403. sendMeta: false,
  1404. }.Init(),
  1405. }, { // 17. do not optimize sliding window
  1406. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY SLIDINGWINDOW(ss, 10) HAVING COUNT(*) > 2`,
  1407. p: ProjectPlan{
  1408. baseLogicalPlan: baseLogicalPlan{
  1409. children: []LogicalPlan{
  1410. HavingPlan{
  1411. baseLogicalPlan: baseLogicalPlan{
  1412. children: []LogicalPlan{
  1413. FilterPlan{
  1414. baseLogicalPlan: baseLogicalPlan{
  1415. children: []LogicalPlan{
  1416. WindowPlan{
  1417. baseLogicalPlan: baseLogicalPlan{
  1418. children: []LogicalPlan{
  1419. DataSourcePlan{
  1420. name: "src1",
  1421. isWildCard: true,
  1422. streamFields: map[string]*ast.JsonStreamField{
  1423. "id1": {
  1424. Type: "bigint",
  1425. },
  1426. "temp": {
  1427. Type: "bigint",
  1428. },
  1429. "name": {
  1430. Type: "string",
  1431. },
  1432. "myarray": {
  1433. Type: "array",
  1434. Items: &ast.JsonStreamField{
  1435. Type: "string",
  1436. },
  1437. },
  1438. },
  1439. streamStmt: streams["src1"],
  1440. metaFields: []string{},
  1441. }.Init(),
  1442. },
  1443. },
  1444. condition: nil,
  1445. wtype: ast.SLIDING_WINDOW,
  1446. length: 10000,
  1447. interval: 0,
  1448. limit: 0,
  1449. }.Init(),
  1450. },
  1451. },
  1452. condition: &ast.BinaryExpr{
  1453. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1454. OP: ast.GT,
  1455. RHS: &ast.IntegerLiteral{Val: 20},
  1456. },
  1457. }.Init(),
  1458. },
  1459. },
  1460. condition: &ast.BinaryExpr{
  1461. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1462. Token: ast.ASTERISK,
  1463. }}, FuncType: ast.FuncTypeAgg},
  1464. OP: ast.GT,
  1465. RHS: &ast.IntegerLiteral{Val: 2},
  1466. },
  1467. }.Init(),
  1468. },
  1469. },
  1470. fields: []ast.Field{
  1471. {
  1472. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1473. Name: "*",
  1474. AName: ""},
  1475. },
  1476. isAggregate: false,
  1477. sendMeta: false,
  1478. }.Init(),
  1479. },
  1480. }
  1481. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1482. for i, tt := range tests {
  1483. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1484. if err != nil {
  1485. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1486. continue
  1487. }
  1488. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1489. IsEventTime: false,
  1490. LateTol: 0,
  1491. Concurrency: 0,
  1492. BufferLength: 0,
  1493. SendMetaToSink: false,
  1494. Qos: 0,
  1495. CheckpointInterval: 0,
  1496. SendError: true,
  1497. }, store)
  1498. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1499. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1500. } else if !reflect.DeepEqual(tt.p, p) {
  1501. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1502. }
  1503. }
  1504. }
  1505. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1506. err, store := store.GetKV("stream")
  1507. if err != nil {
  1508. t.Error(err)
  1509. return
  1510. }
  1511. streamSqls := map[string]string{
  1512. "src1": `CREATE STREAM src1 (
  1513. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1514. "src2": `CREATE STREAM src2 (
  1515. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1516. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1517. id BIGINT,
  1518. name STRING,
  1519. value STRING,
  1520. hum BIGINT
  1521. ) WITH (TYPE="file");`,
  1522. }
  1523. types := map[string]ast.StreamType{
  1524. "src1": ast.TypeStream,
  1525. "src2": ast.TypeStream,
  1526. "tableInPlanner": ast.TypeTable,
  1527. }
  1528. for name, sql := range streamSqls {
  1529. s, err := json.Marshal(&xsql.StreamInfo{
  1530. StreamType: types[name],
  1531. Statement: sql,
  1532. })
  1533. if err != nil {
  1534. t.Error(err)
  1535. t.Fail()
  1536. }
  1537. err = store.Set(name, string(s))
  1538. if err != nil {
  1539. t.Error(err)
  1540. t.Fail()
  1541. }
  1542. }
  1543. streams := make(map[string]*ast.StreamStmt)
  1544. for n := range streamSqls {
  1545. streamStmt, err := xsql.GetDataSource(store, n)
  1546. if err != nil {
  1547. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1548. return
  1549. }
  1550. streams[n] = streamStmt
  1551. }
  1552. var (
  1553. //boolTrue = true
  1554. boolFalse = false
  1555. )
  1556. var tests = []struct {
  1557. sql string
  1558. p LogicalPlan
  1559. err string
  1560. }{
  1561. { // 0
  1562. sql: `SELECT name FROM src1`,
  1563. p: ProjectPlan{
  1564. baseLogicalPlan: baseLogicalPlan{
  1565. children: []LogicalPlan{
  1566. DataSourcePlan{
  1567. baseLogicalPlan: baseLogicalPlan{},
  1568. name: "src1",
  1569. streamFields: map[string]*ast.JsonStreamField{
  1570. "name": nil,
  1571. },
  1572. streamStmt: streams["src1"],
  1573. isSchemaless: true,
  1574. metaFields: []string{},
  1575. }.Init(),
  1576. },
  1577. },
  1578. fields: []ast.Field{
  1579. {
  1580. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1581. Name: "name",
  1582. AName: "",
  1583. },
  1584. },
  1585. isAggregate: false,
  1586. sendMeta: false,
  1587. }.Init(),
  1588. }, { // 1 optimize where to data source
  1589. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1590. p: ProjectPlan{
  1591. baseLogicalPlan: baseLogicalPlan{
  1592. children: []LogicalPlan{
  1593. WindowPlan{
  1594. baseLogicalPlan: baseLogicalPlan{
  1595. children: []LogicalPlan{
  1596. FilterPlan{
  1597. baseLogicalPlan: baseLogicalPlan{
  1598. children: []LogicalPlan{
  1599. DataSourcePlan{
  1600. name: "src1",
  1601. streamFields: map[string]*ast.JsonStreamField{
  1602. "name": nil,
  1603. "temp": nil,
  1604. },
  1605. streamStmt: streams["src1"],
  1606. metaFields: []string{},
  1607. isSchemaless: true,
  1608. }.Init(),
  1609. },
  1610. },
  1611. condition: &ast.BinaryExpr{
  1612. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1613. OP: ast.EQ,
  1614. RHS: &ast.StringLiteral{Val: "v1"},
  1615. },
  1616. }.Init(),
  1617. },
  1618. },
  1619. condition: nil,
  1620. wtype: ast.TUMBLING_WINDOW,
  1621. length: 10000,
  1622. interval: 0,
  1623. limit: 0,
  1624. }.Init(),
  1625. },
  1626. },
  1627. fields: []ast.Field{
  1628. {
  1629. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1630. Name: "temp",
  1631. AName: ""},
  1632. },
  1633. isAggregate: false,
  1634. sendMeta: false,
  1635. }.Init(),
  1636. }, { // 2 condition that cannot be optimized
  1637. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1638. p: ProjectPlan{
  1639. baseLogicalPlan: baseLogicalPlan{
  1640. children: []LogicalPlan{
  1641. JoinPlan{
  1642. baseLogicalPlan: baseLogicalPlan{
  1643. children: []LogicalPlan{
  1644. WindowPlan{
  1645. baseLogicalPlan: baseLogicalPlan{
  1646. children: []LogicalPlan{
  1647. DataSourcePlan{
  1648. name: "src1",
  1649. streamFields: map[string]*ast.JsonStreamField{
  1650. "id1": nil,
  1651. "temp": nil,
  1652. },
  1653. streamStmt: streams["src1"],
  1654. metaFields: []string{},
  1655. isSchemaless: true,
  1656. }.Init(),
  1657. DataSourcePlan{
  1658. name: "src2",
  1659. streamFields: map[string]*ast.JsonStreamField{ // can't determine where is id1 belonged to
  1660. "hum": nil,
  1661. "id1": nil,
  1662. "id2": nil,
  1663. },
  1664. isSchemaless: true,
  1665. streamStmt: streams["src2"],
  1666. metaFields: []string{},
  1667. }.Init(),
  1668. },
  1669. },
  1670. condition: nil,
  1671. wtype: ast.TUMBLING_WINDOW,
  1672. length: 10000,
  1673. interval: 0,
  1674. limit: 0,
  1675. }.Init(),
  1676. },
  1677. },
  1678. from: &ast.Table{Name: "src1"},
  1679. joins: ast.Joins{ast.Join{
  1680. Name: "src2",
  1681. JoinType: ast.INNER_JOIN,
  1682. Expr: &ast.BinaryExpr{
  1683. OP: ast.AND,
  1684. LHS: &ast.BinaryExpr{
  1685. LHS: &ast.BinaryExpr{
  1686. OP: ast.GT,
  1687. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1688. RHS: &ast.IntegerLiteral{Val: 20},
  1689. },
  1690. OP: ast.OR,
  1691. RHS: &ast.BinaryExpr{
  1692. OP: ast.GT,
  1693. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1694. RHS: &ast.IntegerLiteral{Val: 60},
  1695. },
  1696. },
  1697. RHS: &ast.BinaryExpr{
  1698. OP: ast.EQ,
  1699. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1700. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1701. },
  1702. },
  1703. }},
  1704. }.Init(),
  1705. },
  1706. },
  1707. fields: []ast.Field{
  1708. {
  1709. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1710. Name: "id1",
  1711. AName: ""},
  1712. },
  1713. isAggregate: false,
  1714. sendMeta: false,
  1715. }.Init(),
  1716. }, { // 3 optimize window filter
  1717. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1718. p: ProjectPlan{
  1719. baseLogicalPlan: baseLogicalPlan{
  1720. children: []LogicalPlan{
  1721. WindowPlan{
  1722. baseLogicalPlan: baseLogicalPlan{
  1723. children: []LogicalPlan{
  1724. FilterPlan{
  1725. baseLogicalPlan: baseLogicalPlan{
  1726. children: []LogicalPlan{
  1727. DataSourcePlan{
  1728. name: "src1",
  1729. streamFields: map[string]*ast.JsonStreamField{
  1730. "id1": nil,
  1731. "name": nil,
  1732. "temp": nil,
  1733. },
  1734. isSchemaless: true,
  1735. streamStmt: streams["src1"],
  1736. metaFields: []string{},
  1737. }.Init(),
  1738. },
  1739. },
  1740. condition: &ast.BinaryExpr{
  1741. OP: ast.AND,
  1742. LHS: &ast.BinaryExpr{
  1743. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1744. OP: ast.EQ,
  1745. RHS: &ast.StringLiteral{Val: "v1"},
  1746. },
  1747. RHS: &ast.BinaryExpr{
  1748. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1749. OP: ast.GT,
  1750. RHS: &ast.IntegerLiteral{Val: 2},
  1751. },
  1752. },
  1753. }.Init(),
  1754. },
  1755. },
  1756. condition: nil,
  1757. wtype: ast.TUMBLING_WINDOW,
  1758. length: 10000,
  1759. interval: 0,
  1760. limit: 0,
  1761. }.Init(),
  1762. },
  1763. },
  1764. fields: []ast.Field{
  1765. {
  1766. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1767. Name: "id1",
  1768. AName: ""},
  1769. },
  1770. isAggregate: false,
  1771. sendMeta: false,
  1772. }.Init(),
  1773. }, { // 4. do not optimize count window
  1774. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1775. p: ProjectPlan{
  1776. baseLogicalPlan: baseLogicalPlan{
  1777. children: []LogicalPlan{
  1778. HavingPlan{
  1779. baseLogicalPlan: baseLogicalPlan{
  1780. children: []LogicalPlan{
  1781. FilterPlan{
  1782. baseLogicalPlan: baseLogicalPlan{
  1783. children: []LogicalPlan{
  1784. WindowPlan{
  1785. baseLogicalPlan: baseLogicalPlan{
  1786. children: []LogicalPlan{
  1787. DataSourcePlan{
  1788. name: "src1",
  1789. isWildCard: true,
  1790. streamFields: map[string]*ast.JsonStreamField{},
  1791. streamStmt: streams["src1"],
  1792. metaFields: []string{},
  1793. isSchemaless: true,
  1794. }.Init(),
  1795. },
  1796. },
  1797. condition: nil,
  1798. wtype: ast.COUNT_WINDOW,
  1799. length: 5,
  1800. interval: 1,
  1801. limit: 0,
  1802. }.Init(),
  1803. },
  1804. },
  1805. condition: &ast.BinaryExpr{
  1806. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1807. OP: ast.GT,
  1808. RHS: &ast.IntegerLiteral{Val: 20},
  1809. },
  1810. }.Init(),
  1811. },
  1812. },
  1813. condition: &ast.BinaryExpr{
  1814. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1815. Token: ast.ASTERISK,
  1816. }}, FuncType: ast.FuncTypeAgg},
  1817. OP: ast.GT,
  1818. RHS: &ast.IntegerLiteral{Val: 2},
  1819. },
  1820. }.Init(),
  1821. },
  1822. },
  1823. fields: []ast.Field{
  1824. {
  1825. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1826. Name: "*",
  1827. AName: ""},
  1828. },
  1829. isAggregate: false,
  1830. sendMeta: false,
  1831. }.Init(),
  1832. }, { // 5. optimize join on
  1833. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1834. p: ProjectPlan{
  1835. baseLogicalPlan: baseLogicalPlan{
  1836. children: []LogicalPlan{
  1837. JoinPlan{
  1838. baseLogicalPlan: baseLogicalPlan{
  1839. children: []LogicalPlan{
  1840. WindowPlan{
  1841. baseLogicalPlan: baseLogicalPlan{
  1842. children: []LogicalPlan{
  1843. FilterPlan{
  1844. baseLogicalPlan: baseLogicalPlan{
  1845. children: []LogicalPlan{
  1846. DataSourcePlan{
  1847. name: "src1",
  1848. streamFields: map[string]*ast.JsonStreamField{
  1849. "id1": nil,
  1850. "temp": nil,
  1851. },
  1852. isSchemaless: true,
  1853. streamStmt: streams["src1"],
  1854. metaFields: []string{},
  1855. }.Init(),
  1856. },
  1857. },
  1858. condition: &ast.BinaryExpr{
  1859. RHS: &ast.BinaryExpr{
  1860. OP: ast.GT,
  1861. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1862. RHS: &ast.IntegerLiteral{Val: 20},
  1863. },
  1864. OP: ast.AND,
  1865. LHS: &ast.BinaryExpr{
  1866. OP: ast.GT,
  1867. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1868. RHS: &ast.IntegerLiteral{Val: 111},
  1869. },
  1870. },
  1871. }.Init(),
  1872. FilterPlan{
  1873. baseLogicalPlan: baseLogicalPlan{
  1874. children: []LogicalPlan{
  1875. DataSourcePlan{
  1876. name: "src2",
  1877. streamFields: map[string]*ast.JsonStreamField{
  1878. "hum": nil,
  1879. "id1": nil,
  1880. "id2": nil,
  1881. },
  1882. isSchemaless: true,
  1883. streamStmt: streams["src2"],
  1884. metaFields: []string{},
  1885. }.Init(),
  1886. },
  1887. },
  1888. condition: &ast.BinaryExpr{
  1889. OP: ast.LT,
  1890. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1891. RHS: &ast.IntegerLiteral{Val: 60},
  1892. },
  1893. }.Init(),
  1894. },
  1895. },
  1896. condition: nil,
  1897. wtype: ast.TUMBLING_WINDOW,
  1898. length: 10000,
  1899. interval: 0,
  1900. limit: 0,
  1901. }.Init(),
  1902. },
  1903. },
  1904. from: &ast.Table{
  1905. Name: "src1",
  1906. },
  1907. joins: []ast.Join{
  1908. {
  1909. Name: "src2",
  1910. Alias: "",
  1911. JoinType: ast.INNER_JOIN,
  1912. Expr: &ast.BinaryExpr{
  1913. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1914. OP: ast.EQ,
  1915. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1916. },
  1917. },
  1918. },
  1919. }.Init(),
  1920. },
  1921. },
  1922. fields: []ast.Field{
  1923. {
  1924. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1925. Name: "id1",
  1926. AName: ""},
  1927. },
  1928. isAggregate: false,
  1929. sendMeta: false,
  1930. }.Init(),
  1931. }, { // 6. optimize outter join on
  1932. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1933. p: ProjectPlan{
  1934. baseLogicalPlan: baseLogicalPlan{
  1935. children: []LogicalPlan{
  1936. JoinPlan{
  1937. baseLogicalPlan: baseLogicalPlan{
  1938. children: []LogicalPlan{
  1939. WindowPlan{
  1940. baseLogicalPlan: baseLogicalPlan{
  1941. children: []LogicalPlan{
  1942. FilterPlan{
  1943. baseLogicalPlan: baseLogicalPlan{
  1944. children: []LogicalPlan{
  1945. DataSourcePlan{
  1946. name: "src1",
  1947. streamFields: map[string]*ast.JsonStreamField{
  1948. "id1": nil,
  1949. "temp": nil,
  1950. },
  1951. isSchemaless: true,
  1952. streamStmt: streams["src1"],
  1953. metaFields: []string{},
  1954. }.Init(),
  1955. },
  1956. },
  1957. condition: &ast.BinaryExpr{
  1958. OP: ast.GT,
  1959. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1960. RHS: &ast.IntegerLiteral{Val: 111},
  1961. },
  1962. }.Init(),
  1963. DataSourcePlan{
  1964. name: "src2",
  1965. streamFields: map[string]*ast.JsonStreamField{
  1966. "hum": nil,
  1967. "id1": nil,
  1968. "id2": nil,
  1969. },
  1970. isSchemaless: true,
  1971. streamStmt: streams["src2"],
  1972. metaFields: []string{},
  1973. }.Init(),
  1974. },
  1975. },
  1976. condition: nil,
  1977. wtype: ast.TUMBLING_WINDOW,
  1978. length: 10000,
  1979. interval: 0,
  1980. limit: 0,
  1981. }.Init(),
  1982. },
  1983. },
  1984. from: &ast.Table{
  1985. Name: "src1",
  1986. },
  1987. joins: []ast.Join{
  1988. {
  1989. Name: "src2",
  1990. Alias: "",
  1991. JoinType: ast.FULL_JOIN,
  1992. Expr: &ast.BinaryExpr{
  1993. OP: ast.AND,
  1994. LHS: &ast.BinaryExpr{
  1995. OP: ast.AND,
  1996. LHS: &ast.BinaryExpr{
  1997. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1998. OP: ast.EQ,
  1999. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2000. },
  2001. RHS: &ast.BinaryExpr{
  2002. OP: ast.GT,
  2003. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2004. RHS: &ast.IntegerLiteral{Val: 20},
  2005. },
  2006. },
  2007. RHS: &ast.BinaryExpr{
  2008. OP: ast.LT,
  2009. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2010. RHS: &ast.IntegerLiteral{Val: 60},
  2011. },
  2012. },
  2013. },
  2014. },
  2015. }.Init(),
  2016. },
  2017. },
  2018. fields: []ast.Field{
  2019. {
  2020. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2021. Name: "id1",
  2022. AName: ""},
  2023. },
  2024. isAggregate: false,
  2025. sendMeta: false,
  2026. }.Init(),
  2027. }, { // 7 window error for table
  2028. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2029. p: nil,
  2030. err: "cannot run window for TABLE sources",
  2031. }, { // 8 join table without window
  2032. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  2033. p: ProjectPlan{
  2034. baseLogicalPlan: baseLogicalPlan{
  2035. children: []LogicalPlan{
  2036. JoinPlan{
  2037. baseLogicalPlan: baseLogicalPlan{
  2038. children: []LogicalPlan{
  2039. JoinAlignPlan{
  2040. baseLogicalPlan: baseLogicalPlan{
  2041. children: []LogicalPlan{
  2042. FilterPlan{
  2043. baseLogicalPlan: baseLogicalPlan{
  2044. children: []LogicalPlan{
  2045. DataSourcePlan{
  2046. name: "src1",
  2047. streamFields: map[string]*ast.JsonStreamField{
  2048. "hum": nil,
  2049. "id1": nil,
  2050. "temp": nil,
  2051. },
  2052. isSchemaless: true,
  2053. streamStmt: streams["src1"],
  2054. metaFields: []string{},
  2055. }.Init(),
  2056. },
  2057. },
  2058. condition: &ast.BinaryExpr{
  2059. RHS: &ast.BinaryExpr{
  2060. OP: ast.GT,
  2061. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2062. RHS: &ast.IntegerLiteral{Val: 20},
  2063. },
  2064. OP: ast.AND,
  2065. LHS: &ast.BinaryExpr{
  2066. OP: ast.GT,
  2067. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2068. RHS: &ast.IntegerLiteral{Val: 111},
  2069. },
  2070. },
  2071. }.Init(),
  2072. DataSourcePlan{
  2073. name: "tableInPlanner",
  2074. streamFields: map[string]*ast.JsonStreamField{
  2075. "hum": {
  2076. Type: "bigint",
  2077. },
  2078. "id": {
  2079. Type: "bigint",
  2080. },
  2081. },
  2082. streamStmt: streams["tableInPlanner"],
  2083. metaFields: []string{},
  2084. }.Init(),
  2085. },
  2086. },
  2087. Emitters: []string{"tableInPlanner"},
  2088. }.Init(),
  2089. },
  2090. },
  2091. from: &ast.Table{
  2092. Name: "src1",
  2093. },
  2094. joins: []ast.Join{
  2095. {
  2096. Name: "tableInPlanner",
  2097. Alias: "",
  2098. JoinType: ast.INNER_JOIN,
  2099. Expr: &ast.BinaryExpr{
  2100. OP: ast.AND,
  2101. LHS: &ast.BinaryExpr{
  2102. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2103. OP: ast.EQ,
  2104. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  2105. },
  2106. RHS: &ast.BinaryExpr{
  2107. OP: ast.LT,
  2108. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  2109. RHS: &ast.IntegerLiteral{Val: 60},
  2110. },
  2111. },
  2112. },
  2113. },
  2114. }.Init(),
  2115. },
  2116. },
  2117. fields: []ast.Field{
  2118. {
  2119. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2120. Name: "id1",
  2121. AName: ""},
  2122. },
  2123. isAggregate: false,
  2124. sendMeta: false,
  2125. }.Init(),
  2126. }, { // 9 join table with window
  2127. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2128. p: ProjectPlan{
  2129. baseLogicalPlan: baseLogicalPlan{
  2130. children: []LogicalPlan{
  2131. JoinPlan{
  2132. baseLogicalPlan: baseLogicalPlan{
  2133. children: []LogicalPlan{
  2134. JoinAlignPlan{
  2135. baseLogicalPlan: baseLogicalPlan{
  2136. children: []LogicalPlan{
  2137. WindowPlan{
  2138. baseLogicalPlan: baseLogicalPlan{
  2139. children: []LogicalPlan{
  2140. DataSourcePlan{
  2141. name: "src1",
  2142. streamFields: map[string]*ast.JsonStreamField{
  2143. "id1": nil,
  2144. "temp": nil,
  2145. },
  2146. isSchemaless: true,
  2147. streamStmt: streams["src1"],
  2148. metaFields: []string{},
  2149. }.Init(),
  2150. },
  2151. },
  2152. condition: nil,
  2153. wtype: ast.TUMBLING_WINDOW,
  2154. length: 10000,
  2155. interval: 0,
  2156. limit: 0,
  2157. }.Init(),
  2158. DataSourcePlan{
  2159. name: "tableInPlanner",
  2160. streamFields: map[string]*ast.JsonStreamField{
  2161. "hum": {
  2162. Type: "bigint",
  2163. },
  2164. "id": {
  2165. Type: "bigint",
  2166. },
  2167. },
  2168. streamStmt: streams["tableInPlanner"],
  2169. metaFields: []string{},
  2170. }.Init(),
  2171. },
  2172. },
  2173. Emitters: []string{"tableInPlanner"},
  2174. }.Init(),
  2175. },
  2176. },
  2177. from: &ast.Table{
  2178. Name: "src1",
  2179. },
  2180. joins: []ast.Join{
  2181. {
  2182. Name: "tableInPlanner",
  2183. Alias: "",
  2184. JoinType: ast.INNER_JOIN,
  2185. Expr: &ast.BinaryExpr{
  2186. OP: ast.AND,
  2187. LHS: &ast.BinaryExpr{
  2188. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2189. OP: ast.EQ,
  2190. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  2191. },
  2192. RHS: &ast.BinaryExpr{
  2193. RHS: &ast.BinaryExpr{
  2194. OP: ast.AND,
  2195. LHS: &ast.BinaryExpr{
  2196. OP: ast.GT,
  2197. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2198. RHS: &ast.IntegerLiteral{Val: 20},
  2199. },
  2200. RHS: &ast.BinaryExpr{
  2201. OP: ast.LT,
  2202. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  2203. RHS: &ast.IntegerLiteral{Val: 60},
  2204. },
  2205. },
  2206. OP: ast.AND,
  2207. LHS: &ast.BinaryExpr{
  2208. OP: ast.GT,
  2209. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2210. RHS: &ast.IntegerLiteral{Val: 111},
  2211. },
  2212. },
  2213. },
  2214. },
  2215. },
  2216. }.Init(),
  2217. },
  2218. },
  2219. fields: []ast.Field{
  2220. {
  2221. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2222. Name: "id1",
  2223. AName: ""},
  2224. },
  2225. isAggregate: false,
  2226. sendMeta: false,
  2227. }.Init(),
  2228. }, { // 10 meta
  2229. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  2230. p: ProjectPlan{
  2231. baseLogicalPlan: baseLogicalPlan{
  2232. children: []LogicalPlan{
  2233. FilterPlan{
  2234. baseLogicalPlan: baseLogicalPlan{
  2235. children: []LogicalPlan{
  2236. DataSourcePlan{
  2237. name: "src1",
  2238. streamFields: map[string]*ast.JsonStreamField{
  2239. "temp": nil,
  2240. },
  2241. isSchemaless: true,
  2242. streamStmt: streams["src1"],
  2243. metaFields: []string{"Humidity", "device", "id"},
  2244. }.Init(),
  2245. },
  2246. },
  2247. condition: &ast.BinaryExpr{
  2248. LHS: &ast.Call{
  2249. Name: "meta",
  2250. FuncId: 2,
  2251. Args: []ast.Expr{&ast.MetaRef{
  2252. Name: "device",
  2253. StreamName: ast.DefaultStream,
  2254. }},
  2255. },
  2256. OP: ast.EQ,
  2257. RHS: &ast.StringLiteral{
  2258. Val: "demo2",
  2259. },
  2260. },
  2261. }.Init(),
  2262. },
  2263. },
  2264. fields: []ast.Field{
  2265. {
  2266. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2267. Name: "temp",
  2268. AName: "",
  2269. }, {
  2270. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2271. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  2272. Name: "id",
  2273. StreamName: ast.DefaultStream,
  2274. }}},
  2275. []ast.StreamName{},
  2276. nil,
  2277. )},
  2278. Name: "meta",
  2279. AName: "eid",
  2280. }, {
  2281. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2282. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  2283. &ast.BinaryExpr{
  2284. OP: ast.ARROW,
  2285. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  2286. RHS: &ast.JsonFieldRef{Name: "Device"},
  2287. },
  2288. }},
  2289. []ast.StreamName{},
  2290. nil,
  2291. )},
  2292. Name: "meta",
  2293. AName: "hdevice",
  2294. },
  2295. },
  2296. isAggregate: false,
  2297. sendMeta: false,
  2298. }.Init(),
  2299. }, { // 11 join with same name field and aliased
  2300. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  2301. p: ProjectPlan{
  2302. baseLogicalPlan: baseLogicalPlan{
  2303. children: []LogicalPlan{
  2304. JoinPlan{
  2305. baseLogicalPlan: baseLogicalPlan{
  2306. children: []LogicalPlan{
  2307. JoinAlignPlan{
  2308. baseLogicalPlan: baseLogicalPlan{
  2309. children: []LogicalPlan{
  2310. DataSourcePlan{
  2311. name: "src2",
  2312. streamFields: map[string]*ast.JsonStreamField{
  2313. "hum": nil,
  2314. "id": nil,
  2315. "id2": nil,
  2316. },
  2317. isSchemaless: true,
  2318. streamStmt: streams["src2"],
  2319. metaFields: []string{},
  2320. }.Init(),
  2321. DataSourcePlan{
  2322. name: "tableInPlanner",
  2323. streamFields: map[string]*ast.JsonStreamField{
  2324. "hum": {
  2325. Type: "bigint",
  2326. },
  2327. "id": {
  2328. Type: "bigint",
  2329. },
  2330. },
  2331. streamStmt: streams["tableInPlanner"],
  2332. metaFields: []string{},
  2333. }.Init(),
  2334. },
  2335. },
  2336. Emitters: []string{"tableInPlanner"},
  2337. }.Init(),
  2338. },
  2339. },
  2340. from: &ast.Table{
  2341. Name: "src2",
  2342. },
  2343. joins: []ast.Join{
  2344. {
  2345. Name: "tableInPlanner",
  2346. Alias: "",
  2347. JoinType: ast.INNER_JOIN,
  2348. Expr: &ast.BinaryExpr{
  2349. RHS: &ast.BinaryExpr{
  2350. OP: ast.EQ,
  2351. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  2352. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  2353. },
  2354. OP: ast.AND,
  2355. LHS: &ast.BinaryExpr{
  2356. OP: ast.GT,
  2357. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2358. &ast.FieldRef{
  2359. Name: "hum",
  2360. StreamName: "src2",
  2361. },
  2362. []ast.StreamName{"src2"},
  2363. &boolFalse,
  2364. )},
  2365. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2366. &ast.FieldRef{
  2367. Name: "hum",
  2368. StreamName: "tableInPlanner",
  2369. },
  2370. []ast.StreamName{"tableInPlanner"},
  2371. &boolFalse,
  2372. )},
  2373. },
  2374. },
  2375. },
  2376. },
  2377. }.Init(),
  2378. },
  2379. },
  2380. fields: []ast.Field{
  2381. {
  2382. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2383. &ast.FieldRef{
  2384. Name: "hum",
  2385. StreamName: "src2",
  2386. },
  2387. []ast.StreamName{"src2"},
  2388. &boolFalse,
  2389. )},
  2390. Name: "hum",
  2391. AName: "hum1",
  2392. }, {
  2393. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2394. &ast.FieldRef{
  2395. Name: "hum",
  2396. StreamName: "tableInPlanner",
  2397. },
  2398. []ast.StreamName{"tableInPlanner"},
  2399. &boolFalse,
  2400. )},
  2401. Name: "hum",
  2402. AName: "hum2",
  2403. },
  2404. },
  2405. isAggregate: false,
  2406. sendMeta: false,
  2407. }.Init(),
  2408. }, { // 12
  2409. sql: `SELECT name->first, name->last FROM src1`,
  2410. p: ProjectPlan{
  2411. baseLogicalPlan: baseLogicalPlan{
  2412. children: []LogicalPlan{
  2413. DataSourcePlan{
  2414. baseLogicalPlan: baseLogicalPlan{},
  2415. name: "src1",
  2416. streamFields: map[string]*ast.JsonStreamField{
  2417. "name": nil,
  2418. },
  2419. isSchemaless: true,
  2420. streamStmt: streams["src1"],
  2421. metaFields: []string{},
  2422. }.Init(),
  2423. },
  2424. },
  2425. fields: []ast.Field{
  2426. {
  2427. Expr: &ast.BinaryExpr{
  2428. OP: ast.ARROW,
  2429. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2430. RHS: &ast.JsonFieldRef{Name: "first"},
  2431. },
  2432. Name: "kuiper_field_0",
  2433. AName: "",
  2434. }, {
  2435. Expr: &ast.BinaryExpr{
  2436. OP: ast.ARROW,
  2437. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2438. RHS: &ast.JsonFieldRef{Name: "last"},
  2439. },
  2440. Name: "kuiper_field_1",
  2441. AName: "",
  2442. },
  2443. },
  2444. isAggregate: false,
  2445. sendMeta: false,
  2446. }.Init(),
  2447. },
  2448. }
  2449. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2450. for i, tt := range tests {
  2451. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2452. if err != nil {
  2453. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2454. continue
  2455. }
  2456. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2457. IsEventTime: false,
  2458. LateTol: 0,
  2459. Concurrency: 0,
  2460. BufferLength: 0,
  2461. SendMetaToSink: false,
  2462. Qos: 0,
  2463. CheckpointInterval: 0,
  2464. SendError: true,
  2465. }, store)
  2466. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2467. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2468. } else if !reflect.DeepEqual(tt.p, p) {
  2469. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2470. }
  2471. }
  2472. }
  2473. func Test_createLogicalPlan4Lookup(t *testing.T) {
  2474. err, store := store.GetKV("stream")
  2475. if err != nil {
  2476. t.Error(err)
  2477. return
  2478. }
  2479. streamSqls := map[string]string{
  2480. "src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  2481. "table1": `CREATE TABLE table1 () WITH (DATASOURCE="table1",TYPE="sql", KIND="lookup");`,
  2482. "table2": `CREATE TABLE table2 () WITH (DATASOURCE="table2",TYPE="sql", KIND="lookup");`,
  2483. }
  2484. types := map[string]ast.StreamType{
  2485. "src1": ast.TypeStream,
  2486. "table1": ast.TypeTable,
  2487. "table2": ast.TypeTable,
  2488. }
  2489. for name, sql := range streamSqls {
  2490. s, err := json.Marshal(&xsql.StreamInfo{
  2491. StreamType: types[name],
  2492. Statement: sql,
  2493. })
  2494. if err != nil {
  2495. t.Error(err)
  2496. t.Fail()
  2497. }
  2498. err = store.Set(name, string(s))
  2499. if err != nil {
  2500. t.Error(err)
  2501. t.Fail()
  2502. }
  2503. }
  2504. streams := make(map[string]*ast.StreamStmt)
  2505. for n := range streamSqls {
  2506. streamStmt, err := xsql.GetDataSource(store, n)
  2507. if err != nil {
  2508. t.Errorf("fail to get stream %s, please check if stream is created", n)
  2509. return
  2510. }
  2511. streams[n] = streamStmt
  2512. }
  2513. var tests = []struct {
  2514. sql string
  2515. p LogicalPlan
  2516. err string
  2517. }{
  2518. { // 0
  2519. sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id`,
  2520. p: ProjectPlan{
  2521. baseLogicalPlan: baseLogicalPlan{
  2522. children: []LogicalPlan{
  2523. LookupPlan{
  2524. baseLogicalPlan: baseLogicalPlan{
  2525. children: []LogicalPlan{
  2526. DataSourcePlan{
  2527. baseLogicalPlan: baseLogicalPlan{},
  2528. name: "src1",
  2529. streamFields: map[string]*ast.JsonStreamField{
  2530. "a": nil,
  2531. },
  2532. isSchemaless: true,
  2533. streamStmt: streams["src1"],
  2534. metaFields: []string{},
  2535. }.Init(),
  2536. },
  2537. },
  2538. joinExpr: ast.Join{
  2539. Name: "table1",
  2540. Alias: "",
  2541. JoinType: ast.INNER_JOIN,
  2542. Expr: &ast.BinaryExpr{
  2543. OP: ast.EQ,
  2544. LHS: &ast.FieldRef{
  2545. StreamName: "src1",
  2546. Name: "id",
  2547. },
  2548. RHS: &ast.FieldRef{
  2549. StreamName: "table1",
  2550. Name: "id",
  2551. },
  2552. },
  2553. },
  2554. keys: []string{"id"},
  2555. fields: []string{"b"},
  2556. valvars: []ast.Expr{
  2557. &ast.FieldRef{
  2558. StreamName: "src1",
  2559. Name: "id",
  2560. },
  2561. },
  2562. options: &ast.Options{
  2563. DATASOURCE: "table1",
  2564. TYPE: "sql",
  2565. STRICT_VALIDATION: true,
  2566. KIND: "lookup",
  2567. },
  2568. conditions: nil,
  2569. }.Init(),
  2570. },
  2571. },
  2572. fields: []ast.Field{
  2573. {
  2574. Expr: &ast.FieldRef{
  2575. StreamName: "src1",
  2576. Name: "a",
  2577. },
  2578. Name: "a",
  2579. AName: "",
  2580. },
  2581. {
  2582. Expr: &ast.FieldRef{
  2583. StreamName: "table1",
  2584. Name: "b",
  2585. },
  2586. Name: "b",
  2587. AName: "",
  2588. },
  2589. },
  2590. isAggregate: false,
  2591. sendMeta: false,
  2592. }.Init(),
  2593. },
  2594. { // 1
  2595. sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
  2596. p: ProjectPlan{
  2597. baseLogicalPlan: baseLogicalPlan{
  2598. children: []LogicalPlan{
  2599. FilterPlan{
  2600. baseLogicalPlan: baseLogicalPlan{
  2601. children: []LogicalPlan{
  2602. LookupPlan{
  2603. baseLogicalPlan: baseLogicalPlan{
  2604. children: []LogicalPlan{
  2605. FilterPlan{
  2606. baseLogicalPlan: baseLogicalPlan{
  2607. children: []LogicalPlan{
  2608. DataSourcePlan{
  2609. baseLogicalPlan: baseLogicalPlan{},
  2610. name: "src1",
  2611. streamFields: map[string]*ast.JsonStreamField{
  2612. "a": nil,
  2613. },
  2614. isSchemaless: true,
  2615. streamStmt: streams["src1"],
  2616. metaFields: []string{},
  2617. }.Init(),
  2618. },
  2619. },
  2620. condition: &ast.BinaryExpr{
  2621. OP: ast.LT,
  2622. LHS: &ast.FieldRef{
  2623. StreamName: "src1",
  2624. Name: "c",
  2625. },
  2626. RHS: &ast.IntegerLiteral{Val: 40},
  2627. },
  2628. }.Init(),
  2629. },
  2630. },
  2631. joinExpr: ast.Join{
  2632. Name: "table1",
  2633. Alias: "",
  2634. JoinType: ast.INNER_JOIN,
  2635. Expr: &ast.BinaryExpr{
  2636. OP: ast.AND,
  2637. RHS: &ast.BinaryExpr{
  2638. OP: ast.EQ,
  2639. LHS: &ast.FieldRef{
  2640. StreamName: "src1",
  2641. Name: "id",
  2642. },
  2643. RHS: &ast.FieldRef{
  2644. StreamName: "table1",
  2645. Name: "id",
  2646. },
  2647. },
  2648. LHS: &ast.BinaryExpr{
  2649. OP: ast.AND,
  2650. LHS: &ast.BinaryExpr{
  2651. OP: ast.GT,
  2652. LHS: &ast.FieldRef{
  2653. StreamName: "table1",
  2654. Name: "b",
  2655. },
  2656. RHS: &ast.IntegerLiteral{Val: 20},
  2657. },
  2658. RHS: &ast.BinaryExpr{
  2659. OP: ast.LT,
  2660. LHS: &ast.FieldRef{
  2661. StreamName: "src1",
  2662. Name: "c",
  2663. },
  2664. RHS: &ast.IntegerLiteral{Val: 40},
  2665. },
  2666. },
  2667. },
  2668. },
  2669. keys: []string{"id"},
  2670. valvars: []ast.Expr{
  2671. &ast.FieldRef{
  2672. StreamName: "src1",
  2673. Name: "id",
  2674. },
  2675. },
  2676. options: &ast.Options{
  2677. DATASOURCE: "table1",
  2678. TYPE: "sql",
  2679. STRICT_VALIDATION: true,
  2680. KIND: "lookup",
  2681. },
  2682. conditions: &ast.BinaryExpr{
  2683. OP: ast.AND,
  2684. LHS: &ast.BinaryExpr{
  2685. OP: ast.GT,
  2686. LHS: &ast.FieldRef{
  2687. StreamName: "table1",
  2688. Name: "b",
  2689. },
  2690. RHS: &ast.IntegerLiteral{Val: 20},
  2691. },
  2692. RHS: &ast.BinaryExpr{
  2693. OP: ast.LT,
  2694. LHS: &ast.FieldRef{
  2695. StreamName: "src1",
  2696. Name: "c",
  2697. },
  2698. RHS: &ast.IntegerLiteral{Val: 40},
  2699. },
  2700. },
  2701. }.Init(),
  2702. },
  2703. },
  2704. condition: &ast.BinaryExpr{
  2705. OP: ast.GT,
  2706. LHS: &ast.FieldRef{
  2707. StreamName: "table1",
  2708. Name: "b",
  2709. },
  2710. RHS: &ast.IntegerLiteral{Val: 20},
  2711. },
  2712. }.Init(),
  2713. },
  2714. },
  2715. fields: []ast.Field{
  2716. {
  2717. Expr: &ast.FieldRef{
  2718. StreamName: "src1",
  2719. Name: "a",
  2720. },
  2721. Name: "a",
  2722. AName: "",
  2723. },
  2724. {
  2725. Expr: &ast.FieldRef{
  2726. StreamName: "table1",
  2727. Name: "*",
  2728. },
  2729. Name: "*",
  2730. AName: "",
  2731. },
  2732. },
  2733. isAggregate: false,
  2734. sendMeta: false,
  2735. }.Init(),
  2736. },
  2737. { // 2
  2738. sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
  2739. p: ProjectPlan{
  2740. baseLogicalPlan: baseLogicalPlan{
  2741. children: []LogicalPlan{
  2742. LookupPlan{
  2743. baseLogicalPlan: baseLogicalPlan{
  2744. children: []LogicalPlan{
  2745. LookupPlan{
  2746. baseLogicalPlan: baseLogicalPlan{
  2747. children: []LogicalPlan{
  2748. DataSourcePlan{
  2749. baseLogicalPlan: baseLogicalPlan{},
  2750. name: "src1",
  2751. streamFields: map[string]*ast.JsonStreamField{
  2752. "a": nil,
  2753. },
  2754. isSchemaless: true,
  2755. streamStmt: streams["src1"],
  2756. metaFields: []string{},
  2757. }.Init(),
  2758. },
  2759. },
  2760. joinExpr: ast.Join{
  2761. Name: "table1",
  2762. Alias: "",
  2763. JoinType: ast.INNER_JOIN,
  2764. Expr: &ast.BinaryExpr{
  2765. OP: ast.EQ,
  2766. LHS: &ast.FieldRef{
  2767. StreamName: "src1",
  2768. Name: "id",
  2769. },
  2770. RHS: &ast.FieldRef{
  2771. StreamName: "table1",
  2772. Name: "id",
  2773. },
  2774. },
  2775. },
  2776. keys: []string{"id"},
  2777. fields: []string{"b"},
  2778. valvars: []ast.Expr{
  2779. &ast.FieldRef{
  2780. StreamName: "src1",
  2781. Name: "id",
  2782. },
  2783. },
  2784. options: &ast.Options{
  2785. DATASOURCE: "table1",
  2786. TYPE: "sql",
  2787. STRICT_VALIDATION: true,
  2788. KIND: "lookup",
  2789. },
  2790. conditions: nil,
  2791. }.Init(),
  2792. },
  2793. },
  2794. joinExpr: ast.Join{
  2795. Name: "table2",
  2796. Alias: "",
  2797. JoinType: ast.INNER_JOIN,
  2798. Expr: &ast.BinaryExpr{
  2799. OP: ast.EQ,
  2800. LHS: &ast.FieldRef{
  2801. StreamName: "table1",
  2802. Name: "id",
  2803. },
  2804. RHS: &ast.FieldRef{
  2805. StreamName: "table2",
  2806. Name: "id",
  2807. },
  2808. },
  2809. },
  2810. keys: []string{"id"},
  2811. fields: []string{"c"},
  2812. valvars: []ast.Expr{
  2813. &ast.FieldRef{
  2814. StreamName: "table1",
  2815. Name: "id",
  2816. },
  2817. },
  2818. options: &ast.Options{
  2819. DATASOURCE: "table2",
  2820. TYPE: "sql",
  2821. STRICT_VALIDATION: true,
  2822. KIND: "lookup",
  2823. },
  2824. }.Init(),
  2825. },
  2826. },
  2827. fields: []ast.Field{
  2828. {
  2829. Expr: &ast.FieldRef{
  2830. StreamName: "src1",
  2831. Name: "a",
  2832. },
  2833. Name: "a",
  2834. AName: "",
  2835. },
  2836. {
  2837. Expr: &ast.FieldRef{
  2838. StreamName: "table1",
  2839. Name: "b",
  2840. },
  2841. Name: "b",
  2842. AName: "",
  2843. },
  2844. {
  2845. Expr: &ast.FieldRef{
  2846. StreamName: "table2",
  2847. Name: "c",
  2848. },
  2849. Name: "c",
  2850. AName: "",
  2851. },
  2852. },
  2853. isAggregate: false,
  2854. sendMeta: false,
  2855. }.Init(),
  2856. },
  2857. { // 3
  2858. sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2859. p: ProjectPlan{
  2860. baseLogicalPlan: baseLogicalPlan{
  2861. children: []LogicalPlan{
  2862. LookupPlan{
  2863. baseLogicalPlan: baseLogicalPlan{
  2864. children: []LogicalPlan{
  2865. WindowPlan{
  2866. baseLogicalPlan: baseLogicalPlan{
  2867. children: []LogicalPlan{
  2868. DataSourcePlan{
  2869. baseLogicalPlan: baseLogicalPlan{},
  2870. name: "src1",
  2871. streamStmt: streams["src1"],
  2872. streamFields: map[string]*ast.JsonStreamField{},
  2873. metaFields: []string{},
  2874. isWildCard: true,
  2875. isSchemaless: true,
  2876. }.Init(),
  2877. },
  2878. },
  2879. condition: nil,
  2880. wtype: ast.TUMBLING_WINDOW,
  2881. length: 10000,
  2882. interval: 0,
  2883. limit: 0,
  2884. }.Init(),
  2885. },
  2886. },
  2887. joinExpr: ast.Join{
  2888. Name: "table1",
  2889. Alias: "",
  2890. JoinType: ast.INNER_JOIN,
  2891. Expr: &ast.BinaryExpr{
  2892. OP: ast.EQ,
  2893. LHS: &ast.FieldRef{
  2894. StreamName: "src1",
  2895. Name: "id",
  2896. },
  2897. RHS: &ast.FieldRef{
  2898. StreamName: "table1",
  2899. Name: "id",
  2900. },
  2901. },
  2902. },
  2903. keys: []string{"id"},
  2904. valvars: []ast.Expr{
  2905. &ast.FieldRef{
  2906. StreamName: "src1",
  2907. Name: "id",
  2908. },
  2909. },
  2910. options: &ast.Options{
  2911. DATASOURCE: "table1",
  2912. TYPE: "sql",
  2913. STRICT_VALIDATION: true,
  2914. KIND: "lookup",
  2915. },
  2916. conditions: nil,
  2917. }.Init(),
  2918. },
  2919. },
  2920. fields: []ast.Field{
  2921. {
  2922. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  2923. Name: "*",
  2924. AName: "",
  2925. },
  2926. },
  2927. isAggregate: false,
  2928. sendMeta: false,
  2929. }.Init(),
  2930. },
  2931. }
  2932. for i, tt := range tests {
  2933. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2934. if err != nil {
  2935. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2936. continue
  2937. }
  2938. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2939. IsEventTime: false,
  2940. LateTol: 0,
  2941. Concurrency: 0,
  2942. BufferLength: 0,
  2943. SendMetaToSink: false,
  2944. Qos: 0,
  2945. CheckpointInterval: 0,
  2946. SendError: true,
  2947. }, store)
  2948. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2949. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2950. } else if !reflect.DeepEqual(tt.p, p) {
  2951. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2952. }
  2953. }
  2954. }