planner_test.go 118 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "reflect"
  19. "strings"
  20. "testing"
  21. "github.com/gdexlab/go-render/render"
  22. "github.com/lf-edge/ekuiper/internal/pkg/store"
  23. "github.com/lf-edge/ekuiper/internal/testx"
  24. "github.com/lf-edge/ekuiper/internal/topo/node"
  25. "github.com/lf-edge/ekuiper/internal/xsql"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. "github.com/lf-edge/ekuiper/pkg/ast"
  28. )
  29. func init() {
  30. testx.InitEnv()
  31. }
  32. func Test_createLogicalPlan(t *testing.T) {
  33. kv, err := store.GetKV("stream")
  34. if err != nil {
  35. t.Error(err)
  36. return
  37. }
  38. streamSqls := map[string]string{
  39. "src1": `CREATE STREAM src1 (
  40. id1 BIGINT,
  41. temp BIGINT,
  42. name string,
  43. myarray array(string)
  44. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  45. "src2": `CREATE STREAM src2 (
  46. id2 BIGINT,
  47. hum BIGINT
  48. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  49. "tableInPlanner": `CREATE TABLE tableInPlanner (
  50. id BIGINT,
  51. name STRING,
  52. value STRING,
  53. hum BIGINT
  54. ) WITH (TYPE="file");`,
  55. }
  56. types := map[string]ast.StreamType{
  57. "src1": ast.TypeStream,
  58. "src2": ast.TypeStream,
  59. "tableInPlanner": ast.TypeTable,
  60. }
  61. for name, sql := range streamSqls {
  62. s, err := json.Marshal(&xsql.StreamInfo{
  63. StreamType: types[name],
  64. Statement: sql,
  65. })
  66. if err != nil {
  67. t.Error(err)
  68. t.Fail()
  69. }
  70. err = kv.Set(name, string(s))
  71. if err != nil {
  72. t.Error(err)
  73. t.Fail()
  74. }
  75. }
  76. streams := make(map[string]*ast.StreamStmt)
  77. for n := range streamSqls {
  78. streamStmt, err := xsql.GetDataSource(kv, n)
  79. if err != nil {
  80. t.Errorf("fail to get stream %s, please check if stream is created", n)
  81. return
  82. }
  83. streams[n] = streamStmt
  84. }
  85. // boolTrue = true
  86. boolFalse := false
  87. ref := &ast.AliasRef{
  88. Expression: &ast.Call{
  89. Name: "row_number",
  90. FuncType: ast.FuncTypeWindow,
  91. },
  92. }
  93. ref.SetRefSource([]string{})
  94. tests := []struct {
  95. sql string
  96. p LogicalPlan
  97. err string
  98. }{
  99. {
  100. sql: "select name, row_number() as index from src1",
  101. p: ProjectPlan{
  102. baseLogicalPlan: baseLogicalPlan{
  103. children: []LogicalPlan{
  104. WindowFuncPlan{
  105. baseLogicalPlan: baseLogicalPlan{
  106. children: []LogicalPlan{
  107. DataSourcePlan{
  108. baseLogicalPlan: baseLogicalPlan{},
  109. name: "src1",
  110. streamFields: map[string]*ast.JsonStreamField{
  111. "name": {
  112. Type: "string",
  113. },
  114. },
  115. streamStmt: streams["src1"],
  116. metaFields: []string{},
  117. pruneFields: []string{},
  118. }.Init(),
  119. },
  120. },
  121. windowFuncFields: []ast.Field{
  122. {
  123. Name: "row_number",
  124. AName: "index",
  125. Expr: &ast.FieldRef{
  126. StreamName: ast.AliasStream,
  127. Name: "index",
  128. AliasRef: ref,
  129. },
  130. },
  131. },
  132. }.Init(),
  133. },
  134. },
  135. fields: []ast.Field{
  136. {
  137. Name: "row_number",
  138. AName: "index",
  139. Expr: &ast.FieldRef{
  140. StreamName: ast.AliasStream,
  141. Name: "index",
  142. AliasRef: ref,
  143. },
  144. },
  145. {
  146. Name: "name",
  147. Expr: &ast.FieldRef{
  148. StreamName: "src1",
  149. Name: "name",
  150. },
  151. },
  152. },
  153. windowFuncNames: map[string]struct{}{
  154. "index": {},
  155. },
  156. }.Init(),
  157. },
  158. {
  159. sql: "select name, row_number() from src1",
  160. p: ProjectPlan{
  161. baseLogicalPlan: baseLogicalPlan{
  162. children: []LogicalPlan{
  163. WindowFuncPlan{
  164. baseLogicalPlan: baseLogicalPlan{
  165. children: []LogicalPlan{
  166. DataSourcePlan{
  167. baseLogicalPlan: baseLogicalPlan{},
  168. name: "src1",
  169. streamFields: map[string]*ast.JsonStreamField{
  170. "name": {
  171. Type: "string",
  172. },
  173. },
  174. streamStmt: streams["src1"],
  175. metaFields: []string{},
  176. pruneFields: []string{},
  177. }.Init(),
  178. },
  179. },
  180. windowFuncFields: []ast.Field{
  181. {
  182. Name: "row_number",
  183. Expr: &ast.Call{
  184. Name: "row_number",
  185. FuncType: ast.FuncTypeWindow,
  186. },
  187. },
  188. },
  189. }.Init(),
  190. },
  191. },
  192. fields: []ast.Field{
  193. {
  194. Name: "name",
  195. Expr: &ast.FieldRef{
  196. StreamName: "src1",
  197. Name: "name",
  198. },
  199. },
  200. {
  201. Name: "row_number",
  202. Expr: &ast.Call{
  203. Name: "row_number",
  204. FuncType: ast.FuncTypeWindow,
  205. },
  206. },
  207. },
  208. windowFuncNames: map[string]struct{}{
  209. "row_number": {},
  210. },
  211. }.Init(),
  212. },
  213. {
  214. sql: "select name from src1 where true limit 1",
  215. p: ProjectPlan{
  216. baseLogicalPlan: baseLogicalPlan{
  217. children: []LogicalPlan{
  218. FilterPlan{
  219. baseLogicalPlan: baseLogicalPlan{
  220. children: []LogicalPlan{
  221. DataSourcePlan{
  222. baseLogicalPlan: baseLogicalPlan{},
  223. name: "src1",
  224. streamFields: map[string]*ast.JsonStreamField{
  225. "name": {
  226. Type: "string",
  227. },
  228. },
  229. streamStmt: streams["src1"],
  230. metaFields: []string{},
  231. pruneFields: []string{},
  232. }.Init(),
  233. },
  234. },
  235. condition: &ast.BooleanLiteral{
  236. Val: true,
  237. },
  238. }.Init(),
  239. },
  240. },
  241. fields: []ast.Field{
  242. {
  243. Name: "name",
  244. Expr: &ast.FieldRef{
  245. StreamName: "src1",
  246. Name: "name",
  247. },
  248. },
  249. },
  250. limitCount: 1,
  251. enableLimit: true,
  252. }.Init(),
  253. },
  254. {
  255. sql: "select name from src1 limit 1",
  256. p: ProjectPlan{
  257. baseLogicalPlan: baseLogicalPlan{
  258. children: []LogicalPlan{
  259. DataSourcePlan{
  260. baseLogicalPlan: baseLogicalPlan{},
  261. name: "src1",
  262. streamFields: map[string]*ast.JsonStreamField{
  263. "name": {
  264. Type: "string",
  265. },
  266. },
  267. streamStmt: streams["src1"],
  268. metaFields: []string{},
  269. pruneFields: []string{},
  270. }.Init(),
  271. },
  272. },
  273. fields: []ast.Field{
  274. {
  275. Name: "name",
  276. Expr: &ast.FieldRef{
  277. StreamName: "src1",
  278. Name: "name",
  279. },
  280. },
  281. },
  282. limitCount: 1,
  283. enableLimit: true,
  284. }.Init(),
  285. },
  286. {
  287. sql: "select unnest(myarray) as col from src1 limit 1",
  288. p: ProjectSetPlan{
  289. SrfMapping: map[string]struct{}{
  290. "col": {},
  291. },
  292. limitCount: 1,
  293. enableLimit: true,
  294. baseLogicalPlan: baseLogicalPlan{
  295. children: []LogicalPlan{
  296. ProjectPlan{
  297. baseLogicalPlan: baseLogicalPlan{
  298. children: []LogicalPlan{
  299. DataSourcePlan{
  300. baseLogicalPlan: baseLogicalPlan{},
  301. name: "src1",
  302. streamFields: map[string]*ast.JsonStreamField{
  303. "myarray": {
  304. Type: "array",
  305. Items: &ast.JsonStreamField{
  306. Type: "string",
  307. },
  308. },
  309. },
  310. streamStmt: streams["src1"],
  311. metaFields: []string{},
  312. pruneFields: []string{},
  313. }.Init(),
  314. },
  315. },
  316. fields: []ast.Field{
  317. {
  318. Name: "unnest",
  319. AName: "col",
  320. Expr: func() *ast.FieldRef {
  321. fr := &ast.FieldRef{
  322. StreamName: ast.AliasStream,
  323. Name: "col",
  324. AliasRef: &ast.AliasRef{
  325. Expression: &ast.Call{
  326. Name: "unnest",
  327. FuncType: ast.FuncTypeSrf,
  328. Args: []ast.Expr{
  329. &ast.FieldRef{
  330. StreamName: "src1",
  331. Name: "myarray",
  332. },
  333. },
  334. },
  335. },
  336. }
  337. fr.SetRefSource([]ast.StreamName{"src1"})
  338. return fr
  339. }(),
  340. },
  341. },
  342. }.Init(),
  343. },
  344. },
  345. }.Init(),
  346. },
  347. { // 0
  348. sql: "SELECT unnest(myarray), name from src1",
  349. p: ProjectSetPlan{
  350. SrfMapping: map[string]struct{}{
  351. "unnest": {},
  352. },
  353. baseLogicalPlan: baseLogicalPlan{
  354. children: []LogicalPlan{
  355. ProjectPlan{
  356. baseLogicalPlan: baseLogicalPlan{
  357. children: []LogicalPlan{
  358. DataSourcePlan{
  359. baseLogicalPlan: baseLogicalPlan{},
  360. name: "src1",
  361. streamFields: map[string]*ast.JsonStreamField{
  362. "myarray": {
  363. Type: "array",
  364. Items: &ast.JsonStreamField{
  365. Type: "string",
  366. },
  367. },
  368. "name": {
  369. Type: "string",
  370. },
  371. },
  372. streamStmt: streams["src1"],
  373. metaFields: []string{},
  374. pruneFields: []string{},
  375. }.Init(),
  376. },
  377. },
  378. fields: []ast.Field{
  379. {
  380. Expr: &ast.Call{
  381. Name: "unnest",
  382. FuncType: ast.FuncTypeSrf,
  383. Args: []ast.Expr{
  384. &ast.FieldRef{
  385. StreamName: "src1",
  386. Name: "myarray",
  387. },
  388. },
  389. },
  390. Name: "unnest",
  391. },
  392. {
  393. Name: "name",
  394. Expr: &ast.FieldRef{
  395. StreamName: "src1",
  396. Name: "name",
  397. },
  398. },
  399. },
  400. }.Init(),
  401. },
  402. },
  403. }.Init(),
  404. },
  405. { // 0
  406. sql: `SELECT myarray[temp] FROM src1`,
  407. p: ProjectPlan{
  408. baseLogicalPlan: baseLogicalPlan{
  409. children: []LogicalPlan{
  410. DataSourcePlan{
  411. baseLogicalPlan: baseLogicalPlan{},
  412. name: "src1",
  413. streamFields: map[string]*ast.JsonStreamField{
  414. "myarray": {
  415. Type: "array",
  416. Items: &ast.JsonStreamField{
  417. Type: "string",
  418. },
  419. },
  420. "temp": {
  421. Type: "bigint",
  422. },
  423. },
  424. streamStmt: streams["src1"],
  425. metaFields: []string{},
  426. pruneFields: []string{},
  427. }.Init(),
  428. },
  429. },
  430. fields: []ast.Field{
  431. {
  432. Expr: &ast.BinaryExpr{
  433. OP: ast.SUBSET,
  434. LHS: &ast.FieldRef{
  435. StreamName: "src1",
  436. Name: "myarray",
  437. },
  438. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  439. StreamName: "src1",
  440. Name: "temp",
  441. }},
  442. },
  443. Name: "kuiper_field_0",
  444. AName: "",
  445. },
  446. },
  447. isAggregate: false,
  448. sendMeta: false,
  449. }.Init(),
  450. },
  451. { // 1 optimize where to data source
  452. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  453. p: ProjectPlan{
  454. baseLogicalPlan: baseLogicalPlan{
  455. children: []LogicalPlan{
  456. WindowPlan{
  457. baseLogicalPlan: baseLogicalPlan{
  458. children: []LogicalPlan{
  459. FilterPlan{
  460. baseLogicalPlan: baseLogicalPlan{
  461. children: []LogicalPlan{
  462. DataSourcePlan{
  463. name: "src1",
  464. streamFields: map[string]*ast.JsonStreamField{
  465. "name": {
  466. Type: "string",
  467. },
  468. "temp": {
  469. Type: "bigint",
  470. },
  471. },
  472. streamStmt: streams["src1"],
  473. metaFields: []string{},
  474. pruneFields: []string{},
  475. }.Init(),
  476. },
  477. },
  478. condition: &ast.BinaryExpr{
  479. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  480. OP: ast.EQ,
  481. RHS: &ast.StringLiteral{Val: "v1"},
  482. },
  483. }.Init(),
  484. },
  485. },
  486. condition: nil,
  487. wtype: ast.TUMBLING_WINDOW,
  488. length: 10,
  489. timeUnit: ast.SS,
  490. interval: 0,
  491. limit: 0,
  492. }.Init(),
  493. },
  494. },
  495. fields: []ast.Field{
  496. {
  497. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  498. Name: "temp",
  499. AName: "",
  500. },
  501. },
  502. isAggregate: false,
  503. sendMeta: false,
  504. }.Init(),
  505. },
  506. { // 2 condition that cannot be optimized
  507. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  508. p: ProjectPlan{
  509. baseLogicalPlan: baseLogicalPlan{
  510. children: []LogicalPlan{
  511. JoinPlan{
  512. baseLogicalPlan: baseLogicalPlan{
  513. children: []LogicalPlan{
  514. WindowPlan{
  515. baseLogicalPlan: baseLogicalPlan{
  516. children: []LogicalPlan{
  517. DataSourcePlan{
  518. name: "src1",
  519. streamFields: map[string]*ast.JsonStreamField{
  520. "id1": {
  521. Type: "bigint",
  522. },
  523. "temp": {
  524. Type: "bigint",
  525. },
  526. },
  527. streamStmt: streams["src1"],
  528. metaFields: []string{},
  529. pruneFields: []string{},
  530. }.Init(),
  531. DataSourcePlan{
  532. name: "src2",
  533. streamFields: map[string]*ast.JsonStreamField{
  534. "hum": {
  535. Type: "bigint",
  536. },
  537. "id2": {
  538. Type: "bigint",
  539. },
  540. },
  541. streamStmt: streams["src2"],
  542. metaFields: []string{},
  543. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  544. pruneFields: []string{},
  545. }.Init(),
  546. },
  547. },
  548. condition: nil,
  549. wtype: ast.TUMBLING_WINDOW,
  550. length: 10,
  551. timeUnit: ast.SS,
  552. interval: 0,
  553. limit: 0,
  554. }.Init(),
  555. },
  556. },
  557. from: &ast.Table{Name: "src1"},
  558. joins: ast.Joins{ast.Join{
  559. Name: "src2",
  560. JoinType: ast.INNER_JOIN,
  561. Expr: &ast.BinaryExpr{
  562. OP: ast.AND,
  563. LHS: &ast.BinaryExpr{
  564. LHS: &ast.BinaryExpr{
  565. OP: ast.GT,
  566. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  567. RHS: &ast.IntegerLiteral{Val: 20},
  568. },
  569. OP: ast.OR,
  570. RHS: &ast.BinaryExpr{
  571. OP: ast.GT,
  572. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  573. RHS: &ast.IntegerLiteral{Val: 60},
  574. },
  575. },
  576. RHS: &ast.BinaryExpr{
  577. OP: ast.EQ,
  578. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  579. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  580. },
  581. },
  582. }},
  583. }.Init(),
  584. },
  585. },
  586. fields: []ast.Field{
  587. {
  588. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  589. Name: "id1",
  590. AName: "",
  591. },
  592. },
  593. isAggregate: false,
  594. sendMeta: false,
  595. }.Init(),
  596. },
  597. { // 3 optimize window filter
  598. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  599. p: ProjectPlan{
  600. baseLogicalPlan: baseLogicalPlan{
  601. children: []LogicalPlan{
  602. WindowPlan{
  603. baseLogicalPlan: baseLogicalPlan{
  604. children: []LogicalPlan{
  605. FilterPlan{
  606. baseLogicalPlan: baseLogicalPlan{
  607. children: []LogicalPlan{
  608. DataSourcePlan{
  609. name: "src1",
  610. streamFields: map[string]*ast.JsonStreamField{
  611. "id1": {
  612. Type: "bigint",
  613. },
  614. "name": {
  615. Type: "string",
  616. },
  617. "temp": {
  618. Type: "bigint",
  619. },
  620. },
  621. streamStmt: streams["src1"],
  622. metaFields: []string{},
  623. pruneFields: []string{},
  624. }.Init(),
  625. },
  626. },
  627. condition: &ast.BinaryExpr{
  628. OP: ast.AND,
  629. LHS: &ast.BinaryExpr{
  630. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  631. OP: ast.EQ,
  632. RHS: &ast.StringLiteral{Val: "v1"},
  633. },
  634. RHS: &ast.BinaryExpr{
  635. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  636. OP: ast.GT,
  637. RHS: &ast.IntegerLiteral{Val: 2},
  638. },
  639. },
  640. }.Init(),
  641. },
  642. },
  643. condition: nil,
  644. wtype: ast.TUMBLING_WINDOW,
  645. length: 10,
  646. timeUnit: ast.SS,
  647. interval: 0,
  648. limit: 0,
  649. }.Init(),
  650. },
  651. },
  652. fields: []ast.Field{
  653. {
  654. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  655. Name: "id1",
  656. AName: "",
  657. },
  658. },
  659. isAggregate: false,
  660. sendMeta: false,
  661. }.Init(),
  662. },
  663. { // 4. do not optimize count window
  664. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  665. p: ProjectPlan{
  666. baseLogicalPlan: baseLogicalPlan{
  667. children: []LogicalPlan{
  668. HavingPlan{
  669. baseLogicalPlan: baseLogicalPlan{
  670. children: []LogicalPlan{
  671. FilterPlan{
  672. baseLogicalPlan: baseLogicalPlan{
  673. children: []LogicalPlan{
  674. WindowPlan{
  675. baseLogicalPlan: baseLogicalPlan{
  676. children: []LogicalPlan{
  677. DataSourcePlan{
  678. name: "src1",
  679. isWildCard: true,
  680. streamFields: map[string]*ast.JsonStreamField{
  681. "id1": {
  682. Type: "bigint",
  683. },
  684. "temp": {
  685. Type: "bigint",
  686. },
  687. "name": {
  688. Type: "string",
  689. },
  690. "myarray": {
  691. Type: "array",
  692. Items: &ast.JsonStreamField{
  693. Type: "string",
  694. },
  695. },
  696. },
  697. streamStmt: streams["src1"],
  698. metaFields: []string{},
  699. pruneFields: []string{},
  700. }.Init(),
  701. },
  702. },
  703. condition: nil,
  704. wtype: ast.COUNT_WINDOW,
  705. length: 5,
  706. interval: 1,
  707. limit: 0,
  708. }.Init(),
  709. },
  710. },
  711. condition: &ast.BinaryExpr{
  712. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  713. OP: ast.GT,
  714. RHS: &ast.IntegerLiteral{Val: 20},
  715. },
  716. }.Init(),
  717. },
  718. },
  719. condition: &ast.BinaryExpr{
  720. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  721. Token: ast.ASTERISK,
  722. }}, FuncType: ast.FuncTypeAgg},
  723. OP: ast.GT,
  724. RHS: &ast.IntegerLiteral{Val: 2},
  725. },
  726. }.Init(),
  727. },
  728. },
  729. fields: []ast.Field{
  730. {
  731. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  732. Name: "*",
  733. AName: "",
  734. },
  735. },
  736. isAggregate: false,
  737. sendMeta: false,
  738. }.Init(),
  739. },
  740. { // 5. optimize join on
  741. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  742. p: ProjectPlan{
  743. baseLogicalPlan: baseLogicalPlan{
  744. children: []LogicalPlan{
  745. JoinPlan{
  746. baseLogicalPlan: baseLogicalPlan{
  747. children: []LogicalPlan{
  748. WindowPlan{
  749. baseLogicalPlan: baseLogicalPlan{
  750. children: []LogicalPlan{
  751. FilterPlan{
  752. baseLogicalPlan: baseLogicalPlan{
  753. children: []LogicalPlan{
  754. DataSourcePlan{
  755. name: "src1",
  756. streamFields: map[string]*ast.JsonStreamField{
  757. "id1": {
  758. Type: "bigint",
  759. },
  760. "temp": {
  761. Type: "bigint",
  762. },
  763. },
  764. streamStmt: streams["src1"],
  765. metaFields: []string{},
  766. pruneFields: []string{},
  767. }.Init(),
  768. },
  769. },
  770. condition: &ast.BinaryExpr{
  771. RHS: &ast.BinaryExpr{
  772. OP: ast.GT,
  773. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  774. RHS: &ast.IntegerLiteral{Val: 20},
  775. },
  776. OP: ast.AND,
  777. LHS: &ast.BinaryExpr{
  778. OP: ast.GT,
  779. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  780. RHS: &ast.IntegerLiteral{Val: 111},
  781. },
  782. },
  783. }.Init(),
  784. FilterPlan{
  785. baseLogicalPlan: baseLogicalPlan{
  786. children: []LogicalPlan{
  787. DataSourcePlan{
  788. name: "src2",
  789. streamFields: map[string]*ast.JsonStreamField{
  790. "hum": {
  791. Type: "bigint",
  792. },
  793. "id2": {
  794. Type: "bigint",
  795. },
  796. },
  797. streamStmt: streams["src2"],
  798. metaFields: []string{},
  799. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  800. pruneFields: []string{},
  801. }.Init(),
  802. },
  803. },
  804. condition: &ast.BinaryExpr{
  805. OP: ast.LT,
  806. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  807. RHS: &ast.IntegerLiteral{Val: 60},
  808. },
  809. }.Init(),
  810. },
  811. },
  812. condition: nil,
  813. wtype: ast.TUMBLING_WINDOW,
  814. length: 10,
  815. timeUnit: ast.SS,
  816. interval: 0,
  817. limit: 0,
  818. }.Init(),
  819. },
  820. },
  821. from: &ast.Table{
  822. Name: "src1",
  823. },
  824. joins: []ast.Join{
  825. {
  826. Name: "src2",
  827. Alias: "",
  828. JoinType: ast.INNER_JOIN,
  829. Expr: &ast.BinaryExpr{
  830. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  831. OP: ast.EQ,
  832. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  833. },
  834. },
  835. },
  836. }.Init(),
  837. },
  838. },
  839. fields: []ast.Field{
  840. {
  841. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  842. Name: "id1",
  843. AName: "",
  844. },
  845. },
  846. isAggregate: false,
  847. sendMeta: false,
  848. }.Init(),
  849. },
  850. { // 6. optimize outter join on
  851. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  852. p: ProjectPlan{
  853. baseLogicalPlan: baseLogicalPlan{
  854. children: []LogicalPlan{
  855. JoinPlan{
  856. baseLogicalPlan: baseLogicalPlan{
  857. children: []LogicalPlan{
  858. WindowPlan{
  859. baseLogicalPlan: baseLogicalPlan{
  860. children: []LogicalPlan{
  861. FilterPlan{
  862. baseLogicalPlan: baseLogicalPlan{
  863. children: []LogicalPlan{
  864. DataSourcePlan{
  865. name: "src1",
  866. streamFields: map[string]*ast.JsonStreamField{
  867. "id1": {
  868. Type: "bigint",
  869. },
  870. "temp": {
  871. Type: "bigint",
  872. },
  873. },
  874. streamStmt: streams["src1"],
  875. metaFields: []string{},
  876. pruneFields: []string{},
  877. }.Init(),
  878. },
  879. },
  880. condition: &ast.BinaryExpr{
  881. OP: ast.GT,
  882. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  883. RHS: &ast.IntegerLiteral{Val: 111},
  884. },
  885. }.Init(),
  886. DataSourcePlan{
  887. name: "src2",
  888. streamFields: map[string]*ast.JsonStreamField{
  889. "hum": {
  890. Type: "bigint",
  891. },
  892. "id2": {
  893. Type: "bigint",
  894. },
  895. },
  896. streamStmt: streams["src2"],
  897. metaFields: []string{},
  898. pruneFields: []string{},
  899. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  900. }.Init(),
  901. },
  902. },
  903. condition: nil,
  904. wtype: ast.TUMBLING_WINDOW,
  905. length: 10,
  906. timeUnit: ast.SS,
  907. interval: 0,
  908. limit: 0,
  909. }.Init(),
  910. },
  911. },
  912. from: &ast.Table{
  913. Name: "src1",
  914. },
  915. joins: []ast.Join{
  916. {
  917. Name: "src2",
  918. Alias: "",
  919. JoinType: ast.FULL_JOIN,
  920. Expr: &ast.BinaryExpr{
  921. OP: ast.AND,
  922. LHS: &ast.BinaryExpr{
  923. OP: ast.AND,
  924. LHS: &ast.BinaryExpr{
  925. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  926. OP: ast.EQ,
  927. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  928. },
  929. RHS: &ast.BinaryExpr{
  930. OP: ast.GT,
  931. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  932. RHS: &ast.IntegerLiteral{Val: 20},
  933. },
  934. },
  935. RHS: &ast.BinaryExpr{
  936. OP: ast.LT,
  937. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  938. RHS: &ast.IntegerLiteral{Val: 60},
  939. },
  940. },
  941. },
  942. },
  943. }.Init(),
  944. },
  945. },
  946. fields: []ast.Field{
  947. {
  948. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  949. Name: "id1",
  950. AName: "",
  951. },
  952. },
  953. isAggregate: false,
  954. sendMeta: false,
  955. }.Init(),
  956. },
  957. { // 7 window error for table
  958. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  959. p: nil,
  960. err: "cannot run window for TABLE sources",
  961. },
  962. { // 8 join table without window
  963. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  964. p: ProjectPlan{
  965. baseLogicalPlan: baseLogicalPlan{
  966. children: []LogicalPlan{
  967. JoinPlan{
  968. baseLogicalPlan: baseLogicalPlan{
  969. children: []LogicalPlan{
  970. JoinAlignPlan{
  971. baseLogicalPlan: baseLogicalPlan{
  972. children: []LogicalPlan{
  973. FilterPlan{
  974. baseLogicalPlan: baseLogicalPlan{
  975. children: []LogicalPlan{
  976. DataSourcePlan{
  977. name: "src1",
  978. streamFields: map[string]*ast.JsonStreamField{
  979. "id1": {
  980. Type: "bigint",
  981. },
  982. "temp": {
  983. Type: "bigint",
  984. },
  985. },
  986. streamStmt: streams["src1"],
  987. metaFields: []string{},
  988. pruneFields: []string{},
  989. }.Init(),
  990. },
  991. },
  992. condition: &ast.BinaryExpr{
  993. RHS: &ast.BinaryExpr{
  994. OP: ast.GT,
  995. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  996. RHS: &ast.IntegerLiteral{Val: 20},
  997. },
  998. OP: ast.AND,
  999. LHS: &ast.BinaryExpr{
  1000. OP: ast.GT,
  1001. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1002. RHS: &ast.IntegerLiteral{Val: 111},
  1003. },
  1004. },
  1005. }.Init(),
  1006. DataSourcePlan{
  1007. name: "tableInPlanner",
  1008. streamFields: map[string]*ast.JsonStreamField{
  1009. "hum": {
  1010. Type: "bigint",
  1011. },
  1012. "id": {
  1013. Type: "bigint",
  1014. },
  1015. },
  1016. streamStmt: streams["tableInPlanner"],
  1017. metaFields: []string{},
  1018. pruneFields: []string{},
  1019. }.Init(),
  1020. },
  1021. },
  1022. Emitters: []string{"tableInPlanner"},
  1023. }.Init(),
  1024. },
  1025. },
  1026. from: &ast.Table{
  1027. Name: "src1",
  1028. },
  1029. joins: []ast.Join{
  1030. {
  1031. Name: "tableInPlanner",
  1032. Alias: "",
  1033. JoinType: ast.INNER_JOIN,
  1034. Expr: &ast.BinaryExpr{
  1035. OP: ast.AND,
  1036. LHS: &ast.BinaryExpr{
  1037. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1038. OP: ast.EQ,
  1039. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1040. },
  1041. RHS: &ast.BinaryExpr{
  1042. OP: ast.LT,
  1043. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1044. RHS: &ast.IntegerLiteral{Val: 60},
  1045. },
  1046. },
  1047. },
  1048. },
  1049. }.Init(),
  1050. },
  1051. },
  1052. fields: []ast.Field{
  1053. {
  1054. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1055. Name: "id1",
  1056. AName: "",
  1057. },
  1058. },
  1059. isAggregate: false,
  1060. sendMeta: false,
  1061. }.Init(),
  1062. },
  1063. { // 9 join table with window
  1064. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1065. p: ProjectPlan{
  1066. baseLogicalPlan: baseLogicalPlan{
  1067. children: []LogicalPlan{
  1068. JoinPlan{
  1069. baseLogicalPlan: baseLogicalPlan{
  1070. children: []LogicalPlan{
  1071. JoinAlignPlan{
  1072. baseLogicalPlan: baseLogicalPlan{
  1073. children: []LogicalPlan{
  1074. WindowPlan{
  1075. baseLogicalPlan: baseLogicalPlan{
  1076. children: []LogicalPlan{
  1077. DataSourcePlan{
  1078. name: "src1",
  1079. streamFields: map[string]*ast.JsonStreamField{
  1080. "id1": {
  1081. Type: "bigint",
  1082. },
  1083. "temp": {
  1084. Type: "bigint",
  1085. },
  1086. },
  1087. streamStmt: streams["src1"],
  1088. metaFields: []string{},
  1089. pruneFields: []string{},
  1090. }.Init(),
  1091. },
  1092. },
  1093. condition: nil,
  1094. wtype: ast.TUMBLING_WINDOW,
  1095. length: 10,
  1096. timeUnit: ast.SS,
  1097. interval: 0,
  1098. limit: 0,
  1099. }.Init(),
  1100. DataSourcePlan{
  1101. name: "tableInPlanner",
  1102. streamFields: map[string]*ast.JsonStreamField{
  1103. "hum": {
  1104. Type: "bigint",
  1105. },
  1106. "id": {
  1107. Type: "bigint",
  1108. },
  1109. },
  1110. streamStmt: streams["tableInPlanner"],
  1111. metaFields: []string{},
  1112. pruneFields: []string{},
  1113. }.Init(),
  1114. },
  1115. },
  1116. Emitters: []string{"tableInPlanner"},
  1117. }.Init(),
  1118. },
  1119. },
  1120. from: &ast.Table{
  1121. Name: "src1",
  1122. },
  1123. joins: []ast.Join{
  1124. {
  1125. Name: "tableInPlanner",
  1126. Alias: "",
  1127. JoinType: ast.INNER_JOIN,
  1128. Expr: &ast.BinaryExpr{
  1129. OP: ast.AND,
  1130. LHS: &ast.BinaryExpr{
  1131. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1132. OP: ast.EQ,
  1133. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1134. },
  1135. RHS: &ast.BinaryExpr{
  1136. RHS: &ast.BinaryExpr{
  1137. OP: ast.AND,
  1138. LHS: &ast.BinaryExpr{
  1139. OP: ast.GT,
  1140. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1141. RHS: &ast.IntegerLiteral{Val: 20},
  1142. },
  1143. RHS: &ast.BinaryExpr{
  1144. OP: ast.LT,
  1145. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1146. RHS: &ast.IntegerLiteral{Val: 60},
  1147. },
  1148. },
  1149. OP: ast.AND,
  1150. LHS: &ast.BinaryExpr{
  1151. OP: ast.GT,
  1152. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1153. RHS: &ast.IntegerLiteral{Val: 111},
  1154. },
  1155. },
  1156. },
  1157. },
  1158. },
  1159. }.Init(),
  1160. },
  1161. },
  1162. fields: []ast.Field{
  1163. {
  1164. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1165. Name: "id1",
  1166. AName: "",
  1167. },
  1168. },
  1169. isAggregate: false,
  1170. sendMeta: false,
  1171. }.Init(),
  1172. },
  1173. { // 10 meta
  1174. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1175. p: ProjectPlan{
  1176. baseLogicalPlan: baseLogicalPlan{
  1177. children: []LogicalPlan{
  1178. FilterPlan{
  1179. baseLogicalPlan: baseLogicalPlan{
  1180. children: []LogicalPlan{
  1181. DataSourcePlan{
  1182. name: "src1",
  1183. streamFields: map[string]*ast.JsonStreamField{
  1184. "temp": {
  1185. Type: "bigint",
  1186. },
  1187. },
  1188. streamStmt: streams["src1"],
  1189. metaFields: []string{"Humidity", "device", "id"},
  1190. pruneFields: []string{},
  1191. }.Init(),
  1192. },
  1193. },
  1194. condition: &ast.BinaryExpr{
  1195. LHS: &ast.Call{
  1196. Name: "meta",
  1197. FuncId: 2,
  1198. Args: []ast.Expr{&ast.MetaRef{
  1199. Name: "device",
  1200. StreamName: ast.DefaultStream,
  1201. }},
  1202. },
  1203. OP: ast.EQ,
  1204. RHS: &ast.StringLiteral{
  1205. Val: "demo2",
  1206. },
  1207. },
  1208. }.Init(),
  1209. },
  1210. },
  1211. fields: []ast.Field{
  1212. {
  1213. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1214. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1215. Name: "id",
  1216. StreamName: ast.DefaultStream,
  1217. }}},
  1218. []ast.StreamName{},
  1219. nil,
  1220. )},
  1221. Name: "meta",
  1222. AName: "eid",
  1223. },
  1224. {
  1225. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1226. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  1227. &ast.BinaryExpr{
  1228. OP: ast.ARROW,
  1229. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  1230. RHS: &ast.JsonFieldRef{Name: "Device"},
  1231. },
  1232. }},
  1233. []ast.StreamName{},
  1234. nil,
  1235. )},
  1236. Name: "meta",
  1237. AName: "hdevice",
  1238. },
  1239. {
  1240. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1241. Name: "temp",
  1242. AName: "",
  1243. },
  1244. },
  1245. isAggregate: false,
  1246. sendMeta: false,
  1247. }.Init(),
  1248. },
  1249. { // 11 join with same name field and aliased
  1250. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1251. p: ProjectPlan{
  1252. baseLogicalPlan: baseLogicalPlan{
  1253. children: []LogicalPlan{
  1254. JoinPlan{
  1255. baseLogicalPlan: baseLogicalPlan{
  1256. children: []LogicalPlan{
  1257. JoinAlignPlan{
  1258. baseLogicalPlan: baseLogicalPlan{
  1259. children: []LogicalPlan{
  1260. DataSourcePlan{
  1261. name: "src2",
  1262. streamFields: map[string]*ast.JsonStreamField{
  1263. "hum": {
  1264. Type: "bigint",
  1265. },
  1266. "id2": {
  1267. Type: "bigint",
  1268. },
  1269. },
  1270. streamStmt: streams["src2"],
  1271. metaFields: []string{},
  1272. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  1273. pruneFields: []string{},
  1274. }.Init(),
  1275. DataSourcePlan{
  1276. name: "tableInPlanner",
  1277. streamFields: map[string]*ast.JsonStreamField{
  1278. "hum": {
  1279. Type: "bigint",
  1280. },
  1281. "id": {
  1282. Type: "bigint",
  1283. },
  1284. },
  1285. streamStmt: streams["tableInPlanner"],
  1286. metaFields: []string{},
  1287. pruneFields: []string{},
  1288. }.Init(),
  1289. },
  1290. },
  1291. Emitters: []string{"tableInPlanner"},
  1292. }.Init(),
  1293. },
  1294. },
  1295. from: &ast.Table{
  1296. Name: "src2",
  1297. },
  1298. joins: []ast.Join{
  1299. {
  1300. Name: "tableInPlanner",
  1301. Alias: "",
  1302. JoinType: ast.INNER_JOIN,
  1303. Expr: &ast.BinaryExpr{
  1304. RHS: &ast.BinaryExpr{
  1305. OP: ast.EQ,
  1306. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1307. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1308. },
  1309. OP: ast.AND,
  1310. LHS: &ast.BinaryExpr{
  1311. OP: ast.GT,
  1312. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1313. &ast.FieldRef{
  1314. Name: "hum",
  1315. StreamName: "src2",
  1316. },
  1317. []ast.StreamName{"src2"},
  1318. &boolFalse,
  1319. )},
  1320. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1321. &ast.FieldRef{
  1322. Name: "hum",
  1323. StreamName: "tableInPlanner",
  1324. },
  1325. []ast.StreamName{"tableInPlanner"},
  1326. &boolFalse,
  1327. )},
  1328. },
  1329. },
  1330. },
  1331. },
  1332. }.Init(),
  1333. },
  1334. },
  1335. fields: []ast.Field{
  1336. {
  1337. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1338. &ast.FieldRef{
  1339. Name: "hum",
  1340. StreamName: "src2",
  1341. },
  1342. []ast.StreamName{"src2"},
  1343. &boolFalse,
  1344. )},
  1345. Name: "hum",
  1346. AName: "hum1",
  1347. }, {
  1348. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1349. &ast.FieldRef{
  1350. Name: "hum",
  1351. StreamName: "tableInPlanner",
  1352. },
  1353. []ast.StreamName{"tableInPlanner"},
  1354. &boolFalse,
  1355. )},
  1356. Name: "hum",
  1357. AName: "hum2",
  1358. },
  1359. },
  1360. isAggregate: false,
  1361. sendMeta: false,
  1362. }.Init(),
  1363. },
  1364. { // 12 meta with more fields
  1365. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1366. p: ProjectPlan{
  1367. baseLogicalPlan: baseLogicalPlan{
  1368. children: []LogicalPlan{
  1369. FilterPlan{
  1370. baseLogicalPlan: baseLogicalPlan{
  1371. children: []LogicalPlan{
  1372. DataSourcePlan{
  1373. name: "src1",
  1374. streamFields: map[string]*ast.JsonStreamField{
  1375. "temp": {
  1376. Type: "bigint",
  1377. },
  1378. },
  1379. streamStmt: streams["src1"],
  1380. metaFields: []string{},
  1381. allMeta: true,
  1382. pruneFields: []string{},
  1383. }.Init(),
  1384. },
  1385. },
  1386. condition: &ast.BinaryExpr{
  1387. LHS: &ast.Call{
  1388. Name: "meta",
  1389. FuncId: 1,
  1390. Args: []ast.Expr{&ast.MetaRef{
  1391. Name: "device",
  1392. StreamName: ast.DefaultStream,
  1393. }},
  1394. },
  1395. OP: ast.EQ,
  1396. RHS: &ast.StringLiteral{
  1397. Val: "demo2",
  1398. },
  1399. },
  1400. }.Init(),
  1401. },
  1402. },
  1403. fields: []ast.Field{
  1404. {
  1405. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1406. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1407. Name: "*",
  1408. StreamName: ast.DefaultStream,
  1409. }}},
  1410. []ast.StreamName{},
  1411. nil,
  1412. )},
  1413. Name: "meta",
  1414. AName: "m",
  1415. },
  1416. {
  1417. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1418. Name: "temp",
  1419. AName: "",
  1420. },
  1421. },
  1422. isAggregate: false,
  1423. sendMeta: false,
  1424. }.Init(),
  1425. },
  1426. { // 13 analytic function plan
  1427. sql: `SELECT latest(lag(name)), id1 FROM src1 WHERE lag(temp) > temp`,
  1428. p: ProjectPlan{
  1429. baseLogicalPlan: baseLogicalPlan{
  1430. children: []LogicalPlan{
  1431. FilterPlan{
  1432. baseLogicalPlan: baseLogicalPlan{
  1433. children: []LogicalPlan{
  1434. AnalyticFuncsPlan{
  1435. baseLogicalPlan: baseLogicalPlan{
  1436. children: []LogicalPlan{
  1437. DataSourcePlan{
  1438. name: "src1",
  1439. streamFields: map[string]*ast.JsonStreamField{
  1440. "id1": {
  1441. Type: "bigint",
  1442. },
  1443. "name": {
  1444. Type: "string",
  1445. },
  1446. "temp": {
  1447. Type: "bigint",
  1448. },
  1449. },
  1450. streamStmt: streams["src1"],
  1451. metaFields: []string{},
  1452. pruneFields: []string{},
  1453. }.Init(),
  1454. },
  1455. },
  1456. funcs: []*ast.Call{
  1457. {
  1458. Name: "lag",
  1459. FuncId: 2,
  1460. CachedField: "$$a_lag_2",
  1461. Args: []ast.Expr{&ast.FieldRef{
  1462. Name: "temp",
  1463. StreamName: "src1",
  1464. }},
  1465. },
  1466. },
  1467. fieldFuncs: []*ast.Call{
  1468. {
  1469. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1470. },
  1471. {
  1472. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1473. },
  1474. },
  1475. }.Init(),
  1476. },
  1477. },
  1478. condition: &ast.BinaryExpr{
  1479. LHS: &ast.Call{
  1480. Name: "lag",
  1481. FuncId: 2,
  1482. Args: []ast.Expr{&ast.FieldRef{
  1483. Name: "temp",
  1484. StreamName: "src1",
  1485. }},
  1486. CachedField: "$$a_lag_2",
  1487. Cached: true,
  1488. },
  1489. OP: ast.GT,
  1490. RHS: &ast.FieldRef{
  1491. Name: "temp",
  1492. StreamName: "src1",
  1493. },
  1494. },
  1495. }.Init(),
  1496. },
  1497. },
  1498. fields: []ast.Field{
  1499. {
  1500. Expr: &ast.Call{
  1501. Name: "latest",
  1502. FuncId: 1,
  1503. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1504. CachedField: "$$a_latest_1",
  1505. Cached: true,
  1506. },
  1507. Name: "latest",
  1508. }, {
  1509. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1510. Name: "id1",
  1511. },
  1512. },
  1513. isAggregate: false,
  1514. sendMeta: false,
  1515. }.Init(),
  1516. },
  1517. { // 14
  1518. sql: `SELECT name, *, meta(device) FROM src1`,
  1519. p: ProjectPlan{
  1520. baseLogicalPlan: baseLogicalPlan{
  1521. children: []LogicalPlan{
  1522. DataSourcePlan{
  1523. baseLogicalPlan: baseLogicalPlan{},
  1524. name: "src1",
  1525. streamFields: map[string]*ast.JsonStreamField{
  1526. "id1": {
  1527. Type: "bigint",
  1528. },
  1529. "temp": {
  1530. Type: "bigint",
  1531. },
  1532. "name": {
  1533. Type: "string",
  1534. },
  1535. "myarray": {
  1536. Type: "array",
  1537. Items: &ast.JsonStreamField{
  1538. Type: "string",
  1539. },
  1540. },
  1541. },
  1542. streamStmt: streams["src1"],
  1543. metaFields: []string{"device"},
  1544. isWildCard: true,
  1545. pruneFields: []string{},
  1546. }.Init(),
  1547. },
  1548. },
  1549. fields: []ast.Field{
  1550. {
  1551. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1552. Name: "name",
  1553. AName: "",
  1554. },
  1555. {
  1556. Name: "*",
  1557. Expr: &ast.Wildcard{
  1558. Token: ast.ASTERISK,
  1559. },
  1560. },
  1561. {
  1562. Name: "meta",
  1563. Expr: &ast.Call{
  1564. Name: "meta",
  1565. Args: []ast.Expr{
  1566. &ast.MetaRef{
  1567. StreamName: ast.DefaultStream,
  1568. Name: "device",
  1569. },
  1570. },
  1571. },
  1572. },
  1573. },
  1574. isAggregate: false,
  1575. allWildcard: true,
  1576. sendMeta: false,
  1577. }.Init(),
  1578. },
  1579. { // 15 analytic function over partition plan
  1580. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp), id1 FROM src1 WHERE lag(temp) > temp`,
  1581. p: ProjectPlan{
  1582. baseLogicalPlan: baseLogicalPlan{
  1583. children: []LogicalPlan{
  1584. FilterPlan{
  1585. baseLogicalPlan: baseLogicalPlan{
  1586. children: []LogicalPlan{
  1587. AnalyticFuncsPlan{
  1588. baseLogicalPlan: baseLogicalPlan{
  1589. children: []LogicalPlan{
  1590. DataSourcePlan{
  1591. name: "src1",
  1592. streamFields: map[string]*ast.JsonStreamField{
  1593. "id1": {
  1594. Type: "bigint",
  1595. },
  1596. "name": {
  1597. Type: "string",
  1598. },
  1599. "temp": {
  1600. Type: "bigint",
  1601. },
  1602. },
  1603. streamStmt: streams["src1"],
  1604. metaFields: []string{},
  1605. pruneFields: []string{},
  1606. }.Init(),
  1607. },
  1608. },
  1609. funcs: []*ast.Call{
  1610. {
  1611. Name: "lag",
  1612. FuncId: 2,
  1613. CachedField: "$$a_lag_2",
  1614. Args: []ast.Expr{&ast.FieldRef{
  1615. Name: "temp",
  1616. StreamName: "src1",
  1617. }},
  1618. },
  1619. },
  1620. fieldFuncs: []*ast.Call{
  1621. {
  1622. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1623. },
  1624. {
  1625. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}},
  1626. },
  1627. },
  1628. }.Init(),
  1629. },
  1630. },
  1631. condition: &ast.BinaryExpr{
  1632. LHS: &ast.Call{
  1633. Name: "lag",
  1634. FuncId: 2,
  1635. Args: []ast.Expr{&ast.FieldRef{
  1636. Name: "temp",
  1637. StreamName: "src1",
  1638. }},
  1639. CachedField: "$$a_lag_2",
  1640. Cached: true,
  1641. },
  1642. OP: ast.GT,
  1643. RHS: &ast.FieldRef{
  1644. Name: "temp",
  1645. StreamName: "src1",
  1646. },
  1647. },
  1648. }.Init(),
  1649. },
  1650. },
  1651. fields: []ast.Field{
  1652. {
  1653. Expr: &ast.Call{
  1654. Name: "latest",
  1655. FuncId: 1,
  1656. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1657. CachedField: "$$a_latest_1",
  1658. Cached: true,
  1659. Partition: &ast.PartitionExpr{
  1660. Exprs: []ast.Expr{
  1661. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1662. },
  1663. },
  1664. },
  1665. Name: "latest",
  1666. }, {
  1667. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1668. Name: "id1",
  1669. },
  1670. },
  1671. isAggregate: false,
  1672. sendMeta: false,
  1673. }.Init(),
  1674. },
  1675. { // 16 analytic function over partition when plan
  1676. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp WHEN temp > 12), id1 FROM src1 WHERE lag(temp) > temp`,
  1677. p: ProjectPlan{
  1678. baseLogicalPlan: baseLogicalPlan{
  1679. children: []LogicalPlan{
  1680. FilterPlan{
  1681. baseLogicalPlan: baseLogicalPlan{
  1682. children: []LogicalPlan{
  1683. AnalyticFuncsPlan{
  1684. baseLogicalPlan: baseLogicalPlan{
  1685. children: []LogicalPlan{
  1686. DataSourcePlan{
  1687. name: "src1",
  1688. streamFields: map[string]*ast.JsonStreamField{
  1689. "id1": {
  1690. Type: "bigint",
  1691. },
  1692. "name": {
  1693. Type: "string",
  1694. },
  1695. "temp": {
  1696. Type: "bigint",
  1697. },
  1698. },
  1699. streamStmt: streams["src1"],
  1700. metaFields: []string{},
  1701. pruneFields: []string{},
  1702. }.Init(),
  1703. },
  1704. },
  1705. funcs: []*ast.Call{
  1706. {
  1707. Name: "lag",
  1708. FuncId: 2,
  1709. CachedField: "$$a_lag_2",
  1710. Args: []ast.Expr{&ast.FieldRef{
  1711. Name: "temp",
  1712. StreamName: "src1",
  1713. }},
  1714. },
  1715. },
  1716. fieldFuncs: []*ast.Call{
  1717. {
  1718. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1719. },
  1720. {
  1721. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}}, WhenExpr: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"}, OP: ast.GT, RHS: &ast.IntegerLiteral{Val: 12}},
  1722. },
  1723. },
  1724. }.Init(),
  1725. },
  1726. },
  1727. condition: &ast.BinaryExpr{
  1728. LHS: &ast.Call{
  1729. Name: "lag",
  1730. FuncId: 2,
  1731. Args: []ast.Expr{&ast.FieldRef{
  1732. Name: "temp",
  1733. StreamName: "src1",
  1734. }},
  1735. CachedField: "$$a_lag_2",
  1736. Cached: true,
  1737. },
  1738. OP: ast.GT,
  1739. RHS: &ast.FieldRef{
  1740. Name: "temp",
  1741. StreamName: "src1",
  1742. },
  1743. },
  1744. }.Init(),
  1745. },
  1746. },
  1747. fields: []ast.Field{
  1748. {
  1749. Expr: &ast.Call{
  1750. Name: "latest",
  1751. FuncId: 1,
  1752. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1753. CachedField: "$$a_latest_1",
  1754. Cached: true,
  1755. Partition: &ast.PartitionExpr{
  1756. Exprs: []ast.Expr{
  1757. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1758. },
  1759. },
  1760. WhenExpr: &ast.BinaryExpr{
  1761. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1762. OP: ast.GT,
  1763. RHS: &ast.IntegerLiteral{Val: 12},
  1764. },
  1765. },
  1766. Name: "latest",
  1767. }, {
  1768. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1769. Name: "id1",
  1770. },
  1771. },
  1772. isAggregate: false,
  1773. sendMeta: false,
  1774. }.Init(),
  1775. },
  1776. { // 17. do not optimize sliding window
  1777. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY SLIDINGWINDOW(ss, 10) HAVING COUNT(*) > 2`,
  1778. p: ProjectPlan{
  1779. baseLogicalPlan: baseLogicalPlan{
  1780. children: []LogicalPlan{
  1781. HavingPlan{
  1782. baseLogicalPlan: baseLogicalPlan{
  1783. children: []LogicalPlan{
  1784. FilterPlan{
  1785. baseLogicalPlan: baseLogicalPlan{
  1786. children: []LogicalPlan{
  1787. WindowPlan{
  1788. baseLogicalPlan: baseLogicalPlan{
  1789. children: []LogicalPlan{
  1790. DataSourcePlan{
  1791. name: "src1",
  1792. isWildCard: true,
  1793. streamFields: map[string]*ast.JsonStreamField{
  1794. "id1": {
  1795. Type: "bigint",
  1796. },
  1797. "temp": {
  1798. Type: "bigint",
  1799. },
  1800. "name": {
  1801. Type: "string",
  1802. },
  1803. "myarray": {
  1804. Type: "array",
  1805. Items: &ast.JsonStreamField{
  1806. Type: "string",
  1807. },
  1808. },
  1809. },
  1810. streamStmt: streams["src1"],
  1811. metaFields: []string{},
  1812. pruneFields: []string{},
  1813. }.Init(),
  1814. },
  1815. },
  1816. condition: nil,
  1817. wtype: ast.SLIDING_WINDOW,
  1818. length: 10,
  1819. timeUnit: ast.SS,
  1820. interval: 0,
  1821. limit: 0,
  1822. }.Init(),
  1823. },
  1824. },
  1825. condition: &ast.BinaryExpr{
  1826. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1827. OP: ast.GT,
  1828. RHS: &ast.IntegerLiteral{Val: 20},
  1829. },
  1830. }.Init(),
  1831. },
  1832. },
  1833. condition: &ast.BinaryExpr{
  1834. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1835. Token: ast.ASTERISK,
  1836. }}, FuncType: ast.FuncTypeAgg},
  1837. OP: ast.GT,
  1838. RHS: &ast.IntegerLiteral{Val: 2},
  1839. },
  1840. }.Init(),
  1841. },
  1842. },
  1843. fields: []ast.Field{
  1844. {
  1845. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1846. Name: "*",
  1847. AName: "",
  1848. },
  1849. },
  1850. isAggregate: false,
  1851. sendMeta: false,
  1852. }.Init(),
  1853. },
  1854. {
  1855. // 18 analytic function over when plan
  1856. sql: `SELECT CASE WHEN lag(temp) OVER (WHEN lag(id1) > 1) BETWEEN 0 AND 10 THEN 1 ELSE 0 END FROM src1`,
  1857. p: ProjectPlan{
  1858. baseLogicalPlan: baseLogicalPlan{
  1859. children: []LogicalPlan{
  1860. AnalyticFuncsPlan{
  1861. baseLogicalPlan: baseLogicalPlan{
  1862. children: []LogicalPlan{
  1863. DataSourcePlan{
  1864. name: "src1",
  1865. streamFields: map[string]*ast.JsonStreamField{
  1866. "id1": {
  1867. Type: "bigint",
  1868. },
  1869. "temp": {
  1870. Type: "bigint",
  1871. },
  1872. },
  1873. streamStmt: &ast.StreamStmt{
  1874. Name: "src1",
  1875. StreamFields: []ast.StreamField{
  1876. {
  1877. Name: "id1",
  1878. FieldType: &ast.BasicType{
  1879. Type: ast.DataType(1),
  1880. },
  1881. },
  1882. {
  1883. Name: "temp",
  1884. FieldType: &ast.BasicType{
  1885. Type: ast.DataType(1),
  1886. },
  1887. },
  1888. {
  1889. Name: "name",
  1890. FieldType: &ast.BasicType{
  1891. Type: ast.DataType(3),
  1892. },
  1893. },
  1894. {
  1895. Name: "myarray",
  1896. FieldType: &ast.ArrayType{
  1897. Type: ast.DataType(3),
  1898. },
  1899. },
  1900. },
  1901. Options: &ast.Options{
  1902. DATASOURCE: "src1",
  1903. KEY: "ts",
  1904. FORMAT: "json",
  1905. },
  1906. StreamType: ast.StreamType(0),
  1907. },
  1908. metaFields: []string{},
  1909. pruneFields: []string{},
  1910. }.Init(),
  1911. },
  1912. },
  1913. fieldFuncs: []*ast.Call{
  1914. {
  1915. Name: "lag",
  1916. FuncId: 1,
  1917. FuncType: ast.FuncType(0),
  1918. Args: []ast.Expr{
  1919. &ast.FieldRef{
  1920. StreamName: "src1",
  1921. Name: "id1",
  1922. },
  1923. },
  1924. CachedField: "$$a_lag_1",
  1925. },
  1926. {
  1927. Name: "lag",
  1928. FuncId: 0,
  1929. FuncType: ast.FuncType(0),
  1930. Args: []ast.Expr{
  1931. &ast.FieldRef{
  1932. StreamName: "src1",
  1933. Name: "temp",
  1934. },
  1935. },
  1936. CachedField: "$$a_lag_0",
  1937. WhenExpr: &ast.BinaryExpr{
  1938. OP: ast.GT,
  1939. LHS: &ast.Call{
  1940. Name: "lag",
  1941. FuncId: 1,
  1942. FuncType: ast.FuncType(0),
  1943. Args: []ast.Expr{
  1944. &ast.FieldRef{
  1945. StreamName: "src1",
  1946. Name: "id1",
  1947. },
  1948. },
  1949. CachedField: "$$a_lag_1",
  1950. Cached: true,
  1951. },
  1952. RHS: &ast.IntegerLiteral{
  1953. Val: 1,
  1954. },
  1955. },
  1956. },
  1957. },
  1958. }.Init(),
  1959. },
  1960. },
  1961. fields: []ast.Field{
  1962. {
  1963. Name: "kuiper_field_0",
  1964. Expr: &ast.CaseExpr{
  1965. WhenClauses: []*ast.WhenClause{
  1966. {
  1967. Expr: &ast.BinaryExpr{
  1968. OP: ast.BETWEEN,
  1969. LHS: &ast.Call{
  1970. Name: "lag",
  1971. FuncId: 0,
  1972. FuncType: ast.FuncType(0),
  1973. Args: []ast.Expr{
  1974. &ast.FieldRef{
  1975. StreamName: "src1",
  1976. Name: "temp",
  1977. },
  1978. },
  1979. CachedField: "$$a_lag_0",
  1980. Cached: true,
  1981. WhenExpr: &ast.BinaryExpr{
  1982. OP: ast.GT,
  1983. LHS: &ast.Call{
  1984. Name: "lag",
  1985. FuncId: 1,
  1986. FuncType: ast.FuncType(0),
  1987. Args: []ast.Expr{
  1988. &ast.FieldRef{
  1989. StreamName: "src1",
  1990. Name: "id1",
  1991. },
  1992. },
  1993. CachedField: "$$a_lag_1",
  1994. Cached: true,
  1995. },
  1996. RHS: &ast.IntegerLiteral{
  1997. Val: 1,
  1998. },
  1999. },
  2000. },
  2001. RHS: &ast.BetweenExpr{
  2002. Lower: &ast.IntegerLiteral{
  2003. Val: 0,
  2004. },
  2005. Higher: &ast.IntegerLiteral{
  2006. Val: 10,
  2007. },
  2008. },
  2009. },
  2010. Result: &ast.IntegerLiteral{
  2011. Val: 1,
  2012. },
  2013. },
  2014. },
  2015. ElseClause: &ast.IntegerLiteral{
  2016. Val: 0,
  2017. },
  2018. },
  2019. },
  2020. },
  2021. }.Init(),
  2022. },
  2023. { // 19
  2024. sql: `SELECT * EXCEPT(id1, name), meta(device) FROM src1`,
  2025. p: ProjectPlan{
  2026. baseLogicalPlan: baseLogicalPlan{
  2027. children: []LogicalPlan{
  2028. DataSourcePlan{
  2029. baseLogicalPlan: baseLogicalPlan{},
  2030. name: "src1",
  2031. streamFields: map[string]*ast.JsonStreamField{
  2032. "temp": {
  2033. Type: "bigint",
  2034. },
  2035. "myarray": {
  2036. Type: "array",
  2037. Items: &ast.JsonStreamField{
  2038. Type: "string",
  2039. },
  2040. },
  2041. },
  2042. streamStmt: streams["src1"],
  2043. metaFields: []string{"device"},
  2044. isWildCard: false,
  2045. pruneFields: []string{"id1", "name"},
  2046. }.Init(),
  2047. },
  2048. },
  2049. fields: []ast.Field{
  2050. {
  2051. Name: "*",
  2052. Expr: &ast.Wildcard{
  2053. Token: ast.ASTERISK,
  2054. Except: []string{"id1", "name"},
  2055. },
  2056. },
  2057. {
  2058. Name: "meta",
  2059. Expr: &ast.Call{
  2060. Name: "meta",
  2061. Args: []ast.Expr{
  2062. &ast.MetaRef{
  2063. StreamName: ast.DefaultStream,
  2064. Name: "device",
  2065. },
  2066. },
  2067. },
  2068. },
  2069. },
  2070. isAggregate: false,
  2071. allWildcard: true,
  2072. sendMeta: false,
  2073. }.Init(),
  2074. },
  2075. { // 20
  2076. sql: `SELECT * REPLACE(temp * 2 AS id1, myarray * 2 AS name), meta(device) FROM src1`,
  2077. p: ProjectPlan{
  2078. baseLogicalPlan: baseLogicalPlan{
  2079. children: []LogicalPlan{
  2080. DataSourcePlan{
  2081. baseLogicalPlan: baseLogicalPlan{},
  2082. name: "src1",
  2083. streamFields: map[string]*ast.JsonStreamField{
  2084. "temp": {
  2085. Type: "bigint",
  2086. },
  2087. "myarray": {
  2088. Type: "array",
  2089. Items: &ast.JsonStreamField{
  2090. Type: "string",
  2091. },
  2092. },
  2093. },
  2094. streamStmt: streams["src1"],
  2095. metaFields: []string{"device"},
  2096. isWildCard: false,
  2097. pruneFields: []string{"id1", "name"},
  2098. }.Init(),
  2099. },
  2100. },
  2101. fields: []ast.Field{
  2102. {
  2103. Name: "*",
  2104. Expr: &ast.Wildcard{
  2105. Token: ast.ASTERISK,
  2106. Replace: []ast.Field{
  2107. {
  2108. AName: "id1",
  2109. Expr: &ast.BinaryExpr{
  2110. OP: ast.MUL,
  2111. LHS: &ast.FieldRef{
  2112. Name: "temp",
  2113. StreamName: "src1",
  2114. },
  2115. RHS: &ast.IntegerLiteral{Val: 2},
  2116. },
  2117. },
  2118. {
  2119. AName: "name",
  2120. Expr: &ast.BinaryExpr{
  2121. OP: ast.MUL,
  2122. LHS: &ast.FieldRef{
  2123. Name: "myarray",
  2124. StreamName: "src1",
  2125. },
  2126. RHS: &ast.IntegerLiteral{Val: 2},
  2127. },
  2128. },
  2129. },
  2130. },
  2131. },
  2132. {
  2133. Name: "meta",
  2134. Expr: &ast.Call{
  2135. Name: "meta",
  2136. Args: []ast.Expr{
  2137. &ast.MetaRef{
  2138. StreamName: ast.DefaultStream,
  2139. Name: "device",
  2140. },
  2141. },
  2142. },
  2143. },
  2144. },
  2145. isAggregate: false,
  2146. allWildcard: true,
  2147. sendMeta: false,
  2148. }.Init(),
  2149. },
  2150. { // 21
  2151. sql: `SELECT collect( * EXCEPT(id1, name)) FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2152. p: ProjectPlan{
  2153. baseLogicalPlan: baseLogicalPlan{
  2154. children: []LogicalPlan{
  2155. WindowPlan{
  2156. baseLogicalPlan: baseLogicalPlan{
  2157. children: []LogicalPlan{
  2158. DataSourcePlan{
  2159. baseLogicalPlan: baseLogicalPlan{},
  2160. name: "src1",
  2161. streamFields: map[string]*ast.JsonStreamField{
  2162. "temp": {
  2163. Type: "bigint",
  2164. },
  2165. "myarray": {
  2166. Type: "array",
  2167. Items: &ast.JsonStreamField{
  2168. Type: "string",
  2169. },
  2170. },
  2171. },
  2172. streamStmt: streams["src1"],
  2173. metaFields: []string{},
  2174. isWildCard: false,
  2175. pruneFields: []string{"id1", "name"},
  2176. }.Init(),
  2177. },
  2178. },
  2179. condition: nil,
  2180. wtype: ast.TUMBLING_WINDOW,
  2181. length: 10,
  2182. timeUnit: ast.SS,
  2183. interval: 0,
  2184. limit: 0,
  2185. }.Init(),
  2186. },
  2187. },
  2188. fields: []ast.Field{
  2189. {
  2190. Name: "collect",
  2191. Expr: &ast.Call{
  2192. Name: "collect",
  2193. FuncType: ast.FuncTypeAgg,
  2194. Args: []ast.Expr{
  2195. &ast.Wildcard{
  2196. Token: ast.ASTERISK,
  2197. Except: []string{"id1", "name"},
  2198. },
  2199. },
  2200. },
  2201. },
  2202. },
  2203. isAggregate: true,
  2204. allWildcard: false,
  2205. sendMeta: false,
  2206. }.Init(),
  2207. },
  2208. { // 22
  2209. sql: `SELECT collect( * REPLACE(temp * 2 AS id1, myarray * 2 AS name)) FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2210. p: ProjectPlan{
  2211. baseLogicalPlan: baseLogicalPlan{
  2212. children: []LogicalPlan{
  2213. WindowPlan{
  2214. baseLogicalPlan: baseLogicalPlan{
  2215. children: []LogicalPlan{
  2216. DataSourcePlan{
  2217. baseLogicalPlan: baseLogicalPlan{},
  2218. name: "src1",
  2219. streamFields: map[string]*ast.JsonStreamField{
  2220. "temp": {
  2221. Type: "bigint",
  2222. },
  2223. "myarray": {
  2224. Type: "array",
  2225. Items: &ast.JsonStreamField{
  2226. Type: "string",
  2227. },
  2228. },
  2229. },
  2230. streamStmt: streams["src1"],
  2231. metaFields: []string{},
  2232. isWildCard: false,
  2233. pruneFields: []string{"id1", "name"},
  2234. }.Init(),
  2235. },
  2236. },
  2237. condition: nil,
  2238. wtype: ast.TUMBLING_WINDOW,
  2239. length: 10,
  2240. timeUnit: ast.SS,
  2241. interval: 0,
  2242. limit: 0,
  2243. }.Init(),
  2244. },
  2245. },
  2246. fields: []ast.Field{
  2247. {
  2248. Name: "collect",
  2249. Expr: &ast.Call{
  2250. Name: "collect",
  2251. FuncType: ast.FuncTypeAgg,
  2252. Args: []ast.Expr{
  2253. &ast.Wildcard{
  2254. Token: ast.ASTERISK,
  2255. Replace: []ast.Field{
  2256. {
  2257. AName: "id1",
  2258. Expr: &ast.BinaryExpr{
  2259. OP: ast.MUL,
  2260. LHS: &ast.FieldRef{
  2261. Name: "temp",
  2262. StreamName: "src1",
  2263. },
  2264. RHS: &ast.IntegerLiteral{Val: 2},
  2265. },
  2266. },
  2267. {
  2268. AName: "name",
  2269. Expr: &ast.BinaryExpr{
  2270. OP: ast.MUL,
  2271. LHS: &ast.FieldRef{
  2272. Name: "myarray",
  2273. StreamName: "src1",
  2274. },
  2275. RHS: &ast.IntegerLiteral{Val: 2},
  2276. },
  2277. },
  2278. },
  2279. },
  2280. },
  2281. },
  2282. },
  2283. },
  2284. isAggregate: true,
  2285. allWildcard: false,
  2286. sendMeta: false,
  2287. }.Init(),
  2288. },
  2289. { // 23
  2290. sql: `SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) HAVING count(* EXCEPT(id1, name)) > 0`,
  2291. p: ProjectPlan{
  2292. baseLogicalPlan: baseLogicalPlan{
  2293. children: []LogicalPlan{
  2294. HavingPlan{
  2295. baseLogicalPlan: baseLogicalPlan{
  2296. children: []LogicalPlan{
  2297. WindowPlan{
  2298. baseLogicalPlan: baseLogicalPlan{
  2299. children: []LogicalPlan{
  2300. DataSourcePlan{
  2301. baseLogicalPlan: baseLogicalPlan{},
  2302. name: "src1",
  2303. streamFields: map[string]*ast.JsonStreamField{
  2304. "id1": {
  2305. Type: "bigint",
  2306. },
  2307. "temp": {
  2308. Type: "bigint",
  2309. },
  2310. "myarray": {
  2311. Type: "array",
  2312. Items: &ast.JsonStreamField{
  2313. Type: "string",
  2314. },
  2315. },
  2316. },
  2317. streamStmt: streams["src1"],
  2318. metaFields: []string{},
  2319. isWildCard: false,
  2320. pruneFields: []string{"id1", "name"},
  2321. }.Init(),
  2322. },
  2323. },
  2324. condition: nil,
  2325. wtype: ast.TUMBLING_WINDOW,
  2326. length: 10,
  2327. timeUnit: ast.SS,
  2328. interval: 0,
  2329. limit: 0,
  2330. }.Init(),
  2331. },
  2332. },
  2333. condition: &ast.BinaryExpr{
  2334. LHS: &ast.Call{
  2335. Name: "count",
  2336. FuncId: 0,
  2337. Args: []ast.Expr{
  2338. &ast.Wildcard{
  2339. Token: ast.ASTERISK,
  2340. Except: []string{"id1", "name"},
  2341. },
  2342. },
  2343. FuncType: ast.FuncTypeAgg,
  2344. },
  2345. OP: ast.GT,
  2346. RHS: &ast.IntegerLiteral{Val: 0},
  2347. },
  2348. }.Init(),
  2349. },
  2350. },
  2351. fields: []ast.Field{
  2352. {
  2353. Name: "id1",
  2354. Expr: &ast.FieldRef{
  2355. Name: "id1",
  2356. StreamName: "src1",
  2357. },
  2358. },
  2359. },
  2360. isAggregate: false,
  2361. allWildcard: false,
  2362. sendMeta: false,
  2363. }.Init(),
  2364. },
  2365. { // 24
  2366. sql: `SELECT temp FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) HAVING count(* REPLACE(temp * 2 AS id1, myarray * 2 AS name)) > 0`,
  2367. p: ProjectPlan{
  2368. baseLogicalPlan: baseLogicalPlan{
  2369. children: []LogicalPlan{
  2370. HavingPlan{
  2371. baseLogicalPlan: baseLogicalPlan{
  2372. children: []LogicalPlan{
  2373. WindowPlan{
  2374. baseLogicalPlan: baseLogicalPlan{
  2375. children: []LogicalPlan{
  2376. DataSourcePlan{
  2377. baseLogicalPlan: baseLogicalPlan{},
  2378. name: "src1",
  2379. streamFields: map[string]*ast.JsonStreamField{
  2380. "temp": {
  2381. Type: "bigint",
  2382. },
  2383. "myarray": {
  2384. Type: "array",
  2385. Items: &ast.JsonStreamField{
  2386. Type: "string",
  2387. },
  2388. },
  2389. },
  2390. streamStmt: streams["src1"],
  2391. metaFields: []string{},
  2392. isWildCard: false,
  2393. pruneFields: []string{"id1", "name"},
  2394. }.Init(),
  2395. },
  2396. },
  2397. condition: nil,
  2398. wtype: ast.TUMBLING_WINDOW,
  2399. length: 10,
  2400. timeUnit: ast.SS,
  2401. interval: 0,
  2402. limit: 0,
  2403. }.Init(),
  2404. },
  2405. },
  2406. condition: &ast.BinaryExpr{
  2407. LHS: &ast.Call{
  2408. Name: "count",
  2409. FuncId: 0,
  2410. Args: []ast.Expr{
  2411. &ast.Wildcard{
  2412. Token: ast.ASTERISK,
  2413. Replace: []ast.Field{
  2414. {
  2415. AName: "id1",
  2416. Expr: &ast.BinaryExpr{
  2417. OP: ast.MUL,
  2418. LHS: &ast.FieldRef{
  2419. Name: "temp",
  2420. StreamName: "src1",
  2421. },
  2422. RHS: &ast.IntegerLiteral{Val: 2},
  2423. },
  2424. },
  2425. {
  2426. AName: "name",
  2427. Expr: &ast.BinaryExpr{
  2428. OP: ast.MUL,
  2429. LHS: &ast.FieldRef{
  2430. Name: "myarray",
  2431. StreamName: "src1",
  2432. },
  2433. RHS: &ast.IntegerLiteral{Val: 2},
  2434. },
  2435. },
  2436. },
  2437. },
  2438. },
  2439. FuncType: ast.FuncTypeAgg,
  2440. },
  2441. OP: ast.GT,
  2442. RHS: &ast.IntegerLiteral{Val: 0},
  2443. },
  2444. }.Init(),
  2445. },
  2446. },
  2447. fields: []ast.Field{
  2448. {
  2449. Name: "temp",
  2450. Expr: &ast.FieldRef{
  2451. Name: "temp",
  2452. StreamName: "src1",
  2453. },
  2454. },
  2455. },
  2456. isAggregate: false,
  2457. allWildcard: false,
  2458. sendMeta: false,
  2459. }.Init(),
  2460. },
  2461. }
  2462. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2463. for i, tt := range tests {
  2464. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2465. if err != nil {
  2466. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2467. continue
  2468. }
  2469. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2470. IsEventTime: false,
  2471. LateTol: 0,
  2472. Concurrency: 0,
  2473. BufferLength: 0,
  2474. SendMetaToSink: false,
  2475. Qos: 0,
  2476. CheckpointInterval: 0,
  2477. SendError: true,
  2478. }, kv)
  2479. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2480. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2481. } else if !reflect.DeepEqual(tt.p, p) {
  2482. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2483. }
  2484. }
  2485. }
  2486. func Test_createLogicalPlanSchemaless(t *testing.T) {
  2487. kv, err := store.GetKV("stream")
  2488. if err != nil {
  2489. t.Error(err)
  2490. return
  2491. }
  2492. streamSqls := map[string]string{
  2493. "src1": `CREATE STREAM src1 (
  2494. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  2495. "src2": `CREATE STREAM src2 (
  2496. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  2497. "tableInPlanner": `CREATE TABLE tableInPlanner (
  2498. id BIGINT,
  2499. name STRING,
  2500. value STRING,
  2501. hum BIGINT
  2502. ) WITH (TYPE="file");`,
  2503. }
  2504. types := map[string]ast.StreamType{
  2505. "src1": ast.TypeStream,
  2506. "src2": ast.TypeStream,
  2507. "tableInPlanner": ast.TypeTable,
  2508. }
  2509. for name, sql := range streamSqls {
  2510. s, err := json.Marshal(&xsql.StreamInfo{
  2511. StreamType: types[name],
  2512. Statement: sql,
  2513. })
  2514. if err != nil {
  2515. t.Error(err)
  2516. t.Fail()
  2517. }
  2518. err = kv.Set(name, string(s))
  2519. if err != nil {
  2520. t.Error(err)
  2521. t.Fail()
  2522. }
  2523. }
  2524. streams := make(map[string]*ast.StreamStmt)
  2525. for n := range streamSqls {
  2526. streamStmt, err := xsql.GetDataSource(kv, n)
  2527. if err != nil {
  2528. t.Errorf("fail to get stream %s, please check if stream is created", n)
  2529. return
  2530. }
  2531. streams[n] = streamStmt
  2532. }
  2533. // boolTrue = true
  2534. boolFalse := false
  2535. tests := []struct {
  2536. sql string
  2537. p LogicalPlan
  2538. err string
  2539. }{
  2540. { // 0
  2541. sql: `SELECT name FROM src1`,
  2542. p: ProjectPlan{
  2543. baseLogicalPlan: baseLogicalPlan{
  2544. children: []LogicalPlan{
  2545. DataSourcePlan{
  2546. baseLogicalPlan: baseLogicalPlan{},
  2547. name: "src1",
  2548. streamFields: map[string]*ast.JsonStreamField{
  2549. "name": nil,
  2550. },
  2551. streamStmt: streams["src1"],
  2552. isSchemaless: true,
  2553. metaFields: []string{},
  2554. pruneFields: []string{},
  2555. }.Init(),
  2556. },
  2557. },
  2558. fields: []ast.Field{
  2559. {
  2560. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  2561. Name: "name",
  2562. AName: "",
  2563. },
  2564. },
  2565. isAggregate: false,
  2566. sendMeta: false,
  2567. }.Init(),
  2568. }, { // 1 optimize where to data source
  2569. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2570. p: ProjectPlan{
  2571. baseLogicalPlan: baseLogicalPlan{
  2572. children: []LogicalPlan{
  2573. WindowPlan{
  2574. baseLogicalPlan: baseLogicalPlan{
  2575. children: []LogicalPlan{
  2576. FilterPlan{
  2577. baseLogicalPlan: baseLogicalPlan{
  2578. children: []LogicalPlan{
  2579. DataSourcePlan{
  2580. name: "src1",
  2581. streamFields: map[string]*ast.JsonStreamField{
  2582. "name": nil,
  2583. "temp": nil,
  2584. },
  2585. streamStmt: streams["src1"],
  2586. metaFields: []string{},
  2587. isSchemaless: true,
  2588. pruneFields: []string{},
  2589. }.Init(),
  2590. },
  2591. },
  2592. condition: &ast.BinaryExpr{
  2593. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  2594. OP: ast.EQ,
  2595. RHS: &ast.StringLiteral{Val: "v1"},
  2596. },
  2597. }.Init(),
  2598. },
  2599. },
  2600. condition: nil,
  2601. wtype: ast.TUMBLING_WINDOW,
  2602. length: 10,
  2603. timeUnit: ast.SS,
  2604. interval: 0,
  2605. limit: 0,
  2606. }.Init(),
  2607. },
  2608. },
  2609. fields: []ast.Field{
  2610. {
  2611. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2612. Name: "temp",
  2613. AName: "",
  2614. },
  2615. },
  2616. isAggregate: false,
  2617. sendMeta: false,
  2618. }.Init(),
  2619. }, { // 2 condition that cannot be optimized
  2620. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2621. p: ProjectPlan{
  2622. baseLogicalPlan: baseLogicalPlan{
  2623. children: []LogicalPlan{
  2624. JoinPlan{
  2625. baseLogicalPlan: baseLogicalPlan{
  2626. children: []LogicalPlan{
  2627. WindowPlan{
  2628. baseLogicalPlan: baseLogicalPlan{
  2629. children: []LogicalPlan{
  2630. DataSourcePlan{
  2631. name: "src1",
  2632. streamFields: map[string]*ast.JsonStreamField{
  2633. "id1": nil,
  2634. "temp": nil,
  2635. },
  2636. streamStmt: streams["src1"],
  2637. metaFields: []string{},
  2638. isSchemaless: true,
  2639. pruneFields: []string{},
  2640. }.Init(),
  2641. DataSourcePlan{
  2642. name: "src2",
  2643. streamFields: map[string]*ast.JsonStreamField{ // can't determine where is id1 belonged to
  2644. "hum": nil,
  2645. "id1": nil,
  2646. "id2": nil,
  2647. },
  2648. isSchemaless: true,
  2649. streamStmt: streams["src2"],
  2650. metaFields: []string{},
  2651. pruneFields: []string{},
  2652. }.Init(),
  2653. },
  2654. },
  2655. condition: nil,
  2656. wtype: ast.TUMBLING_WINDOW,
  2657. length: 10,
  2658. timeUnit: ast.SS,
  2659. interval: 0,
  2660. limit: 0,
  2661. }.Init(),
  2662. },
  2663. },
  2664. from: &ast.Table{Name: "src1"},
  2665. joins: ast.Joins{ast.Join{
  2666. Name: "src2",
  2667. JoinType: ast.INNER_JOIN,
  2668. Expr: &ast.BinaryExpr{
  2669. OP: ast.AND,
  2670. LHS: &ast.BinaryExpr{
  2671. LHS: &ast.BinaryExpr{
  2672. OP: ast.GT,
  2673. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2674. RHS: &ast.IntegerLiteral{Val: 20},
  2675. },
  2676. OP: ast.OR,
  2677. RHS: &ast.BinaryExpr{
  2678. OP: ast.GT,
  2679. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2680. RHS: &ast.IntegerLiteral{Val: 60},
  2681. },
  2682. },
  2683. RHS: &ast.BinaryExpr{
  2684. OP: ast.EQ,
  2685. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2686. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2687. },
  2688. },
  2689. }},
  2690. }.Init(),
  2691. },
  2692. },
  2693. fields: []ast.Field{
  2694. {
  2695. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2696. Name: "id1",
  2697. AName: "",
  2698. },
  2699. },
  2700. isAggregate: false,
  2701. sendMeta: false,
  2702. }.Init(),
  2703. }, { // 3 optimize window filter
  2704. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  2705. p: ProjectPlan{
  2706. baseLogicalPlan: baseLogicalPlan{
  2707. children: []LogicalPlan{
  2708. WindowPlan{
  2709. baseLogicalPlan: baseLogicalPlan{
  2710. children: []LogicalPlan{
  2711. FilterPlan{
  2712. baseLogicalPlan: baseLogicalPlan{
  2713. children: []LogicalPlan{
  2714. DataSourcePlan{
  2715. name: "src1",
  2716. streamFields: map[string]*ast.JsonStreamField{
  2717. "id1": nil,
  2718. "name": nil,
  2719. "temp": nil,
  2720. },
  2721. isSchemaless: true,
  2722. streamStmt: streams["src1"],
  2723. metaFields: []string{},
  2724. pruneFields: []string{},
  2725. }.Init(),
  2726. },
  2727. },
  2728. condition: &ast.BinaryExpr{
  2729. OP: ast.AND,
  2730. LHS: &ast.BinaryExpr{
  2731. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  2732. OP: ast.EQ,
  2733. RHS: &ast.StringLiteral{Val: "v1"},
  2734. },
  2735. RHS: &ast.BinaryExpr{
  2736. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2737. OP: ast.GT,
  2738. RHS: &ast.IntegerLiteral{Val: 2},
  2739. },
  2740. },
  2741. }.Init(),
  2742. },
  2743. },
  2744. condition: nil,
  2745. wtype: ast.TUMBLING_WINDOW,
  2746. length: 10,
  2747. timeUnit: ast.SS,
  2748. interval: 0,
  2749. limit: 0,
  2750. }.Init(),
  2751. },
  2752. },
  2753. fields: []ast.Field{
  2754. {
  2755. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2756. Name: "id1",
  2757. AName: "",
  2758. },
  2759. },
  2760. isAggregate: false,
  2761. sendMeta: false,
  2762. }.Init(),
  2763. }, { // 4. do not optimize count window
  2764. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  2765. p: ProjectPlan{
  2766. baseLogicalPlan: baseLogicalPlan{
  2767. children: []LogicalPlan{
  2768. HavingPlan{
  2769. baseLogicalPlan: baseLogicalPlan{
  2770. children: []LogicalPlan{
  2771. FilterPlan{
  2772. baseLogicalPlan: baseLogicalPlan{
  2773. children: []LogicalPlan{
  2774. WindowPlan{
  2775. baseLogicalPlan: baseLogicalPlan{
  2776. children: []LogicalPlan{
  2777. DataSourcePlan{
  2778. name: "src1",
  2779. isWildCard: true,
  2780. streamFields: map[string]*ast.JsonStreamField{},
  2781. streamStmt: streams["src1"],
  2782. metaFields: []string{},
  2783. isSchemaless: true,
  2784. pruneFields: []string{},
  2785. }.Init(),
  2786. },
  2787. },
  2788. condition: nil,
  2789. wtype: ast.COUNT_WINDOW,
  2790. length: 5,
  2791. interval: 1,
  2792. limit: 0,
  2793. }.Init(),
  2794. },
  2795. },
  2796. condition: &ast.BinaryExpr{
  2797. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2798. OP: ast.GT,
  2799. RHS: &ast.IntegerLiteral{Val: 20},
  2800. },
  2801. }.Init(),
  2802. },
  2803. },
  2804. condition: &ast.BinaryExpr{
  2805. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  2806. Token: ast.ASTERISK,
  2807. }}, FuncType: ast.FuncTypeAgg},
  2808. OP: ast.GT,
  2809. RHS: &ast.IntegerLiteral{Val: 2},
  2810. },
  2811. }.Init(),
  2812. },
  2813. },
  2814. fields: []ast.Field{
  2815. {
  2816. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  2817. Name: "*",
  2818. AName: "",
  2819. },
  2820. },
  2821. isAggregate: false,
  2822. sendMeta: false,
  2823. }.Init(),
  2824. }, { // 5. optimize join on
  2825. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2826. p: ProjectPlan{
  2827. baseLogicalPlan: baseLogicalPlan{
  2828. children: []LogicalPlan{
  2829. JoinPlan{
  2830. baseLogicalPlan: baseLogicalPlan{
  2831. children: []LogicalPlan{
  2832. WindowPlan{
  2833. baseLogicalPlan: baseLogicalPlan{
  2834. children: []LogicalPlan{
  2835. FilterPlan{
  2836. baseLogicalPlan: baseLogicalPlan{
  2837. children: []LogicalPlan{
  2838. DataSourcePlan{
  2839. name: "src1",
  2840. streamFields: map[string]*ast.JsonStreamField{
  2841. "id1": nil,
  2842. "temp": nil,
  2843. },
  2844. isSchemaless: true,
  2845. streamStmt: streams["src1"],
  2846. metaFields: []string{},
  2847. pruneFields: []string{},
  2848. }.Init(),
  2849. },
  2850. },
  2851. condition: &ast.BinaryExpr{
  2852. RHS: &ast.BinaryExpr{
  2853. OP: ast.GT,
  2854. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2855. RHS: &ast.IntegerLiteral{Val: 20},
  2856. },
  2857. OP: ast.AND,
  2858. LHS: &ast.BinaryExpr{
  2859. OP: ast.GT,
  2860. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2861. RHS: &ast.IntegerLiteral{Val: 111},
  2862. },
  2863. },
  2864. }.Init(),
  2865. FilterPlan{
  2866. baseLogicalPlan: baseLogicalPlan{
  2867. children: []LogicalPlan{
  2868. DataSourcePlan{
  2869. name: "src2",
  2870. streamFields: map[string]*ast.JsonStreamField{
  2871. "hum": nil,
  2872. "id1": nil,
  2873. "id2": nil,
  2874. },
  2875. isSchemaless: true,
  2876. streamStmt: streams["src2"],
  2877. metaFields: []string{},
  2878. pruneFields: []string{},
  2879. }.Init(),
  2880. },
  2881. },
  2882. condition: &ast.BinaryExpr{
  2883. OP: ast.LT,
  2884. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2885. RHS: &ast.IntegerLiteral{Val: 60},
  2886. },
  2887. }.Init(),
  2888. },
  2889. },
  2890. condition: nil,
  2891. wtype: ast.TUMBLING_WINDOW,
  2892. length: 10,
  2893. timeUnit: ast.SS,
  2894. interval: 0,
  2895. limit: 0,
  2896. }.Init(),
  2897. },
  2898. },
  2899. from: &ast.Table{
  2900. Name: "src1",
  2901. },
  2902. joins: []ast.Join{
  2903. {
  2904. Name: "src2",
  2905. Alias: "",
  2906. JoinType: ast.INNER_JOIN,
  2907. Expr: &ast.BinaryExpr{
  2908. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2909. OP: ast.EQ,
  2910. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2911. },
  2912. },
  2913. },
  2914. }.Init(),
  2915. },
  2916. },
  2917. fields: []ast.Field{
  2918. {
  2919. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2920. Name: "id1",
  2921. AName: "",
  2922. },
  2923. },
  2924. isAggregate: false,
  2925. sendMeta: false,
  2926. }.Init(),
  2927. }, { // 6. optimize outter join on
  2928. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2929. p: ProjectPlan{
  2930. baseLogicalPlan: baseLogicalPlan{
  2931. children: []LogicalPlan{
  2932. JoinPlan{
  2933. baseLogicalPlan: baseLogicalPlan{
  2934. children: []LogicalPlan{
  2935. WindowPlan{
  2936. baseLogicalPlan: baseLogicalPlan{
  2937. children: []LogicalPlan{
  2938. FilterPlan{
  2939. baseLogicalPlan: baseLogicalPlan{
  2940. children: []LogicalPlan{
  2941. DataSourcePlan{
  2942. name: "src1",
  2943. streamFields: map[string]*ast.JsonStreamField{
  2944. "id1": nil,
  2945. "temp": nil,
  2946. },
  2947. isSchemaless: true,
  2948. streamStmt: streams["src1"],
  2949. metaFields: []string{},
  2950. pruneFields: []string{},
  2951. }.Init(),
  2952. },
  2953. },
  2954. condition: &ast.BinaryExpr{
  2955. OP: ast.GT,
  2956. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2957. RHS: &ast.IntegerLiteral{Val: 111},
  2958. },
  2959. }.Init(),
  2960. DataSourcePlan{
  2961. name: "src2",
  2962. streamFields: map[string]*ast.JsonStreamField{
  2963. "hum": nil,
  2964. "id1": nil,
  2965. "id2": nil,
  2966. },
  2967. isSchemaless: true,
  2968. streamStmt: streams["src2"],
  2969. metaFields: []string{},
  2970. pruneFields: []string{},
  2971. }.Init(),
  2972. },
  2973. },
  2974. condition: nil,
  2975. wtype: ast.TUMBLING_WINDOW,
  2976. length: 10,
  2977. timeUnit: ast.SS,
  2978. interval: 0,
  2979. limit: 0,
  2980. }.Init(),
  2981. },
  2982. },
  2983. from: &ast.Table{
  2984. Name: "src1",
  2985. },
  2986. joins: []ast.Join{
  2987. {
  2988. Name: "src2",
  2989. Alias: "",
  2990. JoinType: ast.FULL_JOIN,
  2991. Expr: &ast.BinaryExpr{
  2992. OP: ast.AND,
  2993. LHS: &ast.BinaryExpr{
  2994. OP: ast.AND,
  2995. LHS: &ast.BinaryExpr{
  2996. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2997. OP: ast.EQ,
  2998. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2999. },
  3000. RHS: &ast.BinaryExpr{
  3001. OP: ast.GT,
  3002. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  3003. RHS: &ast.IntegerLiteral{Val: 20},
  3004. },
  3005. },
  3006. RHS: &ast.BinaryExpr{
  3007. OP: ast.LT,
  3008. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  3009. RHS: &ast.IntegerLiteral{Val: 60},
  3010. },
  3011. },
  3012. },
  3013. },
  3014. }.Init(),
  3015. },
  3016. },
  3017. fields: []ast.Field{
  3018. {
  3019. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  3020. Name: "id1",
  3021. AName: "",
  3022. },
  3023. },
  3024. isAggregate: false,
  3025. sendMeta: false,
  3026. }.Init(),
  3027. }, { // 7 window error for table
  3028. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  3029. p: nil,
  3030. err: "cannot run window for TABLE sources",
  3031. }, { // 8 join table without window
  3032. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  3033. p: ProjectPlan{
  3034. baseLogicalPlan: baseLogicalPlan{
  3035. children: []LogicalPlan{
  3036. JoinPlan{
  3037. baseLogicalPlan: baseLogicalPlan{
  3038. children: []LogicalPlan{
  3039. JoinAlignPlan{
  3040. baseLogicalPlan: baseLogicalPlan{
  3041. children: []LogicalPlan{
  3042. FilterPlan{
  3043. baseLogicalPlan: baseLogicalPlan{
  3044. children: []LogicalPlan{
  3045. DataSourcePlan{
  3046. name: "src1",
  3047. streamFields: map[string]*ast.JsonStreamField{
  3048. "hum": nil,
  3049. "id1": nil,
  3050. "temp": nil,
  3051. },
  3052. isSchemaless: true,
  3053. streamStmt: streams["src1"],
  3054. metaFields: []string{},
  3055. pruneFields: []string{},
  3056. }.Init(),
  3057. },
  3058. },
  3059. condition: &ast.BinaryExpr{
  3060. RHS: &ast.BinaryExpr{
  3061. OP: ast.GT,
  3062. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  3063. RHS: &ast.IntegerLiteral{Val: 20},
  3064. },
  3065. OP: ast.AND,
  3066. LHS: &ast.BinaryExpr{
  3067. OP: ast.GT,
  3068. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  3069. RHS: &ast.IntegerLiteral{Val: 111},
  3070. },
  3071. },
  3072. }.Init(),
  3073. DataSourcePlan{
  3074. name: "tableInPlanner",
  3075. streamFields: map[string]*ast.JsonStreamField{
  3076. "hum": {
  3077. Type: "bigint",
  3078. },
  3079. "id": {
  3080. Type: "bigint",
  3081. },
  3082. },
  3083. streamStmt: streams["tableInPlanner"],
  3084. metaFields: []string{},
  3085. pruneFields: []string{},
  3086. }.Init(),
  3087. },
  3088. },
  3089. Emitters: []string{"tableInPlanner"},
  3090. }.Init(),
  3091. },
  3092. },
  3093. from: &ast.Table{
  3094. Name: "src1",
  3095. },
  3096. joins: []ast.Join{
  3097. {
  3098. Name: "tableInPlanner",
  3099. Alias: "",
  3100. JoinType: ast.INNER_JOIN,
  3101. Expr: &ast.BinaryExpr{
  3102. OP: ast.AND,
  3103. LHS: &ast.BinaryExpr{
  3104. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  3105. OP: ast.EQ,
  3106. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  3107. },
  3108. RHS: &ast.BinaryExpr{
  3109. OP: ast.LT,
  3110. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  3111. RHS: &ast.IntegerLiteral{Val: 60},
  3112. },
  3113. },
  3114. },
  3115. },
  3116. }.Init(),
  3117. },
  3118. },
  3119. fields: []ast.Field{
  3120. {
  3121. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  3122. Name: "id1",
  3123. AName: "",
  3124. },
  3125. },
  3126. isAggregate: false,
  3127. sendMeta: false,
  3128. }.Init(),
  3129. }, { // 9 join table with window
  3130. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  3131. p: ProjectPlan{
  3132. baseLogicalPlan: baseLogicalPlan{
  3133. children: []LogicalPlan{
  3134. JoinPlan{
  3135. baseLogicalPlan: baseLogicalPlan{
  3136. children: []LogicalPlan{
  3137. JoinAlignPlan{
  3138. baseLogicalPlan: baseLogicalPlan{
  3139. children: []LogicalPlan{
  3140. WindowPlan{
  3141. baseLogicalPlan: baseLogicalPlan{
  3142. children: []LogicalPlan{
  3143. DataSourcePlan{
  3144. name: "src1",
  3145. streamFields: map[string]*ast.JsonStreamField{
  3146. "id1": nil,
  3147. "temp": nil,
  3148. },
  3149. isSchemaless: true,
  3150. streamStmt: streams["src1"],
  3151. metaFields: []string{},
  3152. pruneFields: []string{},
  3153. }.Init(),
  3154. },
  3155. },
  3156. condition: nil,
  3157. wtype: ast.TUMBLING_WINDOW,
  3158. length: 10,
  3159. timeUnit: ast.SS,
  3160. interval: 0,
  3161. limit: 0,
  3162. }.Init(),
  3163. DataSourcePlan{
  3164. name: "tableInPlanner",
  3165. streamFields: map[string]*ast.JsonStreamField{
  3166. "hum": {
  3167. Type: "bigint",
  3168. },
  3169. "id": {
  3170. Type: "bigint",
  3171. },
  3172. },
  3173. streamStmt: streams["tableInPlanner"],
  3174. metaFields: []string{},
  3175. pruneFields: []string{},
  3176. }.Init(),
  3177. },
  3178. },
  3179. Emitters: []string{"tableInPlanner"},
  3180. }.Init(),
  3181. },
  3182. },
  3183. from: &ast.Table{
  3184. Name: "src1",
  3185. },
  3186. joins: []ast.Join{
  3187. {
  3188. Name: "tableInPlanner",
  3189. Alias: "",
  3190. JoinType: ast.INNER_JOIN,
  3191. Expr: &ast.BinaryExpr{
  3192. OP: ast.AND,
  3193. LHS: &ast.BinaryExpr{
  3194. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  3195. OP: ast.EQ,
  3196. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  3197. },
  3198. RHS: &ast.BinaryExpr{
  3199. RHS: &ast.BinaryExpr{
  3200. OP: ast.AND,
  3201. LHS: &ast.BinaryExpr{
  3202. OP: ast.GT,
  3203. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  3204. RHS: &ast.IntegerLiteral{Val: 20},
  3205. },
  3206. RHS: &ast.BinaryExpr{
  3207. OP: ast.LT,
  3208. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  3209. RHS: &ast.IntegerLiteral{Val: 60},
  3210. },
  3211. },
  3212. OP: ast.AND,
  3213. LHS: &ast.BinaryExpr{
  3214. OP: ast.GT,
  3215. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  3216. RHS: &ast.IntegerLiteral{Val: 111},
  3217. },
  3218. },
  3219. },
  3220. },
  3221. },
  3222. }.Init(),
  3223. },
  3224. },
  3225. fields: []ast.Field{
  3226. {
  3227. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  3228. Name: "id1",
  3229. AName: "",
  3230. },
  3231. },
  3232. isAggregate: false,
  3233. sendMeta: false,
  3234. }.Init(),
  3235. }, { // 10 meta
  3236. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  3237. p: ProjectPlan{
  3238. baseLogicalPlan: baseLogicalPlan{
  3239. children: []LogicalPlan{
  3240. FilterPlan{
  3241. baseLogicalPlan: baseLogicalPlan{
  3242. children: []LogicalPlan{
  3243. DataSourcePlan{
  3244. name: "src1",
  3245. streamFields: map[string]*ast.JsonStreamField{
  3246. "temp": nil,
  3247. },
  3248. isSchemaless: true,
  3249. streamStmt: streams["src1"],
  3250. metaFields: []string{"Humidity", "device", "id"},
  3251. pruneFields: []string{},
  3252. }.Init(),
  3253. },
  3254. },
  3255. condition: &ast.BinaryExpr{
  3256. LHS: &ast.Call{
  3257. Name: "meta",
  3258. FuncId: 2,
  3259. Args: []ast.Expr{&ast.MetaRef{
  3260. Name: "device",
  3261. StreamName: ast.DefaultStream,
  3262. }},
  3263. },
  3264. OP: ast.EQ,
  3265. RHS: &ast.StringLiteral{
  3266. Val: "demo2",
  3267. },
  3268. },
  3269. }.Init(),
  3270. },
  3271. },
  3272. fields: []ast.Field{
  3273. {
  3274. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  3275. Name: "temp",
  3276. AName: "",
  3277. }, {
  3278. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3279. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  3280. Name: "id",
  3281. StreamName: ast.DefaultStream,
  3282. }}},
  3283. []ast.StreamName{},
  3284. nil,
  3285. )},
  3286. Name: "meta",
  3287. AName: "eid",
  3288. }, {
  3289. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3290. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  3291. &ast.BinaryExpr{
  3292. OP: ast.ARROW,
  3293. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  3294. RHS: &ast.JsonFieldRef{Name: "Device"},
  3295. },
  3296. }},
  3297. []ast.StreamName{},
  3298. nil,
  3299. )},
  3300. Name: "meta",
  3301. AName: "hdevice",
  3302. },
  3303. },
  3304. isAggregate: false,
  3305. sendMeta: false,
  3306. }.Init(),
  3307. }, { // 11 join with same name field and aliased
  3308. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  3309. p: ProjectPlan{
  3310. baseLogicalPlan: baseLogicalPlan{
  3311. children: []LogicalPlan{
  3312. JoinPlan{
  3313. baseLogicalPlan: baseLogicalPlan{
  3314. children: []LogicalPlan{
  3315. JoinAlignPlan{
  3316. baseLogicalPlan: baseLogicalPlan{
  3317. children: []LogicalPlan{
  3318. DataSourcePlan{
  3319. name: "src2",
  3320. streamFields: map[string]*ast.JsonStreamField{
  3321. "hum": nil,
  3322. "id": nil,
  3323. "id2": nil,
  3324. },
  3325. isSchemaless: true,
  3326. streamStmt: streams["src2"],
  3327. metaFields: []string{},
  3328. pruneFields: []string{},
  3329. }.Init(),
  3330. DataSourcePlan{
  3331. name: "tableInPlanner",
  3332. streamFields: map[string]*ast.JsonStreamField{
  3333. "hum": {
  3334. Type: "bigint",
  3335. },
  3336. "id": {
  3337. Type: "bigint",
  3338. },
  3339. },
  3340. streamStmt: streams["tableInPlanner"],
  3341. metaFields: []string{},
  3342. pruneFields: []string{},
  3343. }.Init(),
  3344. },
  3345. },
  3346. Emitters: []string{"tableInPlanner"},
  3347. }.Init(),
  3348. },
  3349. },
  3350. from: &ast.Table{
  3351. Name: "src2",
  3352. },
  3353. joins: []ast.Join{
  3354. {
  3355. Name: "tableInPlanner",
  3356. Alias: "",
  3357. JoinType: ast.INNER_JOIN,
  3358. Expr: &ast.BinaryExpr{
  3359. RHS: &ast.BinaryExpr{
  3360. OP: ast.EQ,
  3361. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  3362. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  3363. },
  3364. OP: ast.AND,
  3365. LHS: &ast.BinaryExpr{
  3366. OP: ast.GT,
  3367. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3368. &ast.FieldRef{
  3369. Name: "hum",
  3370. StreamName: "src2",
  3371. },
  3372. []ast.StreamName{"src2"},
  3373. &boolFalse,
  3374. )},
  3375. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3376. &ast.FieldRef{
  3377. Name: "hum",
  3378. StreamName: "tableInPlanner",
  3379. },
  3380. []ast.StreamName{"tableInPlanner"},
  3381. &boolFalse,
  3382. )},
  3383. },
  3384. },
  3385. },
  3386. },
  3387. }.Init(),
  3388. },
  3389. },
  3390. fields: []ast.Field{
  3391. {
  3392. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3393. &ast.FieldRef{
  3394. Name: "hum",
  3395. StreamName: "src2",
  3396. },
  3397. []ast.StreamName{"src2"},
  3398. &boolFalse,
  3399. )},
  3400. Name: "hum",
  3401. AName: "hum1",
  3402. },
  3403. {
  3404. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  3405. &ast.FieldRef{
  3406. Name: "hum",
  3407. StreamName: "tableInPlanner",
  3408. },
  3409. []ast.StreamName{"tableInPlanner"},
  3410. &boolFalse,
  3411. )},
  3412. Name: "hum",
  3413. AName: "hum2",
  3414. },
  3415. },
  3416. isAggregate: false,
  3417. sendMeta: false,
  3418. }.Init(),
  3419. }, { // 12
  3420. sql: `SELECT name->first, name->last FROM src1`,
  3421. p: ProjectPlan{
  3422. baseLogicalPlan: baseLogicalPlan{
  3423. children: []LogicalPlan{
  3424. DataSourcePlan{
  3425. baseLogicalPlan: baseLogicalPlan{},
  3426. name: "src1",
  3427. streamFields: map[string]*ast.JsonStreamField{
  3428. "name": nil,
  3429. },
  3430. isSchemaless: true,
  3431. streamStmt: streams["src1"],
  3432. metaFields: []string{},
  3433. pruneFields: []string{},
  3434. }.Init(),
  3435. },
  3436. },
  3437. fields: []ast.Field{
  3438. {
  3439. Expr: &ast.BinaryExpr{
  3440. OP: ast.ARROW,
  3441. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  3442. RHS: &ast.JsonFieldRef{Name: "first"},
  3443. },
  3444. Name: "kuiper_field_0",
  3445. AName: "",
  3446. }, {
  3447. Expr: &ast.BinaryExpr{
  3448. OP: ast.ARROW,
  3449. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  3450. RHS: &ast.JsonFieldRef{Name: "last"},
  3451. },
  3452. Name: "kuiper_field_1",
  3453. AName: "",
  3454. },
  3455. },
  3456. isAggregate: false,
  3457. sendMeta: false,
  3458. }.Init(),
  3459. }, { // 13
  3460. sql: `SELECT * EXCEPT(id1, name), meta(device) FROM src1`,
  3461. p: ProjectPlan{
  3462. baseLogicalPlan: baseLogicalPlan{
  3463. children: []LogicalPlan{
  3464. DataSourcePlan{
  3465. baseLogicalPlan: baseLogicalPlan{},
  3466. name: "src1",
  3467. streamFields: map[string]*ast.JsonStreamField{},
  3468. streamStmt: streams["src1"],
  3469. metaFields: []string{"device"},
  3470. isWildCard: false,
  3471. pruneFields: []string{"id1", "name"},
  3472. isSchemaless: true,
  3473. }.Init(),
  3474. },
  3475. },
  3476. fields: []ast.Field{
  3477. {
  3478. Name: "*",
  3479. Expr: &ast.Wildcard{
  3480. Token: ast.ASTERISK,
  3481. Except: []string{"id1", "name"},
  3482. },
  3483. },
  3484. {
  3485. Name: "meta",
  3486. Expr: &ast.Call{
  3487. Name: "meta",
  3488. Args: []ast.Expr{
  3489. &ast.MetaRef{
  3490. StreamName: ast.DefaultStream,
  3491. Name: "device",
  3492. },
  3493. },
  3494. },
  3495. },
  3496. },
  3497. isAggregate: false,
  3498. allWildcard: true,
  3499. sendMeta: false,
  3500. }.Init(),
  3501. }, { // 14
  3502. sql: `SELECT * REPLACE(temp * 2 AS id1, myarray * 2 AS name), meta(device) FROM src1`,
  3503. p: ProjectPlan{
  3504. baseLogicalPlan: baseLogicalPlan{
  3505. children: []LogicalPlan{
  3506. DataSourcePlan{
  3507. baseLogicalPlan: baseLogicalPlan{},
  3508. name: "src1",
  3509. streamFields: map[string]*ast.JsonStreamField{},
  3510. streamStmt: streams["src1"],
  3511. metaFields: []string{"device"},
  3512. isWildCard: false,
  3513. pruneFields: []string{"id1", "name"},
  3514. isSchemaless: true,
  3515. }.Init(),
  3516. },
  3517. },
  3518. fields: []ast.Field{
  3519. {
  3520. Name: "*",
  3521. Expr: &ast.Wildcard{
  3522. Token: ast.ASTERISK,
  3523. Replace: []ast.Field{
  3524. {
  3525. AName: "id1",
  3526. Expr: &ast.BinaryExpr{
  3527. OP: ast.MUL,
  3528. LHS: &ast.FieldRef{
  3529. Name: "temp",
  3530. StreamName: "src1",
  3531. },
  3532. RHS: &ast.IntegerLiteral{Val: 2},
  3533. },
  3534. },
  3535. {
  3536. AName: "name",
  3537. Expr: &ast.BinaryExpr{
  3538. OP: ast.MUL,
  3539. LHS: &ast.FieldRef{
  3540. Name: "myarray",
  3541. StreamName: "src1",
  3542. },
  3543. RHS: &ast.IntegerLiteral{Val: 2},
  3544. },
  3545. },
  3546. },
  3547. },
  3548. },
  3549. {
  3550. Name: "meta",
  3551. Expr: &ast.Call{
  3552. Name: "meta",
  3553. Args: []ast.Expr{
  3554. &ast.MetaRef{
  3555. StreamName: ast.DefaultStream,
  3556. Name: "device",
  3557. },
  3558. },
  3559. },
  3560. },
  3561. },
  3562. isAggregate: false,
  3563. allWildcard: true,
  3564. sendMeta: false,
  3565. }.Init(),
  3566. }, { // 15
  3567. sql: `SELECT collect( * EXCEPT(id1, name)) FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  3568. p: ProjectPlan{
  3569. baseLogicalPlan: baseLogicalPlan{
  3570. children: []LogicalPlan{
  3571. WindowPlan{
  3572. baseLogicalPlan: baseLogicalPlan{
  3573. children: []LogicalPlan{
  3574. DataSourcePlan{
  3575. baseLogicalPlan: baseLogicalPlan{},
  3576. name: "src1",
  3577. streamFields: map[string]*ast.JsonStreamField{},
  3578. streamStmt: streams["src1"],
  3579. metaFields: []string{},
  3580. isWildCard: false,
  3581. pruneFields: []string{"id1", "name"},
  3582. isSchemaless: true,
  3583. }.Init(),
  3584. },
  3585. },
  3586. condition: nil,
  3587. wtype: ast.TUMBLING_WINDOW,
  3588. length: 10,
  3589. timeUnit: ast.SS,
  3590. interval: 0,
  3591. limit: 0,
  3592. }.Init(),
  3593. },
  3594. },
  3595. fields: []ast.Field{
  3596. {
  3597. Name: "collect",
  3598. Expr: &ast.Call{
  3599. Name: "collect",
  3600. FuncType: ast.FuncTypeAgg,
  3601. Args: []ast.Expr{
  3602. &ast.Wildcard{
  3603. Token: ast.ASTERISK,
  3604. Except: []string{"id1", "name"},
  3605. },
  3606. },
  3607. },
  3608. },
  3609. },
  3610. isAggregate: true,
  3611. allWildcard: false,
  3612. sendMeta: false,
  3613. }.Init(),
  3614. }, { // 16
  3615. sql: `SELECT collect( * REPLACE(temp * 2 AS id1, myarray * 2 AS name)) FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  3616. p: ProjectPlan{
  3617. baseLogicalPlan: baseLogicalPlan{
  3618. children: []LogicalPlan{
  3619. WindowPlan{
  3620. baseLogicalPlan: baseLogicalPlan{
  3621. children: []LogicalPlan{
  3622. DataSourcePlan{
  3623. baseLogicalPlan: baseLogicalPlan{},
  3624. name: "src1",
  3625. streamFields: map[string]*ast.JsonStreamField{},
  3626. streamStmt: streams["src1"],
  3627. metaFields: []string{},
  3628. isWildCard: false,
  3629. pruneFields: []string{"id1", "name"},
  3630. isSchemaless: true,
  3631. }.Init(),
  3632. },
  3633. },
  3634. condition: nil,
  3635. wtype: ast.TUMBLING_WINDOW,
  3636. length: 10,
  3637. timeUnit: ast.SS,
  3638. interval: 0,
  3639. limit: 0,
  3640. }.Init(),
  3641. },
  3642. },
  3643. fields: []ast.Field{
  3644. {
  3645. Name: "collect",
  3646. Expr: &ast.Call{
  3647. Name: "collect",
  3648. FuncType: ast.FuncTypeAgg,
  3649. Args: []ast.Expr{
  3650. &ast.Wildcard{
  3651. Token: ast.ASTERISK,
  3652. Replace: []ast.Field{
  3653. {
  3654. AName: "id1",
  3655. Expr: &ast.BinaryExpr{
  3656. OP: ast.MUL,
  3657. LHS: &ast.FieldRef{
  3658. Name: "temp",
  3659. StreamName: "src1",
  3660. },
  3661. RHS: &ast.IntegerLiteral{Val: 2},
  3662. },
  3663. },
  3664. {
  3665. AName: "name",
  3666. Expr: &ast.BinaryExpr{
  3667. OP: ast.MUL,
  3668. LHS: &ast.FieldRef{
  3669. Name: "myarray",
  3670. StreamName: "src1",
  3671. },
  3672. RHS: &ast.IntegerLiteral{Val: 2},
  3673. },
  3674. },
  3675. },
  3676. },
  3677. },
  3678. },
  3679. },
  3680. },
  3681. isAggregate: true,
  3682. allWildcard: false,
  3683. sendMeta: false,
  3684. }.Init(),
  3685. }, { // 17
  3686. sql: `SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) HAVING count(* EXCEPT(id1, name)) > 0`,
  3687. p: ProjectPlan{
  3688. baseLogicalPlan: baseLogicalPlan{
  3689. children: []LogicalPlan{
  3690. HavingPlan{
  3691. baseLogicalPlan: baseLogicalPlan{
  3692. children: []LogicalPlan{
  3693. WindowPlan{
  3694. baseLogicalPlan: baseLogicalPlan{
  3695. children: []LogicalPlan{
  3696. DataSourcePlan{
  3697. baseLogicalPlan: baseLogicalPlan{},
  3698. name: "src1",
  3699. streamFields: map[string]*ast.JsonStreamField{},
  3700. streamStmt: streams["src1"],
  3701. metaFields: []string{},
  3702. isWildCard: false,
  3703. pruneFields: []string{"id1", "name"},
  3704. isSchemaless: true,
  3705. }.Init(),
  3706. },
  3707. },
  3708. condition: nil,
  3709. wtype: ast.TUMBLING_WINDOW,
  3710. length: 10,
  3711. timeUnit: ast.SS,
  3712. interval: 0,
  3713. limit: 0,
  3714. }.Init(),
  3715. },
  3716. },
  3717. condition: &ast.BinaryExpr{
  3718. LHS: &ast.Call{
  3719. Name: "count",
  3720. FuncId: 0,
  3721. Args: []ast.Expr{
  3722. &ast.Wildcard{
  3723. Token: ast.ASTERISK,
  3724. Except: []string{"id1", "name"},
  3725. },
  3726. },
  3727. FuncType: ast.FuncTypeAgg,
  3728. },
  3729. OP: ast.GT,
  3730. RHS: &ast.IntegerLiteral{Val: 0},
  3731. },
  3732. }.Init(),
  3733. },
  3734. },
  3735. fields: []ast.Field{
  3736. {
  3737. Name: "id1",
  3738. Expr: &ast.FieldRef{
  3739. Name: "id1",
  3740. StreamName: "src1",
  3741. },
  3742. },
  3743. },
  3744. isAggregate: false,
  3745. allWildcard: false,
  3746. sendMeta: false,
  3747. }.Init(),
  3748. }, { // 18
  3749. sql: `SELECT temp FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) HAVING count(* REPLACE(temp * 2 AS id1, myarray * 2 AS name)) > 0`,
  3750. p: ProjectPlan{
  3751. baseLogicalPlan: baseLogicalPlan{
  3752. children: []LogicalPlan{
  3753. HavingPlan{
  3754. baseLogicalPlan: baseLogicalPlan{
  3755. children: []LogicalPlan{
  3756. WindowPlan{
  3757. baseLogicalPlan: baseLogicalPlan{
  3758. children: []LogicalPlan{
  3759. DataSourcePlan{
  3760. baseLogicalPlan: baseLogicalPlan{},
  3761. name: "src1",
  3762. streamFields: map[string]*ast.JsonStreamField{},
  3763. streamStmt: streams["src1"],
  3764. metaFields: []string{},
  3765. isWildCard: false,
  3766. pruneFields: []string{"id1", "name"},
  3767. isSchemaless: true,
  3768. }.Init(),
  3769. },
  3770. },
  3771. condition: nil,
  3772. wtype: ast.TUMBLING_WINDOW,
  3773. length: 10,
  3774. timeUnit: ast.SS,
  3775. interval: 0,
  3776. limit: 0,
  3777. }.Init(),
  3778. },
  3779. },
  3780. condition: &ast.BinaryExpr{
  3781. LHS: &ast.Call{
  3782. Name: "count",
  3783. FuncId: 0,
  3784. Args: []ast.Expr{
  3785. &ast.Wildcard{
  3786. Token: ast.ASTERISK,
  3787. Replace: []ast.Field{
  3788. {
  3789. AName: "id1",
  3790. Expr: &ast.BinaryExpr{
  3791. OP: ast.MUL,
  3792. LHS: &ast.FieldRef{
  3793. Name: "temp",
  3794. StreamName: "src1",
  3795. },
  3796. RHS: &ast.IntegerLiteral{Val: 2},
  3797. },
  3798. },
  3799. {
  3800. AName: "name",
  3801. Expr: &ast.BinaryExpr{
  3802. OP: ast.MUL,
  3803. LHS: &ast.FieldRef{
  3804. Name: "myarray",
  3805. StreamName: "src1",
  3806. },
  3807. RHS: &ast.IntegerLiteral{Val: 2},
  3808. },
  3809. },
  3810. },
  3811. },
  3812. },
  3813. FuncType: ast.FuncTypeAgg,
  3814. },
  3815. OP: ast.GT,
  3816. RHS: &ast.IntegerLiteral{Val: 0},
  3817. },
  3818. }.Init(),
  3819. },
  3820. },
  3821. fields: []ast.Field{
  3822. {
  3823. Name: "temp",
  3824. Expr: &ast.FieldRef{
  3825. Name: "temp",
  3826. StreamName: "src1",
  3827. },
  3828. },
  3829. },
  3830. isAggregate: false,
  3831. allWildcard: false,
  3832. sendMeta: false,
  3833. }.Init(),
  3834. },
  3835. }
  3836. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  3837. for i, tt := range tests {
  3838. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  3839. if err != nil {
  3840. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  3841. continue
  3842. }
  3843. p, err := createLogicalPlan(stmt, &api.RuleOption{
  3844. IsEventTime: false,
  3845. LateTol: 0,
  3846. Concurrency: 0,
  3847. BufferLength: 0,
  3848. SendMetaToSink: false,
  3849. Qos: 0,
  3850. CheckpointInterval: 0,
  3851. SendError: true,
  3852. }, kv)
  3853. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  3854. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  3855. } else if !reflect.DeepEqual(tt.p, p) {
  3856. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  3857. }
  3858. }
  3859. }
  3860. func Test_createLogicalPlan4Lookup(t *testing.T) {
  3861. kv, err := store.GetKV("stream")
  3862. if err != nil {
  3863. t.Error(err)
  3864. return
  3865. }
  3866. streamSqls := map[string]string{
  3867. "src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  3868. "table1": `CREATE TABLE table1 () WITH (DATASOURCE="table1",TYPE="sql", KIND="lookup");`,
  3869. "table2": `CREATE TABLE table2 () WITH (DATASOURCE="table2",TYPE="sql", KIND="lookup");`,
  3870. }
  3871. types := map[string]ast.StreamType{
  3872. "src1": ast.TypeStream,
  3873. "table1": ast.TypeTable,
  3874. "table2": ast.TypeTable,
  3875. }
  3876. for name, sql := range streamSqls {
  3877. s, err := json.Marshal(&xsql.StreamInfo{
  3878. StreamType: types[name],
  3879. Statement: sql,
  3880. })
  3881. if err != nil {
  3882. t.Error(err)
  3883. t.Fail()
  3884. }
  3885. err = kv.Set(name, string(s))
  3886. if err != nil {
  3887. t.Error(err)
  3888. t.Fail()
  3889. }
  3890. }
  3891. streams := make(map[string]*ast.StreamStmt)
  3892. for n := range streamSqls {
  3893. streamStmt, err := xsql.GetDataSource(kv, n)
  3894. if err != nil {
  3895. t.Errorf("fail to get stream %s, please check if stream is created", n)
  3896. return
  3897. }
  3898. streams[n] = streamStmt
  3899. }
  3900. tests := []struct {
  3901. sql string
  3902. p LogicalPlan
  3903. err string
  3904. }{
  3905. { // 0
  3906. sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id`,
  3907. p: ProjectPlan{
  3908. baseLogicalPlan: baseLogicalPlan{
  3909. children: []LogicalPlan{
  3910. LookupPlan{
  3911. baseLogicalPlan: baseLogicalPlan{
  3912. children: []LogicalPlan{
  3913. DataSourcePlan{
  3914. baseLogicalPlan: baseLogicalPlan{},
  3915. name: "src1",
  3916. streamFields: map[string]*ast.JsonStreamField{
  3917. "a": nil,
  3918. },
  3919. isSchemaless: true,
  3920. streamStmt: streams["src1"],
  3921. metaFields: []string{},
  3922. pruneFields: []string{},
  3923. }.Init(),
  3924. },
  3925. },
  3926. joinExpr: ast.Join{
  3927. Name: "table1",
  3928. Alias: "",
  3929. JoinType: ast.INNER_JOIN,
  3930. Expr: &ast.BinaryExpr{
  3931. OP: ast.EQ,
  3932. LHS: &ast.FieldRef{
  3933. StreamName: "src1",
  3934. Name: "id",
  3935. },
  3936. RHS: &ast.FieldRef{
  3937. StreamName: "table1",
  3938. Name: "id",
  3939. },
  3940. },
  3941. },
  3942. keys: []string{"id"},
  3943. fields: []string{"b"},
  3944. valvars: []ast.Expr{
  3945. &ast.FieldRef{
  3946. StreamName: "src1",
  3947. Name: "id",
  3948. },
  3949. },
  3950. options: &ast.Options{
  3951. DATASOURCE: "table1",
  3952. TYPE: "sql",
  3953. KIND: "lookup",
  3954. },
  3955. conditions: nil,
  3956. }.Init(),
  3957. },
  3958. },
  3959. fields: []ast.Field{
  3960. {
  3961. Expr: &ast.FieldRef{
  3962. StreamName: "src1",
  3963. Name: "a",
  3964. },
  3965. Name: "a",
  3966. AName: "",
  3967. },
  3968. {
  3969. Expr: &ast.FieldRef{
  3970. StreamName: "table1",
  3971. Name: "b",
  3972. },
  3973. Name: "b",
  3974. AName: "",
  3975. },
  3976. },
  3977. isAggregate: false,
  3978. sendMeta: false,
  3979. }.Init(),
  3980. },
  3981. { // 1
  3982. sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
  3983. p: ProjectPlan{
  3984. baseLogicalPlan: baseLogicalPlan{
  3985. children: []LogicalPlan{
  3986. FilterPlan{
  3987. baseLogicalPlan: baseLogicalPlan{
  3988. children: []LogicalPlan{
  3989. LookupPlan{
  3990. baseLogicalPlan: baseLogicalPlan{
  3991. children: []LogicalPlan{
  3992. FilterPlan{
  3993. baseLogicalPlan: baseLogicalPlan{
  3994. children: []LogicalPlan{
  3995. DataSourcePlan{
  3996. baseLogicalPlan: baseLogicalPlan{},
  3997. name: "src1",
  3998. streamFields: map[string]*ast.JsonStreamField{
  3999. "a": nil,
  4000. },
  4001. isSchemaless: true,
  4002. streamStmt: streams["src1"],
  4003. metaFields: []string{},
  4004. pruneFields: []string{},
  4005. }.Init(),
  4006. },
  4007. },
  4008. condition: &ast.BinaryExpr{
  4009. OP: ast.LT,
  4010. LHS: &ast.FieldRef{
  4011. StreamName: "src1",
  4012. Name: "c",
  4013. },
  4014. RHS: &ast.IntegerLiteral{Val: 40},
  4015. },
  4016. }.Init(),
  4017. },
  4018. },
  4019. joinExpr: ast.Join{
  4020. Name: "table1",
  4021. Alias: "",
  4022. JoinType: ast.INNER_JOIN,
  4023. Expr: &ast.BinaryExpr{
  4024. OP: ast.AND,
  4025. RHS: &ast.BinaryExpr{
  4026. OP: ast.EQ,
  4027. LHS: &ast.FieldRef{
  4028. StreamName: "src1",
  4029. Name: "id",
  4030. },
  4031. RHS: &ast.FieldRef{
  4032. StreamName: "table1",
  4033. Name: "id",
  4034. },
  4035. },
  4036. LHS: &ast.BinaryExpr{
  4037. OP: ast.AND,
  4038. LHS: &ast.BinaryExpr{
  4039. OP: ast.GT,
  4040. LHS: &ast.FieldRef{
  4041. StreamName: "table1",
  4042. Name: "b",
  4043. },
  4044. RHS: &ast.IntegerLiteral{Val: 20},
  4045. },
  4046. RHS: &ast.BinaryExpr{
  4047. OP: ast.LT,
  4048. LHS: &ast.FieldRef{
  4049. StreamName: "src1",
  4050. Name: "c",
  4051. },
  4052. RHS: &ast.IntegerLiteral{Val: 40},
  4053. },
  4054. },
  4055. },
  4056. },
  4057. keys: []string{"id"},
  4058. valvars: []ast.Expr{
  4059. &ast.FieldRef{
  4060. StreamName: "src1",
  4061. Name: "id",
  4062. },
  4063. },
  4064. options: &ast.Options{
  4065. DATASOURCE: "table1",
  4066. TYPE: "sql",
  4067. KIND: "lookup",
  4068. },
  4069. conditions: &ast.BinaryExpr{
  4070. OP: ast.AND,
  4071. LHS: &ast.BinaryExpr{
  4072. OP: ast.GT,
  4073. LHS: &ast.FieldRef{
  4074. StreamName: "table1",
  4075. Name: "b",
  4076. },
  4077. RHS: &ast.IntegerLiteral{Val: 20},
  4078. },
  4079. RHS: &ast.BinaryExpr{
  4080. OP: ast.LT,
  4081. LHS: &ast.FieldRef{
  4082. StreamName: "src1",
  4083. Name: "c",
  4084. },
  4085. RHS: &ast.IntegerLiteral{Val: 40},
  4086. },
  4087. },
  4088. }.Init(),
  4089. },
  4090. },
  4091. condition: &ast.BinaryExpr{
  4092. OP: ast.GT,
  4093. LHS: &ast.FieldRef{
  4094. StreamName: "table1",
  4095. Name: "b",
  4096. },
  4097. RHS: &ast.IntegerLiteral{Val: 20},
  4098. },
  4099. }.Init(),
  4100. },
  4101. },
  4102. fields: []ast.Field{
  4103. {
  4104. Expr: &ast.FieldRef{
  4105. StreamName: "src1",
  4106. Name: "a",
  4107. },
  4108. Name: "a",
  4109. AName: "",
  4110. },
  4111. {
  4112. Expr: &ast.FieldRef{
  4113. StreamName: "table1",
  4114. Name: "*",
  4115. },
  4116. Name: "*",
  4117. AName: "",
  4118. },
  4119. },
  4120. isAggregate: false,
  4121. sendMeta: false,
  4122. }.Init(),
  4123. },
  4124. { // 2
  4125. sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
  4126. p: ProjectPlan{
  4127. baseLogicalPlan: baseLogicalPlan{
  4128. children: []LogicalPlan{
  4129. LookupPlan{
  4130. baseLogicalPlan: baseLogicalPlan{
  4131. children: []LogicalPlan{
  4132. LookupPlan{
  4133. baseLogicalPlan: baseLogicalPlan{
  4134. children: []LogicalPlan{
  4135. DataSourcePlan{
  4136. baseLogicalPlan: baseLogicalPlan{},
  4137. name: "src1",
  4138. streamFields: map[string]*ast.JsonStreamField{
  4139. "a": nil,
  4140. },
  4141. isSchemaless: true,
  4142. streamStmt: streams["src1"],
  4143. metaFields: []string{},
  4144. pruneFields: []string{},
  4145. }.Init(),
  4146. },
  4147. },
  4148. joinExpr: ast.Join{
  4149. Name: "table1",
  4150. Alias: "",
  4151. JoinType: ast.INNER_JOIN,
  4152. Expr: &ast.BinaryExpr{
  4153. OP: ast.EQ,
  4154. LHS: &ast.FieldRef{
  4155. StreamName: "src1",
  4156. Name: "id",
  4157. },
  4158. RHS: &ast.FieldRef{
  4159. StreamName: "table1",
  4160. Name: "id",
  4161. },
  4162. },
  4163. },
  4164. keys: []string{"id"},
  4165. fields: []string{"b"},
  4166. valvars: []ast.Expr{
  4167. &ast.FieldRef{
  4168. StreamName: "src1",
  4169. Name: "id",
  4170. },
  4171. },
  4172. options: &ast.Options{
  4173. DATASOURCE: "table1",
  4174. TYPE: "sql",
  4175. KIND: "lookup",
  4176. },
  4177. conditions: nil,
  4178. }.Init(),
  4179. },
  4180. },
  4181. joinExpr: ast.Join{
  4182. Name: "table2",
  4183. Alias: "",
  4184. JoinType: ast.INNER_JOIN,
  4185. Expr: &ast.BinaryExpr{
  4186. OP: ast.EQ,
  4187. LHS: &ast.FieldRef{
  4188. StreamName: "table1",
  4189. Name: "id",
  4190. },
  4191. RHS: &ast.FieldRef{
  4192. StreamName: "table2",
  4193. Name: "id",
  4194. },
  4195. },
  4196. },
  4197. keys: []string{"id"},
  4198. fields: []string{"c"},
  4199. valvars: []ast.Expr{
  4200. &ast.FieldRef{
  4201. StreamName: "table1",
  4202. Name: "id",
  4203. },
  4204. },
  4205. options: &ast.Options{
  4206. DATASOURCE: "table2",
  4207. TYPE: "sql",
  4208. KIND: "lookup",
  4209. },
  4210. }.Init(),
  4211. },
  4212. },
  4213. fields: []ast.Field{
  4214. {
  4215. Expr: &ast.FieldRef{
  4216. StreamName: "src1",
  4217. Name: "a",
  4218. },
  4219. Name: "a",
  4220. AName: "",
  4221. },
  4222. {
  4223. Expr: &ast.FieldRef{
  4224. StreamName: "table1",
  4225. Name: "b",
  4226. },
  4227. Name: "b",
  4228. AName: "",
  4229. },
  4230. {
  4231. Expr: &ast.FieldRef{
  4232. StreamName: "table2",
  4233. Name: "c",
  4234. },
  4235. Name: "c",
  4236. AName: "",
  4237. },
  4238. },
  4239. isAggregate: false,
  4240. sendMeta: false,
  4241. }.Init(),
  4242. },
  4243. { // 3
  4244. sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
  4245. p: ProjectPlan{
  4246. baseLogicalPlan: baseLogicalPlan{
  4247. children: []LogicalPlan{
  4248. LookupPlan{
  4249. baseLogicalPlan: baseLogicalPlan{
  4250. children: []LogicalPlan{
  4251. WindowPlan{
  4252. baseLogicalPlan: baseLogicalPlan{
  4253. children: []LogicalPlan{
  4254. DataSourcePlan{
  4255. baseLogicalPlan: baseLogicalPlan{},
  4256. name: "src1",
  4257. streamStmt: streams["src1"],
  4258. streamFields: map[string]*ast.JsonStreamField{},
  4259. metaFields: []string{},
  4260. isWildCard: true,
  4261. isSchemaless: true,
  4262. pruneFields: []string{},
  4263. }.Init(),
  4264. },
  4265. },
  4266. condition: nil,
  4267. wtype: ast.TUMBLING_WINDOW,
  4268. length: 10,
  4269. timeUnit: ast.SS,
  4270. interval: 0,
  4271. limit: 0,
  4272. }.Init(),
  4273. },
  4274. },
  4275. joinExpr: ast.Join{
  4276. Name: "table1",
  4277. Alias: "",
  4278. JoinType: ast.INNER_JOIN,
  4279. Expr: &ast.BinaryExpr{
  4280. OP: ast.EQ,
  4281. LHS: &ast.FieldRef{
  4282. StreamName: "src1",
  4283. Name: "id",
  4284. },
  4285. RHS: &ast.FieldRef{
  4286. StreamName: "table1",
  4287. Name: "id",
  4288. },
  4289. },
  4290. },
  4291. keys: []string{"id"},
  4292. valvars: []ast.Expr{
  4293. &ast.FieldRef{
  4294. StreamName: "src1",
  4295. Name: "id",
  4296. },
  4297. },
  4298. options: &ast.Options{
  4299. DATASOURCE: "table1",
  4300. TYPE: "sql",
  4301. KIND: "lookup",
  4302. },
  4303. conditions: nil,
  4304. }.Init(),
  4305. },
  4306. },
  4307. fields: []ast.Field{
  4308. {
  4309. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  4310. Name: "*",
  4311. AName: "",
  4312. },
  4313. },
  4314. isAggregate: false,
  4315. sendMeta: false,
  4316. }.Init(),
  4317. },
  4318. }
  4319. for i, tt := range tests {
  4320. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  4321. if err != nil {
  4322. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  4323. continue
  4324. }
  4325. p, err := createLogicalPlan(stmt, &api.RuleOption{
  4326. IsEventTime: false,
  4327. LateTol: 0,
  4328. Concurrency: 0,
  4329. BufferLength: 0,
  4330. SendMetaToSink: false,
  4331. Qos: 0,
  4332. CheckpointInterval: 0,
  4333. SendError: true,
  4334. }, kv)
  4335. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  4336. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  4337. } else if !reflect.DeepEqual(tt.p, p) {
  4338. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  4339. }
  4340. }
  4341. }
  4342. func TestTransformSourceNode(t *testing.T) {
  4343. schema := map[string]*ast.JsonStreamField{
  4344. "a": {
  4345. Type: "bigint",
  4346. },
  4347. }
  4348. testCases := []struct {
  4349. name string
  4350. plan *DataSourcePlan
  4351. node *node.SourceNode
  4352. }{
  4353. {
  4354. name: "normal source node",
  4355. plan: &DataSourcePlan{
  4356. name: "test",
  4357. streamStmt: &ast.StreamStmt{
  4358. StreamType: ast.TypeStream,
  4359. Options: &ast.Options{
  4360. TYPE: "file",
  4361. },
  4362. },
  4363. streamFields: nil,
  4364. allMeta: false,
  4365. metaFields: []string{},
  4366. iet: false,
  4367. isBinary: false,
  4368. },
  4369. node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  4370. TYPE: "file",
  4371. }, false, nil),
  4372. },
  4373. {
  4374. name: "schema source node",
  4375. plan: &DataSourcePlan{
  4376. name: "test",
  4377. streamStmt: &ast.StreamStmt{
  4378. StreamType: ast.TypeStream,
  4379. Options: &ast.Options{
  4380. TYPE: "file",
  4381. },
  4382. },
  4383. streamFields: schema,
  4384. allMeta: false,
  4385. metaFields: []string{},
  4386. iet: false,
  4387. isBinary: false,
  4388. },
  4389. node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  4390. TYPE: "file",
  4391. }, false, schema),
  4392. },
  4393. }
  4394. for _, tc := range testCases {
  4395. t.Run(tc.name, func(t *testing.T) {
  4396. sourceNode, err := transformSourceNode(tc.plan, nil, &api.RuleOption{})
  4397. if err != nil {
  4398. t.Errorf("unexpected error: %v", err)
  4399. return
  4400. }
  4401. if !reflect.DeepEqual(sourceNode, tc.node) {
  4402. t.Errorf("unexpected result: got %v, want %v", sourceNode, tc.node)
  4403. }
  4404. })
  4405. }
  4406. }