planner_test.go 115 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "reflect"
  19. "strings"
  20. "testing"
  21. "github.com/gdexlab/go-render/render"
  22. "github.com/lf-edge/ekuiper/internal/pkg/store"
  23. "github.com/lf-edge/ekuiper/internal/testx"
  24. "github.com/lf-edge/ekuiper/internal/topo/node"
  25. "github.com/lf-edge/ekuiper/internal/xsql"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. "github.com/lf-edge/ekuiper/pkg/ast"
  28. )
  29. func init() {
  30. testx.InitEnv()
  31. }
  32. func Test_createLogicalPlan(t *testing.T) {
  33. kv, err := store.GetKV("stream")
  34. if err != nil {
  35. t.Error(err)
  36. return
  37. }
  38. streamSqls := map[string]string{
  39. "src1": `CREATE STREAM src1 (
  40. id1 BIGINT,
  41. temp BIGINT,
  42. name string,
  43. myarray array(string)
  44. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  45. "src2": `CREATE STREAM src2 (
  46. id2 BIGINT,
  47. hum BIGINT
  48. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  49. "tableInPlanner": `CREATE TABLE tableInPlanner (
  50. id BIGINT,
  51. name STRING,
  52. value STRING,
  53. hum BIGINT
  54. ) WITH (TYPE="file");`,
  55. }
  56. types := map[string]ast.StreamType{
  57. "src1": ast.TypeStream,
  58. "src2": ast.TypeStream,
  59. "tableInPlanner": ast.TypeTable,
  60. }
  61. for name, sql := range streamSqls {
  62. s, err := json.Marshal(&xsql.StreamInfo{
  63. StreamType: types[name],
  64. Statement: sql,
  65. })
  66. if err != nil {
  67. t.Error(err)
  68. t.Fail()
  69. }
  70. err = kv.Set(name, string(s))
  71. if err != nil {
  72. t.Error(err)
  73. t.Fail()
  74. }
  75. }
  76. streams := make(map[string]*ast.StreamStmt)
  77. for n := range streamSqls {
  78. streamStmt, err := xsql.GetDataSource(kv, n)
  79. if err != nil {
  80. t.Errorf("fail to get stream %s, please check if stream is created", n)
  81. return
  82. }
  83. streams[n] = streamStmt
  84. }
  85. // boolTrue = true
  86. boolFalse := false
  87. tests := []struct {
  88. sql string
  89. p LogicalPlan
  90. err string
  91. }{
  92. {
  93. sql: "select name from src1 where true limit 1",
  94. p: ProjectPlan{
  95. baseLogicalPlan: baseLogicalPlan{
  96. children: []LogicalPlan{
  97. FilterPlan{
  98. baseLogicalPlan: baseLogicalPlan{
  99. children: []LogicalPlan{
  100. DataSourcePlan{
  101. baseLogicalPlan: baseLogicalPlan{},
  102. name: "src1",
  103. streamFields: map[string]*ast.JsonStreamField{
  104. "name": {
  105. Type: "string",
  106. },
  107. },
  108. streamStmt: streams["src1"],
  109. metaFields: []string{},
  110. pruneFields: []string{},
  111. }.Init(),
  112. },
  113. },
  114. condition: &ast.BooleanLiteral{
  115. Val: true,
  116. },
  117. }.Init(),
  118. },
  119. },
  120. fields: []ast.Field{
  121. {
  122. Name: "name",
  123. Expr: &ast.FieldRef{
  124. StreamName: "src1",
  125. Name: "name",
  126. },
  127. },
  128. },
  129. limitCount: 1,
  130. enableLimit: true,
  131. }.Init(),
  132. },
  133. {
  134. sql: "select name from src1 limit 1",
  135. p: ProjectPlan{
  136. baseLogicalPlan: baseLogicalPlan{
  137. children: []LogicalPlan{
  138. DataSourcePlan{
  139. baseLogicalPlan: baseLogicalPlan{},
  140. name: "src1",
  141. streamFields: map[string]*ast.JsonStreamField{
  142. "name": {
  143. Type: "string",
  144. },
  145. },
  146. streamStmt: streams["src1"],
  147. metaFields: []string{},
  148. pruneFields: []string{},
  149. }.Init(),
  150. },
  151. },
  152. fields: []ast.Field{
  153. {
  154. Name: "name",
  155. Expr: &ast.FieldRef{
  156. StreamName: "src1",
  157. Name: "name",
  158. },
  159. },
  160. },
  161. limitCount: 1,
  162. enableLimit: true,
  163. }.Init(),
  164. },
  165. {
  166. sql: "select unnest(myarray) as col from src1 limit 1",
  167. p: ProjectSetPlan{
  168. SrfMapping: map[string]struct{}{
  169. "col": {},
  170. },
  171. limitCount: 1,
  172. enableLimit: true,
  173. baseLogicalPlan: baseLogicalPlan{
  174. children: []LogicalPlan{
  175. ProjectPlan{
  176. baseLogicalPlan: baseLogicalPlan{
  177. children: []LogicalPlan{
  178. DataSourcePlan{
  179. baseLogicalPlan: baseLogicalPlan{},
  180. name: "src1",
  181. streamFields: map[string]*ast.JsonStreamField{
  182. "myarray": {
  183. Type: "array",
  184. Items: &ast.JsonStreamField{
  185. Type: "string",
  186. },
  187. },
  188. },
  189. streamStmt: streams["src1"],
  190. metaFields: []string{},
  191. pruneFields: []string{},
  192. }.Init(),
  193. },
  194. },
  195. fields: []ast.Field{
  196. {
  197. Name: "unnest",
  198. AName: "col",
  199. Expr: func() *ast.FieldRef {
  200. fr := &ast.FieldRef{
  201. StreamName: ast.AliasStream,
  202. Name: "col",
  203. AliasRef: &ast.AliasRef{
  204. Expression: &ast.Call{
  205. Name: "unnest",
  206. FuncType: ast.FuncTypeSrf,
  207. Args: []ast.Expr{
  208. &ast.FieldRef{
  209. StreamName: "src1",
  210. Name: "myarray",
  211. },
  212. },
  213. },
  214. },
  215. }
  216. fr.SetRefSource([]ast.StreamName{"src1"})
  217. return fr
  218. }(),
  219. },
  220. },
  221. }.Init(),
  222. },
  223. },
  224. }.Init(),
  225. },
  226. { // 0
  227. sql: "SELECT unnest(myarray), name from src1",
  228. p: ProjectSetPlan{
  229. SrfMapping: map[string]struct{}{
  230. "unnest": {},
  231. },
  232. baseLogicalPlan: baseLogicalPlan{
  233. children: []LogicalPlan{
  234. ProjectPlan{
  235. baseLogicalPlan: baseLogicalPlan{
  236. children: []LogicalPlan{
  237. DataSourcePlan{
  238. baseLogicalPlan: baseLogicalPlan{},
  239. name: "src1",
  240. streamFields: map[string]*ast.JsonStreamField{
  241. "myarray": {
  242. Type: "array",
  243. Items: &ast.JsonStreamField{
  244. Type: "string",
  245. },
  246. },
  247. "name": {
  248. Type: "string",
  249. },
  250. },
  251. streamStmt: streams["src1"],
  252. metaFields: []string{},
  253. pruneFields: []string{},
  254. }.Init(),
  255. },
  256. },
  257. fields: []ast.Field{
  258. {
  259. Expr: &ast.Call{
  260. Name: "unnest",
  261. FuncType: ast.FuncTypeSrf,
  262. Args: []ast.Expr{
  263. &ast.FieldRef{
  264. StreamName: "src1",
  265. Name: "myarray",
  266. },
  267. },
  268. },
  269. Name: "unnest",
  270. },
  271. {
  272. Name: "name",
  273. Expr: &ast.FieldRef{
  274. StreamName: "src1",
  275. Name: "name",
  276. },
  277. },
  278. },
  279. }.Init(),
  280. },
  281. },
  282. }.Init(),
  283. },
  284. { // 0
  285. sql: `SELECT myarray[temp] FROM src1`,
  286. p: ProjectPlan{
  287. baseLogicalPlan: baseLogicalPlan{
  288. children: []LogicalPlan{
  289. DataSourcePlan{
  290. baseLogicalPlan: baseLogicalPlan{},
  291. name: "src1",
  292. streamFields: map[string]*ast.JsonStreamField{
  293. "myarray": {
  294. Type: "array",
  295. Items: &ast.JsonStreamField{
  296. Type: "string",
  297. },
  298. },
  299. "temp": {
  300. Type: "bigint",
  301. },
  302. },
  303. streamStmt: streams["src1"],
  304. metaFields: []string{},
  305. pruneFields: []string{},
  306. }.Init(),
  307. },
  308. },
  309. fields: []ast.Field{
  310. {
  311. Expr: &ast.BinaryExpr{
  312. OP: ast.SUBSET,
  313. LHS: &ast.FieldRef{
  314. StreamName: "src1",
  315. Name: "myarray",
  316. },
  317. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  318. StreamName: "src1",
  319. Name: "temp",
  320. }},
  321. },
  322. Name: "kuiper_field_0",
  323. AName: "",
  324. },
  325. },
  326. isAggregate: false,
  327. sendMeta: false,
  328. }.Init(),
  329. },
  330. { // 1 optimize where to data source
  331. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  332. p: ProjectPlan{
  333. baseLogicalPlan: baseLogicalPlan{
  334. children: []LogicalPlan{
  335. WindowPlan{
  336. baseLogicalPlan: baseLogicalPlan{
  337. children: []LogicalPlan{
  338. FilterPlan{
  339. baseLogicalPlan: baseLogicalPlan{
  340. children: []LogicalPlan{
  341. DataSourcePlan{
  342. name: "src1",
  343. streamFields: map[string]*ast.JsonStreamField{
  344. "name": {
  345. Type: "string",
  346. },
  347. "temp": {
  348. Type: "bigint",
  349. },
  350. },
  351. streamStmt: streams["src1"],
  352. metaFields: []string{},
  353. pruneFields: []string{},
  354. }.Init(),
  355. },
  356. },
  357. condition: &ast.BinaryExpr{
  358. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  359. OP: ast.EQ,
  360. RHS: &ast.StringLiteral{Val: "v1"},
  361. },
  362. }.Init(),
  363. },
  364. },
  365. condition: nil,
  366. wtype: ast.TUMBLING_WINDOW,
  367. length: 10,
  368. timeUnit: ast.SS,
  369. interval: 0,
  370. limit: 0,
  371. }.Init(),
  372. },
  373. },
  374. fields: []ast.Field{
  375. {
  376. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  377. Name: "temp",
  378. AName: "",
  379. },
  380. },
  381. isAggregate: false,
  382. sendMeta: false,
  383. }.Init(),
  384. },
  385. { // 2 condition that cannot be optimized
  386. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  387. p: ProjectPlan{
  388. baseLogicalPlan: baseLogicalPlan{
  389. children: []LogicalPlan{
  390. JoinPlan{
  391. baseLogicalPlan: baseLogicalPlan{
  392. children: []LogicalPlan{
  393. WindowPlan{
  394. baseLogicalPlan: baseLogicalPlan{
  395. children: []LogicalPlan{
  396. DataSourcePlan{
  397. name: "src1",
  398. streamFields: map[string]*ast.JsonStreamField{
  399. "id1": {
  400. Type: "bigint",
  401. },
  402. "temp": {
  403. Type: "bigint",
  404. },
  405. },
  406. streamStmt: streams["src1"],
  407. metaFields: []string{},
  408. pruneFields: []string{},
  409. }.Init(),
  410. DataSourcePlan{
  411. name: "src2",
  412. streamFields: map[string]*ast.JsonStreamField{
  413. "hum": {
  414. Type: "bigint",
  415. },
  416. "id2": {
  417. Type: "bigint",
  418. },
  419. },
  420. streamStmt: streams["src2"],
  421. metaFields: []string{},
  422. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  423. pruneFields: []string{},
  424. }.Init(),
  425. },
  426. },
  427. condition: nil,
  428. wtype: ast.TUMBLING_WINDOW,
  429. length: 10,
  430. timeUnit: ast.SS,
  431. interval: 0,
  432. limit: 0,
  433. }.Init(),
  434. },
  435. },
  436. from: &ast.Table{Name: "src1"},
  437. joins: ast.Joins{ast.Join{
  438. Name: "src2",
  439. JoinType: ast.INNER_JOIN,
  440. Expr: &ast.BinaryExpr{
  441. OP: ast.AND,
  442. LHS: &ast.BinaryExpr{
  443. LHS: &ast.BinaryExpr{
  444. OP: ast.GT,
  445. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  446. RHS: &ast.IntegerLiteral{Val: 20},
  447. },
  448. OP: ast.OR,
  449. RHS: &ast.BinaryExpr{
  450. OP: ast.GT,
  451. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  452. RHS: &ast.IntegerLiteral{Val: 60},
  453. },
  454. },
  455. RHS: &ast.BinaryExpr{
  456. OP: ast.EQ,
  457. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  458. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  459. },
  460. },
  461. }},
  462. }.Init(),
  463. },
  464. },
  465. fields: []ast.Field{
  466. {
  467. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  468. Name: "id1",
  469. AName: "",
  470. },
  471. },
  472. isAggregate: false,
  473. sendMeta: false,
  474. }.Init(),
  475. },
  476. { // 3 optimize window filter
  477. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  478. p: ProjectPlan{
  479. baseLogicalPlan: baseLogicalPlan{
  480. children: []LogicalPlan{
  481. WindowPlan{
  482. baseLogicalPlan: baseLogicalPlan{
  483. children: []LogicalPlan{
  484. FilterPlan{
  485. baseLogicalPlan: baseLogicalPlan{
  486. children: []LogicalPlan{
  487. DataSourcePlan{
  488. name: "src1",
  489. streamFields: map[string]*ast.JsonStreamField{
  490. "id1": {
  491. Type: "bigint",
  492. },
  493. "name": {
  494. Type: "string",
  495. },
  496. "temp": {
  497. Type: "bigint",
  498. },
  499. },
  500. streamStmt: streams["src1"],
  501. metaFields: []string{},
  502. pruneFields: []string{},
  503. }.Init(),
  504. },
  505. },
  506. condition: &ast.BinaryExpr{
  507. OP: ast.AND,
  508. LHS: &ast.BinaryExpr{
  509. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  510. OP: ast.EQ,
  511. RHS: &ast.StringLiteral{Val: "v1"},
  512. },
  513. RHS: &ast.BinaryExpr{
  514. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  515. OP: ast.GT,
  516. RHS: &ast.IntegerLiteral{Val: 2},
  517. },
  518. },
  519. }.Init(),
  520. },
  521. },
  522. condition: nil,
  523. wtype: ast.TUMBLING_WINDOW,
  524. length: 10,
  525. timeUnit: ast.SS,
  526. interval: 0,
  527. limit: 0,
  528. }.Init(),
  529. },
  530. },
  531. fields: []ast.Field{
  532. {
  533. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  534. Name: "id1",
  535. AName: "",
  536. },
  537. },
  538. isAggregate: false,
  539. sendMeta: false,
  540. }.Init(),
  541. },
  542. { // 4. do not optimize count window
  543. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  544. p: ProjectPlan{
  545. baseLogicalPlan: baseLogicalPlan{
  546. children: []LogicalPlan{
  547. HavingPlan{
  548. baseLogicalPlan: baseLogicalPlan{
  549. children: []LogicalPlan{
  550. FilterPlan{
  551. baseLogicalPlan: baseLogicalPlan{
  552. children: []LogicalPlan{
  553. WindowPlan{
  554. baseLogicalPlan: baseLogicalPlan{
  555. children: []LogicalPlan{
  556. DataSourcePlan{
  557. name: "src1",
  558. isWildCard: true,
  559. streamFields: map[string]*ast.JsonStreamField{
  560. "id1": {
  561. Type: "bigint",
  562. },
  563. "temp": {
  564. Type: "bigint",
  565. },
  566. "name": {
  567. Type: "string",
  568. },
  569. "myarray": {
  570. Type: "array",
  571. Items: &ast.JsonStreamField{
  572. Type: "string",
  573. },
  574. },
  575. },
  576. streamStmt: streams["src1"],
  577. metaFields: []string{},
  578. pruneFields: []string{},
  579. }.Init(),
  580. },
  581. },
  582. condition: nil,
  583. wtype: ast.COUNT_WINDOW,
  584. length: 5,
  585. interval: 1,
  586. limit: 0,
  587. }.Init(),
  588. },
  589. },
  590. condition: &ast.BinaryExpr{
  591. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  592. OP: ast.GT,
  593. RHS: &ast.IntegerLiteral{Val: 20},
  594. },
  595. }.Init(),
  596. },
  597. },
  598. condition: &ast.BinaryExpr{
  599. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  600. Token: ast.ASTERISK,
  601. }}, FuncType: ast.FuncTypeAgg},
  602. OP: ast.GT,
  603. RHS: &ast.IntegerLiteral{Val: 2},
  604. },
  605. }.Init(),
  606. },
  607. },
  608. fields: []ast.Field{
  609. {
  610. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  611. Name: "*",
  612. AName: "",
  613. },
  614. },
  615. isAggregate: false,
  616. sendMeta: false,
  617. }.Init(),
  618. },
  619. { // 5. optimize join on
  620. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  621. p: ProjectPlan{
  622. baseLogicalPlan: baseLogicalPlan{
  623. children: []LogicalPlan{
  624. JoinPlan{
  625. baseLogicalPlan: baseLogicalPlan{
  626. children: []LogicalPlan{
  627. WindowPlan{
  628. baseLogicalPlan: baseLogicalPlan{
  629. children: []LogicalPlan{
  630. FilterPlan{
  631. baseLogicalPlan: baseLogicalPlan{
  632. children: []LogicalPlan{
  633. DataSourcePlan{
  634. name: "src1",
  635. streamFields: map[string]*ast.JsonStreamField{
  636. "id1": {
  637. Type: "bigint",
  638. },
  639. "temp": {
  640. Type: "bigint",
  641. },
  642. },
  643. streamStmt: streams["src1"],
  644. metaFields: []string{},
  645. pruneFields: []string{},
  646. }.Init(),
  647. },
  648. },
  649. condition: &ast.BinaryExpr{
  650. RHS: &ast.BinaryExpr{
  651. OP: ast.GT,
  652. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  653. RHS: &ast.IntegerLiteral{Val: 20},
  654. },
  655. OP: ast.AND,
  656. LHS: &ast.BinaryExpr{
  657. OP: ast.GT,
  658. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  659. RHS: &ast.IntegerLiteral{Val: 111},
  660. },
  661. },
  662. }.Init(),
  663. FilterPlan{
  664. baseLogicalPlan: baseLogicalPlan{
  665. children: []LogicalPlan{
  666. DataSourcePlan{
  667. name: "src2",
  668. streamFields: map[string]*ast.JsonStreamField{
  669. "hum": {
  670. Type: "bigint",
  671. },
  672. "id2": {
  673. Type: "bigint",
  674. },
  675. },
  676. streamStmt: streams["src2"],
  677. metaFields: []string{},
  678. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  679. pruneFields: []string{},
  680. }.Init(),
  681. },
  682. },
  683. condition: &ast.BinaryExpr{
  684. OP: ast.LT,
  685. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  686. RHS: &ast.IntegerLiteral{Val: 60},
  687. },
  688. }.Init(),
  689. },
  690. },
  691. condition: nil,
  692. wtype: ast.TUMBLING_WINDOW,
  693. length: 10,
  694. timeUnit: ast.SS,
  695. interval: 0,
  696. limit: 0,
  697. }.Init(),
  698. },
  699. },
  700. from: &ast.Table{
  701. Name: "src1",
  702. },
  703. joins: []ast.Join{
  704. {
  705. Name: "src2",
  706. Alias: "",
  707. JoinType: ast.INNER_JOIN,
  708. Expr: &ast.BinaryExpr{
  709. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  710. OP: ast.EQ,
  711. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  712. },
  713. },
  714. },
  715. }.Init(),
  716. },
  717. },
  718. fields: []ast.Field{
  719. {
  720. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  721. Name: "id1",
  722. AName: "",
  723. },
  724. },
  725. isAggregate: false,
  726. sendMeta: false,
  727. }.Init(),
  728. },
  729. { // 6. optimize outter join on
  730. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  731. p: ProjectPlan{
  732. baseLogicalPlan: baseLogicalPlan{
  733. children: []LogicalPlan{
  734. JoinPlan{
  735. baseLogicalPlan: baseLogicalPlan{
  736. children: []LogicalPlan{
  737. WindowPlan{
  738. baseLogicalPlan: baseLogicalPlan{
  739. children: []LogicalPlan{
  740. FilterPlan{
  741. baseLogicalPlan: baseLogicalPlan{
  742. children: []LogicalPlan{
  743. DataSourcePlan{
  744. name: "src1",
  745. streamFields: map[string]*ast.JsonStreamField{
  746. "id1": {
  747. Type: "bigint",
  748. },
  749. "temp": {
  750. Type: "bigint",
  751. },
  752. },
  753. streamStmt: streams["src1"],
  754. metaFields: []string{},
  755. pruneFields: []string{},
  756. }.Init(),
  757. },
  758. },
  759. condition: &ast.BinaryExpr{
  760. OP: ast.GT,
  761. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  762. RHS: &ast.IntegerLiteral{Val: 111},
  763. },
  764. }.Init(),
  765. DataSourcePlan{
  766. name: "src2",
  767. streamFields: map[string]*ast.JsonStreamField{
  768. "hum": {
  769. Type: "bigint",
  770. },
  771. "id2": {
  772. Type: "bigint",
  773. },
  774. },
  775. streamStmt: streams["src2"],
  776. metaFields: []string{},
  777. pruneFields: []string{},
  778. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  779. }.Init(),
  780. },
  781. },
  782. condition: nil,
  783. wtype: ast.TUMBLING_WINDOW,
  784. length: 10,
  785. timeUnit: ast.SS,
  786. interval: 0,
  787. limit: 0,
  788. }.Init(),
  789. },
  790. },
  791. from: &ast.Table{
  792. Name: "src1",
  793. },
  794. joins: []ast.Join{
  795. {
  796. Name: "src2",
  797. Alias: "",
  798. JoinType: ast.FULL_JOIN,
  799. Expr: &ast.BinaryExpr{
  800. OP: ast.AND,
  801. LHS: &ast.BinaryExpr{
  802. OP: ast.AND,
  803. LHS: &ast.BinaryExpr{
  804. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  805. OP: ast.EQ,
  806. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  807. },
  808. RHS: &ast.BinaryExpr{
  809. OP: ast.GT,
  810. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  811. RHS: &ast.IntegerLiteral{Val: 20},
  812. },
  813. },
  814. RHS: &ast.BinaryExpr{
  815. OP: ast.LT,
  816. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  817. RHS: &ast.IntegerLiteral{Val: 60},
  818. },
  819. },
  820. },
  821. },
  822. }.Init(),
  823. },
  824. },
  825. fields: []ast.Field{
  826. {
  827. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  828. Name: "id1",
  829. AName: "",
  830. },
  831. },
  832. isAggregate: false,
  833. sendMeta: false,
  834. }.Init(),
  835. },
  836. { // 7 window error for table
  837. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  838. p: nil,
  839. err: "cannot run window for TABLE sources",
  840. },
  841. { // 8 join table without window
  842. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  843. p: ProjectPlan{
  844. baseLogicalPlan: baseLogicalPlan{
  845. children: []LogicalPlan{
  846. JoinPlan{
  847. baseLogicalPlan: baseLogicalPlan{
  848. children: []LogicalPlan{
  849. JoinAlignPlan{
  850. baseLogicalPlan: baseLogicalPlan{
  851. children: []LogicalPlan{
  852. FilterPlan{
  853. baseLogicalPlan: baseLogicalPlan{
  854. children: []LogicalPlan{
  855. DataSourcePlan{
  856. name: "src1",
  857. streamFields: map[string]*ast.JsonStreamField{
  858. "id1": {
  859. Type: "bigint",
  860. },
  861. "temp": {
  862. Type: "bigint",
  863. },
  864. },
  865. streamStmt: streams["src1"],
  866. metaFields: []string{},
  867. pruneFields: []string{},
  868. }.Init(),
  869. },
  870. },
  871. condition: &ast.BinaryExpr{
  872. RHS: &ast.BinaryExpr{
  873. OP: ast.GT,
  874. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  875. RHS: &ast.IntegerLiteral{Val: 20},
  876. },
  877. OP: ast.AND,
  878. LHS: &ast.BinaryExpr{
  879. OP: ast.GT,
  880. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  881. RHS: &ast.IntegerLiteral{Val: 111},
  882. },
  883. },
  884. }.Init(),
  885. DataSourcePlan{
  886. name: "tableInPlanner",
  887. streamFields: map[string]*ast.JsonStreamField{
  888. "hum": {
  889. Type: "bigint",
  890. },
  891. "id": {
  892. Type: "bigint",
  893. },
  894. },
  895. streamStmt: streams["tableInPlanner"],
  896. metaFields: []string{},
  897. pruneFields: []string{},
  898. }.Init(),
  899. },
  900. },
  901. Emitters: []string{"tableInPlanner"},
  902. }.Init(),
  903. },
  904. },
  905. from: &ast.Table{
  906. Name: "src1",
  907. },
  908. joins: []ast.Join{
  909. {
  910. Name: "tableInPlanner",
  911. Alias: "",
  912. JoinType: ast.INNER_JOIN,
  913. Expr: &ast.BinaryExpr{
  914. OP: ast.AND,
  915. LHS: &ast.BinaryExpr{
  916. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  917. OP: ast.EQ,
  918. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  919. },
  920. RHS: &ast.BinaryExpr{
  921. OP: ast.LT,
  922. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  923. RHS: &ast.IntegerLiteral{Val: 60},
  924. },
  925. },
  926. },
  927. },
  928. }.Init(),
  929. },
  930. },
  931. fields: []ast.Field{
  932. {
  933. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  934. Name: "id1",
  935. AName: "",
  936. },
  937. },
  938. isAggregate: false,
  939. sendMeta: false,
  940. }.Init(),
  941. },
  942. { // 9 join table with window
  943. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  944. p: ProjectPlan{
  945. baseLogicalPlan: baseLogicalPlan{
  946. children: []LogicalPlan{
  947. JoinPlan{
  948. baseLogicalPlan: baseLogicalPlan{
  949. children: []LogicalPlan{
  950. JoinAlignPlan{
  951. baseLogicalPlan: baseLogicalPlan{
  952. children: []LogicalPlan{
  953. WindowPlan{
  954. baseLogicalPlan: baseLogicalPlan{
  955. children: []LogicalPlan{
  956. DataSourcePlan{
  957. name: "src1",
  958. streamFields: map[string]*ast.JsonStreamField{
  959. "id1": {
  960. Type: "bigint",
  961. },
  962. "temp": {
  963. Type: "bigint",
  964. },
  965. },
  966. streamStmt: streams["src1"],
  967. metaFields: []string{},
  968. pruneFields: []string{},
  969. }.Init(),
  970. },
  971. },
  972. condition: nil,
  973. wtype: ast.TUMBLING_WINDOW,
  974. length: 10,
  975. timeUnit: ast.SS,
  976. interval: 0,
  977. limit: 0,
  978. }.Init(),
  979. DataSourcePlan{
  980. name: "tableInPlanner",
  981. streamFields: map[string]*ast.JsonStreamField{
  982. "hum": {
  983. Type: "bigint",
  984. },
  985. "id": {
  986. Type: "bigint",
  987. },
  988. },
  989. streamStmt: streams["tableInPlanner"],
  990. metaFields: []string{},
  991. pruneFields: []string{},
  992. }.Init(),
  993. },
  994. },
  995. Emitters: []string{"tableInPlanner"},
  996. }.Init(),
  997. },
  998. },
  999. from: &ast.Table{
  1000. Name: "src1",
  1001. },
  1002. joins: []ast.Join{
  1003. {
  1004. Name: "tableInPlanner",
  1005. Alias: "",
  1006. JoinType: ast.INNER_JOIN,
  1007. Expr: &ast.BinaryExpr{
  1008. OP: ast.AND,
  1009. LHS: &ast.BinaryExpr{
  1010. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1011. OP: ast.EQ,
  1012. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1013. },
  1014. RHS: &ast.BinaryExpr{
  1015. RHS: &ast.BinaryExpr{
  1016. OP: ast.AND,
  1017. LHS: &ast.BinaryExpr{
  1018. OP: ast.GT,
  1019. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1020. RHS: &ast.IntegerLiteral{Val: 20},
  1021. },
  1022. RHS: &ast.BinaryExpr{
  1023. OP: ast.LT,
  1024. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1025. RHS: &ast.IntegerLiteral{Val: 60},
  1026. },
  1027. },
  1028. OP: ast.AND,
  1029. LHS: &ast.BinaryExpr{
  1030. OP: ast.GT,
  1031. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1032. RHS: &ast.IntegerLiteral{Val: 111},
  1033. },
  1034. },
  1035. },
  1036. },
  1037. },
  1038. }.Init(),
  1039. },
  1040. },
  1041. fields: []ast.Field{
  1042. {
  1043. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1044. Name: "id1",
  1045. AName: "",
  1046. },
  1047. },
  1048. isAggregate: false,
  1049. sendMeta: false,
  1050. }.Init(),
  1051. },
  1052. { // 10 meta
  1053. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1054. p: ProjectPlan{
  1055. baseLogicalPlan: baseLogicalPlan{
  1056. children: []LogicalPlan{
  1057. FilterPlan{
  1058. baseLogicalPlan: baseLogicalPlan{
  1059. children: []LogicalPlan{
  1060. DataSourcePlan{
  1061. name: "src1",
  1062. streamFields: map[string]*ast.JsonStreamField{
  1063. "temp": {
  1064. Type: "bigint",
  1065. },
  1066. },
  1067. streamStmt: streams["src1"],
  1068. metaFields: []string{"Humidity", "device", "id"},
  1069. pruneFields: []string{},
  1070. }.Init(),
  1071. },
  1072. },
  1073. condition: &ast.BinaryExpr{
  1074. LHS: &ast.Call{
  1075. Name: "meta",
  1076. FuncId: 2,
  1077. Args: []ast.Expr{&ast.MetaRef{
  1078. Name: "device",
  1079. StreamName: ast.DefaultStream,
  1080. }},
  1081. },
  1082. OP: ast.EQ,
  1083. RHS: &ast.StringLiteral{
  1084. Val: "demo2",
  1085. },
  1086. },
  1087. }.Init(),
  1088. },
  1089. },
  1090. fields: []ast.Field{
  1091. {
  1092. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1093. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1094. Name: "id",
  1095. StreamName: ast.DefaultStream,
  1096. }}},
  1097. []ast.StreamName{},
  1098. nil,
  1099. )},
  1100. Name: "meta",
  1101. AName: "eid",
  1102. },
  1103. {
  1104. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1105. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  1106. &ast.BinaryExpr{
  1107. OP: ast.ARROW,
  1108. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  1109. RHS: &ast.JsonFieldRef{Name: "Device"},
  1110. },
  1111. }},
  1112. []ast.StreamName{},
  1113. nil,
  1114. )},
  1115. Name: "meta",
  1116. AName: "hdevice",
  1117. },
  1118. {
  1119. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1120. Name: "temp",
  1121. AName: "",
  1122. },
  1123. },
  1124. isAggregate: false,
  1125. sendMeta: false,
  1126. }.Init(),
  1127. },
  1128. { // 11 join with same name field and aliased
  1129. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1130. p: ProjectPlan{
  1131. baseLogicalPlan: baseLogicalPlan{
  1132. children: []LogicalPlan{
  1133. JoinPlan{
  1134. baseLogicalPlan: baseLogicalPlan{
  1135. children: []LogicalPlan{
  1136. JoinAlignPlan{
  1137. baseLogicalPlan: baseLogicalPlan{
  1138. children: []LogicalPlan{
  1139. DataSourcePlan{
  1140. name: "src2",
  1141. streamFields: map[string]*ast.JsonStreamField{
  1142. "hum": {
  1143. Type: "bigint",
  1144. },
  1145. "id2": {
  1146. Type: "bigint",
  1147. },
  1148. },
  1149. streamStmt: streams["src2"],
  1150. metaFields: []string{},
  1151. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  1152. pruneFields: []string{},
  1153. }.Init(),
  1154. DataSourcePlan{
  1155. name: "tableInPlanner",
  1156. streamFields: map[string]*ast.JsonStreamField{
  1157. "hum": {
  1158. Type: "bigint",
  1159. },
  1160. "id": {
  1161. Type: "bigint",
  1162. },
  1163. },
  1164. streamStmt: streams["tableInPlanner"],
  1165. metaFields: []string{},
  1166. pruneFields: []string{},
  1167. }.Init(),
  1168. },
  1169. },
  1170. Emitters: []string{"tableInPlanner"},
  1171. }.Init(),
  1172. },
  1173. },
  1174. from: &ast.Table{
  1175. Name: "src2",
  1176. },
  1177. joins: []ast.Join{
  1178. {
  1179. Name: "tableInPlanner",
  1180. Alias: "",
  1181. JoinType: ast.INNER_JOIN,
  1182. Expr: &ast.BinaryExpr{
  1183. RHS: &ast.BinaryExpr{
  1184. OP: ast.EQ,
  1185. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1186. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1187. },
  1188. OP: ast.AND,
  1189. LHS: &ast.BinaryExpr{
  1190. OP: ast.GT,
  1191. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1192. &ast.FieldRef{
  1193. Name: "hum",
  1194. StreamName: "src2",
  1195. },
  1196. []ast.StreamName{"src2"},
  1197. &boolFalse,
  1198. )},
  1199. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1200. &ast.FieldRef{
  1201. Name: "hum",
  1202. StreamName: "tableInPlanner",
  1203. },
  1204. []ast.StreamName{"tableInPlanner"},
  1205. &boolFalse,
  1206. )},
  1207. },
  1208. },
  1209. },
  1210. },
  1211. }.Init(),
  1212. },
  1213. },
  1214. fields: []ast.Field{
  1215. {
  1216. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1217. &ast.FieldRef{
  1218. Name: "hum",
  1219. StreamName: "src2",
  1220. },
  1221. []ast.StreamName{"src2"},
  1222. &boolFalse,
  1223. )},
  1224. Name: "hum",
  1225. AName: "hum1",
  1226. }, {
  1227. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1228. &ast.FieldRef{
  1229. Name: "hum",
  1230. StreamName: "tableInPlanner",
  1231. },
  1232. []ast.StreamName{"tableInPlanner"},
  1233. &boolFalse,
  1234. )},
  1235. Name: "hum",
  1236. AName: "hum2",
  1237. },
  1238. },
  1239. isAggregate: false,
  1240. sendMeta: false,
  1241. }.Init(),
  1242. },
  1243. { // 12 meta with more fields
  1244. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1245. p: ProjectPlan{
  1246. baseLogicalPlan: baseLogicalPlan{
  1247. children: []LogicalPlan{
  1248. FilterPlan{
  1249. baseLogicalPlan: baseLogicalPlan{
  1250. children: []LogicalPlan{
  1251. DataSourcePlan{
  1252. name: "src1",
  1253. streamFields: map[string]*ast.JsonStreamField{
  1254. "temp": {
  1255. Type: "bigint",
  1256. },
  1257. },
  1258. streamStmt: streams["src1"],
  1259. metaFields: []string{},
  1260. allMeta: true,
  1261. pruneFields: []string{},
  1262. }.Init(),
  1263. },
  1264. },
  1265. condition: &ast.BinaryExpr{
  1266. LHS: &ast.Call{
  1267. Name: "meta",
  1268. FuncId: 1,
  1269. Args: []ast.Expr{&ast.MetaRef{
  1270. Name: "device",
  1271. StreamName: ast.DefaultStream,
  1272. }},
  1273. },
  1274. OP: ast.EQ,
  1275. RHS: &ast.StringLiteral{
  1276. Val: "demo2",
  1277. },
  1278. },
  1279. }.Init(),
  1280. },
  1281. },
  1282. fields: []ast.Field{
  1283. {
  1284. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1285. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1286. Name: "*",
  1287. StreamName: ast.DefaultStream,
  1288. }}},
  1289. []ast.StreamName{},
  1290. nil,
  1291. )},
  1292. Name: "meta",
  1293. AName: "m",
  1294. },
  1295. {
  1296. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1297. Name: "temp",
  1298. AName: "",
  1299. },
  1300. },
  1301. isAggregate: false,
  1302. sendMeta: false,
  1303. }.Init(),
  1304. },
  1305. { // 13 analytic function plan
  1306. sql: `SELECT latest(lag(name)), id1 FROM src1 WHERE lag(temp) > temp`,
  1307. p: ProjectPlan{
  1308. baseLogicalPlan: baseLogicalPlan{
  1309. children: []LogicalPlan{
  1310. FilterPlan{
  1311. baseLogicalPlan: baseLogicalPlan{
  1312. children: []LogicalPlan{
  1313. AnalyticFuncsPlan{
  1314. baseLogicalPlan: baseLogicalPlan{
  1315. children: []LogicalPlan{
  1316. DataSourcePlan{
  1317. name: "src1",
  1318. streamFields: map[string]*ast.JsonStreamField{
  1319. "id1": {
  1320. Type: "bigint",
  1321. },
  1322. "name": {
  1323. Type: "string",
  1324. },
  1325. "temp": {
  1326. Type: "bigint",
  1327. },
  1328. },
  1329. streamStmt: streams["src1"],
  1330. metaFields: []string{},
  1331. pruneFields: []string{},
  1332. }.Init(),
  1333. },
  1334. },
  1335. funcs: []*ast.Call{
  1336. {
  1337. Name: "lag",
  1338. FuncId: 2,
  1339. CachedField: "$$a_lag_2",
  1340. Args: []ast.Expr{&ast.FieldRef{
  1341. Name: "temp",
  1342. StreamName: "src1",
  1343. }},
  1344. },
  1345. {
  1346. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1347. },
  1348. {
  1349. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1350. },
  1351. },
  1352. }.Init(),
  1353. },
  1354. },
  1355. condition: &ast.BinaryExpr{
  1356. LHS: &ast.Call{
  1357. Name: "lag",
  1358. FuncId: 2,
  1359. Args: []ast.Expr{&ast.FieldRef{
  1360. Name: "temp",
  1361. StreamName: "src1",
  1362. }},
  1363. CachedField: "$$a_lag_2",
  1364. Cached: true,
  1365. },
  1366. OP: ast.GT,
  1367. RHS: &ast.FieldRef{
  1368. Name: "temp",
  1369. StreamName: "src1",
  1370. },
  1371. },
  1372. }.Init(),
  1373. },
  1374. },
  1375. fields: []ast.Field{
  1376. {
  1377. Expr: &ast.Call{
  1378. Name: "latest",
  1379. FuncId: 1,
  1380. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1381. CachedField: "$$a_latest_1",
  1382. Cached: true,
  1383. },
  1384. Name: "latest",
  1385. }, {
  1386. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1387. Name: "id1",
  1388. },
  1389. },
  1390. isAggregate: false,
  1391. sendMeta: false,
  1392. }.Init(),
  1393. },
  1394. { // 14
  1395. sql: `SELECT name, *, meta(device) FROM src1`,
  1396. p: ProjectPlan{
  1397. baseLogicalPlan: baseLogicalPlan{
  1398. children: []LogicalPlan{
  1399. DataSourcePlan{
  1400. baseLogicalPlan: baseLogicalPlan{},
  1401. name: "src1",
  1402. streamFields: map[string]*ast.JsonStreamField{
  1403. "id1": {
  1404. Type: "bigint",
  1405. },
  1406. "temp": {
  1407. Type: "bigint",
  1408. },
  1409. "name": {
  1410. Type: "string",
  1411. },
  1412. "myarray": {
  1413. Type: "array",
  1414. Items: &ast.JsonStreamField{
  1415. Type: "string",
  1416. },
  1417. },
  1418. },
  1419. streamStmt: streams["src1"],
  1420. metaFields: []string{"device"},
  1421. isWildCard: true,
  1422. pruneFields: []string{},
  1423. }.Init(),
  1424. },
  1425. },
  1426. fields: []ast.Field{
  1427. {
  1428. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1429. Name: "name",
  1430. AName: "",
  1431. },
  1432. {
  1433. Name: "*",
  1434. Expr: &ast.Wildcard{
  1435. Token: ast.ASTERISK,
  1436. },
  1437. },
  1438. {
  1439. Name: "meta",
  1440. Expr: &ast.Call{
  1441. Name: "meta",
  1442. Args: []ast.Expr{
  1443. &ast.MetaRef{
  1444. StreamName: ast.DefaultStream,
  1445. Name: "device",
  1446. },
  1447. },
  1448. },
  1449. },
  1450. },
  1451. isAggregate: false,
  1452. allWildcard: true,
  1453. sendMeta: false,
  1454. }.Init(),
  1455. },
  1456. { // 15 analytic function over partition plan
  1457. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp), id1 FROM src1 WHERE lag(temp) > temp`,
  1458. p: ProjectPlan{
  1459. baseLogicalPlan: baseLogicalPlan{
  1460. children: []LogicalPlan{
  1461. FilterPlan{
  1462. baseLogicalPlan: baseLogicalPlan{
  1463. children: []LogicalPlan{
  1464. AnalyticFuncsPlan{
  1465. baseLogicalPlan: baseLogicalPlan{
  1466. children: []LogicalPlan{
  1467. DataSourcePlan{
  1468. name: "src1",
  1469. streamFields: map[string]*ast.JsonStreamField{
  1470. "id1": {
  1471. Type: "bigint",
  1472. },
  1473. "name": {
  1474. Type: "string",
  1475. },
  1476. "temp": {
  1477. Type: "bigint",
  1478. },
  1479. },
  1480. streamStmt: streams["src1"],
  1481. metaFields: []string{},
  1482. pruneFields: []string{},
  1483. }.Init(),
  1484. },
  1485. },
  1486. funcs: []*ast.Call{
  1487. {
  1488. Name: "lag",
  1489. FuncId: 2,
  1490. CachedField: "$$a_lag_2",
  1491. Args: []ast.Expr{&ast.FieldRef{
  1492. Name: "temp",
  1493. StreamName: "src1",
  1494. }},
  1495. },
  1496. {
  1497. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}},
  1498. },
  1499. {
  1500. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1501. },
  1502. },
  1503. }.Init(),
  1504. },
  1505. },
  1506. condition: &ast.BinaryExpr{
  1507. LHS: &ast.Call{
  1508. Name: "lag",
  1509. FuncId: 2,
  1510. Args: []ast.Expr{&ast.FieldRef{
  1511. Name: "temp",
  1512. StreamName: "src1",
  1513. }},
  1514. CachedField: "$$a_lag_2",
  1515. Cached: true,
  1516. },
  1517. OP: ast.GT,
  1518. RHS: &ast.FieldRef{
  1519. Name: "temp",
  1520. StreamName: "src1",
  1521. },
  1522. },
  1523. }.Init(),
  1524. },
  1525. },
  1526. fields: []ast.Field{
  1527. {
  1528. Expr: &ast.Call{
  1529. Name: "latest",
  1530. FuncId: 1,
  1531. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1532. CachedField: "$$a_latest_1",
  1533. Cached: true,
  1534. Partition: &ast.PartitionExpr{
  1535. Exprs: []ast.Expr{
  1536. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1537. },
  1538. },
  1539. },
  1540. Name: "latest",
  1541. }, {
  1542. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1543. Name: "id1",
  1544. },
  1545. },
  1546. isAggregate: false,
  1547. sendMeta: false,
  1548. }.Init(),
  1549. },
  1550. { // 16 analytic function over partition when plan
  1551. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp WHEN temp > 12), id1 FROM src1 WHERE lag(temp) > temp`,
  1552. p: ProjectPlan{
  1553. baseLogicalPlan: baseLogicalPlan{
  1554. children: []LogicalPlan{
  1555. FilterPlan{
  1556. baseLogicalPlan: baseLogicalPlan{
  1557. children: []LogicalPlan{
  1558. AnalyticFuncsPlan{
  1559. baseLogicalPlan: baseLogicalPlan{
  1560. children: []LogicalPlan{
  1561. DataSourcePlan{
  1562. name: "src1",
  1563. streamFields: map[string]*ast.JsonStreamField{
  1564. "id1": {
  1565. Type: "bigint",
  1566. },
  1567. "name": {
  1568. Type: "string",
  1569. },
  1570. "temp": {
  1571. Type: "bigint",
  1572. },
  1573. },
  1574. streamStmt: streams["src1"],
  1575. metaFields: []string{},
  1576. pruneFields: []string{},
  1577. }.Init(),
  1578. },
  1579. },
  1580. funcs: []*ast.Call{
  1581. {
  1582. Name: "lag",
  1583. FuncId: 2,
  1584. CachedField: "$$a_lag_2",
  1585. Args: []ast.Expr{&ast.FieldRef{
  1586. Name: "temp",
  1587. StreamName: "src1",
  1588. }},
  1589. },
  1590. {
  1591. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}}, WhenExpr: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"}, OP: ast.GT, RHS: &ast.IntegerLiteral{Val: 12}},
  1592. },
  1593. {
  1594. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1595. },
  1596. },
  1597. }.Init(),
  1598. },
  1599. },
  1600. condition: &ast.BinaryExpr{
  1601. LHS: &ast.Call{
  1602. Name: "lag",
  1603. FuncId: 2,
  1604. Args: []ast.Expr{&ast.FieldRef{
  1605. Name: "temp",
  1606. StreamName: "src1",
  1607. }},
  1608. CachedField: "$$a_lag_2",
  1609. Cached: true,
  1610. },
  1611. OP: ast.GT,
  1612. RHS: &ast.FieldRef{
  1613. Name: "temp",
  1614. StreamName: "src1",
  1615. },
  1616. },
  1617. }.Init(),
  1618. },
  1619. },
  1620. fields: []ast.Field{
  1621. {
  1622. Expr: &ast.Call{
  1623. Name: "latest",
  1624. FuncId: 1,
  1625. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1626. CachedField: "$$a_latest_1",
  1627. Cached: true,
  1628. Partition: &ast.PartitionExpr{
  1629. Exprs: []ast.Expr{
  1630. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1631. },
  1632. },
  1633. WhenExpr: &ast.BinaryExpr{
  1634. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1635. OP: ast.GT,
  1636. RHS: &ast.IntegerLiteral{Val: 12},
  1637. },
  1638. },
  1639. Name: "latest",
  1640. }, {
  1641. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1642. Name: "id1",
  1643. },
  1644. },
  1645. isAggregate: false,
  1646. sendMeta: false,
  1647. }.Init(),
  1648. },
  1649. { // 17. do not optimize sliding window
  1650. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY SLIDINGWINDOW(ss, 10) HAVING COUNT(*) > 2`,
  1651. p: ProjectPlan{
  1652. baseLogicalPlan: baseLogicalPlan{
  1653. children: []LogicalPlan{
  1654. HavingPlan{
  1655. baseLogicalPlan: baseLogicalPlan{
  1656. children: []LogicalPlan{
  1657. FilterPlan{
  1658. baseLogicalPlan: baseLogicalPlan{
  1659. children: []LogicalPlan{
  1660. WindowPlan{
  1661. baseLogicalPlan: baseLogicalPlan{
  1662. children: []LogicalPlan{
  1663. DataSourcePlan{
  1664. name: "src1",
  1665. isWildCard: true,
  1666. streamFields: map[string]*ast.JsonStreamField{
  1667. "id1": {
  1668. Type: "bigint",
  1669. },
  1670. "temp": {
  1671. Type: "bigint",
  1672. },
  1673. "name": {
  1674. Type: "string",
  1675. },
  1676. "myarray": {
  1677. Type: "array",
  1678. Items: &ast.JsonStreamField{
  1679. Type: "string",
  1680. },
  1681. },
  1682. },
  1683. streamStmt: streams["src1"],
  1684. metaFields: []string{},
  1685. pruneFields: []string{},
  1686. }.Init(),
  1687. },
  1688. },
  1689. condition: nil,
  1690. wtype: ast.SLIDING_WINDOW,
  1691. length: 10,
  1692. timeUnit: ast.SS,
  1693. interval: 0,
  1694. limit: 0,
  1695. }.Init(),
  1696. },
  1697. },
  1698. condition: &ast.BinaryExpr{
  1699. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1700. OP: ast.GT,
  1701. RHS: &ast.IntegerLiteral{Val: 20},
  1702. },
  1703. }.Init(),
  1704. },
  1705. },
  1706. condition: &ast.BinaryExpr{
  1707. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1708. Token: ast.ASTERISK,
  1709. }}, FuncType: ast.FuncTypeAgg},
  1710. OP: ast.GT,
  1711. RHS: &ast.IntegerLiteral{Val: 2},
  1712. },
  1713. }.Init(),
  1714. },
  1715. },
  1716. fields: []ast.Field{
  1717. {
  1718. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1719. Name: "*",
  1720. AName: "",
  1721. },
  1722. },
  1723. isAggregate: false,
  1724. sendMeta: false,
  1725. }.Init(),
  1726. },
  1727. {
  1728. // 18 analytic function over when plan
  1729. sql: `SELECT CASE WHEN lag(temp) OVER (WHEN lag(id1) > 1) BETWEEN 0 AND 10 THEN 1 ELSE 0 END FROM src1`,
  1730. p: ProjectPlan{
  1731. baseLogicalPlan: baseLogicalPlan{
  1732. children: []LogicalPlan{
  1733. AnalyticFuncsPlan{
  1734. baseLogicalPlan: baseLogicalPlan{
  1735. children: []LogicalPlan{
  1736. DataSourcePlan{
  1737. name: "src1",
  1738. streamFields: map[string]*ast.JsonStreamField{
  1739. "id1": {
  1740. Type: "bigint",
  1741. },
  1742. "temp": {
  1743. Type: "bigint",
  1744. },
  1745. },
  1746. streamStmt: &ast.StreamStmt{
  1747. Name: "src1",
  1748. StreamFields: []ast.StreamField{
  1749. {
  1750. Name: "id1",
  1751. FieldType: &ast.BasicType{
  1752. Type: ast.DataType(1),
  1753. },
  1754. },
  1755. {
  1756. Name: "temp",
  1757. FieldType: &ast.BasicType{
  1758. Type: ast.DataType(1),
  1759. },
  1760. },
  1761. {
  1762. Name: "name",
  1763. FieldType: &ast.BasicType{
  1764. Type: ast.DataType(3),
  1765. },
  1766. },
  1767. {
  1768. Name: "myarray",
  1769. FieldType: &ast.ArrayType{
  1770. Type: ast.DataType(3),
  1771. },
  1772. },
  1773. },
  1774. Options: &ast.Options{
  1775. DATASOURCE: "src1",
  1776. KEY: "ts",
  1777. FORMAT: "json",
  1778. },
  1779. StreamType: ast.StreamType(0),
  1780. },
  1781. metaFields: []string{},
  1782. pruneFields: []string{},
  1783. }.Init(),
  1784. },
  1785. },
  1786. funcs: []*ast.Call{
  1787. {
  1788. Name: "lag",
  1789. FuncId: 0,
  1790. FuncType: ast.FuncType(0),
  1791. Args: []ast.Expr{
  1792. &ast.FieldRef{
  1793. StreamName: "src1",
  1794. Name: "temp",
  1795. },
  1796. },
  1797. CachedField: "$$a_lag_0",
  1798. WhenExpr: &ast.BinaryExpr{
  1799. OP: ast.GT,
  1800. LHS: &ast.Call{
  1801. Name: "lag",
  1802. FuncId: 1,
  1803. FuncType: ast.FuncType(0),
  1804. Args: []ast.Expr{
  1805. &ast.FieldRef{
  1806. StreamName: "src1",
  1807. Name: "id1",
  1808. },
  1809. },
  1810. CachedField: "$$a_lag_1",
  1811. Cached: true,
  1812. },
  1813. RHS: &ast.IntegerLiteral{
  1814. Val: 1,
  1815. },
  1816. },
  1817. },
  1818. {
  1819. Name: "lag",
  1820. FuncId: 1,
  1821. FuncType: ast.FuncType(0),
  1822. Args: []ast.Expr{
  1823. &ast.FieldRef{
  1824. StreamName: "src1",
  1825. Name: "id1",
  1826. },
  1827. },
  1828. CachedField: "$$a_lag_1",
  1829. },
  1830. },
  1831. }.Init(),
  1832. },
  1833. },
  1834. fields: []ast.Field{
  1835. {
  1836. Name: "kuiper_field_0",
  1837. Expr: &ast.CaseExpr{
  1838. WhenClauses: []*ast.WhenClause{
  1839. {
  1840. Expr: &ast.BinaryExpr{
  1841. OP: ast.BETWEEN,
  1842. LHS: &ast.Call{
  1843. Name: "lag",
  1844. FuncId: 0,
  1845. FuncType: ast.FuncType(0),
  1846. Args: []ast.Expr{
  1847. &ast.FieldRef{
  1848. StreamName: "src1",
  1849. Name: "temp",
  1850. },
  1851. },
  1852. CachedField: "$$a_lag_0",
  1853. Cached: true,
  1854. WhenExpr: &ast.BinaryExpr{
  1855. OP: ast.GT,
  1856. LHS: &ast.Call{
  1857. Name: "lag",
  1858. FuncId: 1,
  1859. FuncType: ast.FuncType(0),
  1860. Args: []ast.Expr{
  1861. &ast.FieldRef{
  1862. StreamName: "src1",
  1863. Name: "id1",
  1864. },
  1865. },
  1866. CachedField: "$$a_lag_1",
  1867. Cached: true,
  1868. },
  1869. RHS: &ast.IntegerLiteral{
  1870. Val: 1,
  1871. },
  1872. },
  1873. },
  1874. RHS: &ast.BetweenExpr{
  1875. Lower: &ast.IntegerLiteral{
  1876. Val: 0,
  1877. },
  1878. Higher: &ast.IntegerLiteral{
  1879. Val: 10,
  1880. },
  1881. },
  1882. },
  1883. Result: &ast.IntegerLiteral{
  1884. Val: 1,
  1885. },
  1886. },
  1887. },
  1888. ElseClause: &ast.IntegerLiteral{
  1889. Val: 0,
  1890. },
  1891. },
  1892. },
  1893. },
  1894. }.Init(),
  1895. },
  1896. { // 19
  1897. sql: `SELECT * EXCEPT(id1, name), meta(device) FROM src1`,
  1898. p: ProjectPlan{
  1899. baseLogicalPlan: baseLogicalPlan{
  1900. children: []LogicalPlan{
  1901. DataSourcePlan{
  1902. baseLogicalPlan: baseLogicalPlan{},
  1903. name: "src1",
  1904. streamFields: map[string]*ast.JsonStreamField{
  1905. "temp": {
  1906. Type: "bigint",
  1907. },
  1908. "myarray": {
  1909. Type: "array",
  1910. Items: &ast.JsonStreamField{
  1911. Type: "string",
  1912. },
  1913. },
  1914. },
  1915. streamStmt: streams["src1"],
  1916. metaFields: []string{"device"},
  1917. isWildCard: false,
  1918. pruneFields: []string{"id1", "name"},
  1919. }.Init(),
  1920. },
  1921. },
  1922. fields: []ast.Field{
  1923. {
  1924. Name: "*",
  1925. Expr: &ast.Wildcard{
  1926. Token: ast.ASTERISK,
  1927. Except: []string{"id1", "name"},
  1928. },
  1929. },
  1930. {
  1931. Name: "meta",
  1932. Expr: &ast.Call{
  1933. Name: "meta",
  1934. Args: []ast.Expr{
  1935. &ast.MetaRef{
  1936. StreamName: ast.DefaultStream,
  1937. Name: "device",
  1938. },
  1939. },
  1940. },
  1941. },
  1942. },
  1943. isAggregate: false,
  1944. allWildcard: true,
  1945. sendMeta: false,
  1946. }.Init(),
  1947. },
  1948. { // 20
  1949. sql: `SELECT * REPLACE(temp * 2 AS id1, myarray * 2 AS name), meta(device) FROM src1`,
  1950. p: ProjectPlan{
  1951. baseLogicalPlan: baseLogicalPlan{
  1952. children: []LogicalPlan{
  1953. DataSourcePlan{
  1954. baseLogicalPlan: baseLogicalPlan{},
  1955. name: "src1",
  1956. streamFields: map[string]*ast.JsonStreamField{
  1957. "temp": {
  1958. Type: "bigint",
  1959. },
  1960. "myarray": {
  1961. Type: "array",
  1962. Items: &ast.JsonStreamField{
  1963. Type: "string",
  1964. },
  1965. },
  1966. },
  1967. streamStmt: streams["src1"],
  1968. metaFields: []string{"device"},
  1969. isWildCard: false,
  1970. pruneFields: []string{"id1", "name"},
  1971. }.Init(),
  1972. },
  1973. },
  1974. fields: []ast.Field{
  1975. {
  1976. Name: "*",
  1977. Expr: &ast.Wildcard{
  1978. Token: ast.ASTERISK,
  1979. Replace: []ast.Field{
  1980. {
  1981. AName: "id1",
  1982. Expr: &ast.BinaryExpr{
  1983. OP: ast.MUL,
  1984. LHS: &ast.FieldRef{
  1985. Name: "temp",
  1986. StreamName: "src1",
  1987. },
  1988. RHS: &ast.IntegerLiteral{Val: 2},
  1989. },
  1990. },
  1991. {
  1992. AName: "name",
  1993. Expr: &ast.BinaryExpr{
  1994. OP: ast.MUL,
  1995. LHS: &ast.FieldRef{
  1996. Name: "myarray",
  1997. StreamName: "src1",
  1998. },
  1999. RHS: &ast.IntegerLiteral{Val: 2},
  2000. },
  2001. },
  2002. },
  2003. },
  2004. },
  2005. {
  2006. Name: "meta",
  2007. Expr: &ast.Call{
  2008. Name: "meta",
  2009. Args: []ast.Expr{
  2010. &ast.MetaRef{
  2011. StreamName: ast.DefaultStream,
  2012. Name: "device",
  2013. },
  2014. },
  2015. },
  2016. },
  2017. },
  2018. isAggregate: false,
  2019. allWildcard: true,
  2020. sendMeta: false,
  2021. }.Init(),
  2022. },
  2023. { // 21
  2024. sql: `SELECT collect( * EXCEPT(id1, name)) FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2025. p: ProjectPlan{
  2026. baseLogicalPlan: baseLogicalPlan{
  2027. children: []LogicalPlan{
  2028. WindowPlan{
  2029. baseLogicalPlan: baseLogicalPlan{
  2030. children: []LogicalPlan{
  2031. DataSourcePlan{
  2032. baseLogicalPlan: baseLogicalPlan{},
  2033. name: "src1",
  2034. streamFields: map[string]*ast.JsonStreamField{
  2035. "temp": {
  2036. Type: "bigint",
  2037. },
  2038. "myarray": {
  2039. Type: "array",
  2040. Items: &ast.JsonStreamField{
  2041. Type: "string",
  2042. },
  2043. },
  2044. },
  2045. streamStmt: streams["src1"],
  2046. metaFields: []string{},
  2047. isWildCard: false,
  2048. pruneFields: []string{"id1", "name"},
  2049. }.Init(),
  2050. },
  2051. },
  2052. condition: nil,
  2053. wtype: ast.TUMBLING_WINDOW,
  2054. length: 10,
  2055. timeUnit: ast.SS,
  2056. interval: 0,
  2057. limit: 0,
  2058. }.Init(),
  2059. },
  2060. },
  2061. fields: []ast.Field{
  2062. {
  2063. Name: "collect",
  2064. Expr: &ast.Call{
  2065. Name: "collect",
  2066. FuncType: ast.FuncTypeAgg,
  2067. Args: []ast.Expr{
  2068. &ast.Wildcard{
  2069. Token: ast.ASTERISK,
  2070. Except: []string{"id1", "name"},
  2071. },
  2072. },
  2073. },
  2074. },
  2075. },
  2076. isAggregate: true,
  2077. allWildcard: false,
  2078. sendMeta: false,
  2079. }.Init(),
  2080. },
  2081. { // 22
  2082. sql: `SELECT collect( * REPLACE(temp * 2 AS id1, myarray * 2 AS name)) FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2083. p: ProjectPlan{
  2084. baseLogicalPlan: baseLogicalPlan{
  2085. children: []LogicalPlan{
  2086. WindowPlan{
  2087. baseLogicalPlan: baseLogicalPlan{
  2088. children: []LogicalPlan{
  2089. DataSourcePlan{
  2090. baseLogicalPlan: baseLogicalPlan{},
  2091. name: "src1",
  2092. streamFields: map[string]*ast.JsonStreamField{
  2093. "temp": {
  2094. Type: "bigint",
  2095. },
  2096. "myarray": {
  2097. Type: "array",
  2098. Items: &ast.JsonStreamField{
  2099. Type: "string",
  2100. },
  2101. },
  2102. },
  2103. streamStmt: streams["src1"],
  2104. metaFields: []string{},
  2105. isWildCard: false,
  2106. pruneFields: []string{"id1", "name"},
  2107. }.Init(),
  2108. },
  2109. },
  2110. condition: nil,
  2111. wtype: ast.TUMBLING_WINDOW,
  2112. length: 10,
  2113. timeUnit: ast.SS,
  2114. interval: 0,
  2115. limit: 0,
  2116. }.Init(),
  2117. },
  2118. },
  2119. fields: []ast.Field{
  2120. {
  2121. Name: "collect",
  2122. Expr: &ast.Call{
  2123. Name: "collect",
  2124. FuncType: ast.FuncTypeAgg,
  2125. Args: []ast.Expr{
  2126. &ast.Wildcard{
  2127. Token: ast.ASTERISK,
  2128. Replace: []ast.Field{
  2129. {
  2130. AName: "id1",
  2131. Expr: &ast.BinaryExpr{
  2132. OP: ast.MUL,
  2133. LHS: &ast.FieldRef{
  2134. Name: "temp",
  2135. StreamName: "src1",
  2136. },
  2137. RHS: &ast.IntegerLiteral{Val: 2},
  2138. },
  2139. },
  2140. {
  2141. AName: "name",
  2142. Expr: &ast.BinaryExpr{
  2143. OP: ast.MUL,
  2144. LHS: &ast.FieldRef{
  2145. Name: "myarray",
  2146. StreamName: "src1",
  2147. },
  2148. RHS: &ast.IntegerLiteral{Val: 2},
  2149. },
  2150. },
  2151. },
  2152. },
  2153. },
  2154. },
  2155. },
  2156. },
  2157. isAggregate: true,
  2158. allWildcard: false,
  2159. sendMeta: false,
  2160. }.Init(),
  2161. },
  2162. { // 23
  2163. sql: `SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) HAVING count(* EXCEPT(id1, name)) > 0`,
  2164. p: ProjectPlan{
  2165. baseLogicalPlan: baseLogicalPlan{
  2166. children: []LogicalPlan{
  2167. HavingPlan{
  2168. baseLogicalPlan: baseLogicalPlan{
  2169. children: []LogicalPlan{
  2170. WindowPlan{
  2171. baseLogicalPlan: baseLogicalPlan{
  2172. children: []LogicalPlan{
  2173. DataSourcePlan{
  2174. baseLogicalPlan: baseLogicalPlan{},
  2175. name: "src1",
  2176. streamFields: map[string]*ast.JsonStreamField{
  2177. "id1": {
  2178. Type: "bigint",
  2179. },
  2180. "temp": {
  2181. Type: "bigint",
  2182. },
  2183. "myarray": {
  2184. Type: "array",
  2185. Items: &ast.JsonStreamField{
  2186. Type: "string",
  2187. },
  2188. },
  2189. },
  2190. streamStmt: streams["src1"],
  2191. metaFields: []string{},
  2192. isWildCard: false,
  2193. pruneFields: []string{"id1", "name"},
  2194. }.Init(),
  2195. },
  2196. },
  2197. condition: nil,
  2198. wtype: ast.TUMBLING_WINDOW,
  2199. length: 10,
  2200. timeUnit: ast.SS,
  2201. interval: 0,
  2202. limit: 0,
  2203. }.Init(),
  2204. },
  2205. },
  2206. condition: &ast.BinaryExpr{
  2207. LHS: &ast.Call{
  2208. Name: "count",
  2209. FuncId: 0,
  2210. Args: []ast.Expr{
  2211. &ast.Wildcard{
  2212. Token: ast.ASTERISK,
  2213. Except: []string{"id1", "name"},
  2214. },
  2215. },
  2216. FuncType: ast.FuncTypeAgg,
  2217. },
  2218. OP: ast.GT,
  2219. RHS: &ast.IntegerLiteral{Val: 0},
  2220. },
  2221. }.Init(),
  2222. },
  2223. },
  2224. fields: []ast.Field{
  2225. {
  2226. Name: "id1",
  2227. Expr: &ast.FieldRef{
  2228. Name: "id1",
  2229. StreamName: "src1",
  2230. },
  2231. },
  2232. },
  2233. isAggregate: false,
  2234. allWildcard: false,
  2235. sendMeta: false,
  2236. }.Init(),
  2237. },
  2238. { // 24
  2239. sql: `SELECT temp FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) HAVING count(* REPLACE(temp * 2 AS id1, myarray * 2 AS name)) > 0`,
  2240. p: ProjectPlan{
  2241. baseLogicalPlan: baseLogicalPlan{
  2242. children: []LogicalPlan{
  2243. HavingPlan{
  2244. baseLogicalPlan: baseLogicalPlan{
  2245. children: []LogicalPlan{
  2246. WindowPlan{
  2247. baseLogicalPlan: baseLogicalPlan{
  2248. children: []LogicalPlan{
  2249. DataSourcePlan{
  2250. baseLogicalPlan: baseLogicalPlan{},
  2251. name: "src1",
  2252. streamFields: map[string]*ast.JsonStreamField{
  2253. "temp": {
  2254. Type: "bigint",
  2255. },
  2256. "myarray": {
  2257. Type: "array",
  2258. Items: &ast.JsonStreamField{
  2259. Type: "string",
  2260. },
  2261. },
  2262. },
  2263. streamStmt: streams["src1"],
  2264. metaFields: []string{},
  2265. isWildCard: false,
  2266. pruneFields: []string{"id1", "name"},
  2267. }.Init(),
  2268. },
  2269. },
  2270. condition: nil,
  2271. wtype: ast.TUMBLING_WINDOW,
  2272. length: 10,
  2273. timeUnit: ast.SS,
  2274. interval: 0,
  2275. limit: 0,
  2276. }.Init(),
  2277. },
  2278. },
  2279. condition: &ast.BinaryExpr{
  2280. LHS: &ast.Call{
  2281. Name: "count",
  2282. FuncId: 0,
  2283. Args: []ast.Expr{
  2284. &ast.Wildcard{
  2285. Token: ast.ASTERISK,
  2286. Replace: []ast.Field{
  2287. {
  2288. AName: "id1",
  2289. Expr: &ast.BinaryExpr{
  2290. OP: ast.MUL,
  2291. LHS: &ast.FieldRef{
  2292. Name: "temp",
  2293. StreamName: "src1",
  2294. },
  2295. RHS: &ast.IntegerLiteral{Val: 2},
  2296. },
  2297. },
  2298. {
  2299. AName: "name",
  2300. Expr: &ast.BinaryExpr{
  2301. OP: ast.MUL,
  2302. LHS: &ast.FieldRef{
  2303. Name: "myarray",
  2304. StreamName: "src1",
  2305. },
  2306. RHS: &ast.IntegerLiteral{Val: 2},
  2307. },
  2308. },
  2309. },
  2310. },
  2311. },
  2312. FuncType: ast.FuncTypeAgg,
  2313. },
  2314. OP: ast.GT,
  2315. RHS: &ast.IntegerLiteral{Val: 0},
  2316. },
  2317. }.Init(),
  2318. },
  2319. },
  2320. fields: []ast.Field{
  2321. {
  2322. Name: "temp",
  2323. Expr: &ast.FieldRef{
  2324. Name: "temp",
  2325. StreamName: "src1",
  2326. },
  2327. },
  2328. },
  2329. isAggregate: false,
  2330. allWildcard: false,
  2331. sendMeta: false,
  2332. }.Init(),
  2333. },
  2334. }
  2335. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2336. for i, tt := range tests {
  2337. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2338. if err != nil {
  2339. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2340. continue
  2341. }
  2342. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2343. IsEventTime: false,
  2344. LateTol: 0,
  2345. Concurrency: 0,
  2346. BufferLength: 0,
  2347. SendMetaToSink: false,
  2348. Qos: 0,
  2349. CheckpointInterval: 0,
  2350. SendError: true,
  2351. }, kv)
  2352. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2353. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2354. } else if !reflect.DeepEqual(tt.p, p) {
  2355. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2356. }
  2357. }
  2358. }
  2359. func Test_createLogicalPlanSchemaless(t *testing.T) {
  2360. kv, err := store.GetKV("stream")
  2361. if err != nil {
  2362. t.Error(err)
  2363. return
  2364. }
  2365. streamSqls := map[string]string{
  2366. "src1": `CREATE STREAM src1 (
  2367. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  2368. "src2": `CREATE STREAM src2 (
  2369. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  2370. "tableInPlanner": `CREATE TABLE tableInPlanner (
  2371. id BIGINT,
  2372. name STRING,
  2373. value STRING,
  2374. hum BIGINT
  2375. ) WITH (TYPE="file");`,
  2376. }
  2377. types := map[string]ast.StreamType{
  2378. "src1": ast.TypeStream,
  2379. "src2": ast.TypeStream,
  2380. "tableInPlanner": ast.TypeTable,
  2381. }
  2382. for name, sql := range streamSqls {
  2383. s, err := json.Marshal(&xsql.StreamInfo{
  2384. StreamType: types[name],
  2385. Statement: sql,
  2386. })
  2387. if err != nil {
  2388. t.Error(err)
  2389. t.Fail()
  2390. }
  2391. err = kv.Set(name, string(s))
  2392. if err != nil {
  2393. t.Error(err)
  2394. t.Fail()
  2395. }
  2396. }
  2397. streams := make(map[string]*ast.StreamStmt)
  2398. for n := range streamSqls {
  2399. streamStmt, err := xsql.GetDataSource(kv, n)
  2400. if err != nil {
  2401. t.Errorf("fail to get stream %s, please check if stream is created", n)
  2402. return
  2403. }
  2404. streams[n] = streamStmt
  2405. }
  2406. // boolTrue = true
  2407. boolFalse := false
  2408. tests := []struct {
  2409. sql string
  2410. p LogicalPlan
  2411. err string
  2412. }{
  2413. { // 0
  2414. sql: `SELECT name FROM src1`,
  2415. p: ProjectPlan{
  2416. baseLogicalPlan: baseLogicalPlan{
  2417. children: []LogicalPlan{
  2418. DataSourcePlan{
  2419. baseLogicalPlan: baseLogicalPlan{},
  2420. name: "src1",
  2421. streamFields: map[string]*ast.JsonStreamField{
  2422. "name": nil,
  2423. },
  2424. streamStmt: streams["src1"],
  2425. isSchemaless: true,
  2426. metaFields: []string{},
  2427. pruneFields: []string{},
  2428. }.Init(),
  2429. },
  2430. },
  2431. fields: []ast.Field{
  2432. {
  2433. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  2434. Name: "name",
  2435. AName: "",
  2436. },
  2437. },
  2438. isAggregate: false,
  2439. sendMeta: false,
  2440. }.Init(),
  2441. }, { // 1 optimize where to data source
  2442. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2443. p: ProjectPlan{
  2444. baseLogicalPlan: baseLogicalPlan{
  2445. children: []LogicalPlan{
  2446. WindowPlan{
  2447. baseLogicalPlan: baseLogicalPlan{
  2448. children: []LogicalPlan{
  2449. FilterPlan{
  2450. baseLogicalPlan: baseLogicalPlan{
  2451. children: []LogicalPlan{
  2452. DataSourcePlan{
  2453. name: "src1",
  2454. streamFields: map[string]*ast.JsonStreamField{
  2455. "name": nil,
  2456. "temp": nil,
  2457. },
  2458. streamStmt: streams["src1"],
  2459. metaFields: []string{},
  2460. isSchemaless: true,
  2461. pruneFields: []string{},
  2462. }.Init(),
  2463. },
  2464. },
  2465. condition: &ast.BinaryExpr{
  2466. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  2467. OP: ast.EQ,
  2468. RHS: &ast.StringLiteral{Val: "v1"},
  2469. },
  2470. }.Init(),
  2471. },
  2472. },
  2473. condition: nil,
  2474. wtype: ast.TUMBLING_WINDOW,
  2475. length: 10,
  2476. timeUnit: ast.SS,
  2477. interval: 0,
  2478. limit: 0,
  2479. }.Init(),
  2480. },
  2481. },
  2482. fields: []ast.Field{
  2483. {
  2484. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2485. Name: "temp",
  2486. AName: "",
  2487. },
  2488. },
  2489. isAggregate: false,
  2490. sendMeta: false,
  2491. }.Init(),
  2492. }, { // 2 condition that cannot be optimized
  2493. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2494. p: ProjectPlan{
  2495. baseLogicalPlan: baseLogicalPlan{
  2496. children: []LogicalPlan{
  2497. JoinPlan{
  2498. baseLogicalPlan: baseLogicalPlan{
  2499. children: []LogicalPlan{
  2500. WindowPlan{
  2501. baseLogicalPlan: baseLogicalPlan{
  2502. children: []LogicalPlan{
  2503. DataSourcePlan{
  2504. name: "src1",
  2505. streamFields: map[string]*ast.JsonStreamField{
  2506. "id1": nil,
  2507. "temp": nil,
  2508. },
  2509. streamStmt: streams["src1"],
  2510. metaFields: []string{},
  2511. isSchemaless: true,
  2512. pruneFields: []string{},
  2513. }.Init(),
  2514. DataSourcePlan{
  2515. name: "src2",
  2516. streamFields: map[string]*ast.JsonStreamField{ // can't determine where is id1 belonged to
  2517. "hum": nil,
  2518. "id1": nil,
  2519. "id2": nil,
  2520. },
  2521. isSchemaless: true,
  2522. streamStmt: streams["src2"],
  2523. metaFields: []string{},
  2524. pruneFields: []string{},
  2525. }.Init(),
  2526. },
  2527. },
  2528. condition: nil,
  2529. wtype: ast.TUMBLING_WINDOW,
  2530. length: 10,
  2531. timeUnit: ast.SS,
  2532. interval: 0,
  2533. limit: 0,
  2534. }.Init(),
  2535. },
  2536. },
  2537. from: &ast.Table{Name: "src1"},
  2538. joins: ast.Joins{ast.Join{
  2539. Name: "src2",
  2540. JoinType: ast.INNER_JOIN,
  2541. Expr: &ast.BinaryExpr{
  2542. OP: ast.AND,
  2543. LHS: &ast.BinaryExpr{
  2544. LHS: &ast.BinaryExpr{
  2545. OP: ast.GT,
  2546. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2547. RHS: &ast.IntegerLiteral{Val: 20},
  2548. },
  2549. OP: ast.OR,
  2550. RHS: &ast.BinaryExpr{
  2551. OP: ast.GT,
  2552. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2553. RHS: &ast.IntegerLiteral{Val: 60},
  2554. },
  2555. },
  2556. RHS: &ast.BinaryExpr{
  2557. OP: ast.EQ,
  2558. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2559. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2560. },
  2561. },
  2562. }},
  2563. }.Init(),
  2564. },
  2565. },
  2566. fields: []ast.Field{
  2567. {
  2568. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2569. Name: "id1",
  2570. AName: "",
  2571. },
  2572. },
  2573. isAggregate: false,
  2574. sendMeta: false,
  2575. }.Init(),
  2576. }, { // 3 optimize window filter
  2577. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  2578. p: ProjectPlan{
  2579. baseLogicalPlan: baseLogicalPlan{
  2580. children: []LogicalPlan{
  2581. WindowPlan{
  2582. baseLogicalPlan: baseLogicalPlan{
  2583. children: []LogicalPlan{
  2584. FilterPlan{
  2585. baseLogicalPlan: baseLogicalPlan{
  2586. children: []LogicalPlan{
  2587. DataSourcePlan{
  2588. name: "src1",
  2589. streamFields: map[string]*ast.JsonStreamField{
  2590. "id1": nil,
  2591. "name": nil,
  2592. "temp": nil,
  2593. },
  2594. isSchemaless: true,
  2595. streamStmt: streams["src1"],
  2596. metaFields: []string{},
  2597. pruneFields: []string{},
  2598. }.Init(),
  2599. },
  2600. },
  2601. condition: &ast.BinaryExpr{
  2602. OP: ast.AND,
  2603. LHS: &ast.BinaryExpr{
  2604. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  2605. OP: ast.EQ,
  2606. RHS: &ast.StringLiteral{Val: "v1"},
  2607. },
  2608. RHS: &ast.BinaryExpr{
  2609. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2610. OP: ast.GT,
  2611. RHS: &ast.IntegerLiteral{Val: 2},
  2612. },
  2613. },
  2614. }.Init(),
  2615. },
  2616. },
  2617. condition: nil,
  2618. wtype: ast.TUMBLING_WINDOW,
  2619. length: 10,
  2620. timeUnit: ast.SS,
  2621. interval: 0,
  2622. limit: 0,
  2623. }.Init(),
  2624. },
  2625. },
  2626. fields: []ast.Field{
  2627. {
  2628. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2629. Name: "id1",
  2630. AName: "",
  2631. },
  2632. },
  2633. isAggregate: false,
  2634. sendMeta: false,
  2635. }.Init(),
  2636. }, { // 4. do not optimize count window
  2637. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  2638. p: ProjectPlan{
  2639. baseLogicalPlan: baseLogicalPlan{
  2640. children: []LogicalPlan{
  2641. HavingPlan{
  2642. baseLogicalPlan: baseLogicalPlan{
  2643. children: []LogicalPlan{
  2644. FilterPlan{
  2645. baseLogicalPlan: baseLogicalPlan{
  2646. children: []LogicalPlan{
  2647. WindowPlan{
  2648. baseLogicalPlan: baseLogicalPlan{
  2649. children: []LogicalPlan{
  2650. DataSourcePlan{
  2651. name: "src1",
  2652. isWildCard: true,
  2653. streamFields: map[string]*ast.JsonStreamField{},
  2654. streamStmt: streams["src1"],
  2655. metaFields: []string{},
  2656. isSchemaless: true,
  2657. pruneFields: []string{},
  2658. }.Init(),
  2659. },
  2660. },
  2661. condition: nil,
  2662. wtype: ast.COUNT_WINDOW,
  2663. length: 5,
  2664. interval: 1,
  2665. limit: 0,
  2666. }.Init(),
  2667. },
  2668. },
  2669. condition: &ast.BinaryExpr{
  2670. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2671. OP: ast.GT,
  2672. RHS: &ast.IntegerLiteral{Val: 20},
  2673. },
  2674. }.Init(),
  2675. },
  2676. },
  2677. condition: &ast.BinaryExpr{
  2678. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  2679. Token: ast.ASTERISK,
  2680. }}, FuncType: ast.FuncTypeAgg},
  2681. OP: ast.GT,
  2682. RHS: &ast.IntegerLiteral{Val: 2},
  2683. },
  2684. }.Init(),
  2685. },
  2686. },
  2687. fields: []ast.Field{
  2688. {
  2689. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  2690. Name: "*",
  2691. AName: "",
  2692. },
  2693. },
  2694. isAggregate: false,
  2695. sendMeta: false,
  2696. }.Init(),
  2697. }, { // 5. optimize join on
  2698. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2699. p: ProjectPlan{
  2700. baseLogicalPlan: baseLogicalPlan{
  2701. children: []LogicalPlan{
  2702. JoinPlan{
  2703. baseLogicalPlan: baseLogicalPlan{
  2704. children: []LogicalPlan{
  2705. WindowPlan{
  2706. baseLogicalPlan: baseLogicalPlan{
  2707. children: []LogicalPlan{
  2708. FilterPlan{
  2709. baseLogicalPlan: baseLogicalPlan{
  2710. children: []LogicalPlan{
  2711. DataSourcePlan{
  2712. name: "src1",
  2713. streamFields: map[string]*ast.JsonStreamField{
  2714. "id1": nil,
  2715. "temp": nil,
  2716. },
  2717. isSchemaless: true,
  2718. streamStmt: streams["src1"],
  2719. metaFields: []string{},
  2720. pruneFields: []string{},
  2721. }.Init(),
  2722. },
  2723. },
  2724. condition: &ast.BinaryExpr{
  2725. RHS: &ast.BinaryExpr{
  2726. OP: ast.GT,
  2727. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2728. RHS: &ast.IntegerLiteral{Val: 20},
  2729. },
  2730. OP: ast.AND,
  2731. LHS: &ast.BinaryExpr{
  2732. OP: ast.GT,
  2733. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2734. RHS: &ast.IntegerLiteral{Val: 111},
  2735. },
  2736. },
  2737. }.Init(),
  2738. FilterPlan{
  2739. baseLogicalPlan: baseLogicalPlan{
  2740. children: []LogicalPlan{
  2741. DataSourcePlan{
  2742. name: "src2",
  2743. streamFields: map[string]*ast.JsonStreamField{
  2744. "hum": nil,
  2745. "id1": nil,
  2746. "id2": nil,
  2747. },
  2748. isSchemaless: true,
  2749. streamStmt: streams["src2"],
  2750. metaFields: []string{},
  2751. pruneFields: []string{},
  2752. }.Init(),
  2753. },
  2754. },
  2755. condition: &ast.BinaryExpr{
  2756. OP: ast.LT,
  2757. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2758. RHS: &ast.IntegerLiteral{Val: 60},
  2759. },
  2760. }.Init(),
  2761. },
  2762. },
  2763. condition: nil,
  2764. wtype: ast.TUMBLING_WINDOW,
  2765. length: 10,
  2766. timeUnit: ast.SS,
  2767. interval: 0,
  2768. limit: 0,
  2769. }.Init(),
  2770. },
  2771. },
  2772. from: &ast.Table{
  2773. Name: "src1",
  2774. },
  2775. joins: []ast.Join{
  2776. {
  2777. Name: "src2",
  2778. Alias: "",
  2779. JoinType: ast.INNER_JOIN,
  2780. Expr: &ast.BinaryExpr{
  2781. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2782. OP: ast.EQ,
  2783. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2784. },
  2785. },
  2786. },
  2787. }.Init(),
  2788. },
  2789. },
  2790. fields: []ast.Field{
  2791. {
  2792. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2793. Name: "id1",
  2794. AName: "",
  2795. },
  2796. },
  2797. isAggregate: false,
  2798. sendMeta: false,
  2799. }.Init(),
  2800. }, { // 6. optimize outter join on
  2801. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2802. p: ProjectPlan{
  2803. baseLogicalPlan: baseLogicalPlan{
  2804. children: []LogicalPlan{
  2805. JoinPlan{
  2806. baseLogicalPlan: baseLogicalPlan{
  2807. children: []LogicalPlan{
  2808. WindowPlan{
  2809. baseLogicalPlan: baseLogicalPlan{
  2810. children: []LogicalPlan{
  2811. FilterPlan{
  2812. baseLogicalPlan: baseLogicalPlan{
  2813. children: []LogicalPlan{
  2814. DataSourcePlan{
  2815. name: "src1",
  2816. streamFields: map[string]*ast.JsonStreamField{
  2817. "id1": nil,
  2818. "temp": nil,
  2819. },
  2820. isSchemaless: true,
  2821. streamStmt: streams["src1"],
  2822. metaFields: []string{},
  2823. pruneFields: []string{},
  2824. }.Init(),
  2825. },
  2826. },
  2827. condition: &ast.BinaryExpr{
  2828. OP: ast.GT,
  2829. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2830. RHS: &ast.IntegerLiteral{Val: 111},
  2831. },
  2832. }.Init(),
  2833. DataSourcePlan{
  2834. name: "src2",
  2835. streamFields: map[string]*ast.JsonStreamField{
  2836. "hum": nil,
  2837. "id1": nil,
  2838. "id2": nil,
  2839. },
  2840. isSchemaless: true,
  2841. streamStmt: streams["src2"],
  2842. metaFields: []string{},
  2843. pruneFields: []string{},
  2844. }.Init(),
  2845. },
  2846. },
  2847. condition: nil,
  2848. wtype: ast.TUMBLING_WINDOW,
  2849. length: 10,
  2850. timeUnit: ast.SS,
  2851. interval: 0,
  2852. limit: 0,
  2853. }.Init(),
  2854. },
  2855. },
  2856. from: &ast.Table{
  2857. Name: "src1",
  2858. },
  2859. joins: []ast.Join{
  2860. {
  2861. Name: "src2",
  2862. Alias: "",
  2863. JoinType: ast.FULL_JOIN,
  2864. Expr: &ast.BinaryExpr{
  2865. OP: ast.AND,
  2866. LHS: &ast.BinaryExpr{
  2867. OP: ast.AND,
  2868. LHS: &ast.BinaryExpr{
  2869. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2870. OP: ast.EQ,
  2871. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2872. },
  2873. RHS: &ast.BinaryExpr{
  2874. OP: ast.GT,
  2875. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2876. RHS: &ast.IntegerLiteral{Val: 20},
  2877. },
  2878. },
  2879. RHS: &ast.BinaryExpr{
  2880. OP: ast.LT,
  2881. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2882. RHS: &ast.IntegerLiteral{Val: 60},
  2883. },
  2884. },
  2885. },
  2886. },
  2887. }.Init(),
  2888. },
  2889. },
  2890. fields: []ast.Field{
  2891. {
  2892. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2893. Name: "id1",
  2894. AName: "",
  2895. },
  2896. },
  2897. isAggregate: false,
  2898. sendMeta: false,
  2899. }.Init(),
  2900. }, { // 7 window error for table
  2901. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2902. p: nil,
  2903. err: "cannot run window for TABLE sources",
  2904. }, { // 8 join table without window
  2905. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  2906. p: ProjectPlan{
  2907. baseLogicalPlan: baseLogicalPlan{
  2908. children: []LogicalPlan{
  2909. JoinPlan{
  2910. baseLogicalPlan: baseLogicalPlan{
  2911. children: []LogicalPlan{
  2912. JoinAlignPlan{
  2913. baseLogicalPlan: baseLogicalPlan{
  2914. children: []LogicalPlan{
  2915. FilterPlan{
  2916. baseLogicalPlan: baseLogicalPlan{
  2917. children: []LogicalPlan{
  2918. DataSourcePlan{
  2919. name: "src1",
  2920. streamFields: map[string]*ast.JsonStreamField{
  2921. "hum": nil,
  2922. "id1": nil,
  2923. "temp": nil,
  2924. },
  2925. isSchemaless: true,
  2926. streamStmt: streams["src1"],
  2927. metaFields: []string{},
  2928. pruneFields: []string{},
  2929. }.Init(),
  2930. },
  2931. },
  2932. condition: &ast.BinaryExpr{
  2933. RHS: &ast.BinaryExpr{
  2934. OP: ast.GT,
  2935. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2936. RHS: &ast.IntegerLiteral{Val: 20},
  2937. },
  2938. OP: ast.AND,
  2939. LHS: &ast.BinaryExpr{
  2940. OP: ast.GT,
  2941. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2942. RHS: &ast.IntegerLiteral{Val: 111},
  2943. },
  2944. },
  2945. }.Init(),
  2946. DataSourcePlan{
  2947. name: "tableInPlanner",
  2948. streamFields: map[string]*ast.JsonStreamField{
  2949. "hum": {
  2950. Type: "bigint",
  2951. },
  2952. "id": {
  2953. Type: "bigint",
  2954. },
  2955. },
  2956. streamStmt: streams["tableInPlanner"],
  2957. metaFields: []string{},
  2958. pruneFields: []string{},
  2959. }.Init(),
  2960. },
  2961. },
  2962. Emitters: []string{"tableInPlanner"},
  2963. }.Init(),
  2964. },
  2965. },
  2966. from: &ast.Table{
  2967. Name: "src1",
  2968. },
  2969. joins: []ast.Join{
  2970. {
  2971. Name: "tableInPlanner",
  2972. Alias: "",
  2973. JoinType: ast.INNER_JOIN,
  2974. Expr: &ast.BinaryExpr{
  2975. OP: ast.AND,
  2976. LHS: &ast.BinaryExpr{
  2977. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2978. OP: ast.EQ,
  2979. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  2980. },
  2981. RHS: &ast.BinaryExpr{
  2982. OP: ast.LT,
  2983. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  2984. RHS: &ast.IntegerLiteral{Val: 60},
  2985. },
  2986. },
  2987. },
  2988. },
  2989. }.Init(),
  2990. },
  2991. },
  2992. fields: []ast.Field{
  2993. {
  2994. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2995. Name: "id1",
  2996. AName: "",
  2997. },
  2998. },
  2999. isAggregate: false,
  3000. sendMeta: false,
  3001. }.Init(),
  3002. }, { // 9 join table with window
  3003. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  3004. p: ProjectPlan{
  3005. baseLogicalPlan: baseLogicalPlan{
  3006. children: []LogicalPlan{
  3007. JoinPlan{
  3008. baseLogicalPlan: baseLogicalPlan{
  3009. children: []LogicalPlan{
  3010. JoinAlignPlan{
  3011. baseLogicalPlan: baseLogicalPlan{
  3012. children: []LogicalPlan{
  3013. WindowPlan{
  3014. baseLogicalPlan: baseLogicalPlan{
  3015. children: []LogicalPlan{
  3016. DataSourcePlan{
  3017. name: "src1",
  3018. streamFields: map[string]*ast.JsonStreamField{
  3019. "id1": nil,
  3020. "temp": nil,
  3021. },
  3022. isSchemaless: true,
  3023. streamStmt: streams["src1"],
  3024. metaFields: []string{},
  3025. pruneFields: []string{},
  3026. }.Init(),
  3027. },
  3028. },
  3029. condition: nil,
  3030. wtype: ast.TUMBLING_WINDOW,
  3031. length: 10,
  3032. timeUnit: ast.SS,
  3033. interval: 0,
  3034. limit: 0,
  3035. }.Init(),
  3036. DataSourcePlan{
  3037. name: "tableInPlanner",
  3038. streamFields: map[string]*ast.JsonStreamField{
  3039. "hum": {
  3040. Type: "bigint",
  3041. },
  3042. "id": {
  3043. Type: "bigint",
  3044. },
  3045. },
  3046. streamStmt: streams["tableInPlanner"],
  3047. metaFields: []string{},
  3048. pruneFields: []string{},
  3049. }.Init(),
  3050. },
  3051. },
  3052. Emitters: []string{"tableInPlanner"},
  3053. }.Init(),
  3054. },
  3055. },
  3056. from: &ast.Table{
  3057. Name: "src1",
  3058. },
  3059. joins: []ast.Join{
  3060. {
  3061. Name: "tableInPlanner",
  3062. Alias: "",
  3063. JoinType: ast.INNER_JOIN,
  3064. Expr: &ast.BinaryExpr{
  3065. OP: ast.AND,
  3066. LHS: &ast.BinaryExpr{
  3067. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  3068. OP: ast.EQ,
  3069. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  3070. },
  3071. RHS: &ast.BinaryExpr{
  3072. RHS: &ast.BinaryExpr{
  3073. OP: ast.AND,
  3074. LHS: &ast.BinaryExpr{
  3075. OP: ast.GT,
  3076. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  3077. RHS: &ast.IntegerLiteral{Val: 20},
  3078. },
  3079. RHS: &ast.BinaryExpr{
  3080. OP: ast.LT,
  3081. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  3082. RHS: &ast.IntegerLiteral{Val: 60},
  3083. },
  3084. },
  3085. OP: ast.AND,
  3086. LHS: &ast.BinaryExpr{
  3087. OP: ast.GT,
  3088. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  3089. RHS: &ast.IntegerLiteral{Val: 111},
  3090. },
  3091. },
  3092. },
  3093. },
  3094. },
  3095. }.Init(),
  3096. },
  3097. },
  3098. fields: []ast.Field{
  3099. {
  3100. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  3101. Name: "id1",
  3102. AName: "",
  3103. },
  3104. },
  3105. isAggregate: false,
  3106. sendMeta: false,
  3107. }.Init(),
  3108. }, { // 10 meta
  3109. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  3110. p: ProjectPlan{
  3111. baseLogicalPlan: baseLogicalPlan{
  3112. children: []LogicalPlan{
  3113. FilterPlan{
  3114. baseLogicalPlan: baseLogicalPlan{
  3115. children: []LogicalPlan{
  3116. DataSourcePlan{
  3117. name: "src1",
  3118. streamFields: map[string]*ast.JsonStreamField{
  3119. "temp": nil,
  3120. },
  3121. isSchemaless: true,
  3122. streamStmt: streams["src1"],
  3123. metaFields: []string{"Humidity", "device", "id"},
  3124. pruneFields: []string{},
  3125. }.Init(),
  3126. },
  3127. },
  3128. condition: &ast.BinaryExpr{
  3129. LHS: &ast.Call{
  3130. Name: "meta",
  3131. FuncId: 2,
  3132. Args: []ast.Expr{&ast.MetaRef{
  3133. Name: "device",
  3134. StreamName: ast.DefaultStream,
  3135. }},
  3136. },
  3137. OP: ast.EQ,
  3138. RHS: &ast.StringLiteral{
  3139. Val: "demo2",
  3140. },
  3141. },
  3142. }.Init(),
  3143. },
  3144. },
  3145. fields: []ast.Field{
  3146. {
  3147. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  3148. Name: "temp",
  3149. AName: "",
  3150. }, {
  3151. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3152. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  3153. Name: "id",
  3154. StreamName: ast.DefaultStream,
  3155. }}},
  3156. []ast.StreamName{},
  3157. nil,
  3158. )},
  3159. Name: "meta",
  3160. AName: "eid",
  3161. }, {
  3162. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3163. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  3164. &ast.BinaryExpr{
  3165. OP: ast.ARROW,
  3166. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  3167. RHS: &ast.JsonFieldRef{Name: "Device"},
  3168. },
  3169. }},
  3170. []ast.StreamName{},
  3171. nil,
  3172. )},
  3173. Name: "meta",
  3174. AName: "hdevice",
  3175. },
  3176. },
  3177. isAggregate: false,
  3178. sendMeta: false,
  3179. }.Init(),
  3180. }, { // 11 join with same name field and aliased
  3181. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  3182. p: ProjectPlan{
  3183. baseLogicalPlan: baseLogicalPlan{
  3184. children: []LogicalPlan{
  3185. JoinPlan{
  3186. baseLogicalPlan: baseLogicalPlan{
  3187. children: []LogicalPlan{
  3188. JoinAlignPlan{
  3189. baseLogicalPlan: baseLogicalPlan{
  3190. children: []LogicalPlan{
  3191. DataSourcePlan{
  3192. name: "src2",
  3193. streamFields: map[string]*ast.JsonStreamField{
  3194. "hum": nil,
  3195. "id": nil,
  3196. "id2": nil,
  3197. },
  3198. isSchemaless: true,
  3199. streamStmt: streams["src2"],
  3200. metaFields: []string{},
  3201. pruneFields: []string{},
  3202. }.Init(),
  3203. DataSourcePlan{
  3204. name: "tableInPlanner",
  3205. streamFields: map[string]*ast.JsonStreamField{
  3206. "hum": {
  3207. Type: "bigint",
  3208. },
  3209. "id": {
  3210. Type: "bigint",
  3211. },
  3212. },
  3213. streamStmt: streams["tableInPlanner"],
  3214. metaFields: []string{},
  3215. pruneFields: []string{},
  3216. }.Init(),
  3217. },
  3218. },
  3219. Emitters: []string{"tableInPlanner"},
  3220. }.Init(),
  3221. },
  3222. },
  3223. from: &ast.Table{
  3224. Name: "src2",
  3225. },
  3226. joins: []ast.Join{
  3227. {
  3228. Name: "tableInPlanner",
  3229. Alias: "",
  3230. JoinType: ast.INNER_JOIN,
  3231. Expr: &ast.BinaryExpr{
  3232. RHS: &ast.BinaryExpr{
  3233. OP: ast.EQ,
  3234. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  3235. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  3236. },
  3237. OP: ast.AND,
  3238. LHS: &ast.BinaryExpr{
  3239. OP: ast.GT,
  3240. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3241. &ast.FieldRef{
  3242. Name: "hum",
  3243. StreamName: "src2",
  3244. },
  3245. []ast.StreamName{"src2"},
  3246. &boolFalse,
  3247. )},
  3248. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3249. &ast.FieldRef{
  3250. Name: "hum",
  3251. StreamName: "tableInPlanner",
  3252. },
  3253. []ast.StreamName{"tableInPlanner"},
  3254. &boolFalse,
  3255. )},
  3256. },
  3257. },
  3258. },
  3259. },
  3260. }.Init(),
  3261. },
  3262. },
  3263. fields: []ast.Field{
  3264. {
  3265. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3266. &ast.FieldRef{
  3267. Name: "hum",
  3268. StreamName: "src2",
  3269. },
  3270. []ast.StreamName{"src2"},
  3271. &boolFalse,
  3272. )},
  3273. Name: "hum",
  3274. AName: "hum1",
  3275. }, {
  3276. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3277. &ast.FieldRef{
  3278. Name: "hum",
  3279. StreamName: "tableInPlanner",
  3280. },
  3281. []ast.StreamName{"tableInPlanner"},
  3282. &boolFalse,
  3283. )},
  3284. Name: "hum",
  3285. AName: "hum2",
  3286. },
  3287. },
  3288. isAggregate: false,
  3289. sendMeta: false,
  3290. }.Init(),
  3291. }, { // 12
  3292. sql: `SELECT name->first, name->last FROM src1`,
  3293. p: ProjectPlan{
  3294. baseLogicalPlan: baseLogicalPlan{
  3295. children: []LogicalPlan{
  3296. DataSourcePlan{
  3297. baseLogicalPlan: baseLogicalPlan{},
  3298. name: "src1",
  3299. streamFields: map[string]*ast.JsonStreamField{
  3300. "name": nil,
  3301. },
  3302. isSchemaless: true,
  3303. streamStmt: streams["src1"],
  3304. metaFields: []string{},
  3305. pruneFields: []string{},
  3306. }.Init(),
  3307. },
  3308. },
  3309. fields: []ast.Field{
  3310. {
  3311. Expr: &ast.BinaryExpr{
  3312. OP: ast.ARROW,
  3313. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  3314. RHS: &ast.JsonFieldRef{Name: "first"},
  3315. },
  3316. Name: "kuiper_field_0",
  3317. AName: "",
  3318. }, {
  3319. Expr: &ast.BinaryExpr{
  3320. OP: ast.ARROW,
  3321. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  3322. RHS: &ast.JsonFieldRef{Name: "last"},
  3323. },
  3324. Name: "kuiper_field_1",
  3325. AName: "",
  3326. },
  3327. },
  3328. isAggregate: false,
  3329. sendMeta: false,
  3330. }.Init(),
  3331. }, { // 13
  3332. sql: `SELECT * EXCEPT(id1, name), meta(device) FROM src1`,
  3333. p: ProjectPlan{
  3334. baseLogicalPlan: baseLogicalPlan{
  3335. children: []LogicalPlan{
  3336. DataSourcePlan{
  3337. baseLogicalPlan: baseLogicalPlan{},
  3338. name: "src1",
  3339. streamFields: map[string]*ast.JsonStreamField{},
  3340. streamStmt: streams["src1"],
  3341. metaFields: []string{"device"},
  3342. isWildCard: false,
  3343. pruneFields: []string{"id1", "name"},
  3344. isSchemaless: true,
  3345. }.Init(),
  3346. },
  3347. },
  3348. fields: []ast.Field{
  3349. {
  3350. Name: "*",
  3351. Expr: &ast.Wildcard{
  3352. Token: ast.ASTERISK,
  3353. Except: []string{"id1", "name"},
  3354. },
  3355. },
  3356. {
  3357. Name: "meta",
  3358. Expr: &ast.Call{
  3359. Name: "meta",
  3360. Args: []ast.Expr{
  3361. &ast.MetaRef{
  3362. StreamName: ast.DefaultStream,
  3363. Name: "device",
  3364. },
  3365. },
  3366. },
  3367. },
  3368. },
  3369. isAggregate: false,
  3370. allWildcard: true,
  3371. sendMeta: false,
  3372. }.Init(),
  3373. }, { // 14
  3374. sql: `SELECT * REPLACE(temp * 2 AS id1, myarray * 2 AS name), meta(device) FROM src1`,
  3375. p: ProjectPlan{
  3376. baseLogicalPlan: baseLogicalPlan{
  3377. children: []LogicalPlan{
  3378. DataSourcePlan{
  3379. baseLogicalPlan: baseLogicalPlan{},
  3380. name: "src1",
  3381. streamFields: map[string]*ast.JsonStreamField{},
  3382. streamStmt: streams["src1"],
  3383. metaFields: []string{"device"},
  3384. isWildCard: false,
  3385. pruneFields: []string{"id1", "name"},
  3386. isSchemaless: true,
  3387. }.Init(),
  3388. },
  3389. },
  3390. fields: []ast.Field{
  3391. {
  3392. Name: "*",
  3393. Expr: &ast.Wildcard{
  3394. Token: ast.ASTERISK,
  3395. Replace: []ast.Field{
  3396. {
  3397. AName: "id1",
  3398. Expr: &ast.BinaryExpr{
  3399. OP: ast.MUL,
  3400. LHS: &ast.FieldRef{
  3401. Name: "temp",
  3402. StreamName: "src1",
  3403. },
  3404. RHS: &ast.IntegerLiteral{Val: 2},
  3405. },
  3406. },
  3407. {
  3408. AName: "name",
  3409. Expr: &ast.BinaryExpr{
  3410. OP: ast.MUL,
  3411. LHS: &ast.FieldRef{
  3412. Name: "myarray",
  3413. StreamName: "src1",
  3414. },
  3415. RHS: &ast.IntegerLiteral{Val: 2},
  3416. },
  3417. },
  3418. },
  3419. },
  3420. },
  3421. {
  3422. Name: "meta",
  3423. Expr: &ast.Call{
  3424. Name: "meta",
  3425. Args: []ast.Expr{
  3426. &ast.MetaRef{
  3427. StreamName: ast.DefaultStream,
  3428. Name: "device",
  3429. },
  3430. },
  3431. },
  3432. },
  3433. },
  3434. isAggregate: false,
  3435. allWildcard: true,
  3436. sendMeta: false,
  3437. }.Init(),
  3438. }, { // 15
  3439. sql: `SELECT collect( * EXCEPT(id1, name)) FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  3440. p: ProjectPlan{
  3441. baseLogicalPlan: baseLogicalPlan{
  3442. children: []LogicalPlan{
  3443. WindowPlan{
  3444. baseLogicalPlan: baseLogicalPlan{
  3445. children: []LogicalPlan{
  3446. DataSourcePlan{
  3447. baseLogicalPlan: baseLogicalPlan{},
  3448. name: "src1",
  3449. streamFields: map[string]*ast.JsonStreamField{},
  3450. streamStmt: streams["src1"],
  3451. metaFields: []string{},
  3452. isWildCard: false,
  3453. pruneFields: []string{"id1", "name"},
  3454. isSchemaless: true,
  3455. }.Init(),
  3456. },
  3457. },
  3458. condition: nil,
  3459. wtype: ast.TUMBLING_WINDOW,
  3460. length: 10,
  3461. timeUnit: ast.SS,
  3462. interval: 0,
  3463. limit: 0,
  3464. }.Init(),
  3465. },
  3466. },
  3467. fields: []ast.Field{
  3468. {
  3469. Name: "collect",
  3470. Expr: &ast.Call{
  3471. Name: "collect",
  3472. FuncType: ast.FuncTypeAgg,
  3473. Args: []ast.Expr{
  3474. &ast.Wildcard{
  3475. Token: ast.ASTERISK,
  3476. Except: []string{"id1", "name"},
  3477. },
  3478. },
  3479. },
  3480. },
  3481. },
  3482. isAggregate: true,
  3483. allWildcard: false,
  3484. sendMeta: false,
  3485. }.Init(),
  3486. }, { // 16
  3487. sql: `SELECT collect( * REPLACE(temp * 2 AS id1, myarray * 2 AS name)) FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  3488. p: ProjectPlan{
  3489. baseLogicalPlan: baseLogicalPlan{
  3490. children: []LogicalPlan{
  3491. WindowPlan{
  3492. baseLogicalPlan: baseLogicalPlan{
  3493. children: []LogicalPlan{
  3494. DataSourcePlan{
  3495. baseLogicalPlan: baseLogicalPlan{},
  3496. name: "src1",
  3497. streamFields: map[string]*ast.JsonStreamField{},
  3498. streamStmt: streams["src1"],
  3499. metaFields: []string{},
  3500. isWildCard: false,
  3501. pruneFields: []string{"id1", "name"},
  3502. isSchemaless: true,
  3503. }.Init(),
  3504. },
  3505. },
  3506. condition: nil,
  3507. wtype: ast.TUMBLING_WINDOW,
  3508. length: 10,
  3509. timeUnit: ast.SS,
  3510. interval: 0,
  3511. limit: 0,
  3512. }.Init(),
  3513. },
  3514. },
  3515. fields: []ast.Field{
  3516. {
  3517. Name: "collect",
  3518. Expr: &ast.Call{
  3519. Name: "collect",
  3520. FuncType: ast.FuncTypeAgg,
  3521. Args: []ast.Expr{
  3522. &ast.Wildcard{
  3523. Token: ast.ASTERISK,
  3524. Replace: []ast.Field{
  3525. {
  3526. AName: "id1",
  3527. Expr: &ast.BinaryExpr{
  3528. OP: ast.MUL,
  3529. LHS: &ast.FieldRef{
  3530. Name: "temp",
  3531. StreamName: "src1",
  3532. },
  3533. RHS: &ast.IntegerLiteral{Val: 2},
  3534. },
  3535. },
  3536. {
  3537. AName: "name",
  3538. Expr: &ast.BinaryExpr{
  3539. OP: ast.MUL,
  3540. LHS: &ast.FieldRef{
  3541. Name: "myarray",
  3542. StreamName: "src1",
  3543. },
  3544. RHS: &ast.IntegerLiteral{Val: 2},
  3545. },
  3546. },
  3547. },
  3548. },
  3549. },
  3550. },
  3551. },
  3552. },
  3553. isAggregate: true,
  3554. allWildcard: false,
  3555. sendMeta: false,
  3556. }.Init(),
  3557. }, { // 17
  3558. sql: `SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) HAVING count(* EXCEPT(id1, name)) > 0`,
  3559. p: ProjectPlan{
  3560. baseLogicalPlan: baseLogicalPlan{
  3561. children: []LogicalPlan{
  3562. HavingPlan{
  3563. baseLogicalPlan: baseLogicalPlan{
  3564. children: []LogicalPlan{
  3565. WindowPlan{
  3566. baseLogicalPlan: baseLogicalPlan{
  3567. children: []LogicalPlan{
  3568. DataSourcePlan{
  3569. baseLogicalPlan: baseLogicalPlan{},
  3570. name: "src1",
  3571. streamFields: map[string]*ast.JsonStreamField{},
  3572. streamStmt: streams["src1"],
  3573. metaFields: []string{},
  3574. isWildCard: false,
  3575. pruneFields: []string{"id1", "name"},
  3576. isSchemaless: true,
  3577. }.Init(),
  3578. },
  3579. },
  3580. condition: nil,
  3581. wtype: ast.TUMBLING_WINDOW,
  3582. length: 10,
  3583. timeUnit: ast.SS,
  3584. interval: 0,
  3585. limit: 0,
  3586. }.Init(),
  3587. },
  3588. },
  3589. condition: &ast.BinaryExpr{
  3590. LHS: &ast.Call{
  3591. Name: "count",
  3592. FuncId: 0,
  3593. Args: []ast.Expr{
  3594. &ast.Wildcard{
  3595. Token: ast.ASTERISK,
  3596. Except: []string{"id1", "name"},
  3597. },
  3598. },
  3599. FuncType: ast.FuncTypeAgg,
  3600. },
  3601. OP: ast.GT,
  3602. RHS: &ast.IntegerLiteral{Val: 0},
  3603. },
  3604. }.Init(),
  3605. },
  3606. },
  3607. fields: []ast.Field{
  3608. {
  3609. Name: "id1",
  3610. Expr: &ast.FieldRef{
  3611. Name: "id1",
  3612. StreamName: "src1",
  3613. },
  3614. },
  3615. },
  3616. isAggregate: false,
  3617. allWildcard: false,
  3618. sendMeta: false,
  3619. }.Init(),
  3620. }, { // 18
  3621. sql: `SELECT temp FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) HAVING count(* REPLACE(temp * 2 AS id1, myarray * 2 AS name)) > 0`,
  3622. p: ProjectPlan{
  3623. baseLogicalPlan: baseLogicalPlan{
  3624. children: []LogicalPlan{
  3625. HavingPlan{
  3626. baseLogicalPlan: baseLogicalPlan{
  3627. children: []LogicalPlan{
  3628. WindowPlan{
  3629. baseLogicalPlan: baseLogicalPlan{
  3630. children: []LogicalPlan{
  3631. DataSourcePlan{
  3632. baseLogicalPlan: baseLogicalPlan{},
  3633. name: "src1",
  3634. streamFields: map[string]*ast.JsonStreamField{},
  3635. streamStmt: streams["src1"],
  3636. metaFields: []string{},
  3637. isWildCard: false,
  3638. pruneFields: []string{"id1", "name"},
  3639. isSchemaless: true,
  3640. }.Init(),
  3641. },
  3642. },
  3643. condition: nil,
  3644. wtype: ast.TUMBLING_WINDOW,
  3645. length: 10,
  3646. timeUnit: ast.SS,
  3647. interval: 0,
  3648. limit: 0,
  3649. }.Init(),
  3650. },
  3651. },
  3652. condition: &ast.BinaryExpr{
  3653. LHS: &ast.Call{
  3654. Name: "count",
  3655. FuncId: 0,
  3656. Args: []ast.Expr{
  3657. &ast.Wildcard{
  3658. Token: ast.ASTERISK,
  3659. Replace: []ast.Field{
  3660. {
  3661. AName: "id1",
  3662. Expr: &ast.BinaryExpr{
  3663. OP: ast.MUL,
  3664. LHS: &ast.FieldRef{
  3665. Name: "temp",
  3666. StreamName: "src1",
  3667. },
  3668. RHS: &ast.IntegerLiteral{Val: 2},
  3669. },
  3670. },
  3671. {
  3672. AName: "name",
  3673. Expr: &ast.BinaryExpr{
  3674. OP: ast.MUL,
  3675. LHS: &ast.FieldRef{
  3676. Name: "myarray",
  3677. StreamName: "src1",
  3678. },
  3679. RHS: &ast.IntegerLiteral{Val: 2},
  3680. },
  3681. },
  3682. },
  3683. },
  3684. },
  3685. FuncType: ast.FuncTypeAgg,
  3686. },
  3687. OP: ast.GT,
  3688. RHS: &ast.IntegerLiteral{Val: 0},
  3689. },
  3690. }.Init(),
  3691. },
  3692. },
  3693. fields: []ast.Field{
  3694. {
  3695. Name: "temp",
  3696. Expr: &ast.FieldRef{
  3697. Name: "temp",
  3698. StreamName: "src1",
  3699. },
  3700. },
  3701. },
  3702. isAggregate: false,
  3703. allWildcard: false,
  3704. sendMeta: false,
  3705. }.Init(),
  3706. },
  3707. }
  3708. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  3709. for i, tt := range tests {
  3710. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  3711. if err != nil {
  3712. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  3713. continue
  3714. }
  3715. p, err := createLogicalPlan(stmt, &api.RuleOption{
  3716. IsEventTime: false,
  3717. LateTol: 0,
  3718. Concurrency: 0,
  3719. BufferLength: 0,
  3720. SendMetaToSink: false,
  3721. Qos: 0,
  3722. CheckpointInterval: 0,
  3723. SendError: true,
  3724. }, kv)
  3725. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  3726. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  3727. } else if !reflect.DeepEqual(tt.p, p) {
  3728. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  3729. }
  3730. }
  3731. }
  3732. func Test_createLogicalPlan4Lookup(t *testing.T) {
  3733. kv, err := store.GetKV("stream")
  3734. if err != nil {
  3735. t.Error(err)
  3736. return
  3737. }
  3738. streamSqls := map[string]string{
  3739. "src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  3740. "table1": `CREATE TABLE table1 () WITH (DATASOURCE="table1",TYPE="sql", KIND="lookup");`,
  3741. "table2": `CREATE TABLE table2 () WITH (DATASOURCE="table2",TYPE="sql", KIND="lookup");`,
  3742. }
  3743. types := map[string]ast.StreamType{
  3744. "src1": ast.TypeStream,
  3745. "table1": ast.TypeTable,
  3746. "table2": ast.TypeTable,
  3747. }
  3748. for name, sql := range streamSqls {
  3749. s, err := json.Marshal(&xsql.StreamInfo{
  3750. StreamType: types[name],
  3751. Statement: sql,
  3752. })
  3753. if err != nil {
  3754. t.Error(err)
  3755. t.Fail()
  3756. }
  3757. err = kv.Set(name, string(s))
  3758. if err != nil {
  3759. t.Error(err)
  3760. t.Fail()
  3761. }
  3762. }
  3763. streams := make(map[string]*ast.StreamStmt)
  3764. for n := range streamSqls {
  3765. streamStmt, err := xsql.GetDataSource(kv, n)
  3766. if err != nil {
  3767. t.Errorf("fail to get stream %s, please check if stream is created", n)
  3768. return
  3769. }
  3770. streams[n] = streamStmt
  3771. }
  3772. tests := []struct {
  3773. sql string
  3774. p LogicalPlan
  3775. err string
  3776. }{
  3777. { // 0
  3778. sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id`,
  3779. p: ProjectPlan{
  3780. baseLogicalPlan: baseLogicalPlan{
  3781. children: []LogicalPlan{
  3782. LookupPlan{
  3783. baseLogicalPlan: baseLogicalPlan{
  3784. children: []LogicalPlan{
  3785. DataSourcePlan{
  3786. baseLogicalPlan: baseLogicalPlan{},
  3787. name: "src1",
  3788. streamFields: map[string]*ast.JsonStreamField{
  3789. "a": nil,
  3790. },
  3791. isSchemaless: true,
  3792. streamStmt: streams["src1"],
  3793. metaFields: []string{},
  3794. pruneFields: []string{},
  3795. }.Init(),
  3796. },
  3797. },
  3798. joinExpr: ast.Join{
  3799. Name: "table1",
  3800. Alias: "",
  3801. JoinType: ast.INNER_JOIN,
  3802. Expr: &ast.BinaryExpr{
  3803. OP: ast.EQ,
  3804. LHS: &ast.FieldRef{
  3805. StreamName: "src1",
  3806. Name: "id",
  3807. },
  3808. RHS: &ast.FieldRef{
  3809. StreamName: "table1",
  3810. Name: "id",
  3811. },
  3812. },
  3813. },
  3814. keys: []string{"id"},
  3815. fields: []string{"b"},
  3816. valvars: []ast.Expr{
  3817. &ast.FieldRef{
  3818. StreamName: "src1",
  3819. Name: "id",
  3820. },
  3821. },
  3822. options: &ast.Options{
  3823. DATASOURCE: "table1",
  3824. TYPE: "sql",
  3825. KIND: "lookup",
  3826. },
  3827. conditions: nil,
  3828. }.Init(),
  3829. },
  3830. },
  3831. fields: []ast.Field{
  3832. {
  3833. Expr: &ast.FieldRef{
  3834. StreamName: "src1",
  3835. Name: "a",
  3836. },
  3837. Name: "a",
  3838. AName: "",
  3839. },
  3840. {
  3841. Expr: &ast.FieldRef{
  3842. StreamName: "table1",
  3843. Name: "b",
  3844. },
  3845. Name: "b",
  3846. AName: "",
  3847. },
  3848. },
  3849. isAggregate: false,
  3850. sendMeta: false,
  3851. }.Init(),
  3852. },
  3853. { // 1
  3854. sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
  3855. p: ProjectPlan{
  3856. baseLogicalPlan: baseLogicalPlan{
  3857. children: []LogicalPlan{
  3858. FilterPlan{
  3859. baseLogicalPlan: baseLogicalPlan{
  3860. children: []LogicalPlan{
  3861. LookupPlan{
  3862. baseLogicalPlan: baseLogicalPlan{
  3863. children: []LogicalPlan{
  3864. FilterPlan{
  3865. baseLogicalPlan: baseLogicalPlan{
  3866. children: []LogicalPlan{
  3867. DataSourcePlan{
  3868. baseLogicalPlan: baseLogicalPlan{},
  3869. name: "src1",
  3870. streamFields: map[string]*ast.JsonStreamField{
  3871. "a": nil,
  3872. },
  3873. isSchemaless: true,
  3874. streamStmt: streams["src1"],
  3875. metaFields: []string{},
  3876. pruneFields: []string{},
  3877. }.Init(),
  3878. },
  3879. },
  3880. condition: &ast.BinaryExpr{
  3881. OP: ast.LT,
  3882. LHS: &ast.FieldRef{
  3883. StreamName: "src1",
  3884. Name: "c",
  3885. },
  3886. RHS: &ast.IntegerLiteral{Val: 40},
  3887. },
  3888. }.Init(),
  3889. },
  3890. },
  3891. joinExpr: ast.Join{
  3892. Name: "table1",
  3893. Alias: "",
  3894. JoinType: ast.INNER_JOIN,
  3895. Expr: &ast.BinaryExpr{
  3896. OP: ast.AND,
  3897. RHS: &ast.BinaryExpr{
  3898. OP: ast.EQ,
  3899. LHS: &ast.FieldRef{
  3900. StreamName: "src1",
  3901. Name: "id",
  3902. },
  3903. RHS: &ast.FieldRef{
  3904. StreamName: "table1",
  3905. Name: "id",
  3906. },
  3907. },
  3908. LHS: &ast.BinaryExpr{
  3909. OP: ast.AND,
  3910. LHS: &ast.BinaryExpr{
  3911. OP: ast.GT,
  3912. LHS: &ast.FieldRef{
  3913. StreamName: "table1",
  3914. Name: "b",
  3915. },
  3916. RHS: &ast.IntegerLiteral{Val: 20},
  3917. },
  3918. RHS: &ast.BinaryExpr{
  3919. OP: ast.LT,
  3920. LHS: &ast.FieldRef{
  3921. StreamName: "src1",
  3922. Name: "c",
  3923. },
  3924. RHS: &ast.IntegerLiteral{Val: 40},
  3925. },
  3926. },
  3927. },
  3928. },
  3929. keys: []string{"id"},
  3930. valvars: []ast.Expr{
  3931. &ast.FieldRef{
  3932. StreamName: "src1",
  3933. Name: "id",
  3934. },
  3935. },
  3936. options: &ast.Options{
  3937. DATASOURCE: "table1",
  3938. TYPE: "sql",
  3939. KIND: "lookup",
  3940. },
  3941. conditions: &ast.BinaryExpr{
  3942. OP: ast.AND,
  3943. LHS: &ast.BinaryExpr{
  3944. OP: ast.GT,
  3945. LHS: &ast.FieldRef{
  3946. StreamName: "table1",
  3947. Name: "b",
  3948. },
  3949. RHS: &ast.IntegerLiteral{Val: 20},
  3950. },
  3951. RHS: &ast.BinaryExpr{
  3952. OP: ast.LT,
  3953. LHS: &ast.FieldRef{
  3954. StreamName: "src1",
  3955. Name: "c",
  3956. },
  3957. RHS: &ast.IntegerLiteral{Val: 40},
  3958. },
  3959. },
  3960. }.Init(),
  3961. },
  3962. },
  3963. condition: &ast.BinaryExpr{
  3964. OP: ast.GT,
  3965. LHS: &ast.FieldRef{
  3966. StreamName: "table1",
  3967. Name: "b",
  3968. },
  3969. RHS: &ast.IntegerLiteral{Val: 20},
  3970. },
  3971. }.Init(),
  3972. },
  3973. },
  3974. fields: []ast.Field{
  3975. {
  3976. Expr: &ast.FieldRef{
  3977. StreamName: "src1",
  3978. Name: "a",
  3979. },
  3980. Name: "a",
  3981. AName: "",
  3982. },
  3983. {
  3984. Expr: &ast.FieldRef{
  3985. StreamName: "table1",
  3986. Name: "*",
  3987. },
  3988. Name: "*",
  3989. AName: "",
  3990. },
  3991. },
  3992. isAggregate: false,
  3993. sendMeta: false,
  3994. }.Init(),
  3995. },
  3996. { // 2
  3997. sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
  3998. p: ProjectPlan{
  3999. baseLogicalPlan: baseLogicalPlan{
  4000. children: []LogicalPlan{
  4001. LookupPlan{
  4002. baseLogicalPlan: baseLogicalPlan{
  4003. children: []LogicalPlan{
  4004. LookupPlan{
  4005. baseLogicalPlan: baseLogicalPlan{
  4006. children: []LogicalPlan{
  4007. DataSourcePlan{
  4008. baseLogicalPlan: baseLogicalPlan{},
  4009. name: "src1",
  4010. streamFields: map[string]*ast.JsonStreamField{
  4011. "a": nil,
  4012. },
  4013. isSchemaless: true,
  4014. streamStmt: streams["src1"],
  4015. metaFields: []string{},
  4016. pruneFields: []string{},
  4017. }.Init(),
  4018. },
  4019. },
  4020. joinExpr: ast.Join{
  4021. Name: "table1",
  4022. Alias: "",
  4023. JoinType: ast.INNER_JOIN,
  4024. Expr: &ast.BinaryExpr{
  4025. OP: ast.EQ,
  4026. LHS: &ast.FieldRef{
  4027. StreamName: "src1",
  4028. Name: "id",
  4029. },
  4030. RHS: &ast.FieldRef{
  4031. StreamName: "table1",
  4032. Name: "id",
  4033. },
  4034. },
  4035. },
  4036. keys: []string{"id"},
  4037. fields: []string{"b"},
  4038. valvars: []ast.Expr{
  4039. &ast.FieldRef{
  4040. StreamName: "src1",
  4041. Name: "id",
  4042. },
  4043. },
  4044. options: &ast.Options{
  4045. DATASOURCE: "table1",
  4046. TYPE: "sql",
  4047. KIND: "lookup",
  4048. },
  4049. conditions: nil,
  4050. }.Init(),
  4051. },
  4052. },
  4053. joinExpr: ast.Join{
  4054. Name: "table2",
  4055. Alias: "",
  4056. JoinType: ast.INNER_JOIN,
  4057. Expr: &ast.BinaryExpr{
  4058. OP: ast.EQ,
  4059. LHS: &ast.FieldRef{
  4060. StreamName: "table1",
  4061. Name: "id",
  4062. },
  4063. RHS: &ast.FieldRef{
  4064. StreamName: "table2",
  4065. Name: "id",
  4066. },
  4067. },
  4068. },
  4069. keys: []string{"id"},
  4070. fields: []string{"c"},
  4071. valvars: []ast.Expr{
  4072. &ast.FieldRef{
  4073. StreamName: "table1",
  4074. Name: "id",
  4075. },
  4076. },
  4077. options: &ast.Options{
  4078. DATASOURCE: "table2",
  4079. TYPE: "sql",
  4080. KIND: "lookup",
  4081. },
  4082. }.Init(),
  4083. },
  4084. },
  4085. fields: []ast.Field{
  4086. {
  4087. Expr: &ast.FieldRef{
  4088. StreamName: "src1",
  4089. Name: "a",
  4090. },
  4091. Name: "a",
  4092. AName: "",
  4093. },
  4094. {
  4095. Expr: &ast.FieldRef{
  4096. StreamName: "table1",
  4097. Name: "b",
  4098. },
  4099. Name: "b",
  4100. AName: "",
  4101. },
  4102. {
  4103. Expr: &ast.FieldRef{
  4104. StreamName: "table2",
  4105. Name: "c",
  4106. },
  4107. Name: "c",
  4108. AName: "",
  4109. },
  4110. },
  4111. isAggregate: false,
  4112. sendMeta: false,
  4113. }.Init(),
  4114. },
  4115. { // 3
  4116. sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
  4117. p: ProjectPlan{
  4118. baseLogicalPlan: baseLogicalPlan{
  4119. children: []LogicalPlan{
  4120. LookupPlan{
  4121. baseLogicalPlan: baseLogicalPlan{
  4122. children: []LogicalPlan{
  4123. WindowPlan{
  4124. baseLogicalPlan: baseLogicalPlan{
  4125. children: []LogicalPlan{
  4126. DataSourcePlan{
  4127. baseLogicalPlan: baseLogicalPlan{},
  4128. name: "src1",
  4129. streamStmt: streams["src1"],
  4130. streamFields: map[string]*ast.JsonStreamField{},
  4131. metaFields: []string{},
  4132. isWildCard: true,
  4133. isSchemaless: true,
  4134. pruneFields: []string{},
  4135. }.Init(),
  4136. },
  4137. },
  4138. condition: nil,
  4139. wtype: ast.TUMBLING_WINDOW,
  4140. length: 10,
  4141. timeUnit: ast.SS,
  4142. interval: 0,
  4143. limit: 0,
  4144. }.Init(),
  4145. },
  4146. },
  4147. joinExpr: ast.Join{
  4148. Name: "table1",
  4149. Alias: "",
  4150. JoinType: ast.INNER_JOIN,
  4151. Expr: &ast.BinaryExpr{
  4152. OP: ast.EQ,
  4153. LHS: &ast.FieldRef{
  4154. StreamName: "src1",
  4155. Name: "id",
  4156. },
  4157. RHS: &ast.FieldRef{
  4158. StreamName: "table1",
  4159. Name: "id",
  4160. },
  4161. },
  4162. },
  4163. keys: []string{"id"},
  4164. valvars: []ast.Expr{
  4165. &ast.FieldRef{
  4166. StreamName: "src1",
  4167. Name: "id",
  4168. },
  4169. },
  4170. options: &ast.Options{
  4171. DATASOURCE: "table1",
  4172. TYPE: "sql",
  4173. KIND: "lookup",
  4174. },
  4175. conditions: nil,
  4176. }.Init(),
  4177. },
  4178. },
  4179. fields: []ast.Field{
  4180. {
  4181. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  4182. Name: "*",
  4183. AName: "",
  4184. },
  4185. },
  4186. isAggregate: false,
  4187. sendMeta: false,
  4188. }.Init(),
  4189. },
  4190. }
  4191. for i, tt := range tests {
  4192. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  4193. if err != nil {
  4194. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  4195. continue
  4196. }
  4197. p, err := createLogicalPlan(stmt, &api.RuleOption{
  4198. IsEventTime: false,
  4199. LateTol: 0,
  4200. Concurrency: 0,
  4201. BufferLength: 0,
  4202. SendMetaToSink: false,
  4203. Qos: 0,
  4204. CheckpointInterval: 0,
  4205. SendError: true,
  4206. }, kv)
  4207. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  4208. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  4209. } else if !reflect.DeepEqual(tt.p, p) {
  4210. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  4211. }
  4212. }
  4213. }
  4214. func TestTransformSourceNode(t *testing.T) {
  4215. schema := map[string]*ast.JsonStreamField{
  4216. "a": {
  4217. Type: "bigint",
  4218. },
  4219. }
  4220. testCases := []struct {
  4221. name string
  4222. plan *DataSourcePlan
  4223. node *node.SourceNode
  4224. }{
  4225. {
  4226. name: "normal source node",
  4227. plan: &DataSourcePlan{
  4228. name: "test",
  4229. streamStmt: &ast.StreamStmt{
  4230. StreamType: ast.TypeStream,
  4231. Options: &ast.Options{
  4232. TYPE: "file",
  4233. },
  4234. },
  4235. streamFields: nil,
  4236. allMeta: false,
  4237. metaFields: []string{},
  4238. iet: false,
  4239. isBinary: false,
  4240. },
  4241. node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  4242. TYPE: "file",
  4243. }, false, nil),
  4244. },
  4245. {
  4246. name: "schema source node",
  4247. plan: &DataSourcePlan{
  4248. name: "test",
  4249. streamStmt: &ast.StreamStmt{
  4250. StreamType: ast.TypeStream,
  4251. Options: &ast.Options{
  4252. TYPE: "file",
  4253. },
  4254. },
  4255. streamFields: schema,
  4256. allMeta: false,
  4257. metaFields: []string{},
  4258. iet: false,
  4259. isBinary: false,
  4260. },
  4261. node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  4262. TYPE: "file",
  4263. }, false, schema),
  4264. },
  4265. }
  4266. for _, tc := range testCases {
  4267. t.Run(tc.name, func(t *testing.T) {
  4268. sourceNode, err := transformSourceNode(tc.plan, nil, &api.RuleOption{})
  4269. if err != nil {
  4270. t.Errorf("unexpected error: %v", err)
  4271. return
  4272. }
  4273. if !reflect.DeepEqual(sourceNode, tc.node) {
  4274. t.Errorf("unexpected result: got %v, want %v", sourceNode, tc.node)
  4275. }
  4276. })
  4277. }
  4278. }