planner_test.go 86 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/internal/testx"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func init() {
  29. testx.InitEnv()
  30. }
  31. func Test_createLogicalPlan(t *testing.T) {
  32. store, err := store.GetKV("stream")
  33. if err != nil {
  34. t.Error(err)
  35. return
  36. }
  37. streamSqls := map[string]string{
  38. "src1": `CREATE STREAM src1 (
  39. id1 BIGINT,
  40. temp BIGINT,
  41. name string,
  42. myarray array(string)
  43. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  44. "src2": `CREATE STREAM src2 (
  45. id2 BIGINT,
  46. hum BIGINT
  47. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  48. "tableInPlanner": `CREATE TABLE tableInPlanner (
  49. id BIGINT,
  50. name STRING,
  51. value STRING,
  52. hum BIGINT
  53. ) WITH (TYPE="file");`,
  54. }
  55. types := map[string]ast.StreamType{
  56. "src1": ast.TypeStream,
  57. "src2": ast.TypeStream,
  58. "tableInPlanner": ast.TypeTable,
  59. }
  60. for name, sql := range streamSqls {
  61. s, err := json.Marshal(&xsql.StreamInfo{
  62. StreamType: types[name],
  63. Statement: sql,
  64. })
  65. if err != nil {
  66. t.Error(err)
  67. t.Fail()
  68. }
  69. err = store.Set(name, string(s))
  70. if err != nil {
  71. t.Error(err)
  72. t.Fail()
  73. }
  74. }
  75. streams := make(map[string]*ast.StreamStmt)
  76. for n := range streamSqls {
  77. streamStmt, err := xsql.GetDataSource(store, n)
  78. if err != nil {
  79. t.Errorf("fail to get stream %s, please check if stream is created", n)
  80. return
  81. }
  82. streams[n] = streamStmt
  83. }
  84. var (
  85. //boolTrue = true
  86. boolFalse = false
  87. )
  88. var tests = []struct {
  89. sql string
  90. p LogicalPlan
  91. err string
  92. }{
  93. {
  94. sql: "select unnest(myarray) as col from src1",
  95. p: ProjectSetPlan{
  96. SrfMapping: map[string]struct{}{
  97. "col": {},
  98. },
  99. baseLogicalPlan: baseLogicalPlan{
  100. children: []LogicalPlan{
  101. ProjectPlan{
  102. baseLogicalPlan: baseLogicalPlan{
  103. children: []LogicalPlan{
  104. DataSourcePlan{
  105. baseLogicalPlan: baseLogicalPlan{},
  106. name: "src1",
  107. streamFields: map[string]*ast.JsonStreamField{
  108. "myarray": {
  109. Type: "array",
  110. Items: &ast.JsonStreamField{
  111. Type: "string",
  112. },
  113. },
  114. },
  115. streamStmt: streams["src1"],
  116. metaFields: []string{},
  117. }.Init(),
  118. },
  119. },
  120. fields: []ast.Field{
  121. {
  122. Name: "unnest",
  123. AName: "col",
  124. Expr: func() *ast.FieldRef {
  125. fr := &ast.FieldRef{
  126. StreamName: ast.AliasStream,
  127. Name: "col",
  128. AliasRef: &ast.AliasRef{
  129. Expression: &ast.Call{
  130. Name: "unnest",
  131. FuncType: ast.FuncTypeSrf,
  132. Args: []ast.Expr{
  133. &ast.FieldRef{
  134. StreamName: "src1",
  135. Name: "myarray",
  136. },
  137. },
  138. },
  139. },
  140. }
  141. fr.SetRefSource([]ast.StreamName{"src1"})
  142. return fr
  143. }(),
  144. },
  145. },
  146. }.Init(),
  147. },
  148. },
  149. }.Init(),
  150. },
  151. { // 0
  152. sql: "SELECT unnest(myarray), name from src1",
  153. p: ProjectSetPlan{
  154. SrfMapping: map[string]struct{}{
  155. "unnest": {},
  156. },
  157. baseLogicalPlan: baseLogicalPlan{
  158. children: []LogicalPlan{
  159. ProjectPlan{
  160. baseLogicalPlan: baseLogicalPlan{
  161. children: []LogicalPlan{
  162. DataSourcePlan{
  163. baseLogicalPlan: baseLogicalPlan{},
  164. name: "src1",
  165. streamFields: map[string]*ast.JsonStreamField{
  166. "myarray": {
  167. Type: "array",
  168. Items: &ast.JsonStreamField{
  169. Type: "string",
  170. },
  171. },
  172. "name": {
  173. Type: "string",
  174. },
  175. },
  176. streamStmt: streams["src1"],
  177. metaFields: []string{},
  178. }.Init(),
  179. },
  180. },
  181. fields: []ast.Field{
  182. {
  183. Expr: &ast.Call{
  184. Name: "unnest",
  185. FuncType: ast.FuncTypeSrf,
  186. Args: []ast.Expr{
  187. &ast.FieldRef{
  188. StreamName: "src1",
  189. Name: "myarray",
  190. },
  191. },
  192. },
  193. Name: "unnest",
  194. },
  195. {
  196. Name: "name",
  197. Expr: &ast.FieldRef{
  198. StreamName: "src1",
  199. Name: "name",
  200. },
  201. },
  202. },
  203. }.Init(),
  204. },
  205. },
  206. }.Init(),
  207. },
  208. { // 0
  209. sql: `SELECT myarray[temp] FROM src1`,
  210. p: ProjectPlan{
  211. baseLogicalPlan: baseLogicalPlan{
  212. children: []LogicalPlan{
  213. DataSourcePlan{
  214. baseLogicalPlan: baseLogicalPlan{},
  215. name: "src1",
  216. streamFields: map[string]*ast.JsonStreamField{
  217. "myarray": {
  218. Type: "array",
  219. Items: &ast.JsonStreamField{
  220. Type: "string",
  221. },
  222. },
  223. "temp": {
  224. Type: "bigint",
  225. },
  226. },
  227. streamStmt: streams["src1"],
  228. metaFields: []string{},
  229. }.Init(),
  230. },
  231. },
  232. fields: []ast.Field{
  233. {
  234. Expr: &ast.BinaryExpr{
  235. OP: ast.SUBSET,
  236. LHS: &ast.FieldRef{
  237. StreamName: "src1",
  238. Name: "myarray",
  239. },
  240. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  241. StreamName: "src1",
  242. Name: "temp",
  243. }},
  244. },
  245. Name: "kuiper_field_0",
  246. AName: "",
  247. },
  248. },
  249. isAggregate: false,
  250. sendMeta: false,
  251. }.Init(),
  252. }, { // 1 optimize where to data source
  253. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  254. p: ProjectPlan{
  255. baseLogicalPlan: baseLogicalPlan{
  256. children: []LogicalPlan{
  257. WindowPlan{
  258. baseLogicalPlan: baseLogicalPlan{
  259. children: []LogicalPlan{
  260. FilterPlan{
  261. baseLogicalPlan: baseLogicalPlan{
  262. children: []LogicalPlan{
  263. DataSourcePlan{
  264. name: "src1",
  265. streamFields: map[string]*ast.JsonStreamField{
  266. "name": {
  267. Type: "string",
  268. },
  269. "temp": {
  270. Type: "bigint",
  271. },
  272. },
  273. streamStmt: streams["src1"],
  274. metaFields: []string{},
  275. }.Init(),
  276. },
  277. },
  278. condition: &ast.BinaryExpr{
  279. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  280. OP: ast.EQ,
  281. RHS: &ast.StringLiteral{Val: "v1"},
  282. },
  283. }.Init(),
  284. },
  285. },
  286. condition: nil,
  287. wtype: ast.TUMBLING_WINDOW,
  288. length: 10000,
  289. interval: 0,
  290. limit: 0,
  291. }.Init(),
  292. },
  293. },
  294. fields: []ast.Field{
  295. {
  296. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  297. Name: "temp",
  298. AName: ""},
  299. },
  300. isAggregate: false,
  301. sendMeta: false,
  302. }.Init(),
  303. }, { // 2 condition that cannot be optimized
  304. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  305. p: ProjectPlan{
  306. baseLogicalPlan: baseLogicalPlan{
  307. children: []LogicalPlan{
  308. JoinPlan{
  309. baseLogicalPlan: baseLogicalPlan{
  310. children: []LogicalPlan{
  311. WindowPlan{
  312. baseLogicalPlan: baseLogicalPlan{
  313. children: []LogicalPlan{
  314. DataSourcePlan{
  315. name: "src1",
  316. streamFields: map[string]*ast.JsonStreamField{
  317. "id1": {
  318. Type: "bigint",
  319. },
  320. "temp": {
  321. Type: "bigint",
  322. },
  323. },
  324. streamStmt: streams["src1"],
  325. metaFields: []string{},
  326. }.Init(),
  327. DataSourcePlan{
  328. name: "src2",
  329. streamFields: map[string]*ast.JsonStreamField{
  330. "hum": {
  331. Type: "bigint",
  332. },
  333. "id2": {
  334. Type: "bigint",
  335. },
  336. },
  337. streamStmt: streams["src2"],
  338. metaFields: []string{},
  339. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  340. }.Init(),
  341. },
  342. },
  343. condition: nil,
  344. wtype: ast.TUMBLING_WINDOW,
  345. length: 10000,
  346. interval: 0,
  347. limit: 0,
  348. }.Init(),
  349. },
  350. },
  351. from: &ast.Table{Name: "src1"},
  352. joins: ast.Joins{ast.Join{
  353. Name: "src2",
  354. JoinType: ast.INNER_JOIN,
  355. Expr: &ast.BinaryExpr{
  356. OP: ast.AND,
  357. LHS: &ast.BinaryExpr{
  358. LHS: &ast.BinaryExpr{
  359. OP: ast.GT,
  360. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  361. RHS: &ast.IntegerLiteral{Val: 20},
  362. },
  363. OP: ast.OR,
  364. RHS: &ast.BinaryExpr{
  365. OP: ast.GT,
  366. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  367. RHS: &ast.IntegerLiteral{Val: 60},
  368. },
  369. },
  370. RHS: &ast.BinaryExpr{
  371. OP: ast.EQ,
  372. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  373. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  374. },
  375. },
  376. }},
  377. }.Init(),
  378. },
  379. },
  380. fields: []ast.Field{
  381. {
  382. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  383. Name: "id1",
  384. AName: ""},
  385. },
  386. isAggregate: false,
  387. sendMeta: false,
  388. }.Init(),
  389. }, { // 3 optimize window filter
  390. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  391. p: ProjectPlan{
  392. baseLogicalPlan: baseLogicalPlan{
  393. children: []LogicalPlan{
  394. WindowPlan{
  395. baseLogicalPlan: baseLogicalPlan{
  396. children: []LogicalPlan{
  397. FilterPlan{
  398. baseLogicalPlan: baseLogicalPlan{
  399. children: []LogicalPlan{
  400. DataSourcePlan{
  401. name: "src1",
  402. streamFields: map[string]*ast.JsonStreamField{
  403. "id1": {
  404. Type: "bigint",
  405. },
  406. "name": {
  407. Type: "string",
  408. },
  409. "temp": {
  410. Type: "bigint",
  411. },
  412. },
  413. streamStmt: streams["src1"],
  414. metaFields: []string{},
  415. }.Init(),
  416. },
  417. },
  418. condition: &ast.BinaryExpr{
  419. OP: ast.AND,
  420. LHS: &ast.BinaryExpr{
  421. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  422. OP: ast.EQ,
  423. RHS: &ast.StringLiteral{Val: "v1"},
  424. },
  425. RHS: &ast.BinaryExpr{
  426. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  427. OP: ast.GT,
  428. RHS: &ast.IntegerLiteral{Val: 2},
  429. },
  430. },
  431. }.Init(),
  432. },
  433. },
  434. condition: nil,
  435. wtype: ast.TUMBLING_WINDOW,
  436. length: 10000,
  437. interval: 0,
  438. limit: 0,
  439. }.Init(),
  440. },
  441. },
  442. fields: []ast.Field{
  443. {
  444. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  445. Name: "id1",
  446. AName: ""},
  447. },
  448. isAggregate: false,
  449. sendMeta: false,
  450. }.Init(),
  451. }, { // 4. do not optimize count window
  452. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  453. p: ProjectPlan{
  454. baseLogicalPlan: baseLogicalPlan{
  455. children: []LogicalPlan{
  456. HavingPlan{
  457. baseLogicalPlan: baseLogicalPlan{
  458. children: []LogicalPlan{
  459. FilterPlan{
  460. baseLogicalPlan: baseLogicalPlan{
  461. children: []LogicalPlan{
  462. WindowPlan{
  463. baseLogicalPlan: baseLogicalPlan{
  464. children: []LogicalPlan{
  465. DataSourcePlan{
  466. name: "src1",
  467. isWildCard: true,
  468. streamFields: map[string]*ast.JsonStreamField{
  469. "id1": {
  470. Type: "bigint",
  471. },
  472. "temp": {
  473. Type: "bigint",
  474. },
  475. "name": {
  476. Type: "string",
  477. },
  478. "myarray": {
  479. Type: "array",
  480. Items: &ast.JsonStreamField{
  481. Type: "string",
  482. },
  483. },
  484. },
  485. streamStmt: streams["src1"],
  486. metaFields: []string{},
  487. }.Init(),
  488. },
  489. },
  490. condition: nil,
  491. wtype: ast.COUNT_WINDOW,
  492. length: 5,
  493. interval: 1,
  494. limit: 0,
  495. }.Init(),
  496. },
  497. },
  498. condition: &ast.BinaryExpr{
  499. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  500. OP: ast.GT,
  501. RHS: &ast.IntegerLiteral{Val: 20},
  502. },
  503. }.Init(),
  504. },
  505. },
  506. condition: &ast.BinaryExpr{
  507. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  508. Token: ast.ASTERISK,
  509. }}, FuncType: ast.FuncTypeAgg},
  510. OP: ast.GT,
  511. RHS: &ast.IntegerLiteral{Val: 2},
  512. },
  513. }.Init(),
  514. },
  515. },
  516. fields: []ast.Field{
  517. {
  518. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  519. Name: "*",
  520. AName: ""},
  521. },
  522. isAggregate: false,
  523. sendMeta: false,
  524. }.Init(),
  525. }, { // 5. optimize join on
  526. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  527. p: ProjectPlan{
  528. baseLogicalPlan: baseLogicalPlan{
  529. children: []LogicalPlan{
  530. JoinPlan{
  531. baseLogicalPlan: baseLogicalPlan{
  532. children: []LogicalPlan{
  533. WindowPlan{
  534. baseLogicalPlan: baseLogicalPlan{
  535. children: []LogicalPlan{
  536. FilterPlan{
  537. baseLogicalPlan: baseLogicalPlan{
  538. children: []LogicalPlan{
  539. DataSourcePlan{
  540. name: "src1",
  541. streamFields: map[string]*ast.JsonStreamField{
  542. "id1": {
  543. Type: "bigint",
  544. },
  545. "temp": {
  546. Type: "bigint",
  547. },
  548. },
  549. streamStmt: streams["src1"],
  550. metaFields: []string{},
  551. }.Init(),
  552. },
  553. },
  554. condition: &ast.BinaryExpr{
  555. RHS: &ast.BinaryExpr{
  556. OP: ast.GT,
  557. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  558. RHS: &ast.IntegerLiteral{Val: 20},
  559. },
  560. OP: ast.AND,
  561. LHS: &ast.BinaryExpr{
  562. OP: ast.GT,
  563. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  564. RHS: &ast.IntegerLiteral{Val: 111},
  565. },
  566. },
  567. }.Init(),
  568. FilterPlan{
  569. baseLogicalPlan: baseLogicalPlan{
  570. children: []LogicalPlan{
  571. DataSourcePlan{
  572. name: "src2",
  573. streamFields: map[string]*ast.JsonStreamField{
  574. "hum": {
  575. Type: "bigint",
  576. },
  577. "id2": {
  578. Type: "bigint",
  579. },
  580. },
  581. streamStmt: streams["src2"],
  582. metaFields: []string{},
  583. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  584. }.Init(),
  585. },
  586. },
  587. condition: &ast.BinaryExpr{
  588. OP: ast.LT,
  589. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  590. RHS: &ast.IntegerLiteral{Val: 60},
  591. },
  592. }.Init(),
  593. },
  594. },
  595. condition: nil,
  596. wtype: ast.TUMBLING_WINDOW,
  597. length: 10000,
  598. interval: 0,
  599. limit: 0,
  600. }.Init(),
  601. },
  602. },
  603. from: &ast.Table{
  604. Name: "src1",
  605. },
  606. joins: []ast.Join{
  607. {
  608. Name: "src2",
  609. Alias: "",
  610. JoinType: ast.INNER_JOIN,
  611. Expr: &ast.BinaryExpr{
  612. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  613. OP: ast.EQ,
  614. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  615. },
  616. },
  617. },
  618. }.Init(),
  619. },
  620. },
  621. fields: []ast.Field{
  622. {
  623. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  624. Name: "id1",
  625. AName: ""},
  626. },
  627. isAggregate: false,
  628. sendMeta: false,
  629. }.Init(),
  630. }, { // 6. optimize outter join on
  631. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  632. p: ProjectPlan{
  633. baseLogicalPlan: baseLogicalPlan{
  634. children: []LogicalPlan{
  635. JoinPlan{
  636. baseLogicalPlan: baseLogicalPlan{
  637. children: []LogicalPlan{
  638. WindowPlan{
  639. baseLogicalPlan: baseLogicalPlan{
  640. children: []LogicalPlan{
  641. FilterPlan{
  642. baseLogicalPlan: baseLogicalPlan{
  643. children: []LogicalPlan{
  644. DataSourcePlan{
  645. name: "src1",
  646. streamFields: map[string]*ast.JsonStreamField{
  647. "id1": {
  648. Type: "bigint",
  649. },
  650. "temp": {
  651. Type: "bigint",
  652. },
  653. },
  654. streamStmt: streams["src1"],
  655. metaFields: []string{},
  656. }.Init(),
  657. },
  658. },
  659. condition: &ast.BinaryExpr{
  660. OP: ast.GT,
  661. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  662. RHS: &ast.IntegerLiteral{Val: 111},
  663. },
  664. }.Init(),
  665. DataSourcePlan{
  666. name: "src2",
  667. streamFields: map[string]*ast.JsonStreamField{
  668. "hum": {
  669. Type: "bigint",
  670. },
  671. "id2": {
  672. Type: "bigint",
  673. },
  674. },
  675. streamStmt: streams["src2"],
  676. metaFields: []string{},
  677. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  678. }.Init(),
  679. },
  680. },
  681. condition: nil,
  682. wtype: ast.TUMBLING_WINDOW,
  683. length: 10000,
  684. interval: 0,
  685. limit: 0,
  686. }.Init(),
  687. },
  688. },
  689. from: &ast.Table{
  690. Name: "src1",
  691. },
  692. joins: []ast.Join{
  693. {
  694. Name: "src2",
  695. Alias: "",
  696. JoinType: ast.FULL_JOIN,
  697. Expr: &ast.BinaryExpr{
  698. OP: ast.AND,
  699. LHS: &ast.BinaryExpr{
  700. OP: ast.AND,
  701. LHS: &ast.BinaryExpr{
  702. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  703. OP: ast.EQ,
  704. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  705. },
  706. RHS: &ast.BinaryExpr{
  707. OP: ast.GT,
  708. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  709. RHS: &ast.IntegerLiteral{Val: 20},
  710. },
  711. },
  712. RHS: &ast.BinaryExpr{
  713. OP: ast.LT,
  714. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  715. RHS: &ast.IntegerLiteral{Val: 60},
  716. },
  717. },
  718. },
  719. },
  720. }.Init(),
  721. },
  722. },
  723. fields: []ast.Field{
  724. {
  725. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  726. Name: "id1",
  727. AName: ""},
  728. },
  729. isAggregate: false,
  730. sendMeta: false,
  731. }.Init(),
  732. }, { // 7 window error for table
  733. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  734. p: nil,
  735. err: "cannot run window for TABLE sources",
  736. }, { // 8 join table without window
  737. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  738. p: ProjectPlan{
  739. baseLogicalPlan: baseLogicalPlan{
  740. children: []LogicalPlan{
  741. JoinPlan{
  742. baseLogicalPlan: baseLogicalPlan{
  743. children: []LogicalPlan{
  744. JoinAlignPlan{
  745. baseLogicalPlan: baseLogicalPlan{
  746. children: []LogicalPlan{
  747. FilterPlan{
  748. baseLogicalPlan: baseLogicalPlan{
  749. children: []LogicalPlan{
  750. DataSourcePlan{
  751. name: "src1",
  752. streamFields: map[string]*ast.JsonStreamField{
  753. "id1": {
  754. Type: "bigint",
  755. },
  756. "temp": {
  757. Type: "bigint",
  758. },
  759. },
  760. streamStmt: streams["src1"],
  761. metaFields: []string{},
  762. }.Init(),
  763. },
  764. },
  765. condition: &ast.BinaryExpr{
  766. RHS: &ast.BinaryExpr{
  767. OP: ast.GT,
  768. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  769. RHS: &ast.IntegerLiteral{Val: 20},
  770. },
  771. OP: ast.AND,
  772. LHS: &ast.BinaryExpr{
  773. OP: ast.GT,
  774. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  775. RHS: &ast.IntegerLiteral{Val: 111},
  776. },
  777. },
  778. }.Init(),
  779. DataSourcePlan{
  780. name: "tableInPlanner",
  781. streamFields: map[string]*ast.JsonStreamField{
  782. "hum": {
  783. Type: "bigint",
  784. },
  785. "id": {
  786. Type: "bigint",
  787. },
  788. },
  789. streamStmt: streams["tableInPlanner"],
  790. metaFields: []string{},
  791. }.Init(),
  792. },
  793. },
  794. Emitters: []string{"tableInPlanner"},
  795. }.Init(),
  796. },
  797. },
  798. from: &ast.Table{
  799. Name: "src1",
  800. },
  801. joins: []ast.Join{
  802. {
  803. Name: "tableInPlanner",
  804. Alias: "",
  805. JoinType: ast.INNER_JOIN,
  806. Expr: &ast.BinaryExpr{
  807. OP: ast.AND,
  808. LHS: &ast.BinaryExpr{
  809. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  810. OP: ast.EQ,
  811. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  812. },
  813. RHS: &ast.BinaryExpr{
  814. OP: ast.LT,
  815. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  816. RHS: &ast.IntegerLiteral{Val: 60},
  817. },
  818. },
  819. },
  820. },
  821. }.Init(),
  822. },
  823. },
  824. fields: []ast.Field{
  825. {
  826. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  827. Name: "id1",
  828. AName: ""},
  829. },
  830. isAggregate: false,
  831. sendMeta: false,
  832. }.Init(),
  833. }, { // 9 join table with window
  834. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  835. p: ProjectPlan{
  836. baseLogicalPlan: baseLogicalPlan{
  837. children: []LogicalPlan{
  838. JoinPlan{
  839. baseLogicalPlan: baseLogicalPlan{
  840. children: []LogicalPlan{
  841. JoinAlignPlan{
  842. baseLogicalPlan: baseLogicalPlan{
  843. children: []LogicalPlan{
  844. WindowPlan{
  845. baseLogicalPlan: baseLogicalPlan{
  846. children: []LogicalPlan{
  847. DataSourcePlan{
  848. name: "src1",
  849. streamFields: map[string]*ast.JsonStreamField{
  850. "id1": {
  851. Type: "bigint",
  852. },
  853. "temp": {
  854. Type: "bigint",
  855. },
  856. },
  857. streamStmt: streams["src1"],
  858. metaFields: []string{},
  859. }.Init(),
  860. },
  861. },
  862. condition: nil,
  863. wtype: ast.TUMBLING_WINDOW,
  864. length: 10000,
  865. interval: 0,
  866. limit: 0,
  867. }.Init(),
  868. DataSourcePlan{
  869. name: "tableInPlanner",
  870. streamFields: map[string]*ast.JsonStreamField{
  871. "hum": {
  872. Type: "bigint",
  873. },
  874. "id": {
  875. Type: "bigint",
  876. },
  877. },
  878. streamStmt: streams["tableInPlanner"],
  879. metaFields: []string{},
  880. }.Init(),
  881. },
  882. },
  883. Emitters: []string{"tableInPlanner"},
  884. }.Init(),
  885. },
  886. },
  887. from: &ast.Table{
  888. Name: "src1",
  889. },
  890. joins: []ast.Join{
  891. {
  892. Name: "tableInPlanner",
  893. Alias: "",
  894. JoinType: ast.INNER_JOIN,
  895. Expr: &ast.BinaryExpr{
  896. OP: ast.AND,
  897. LHS: &ast.BinaryExpr{
  898. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  899. OP: ast.EQ,
  900. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  901. },
  902. RHS: &ast.BinaryExpr{
  903. RHS: &ast.BinaryExpr{
  904. OP: ast.AND,
  905. LHS: &ast.BinaryExpr{
  906. OP: ast.GT,
  907. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  908. RHS: &ast.IntegerLiteral{Val: 20},
  909. },
  910. RHS: &ast.BinaryExpr{
  911. OP: ast.LT,
  912. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  913. RHS: &ast.IntegerLiteral{Val: 60},
  914. },
  915. },
  916. OP: ast.AND,
  917. LHS: &ast.BinaryExpr{
  918. OP: ast.GT,
  919. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  920. RHS: &ast.IntegerLiteral{Val: 111},
  921. },
  922. },
  923. },
  924. },
  925. },
  926. }.Init(),
  927. },
  928. },
  929. fields: []ast.Field{
  930. {
  931. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  932. Name: "id1",
  933. AName: ""},
  934. },
  935. isAggregate: false,
  936. sendMeta: false,
  937. }.Init(),
  938. }, { // 10 meta
  939. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  940. p: ProjectPlan{
  941. baseLogicalPlan: baseLogicalPlan{
  942. children: []LogicalPlan{
  943. FilterPlan{
  944. baseLogicalPlan: baseLogicalPlan{
  945. children: []LogicalPlan{
  946. DataSourcePlan{
  947. name: "src1",
  948. streamFields: map[string]*ast.JsonStreamField{
  949. "temp": {
  950. Type: "bigint",
  951. },
  952. },
  953. streamStmt: streams["src1"],
  954. metaFields: []string{"Humidity", "device", "id"},
  955. }.Init(),
  956. },
  957. },
  958. condition: &ast.BinaryExpr{
  959. LHS: &ast.Call{
  960. Name: "meta",
  961. FuncId: 2,
  962. Args: []ast.Expr{&ast.MetaRef{
  963. Name: "device",
  964. StreamName: ast.DefaultStream,
  965. }},
  966. },
  967. OP: ast.EQ,
  968. RHS: &ast.StringLiteral{
  969. Val: "demo2",
  970. },
  971. },
  972. }.Init(),
  973. },
  974. },
  975. fields: []ast.Field{
  976. {
  977. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  978. Name: "temp",
  979. AName: "",
  980. }, {
  981. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  982. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  983. Name: "id",
  984. StreamName: ast.DefaultStream,
  985. }}},
  986. []ast.StreamName{},
  987. nil,
  988. )},
  989. Name: "meta",
  990. AName: "eid",
  991. }, {
  992. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  993. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  994. &ast.BinaryExpr{
  995. OP: ast.ARROW,
  996. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  997. RHS: &ast.JsonFieldRef{Name: "Device"},
  998. },
  999. }},
  1000. []ast.StreamName{},
  1001. nil,
  1002. )},
  1003. Name: "meta",
  1004. AName: "hdevice",
  1005. },
  1006. },
  1007. isAggregate: false,
  1008. sendMeta: false,
  1009. }.Init(),
  1010. }, { // 11 join with same name field and aliased
  1011. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1012. p: ProjectPlan{
  1013. baseLogicalPlan: baseLogicalPlan{
  1014. children: []LogicalPlan{
  1015. JoinPlan{
  1016. baseLogicalPlan: baseLogicalPlan{
  1017. children: []LogicalPlan{
  1018. JoinAlignPlan{
  1019. baseLogicalPlan: baseLogicalPlan{
  1020. children: []LogicalPlan{
  1021. DataSourcePlan{
  1022. name: "src2",
  1023. streamFields: map[string]*ast.JsonStreamField{
  1024. "hum": {
  1025. Type: "bigint",
  1026. },
  1027. "id2": {
  1028. Type: "bigint",
  1029. },
  1030. },
  1031. streamStmt: streams["src2"],
  1032. metaFields: []string{},
  1033. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  1034. }.Init(),
  1035. DataSourcePlan{
  1036. name: "tableInPlanner",
  1037. streamFields: map[string]*ast.JsonStreamField{
  1038. "hum": {
  1039. Type: "bigint",
  1040. },
  1041. "id": {
  1042. Type: "bigint",
  1043. },
  1044. },
  1045. streamStmt: streams["tableInPlanner"],
  1046. metaFields: []string{},
  1047. }.Init(),
  1048. },
  1049. },
  1050. Emitters: []string{"tableInPlanner"},
  1051. }.Init(),
  1052. },
  1053. },
  1054. from: &ast.Table{
  1055. Name: "src2",
  1056. },
  1057. joins: []ast.Join{
  1058. {
  1059. Name: "tableInPlanner",
  1060. Alias: "",
  1061. JoinType: ast.INNER_JOIN,
  1062. Expr: &ast.BinaryExpr{
  1063. RHS: &ast.BinaryExpr{
  1064. OP: ast.EQ,
  1065. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1066. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1067. },
  1068. OP: ast.AND,
  1069. LHS: &ast.BinaryExpr{
  1070. OP: ast.GT,
  1071. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1072. &ast.FieldRef{
  1073. Name: "hum",
  1074. StreamName: "src2",
  1075. },
  1076. []ast.StreamName{"src2"},
  1077. &boolFalse,
  1078. )},
  1079. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1080. &ast.FieldRef{
  1081. Name: "hum",
  1082. StreamName: "tableInPlanner",
  1083. },
  1084. []ast.StreamName{"tableInPlanner"},
  1085. &boolFalse,
  1086. )},
  1087. },
  1088. },
  1089. },
  1090. },
  1091. }.Init(),
  1092. },
  1093. },
  1094. fields: []ast.Field{
  1095. {
  1096. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1097. &ast.FieldRef{
  1098. Name: "hum",
  1099. StreamName: "src2",
  1100. },
  1101. []ast.StreamName{"src2"},
  1102. &boolFalse,
  1103. )},
  1104. Name: "hum",
  1105. AName: "hum1",
  1106. }, {
  1107. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1108. &ast.FieldRef{
  1109. Name: "hum",
  1110. StreamName: "tableInPlanner",
  1111. },
  1112. []ast.StreamName{"tableInPlanner"},
  1113. &boolFalse,
  1114. )},
  1115. Name: "hum",
  1116. AName: "hum2",
  1117. },
  1118. },
  1119. isAggregate: false,
  1120. sendMeta: false,
  1121. }.Init(),
  1122. }, { // 12 meta with more fields
  1123. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1124. p: ProjectPlan{
  1125. baseLogicalPlan: baseLogicalPlan{
  1126. children: []LogicalPlan{
  1127. FilterPlan{
  1128. baseLogicalPlan: baseLogicalPlan{
  1129. children: []LogicalPlan{
  1130. DataSourcePlan{
  1131. name: "src1",
  1132. streamFields: map[string]*ast.JsonStreamField{
  1133. "temp": {
  1134. Type: "bigint",
  1135. },
  1136. },
  1137. streamStmt: streams["src1"],
  1138. metaFields: []string{},
  1139. allMeta: true,
  1140. }.Init(),
  1141. },
  1142. },
  1143. condition: &ast.BinaryExpr{
  1144. LHS: &ast.Call{
  1145. Name: "meta",
  1146. FuncId: 1,
  1147. Args: []ast.Expr{&ast.MetaRef{
  1148. Name: "device",
  1149. StreamName: ast.DefaultStream,
  1150. }},
  1151. },
  1152. OP: ast.EQ,
  1153. RHS: &ast.StringLiteral{
  1154. Val: "demo2",
  1155. },
  1156. },
  1157. }.Init(),
  1158. },
  1159. },
  1160. fields: []ast.Field{
  1161. {
  1162. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1163. Name: "temp",
  1164. AName: "",
  1165. }, {
  1166. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1167. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1168. Name: "*",
  1169. StreamName: ast.DefaultStream,
  1170. }}},
  1171. []ast.StreamName{},
  1172. nil,
  1173. )},
  1174. Name: "meta",
  1175. AName: "m",
  1176. },
  1177. },
  1178. isAggregate: false,
  1179. sendMeta: false,
  1180. }.Init(),
  1181. }, { // 13 analytic function plan
  1182. sql: `SELECT latest(lag(name)), id1 FROM src1 WHERE lag(temp) > temp`,
  1183. p: ProjectPlan{
  1184. baseLogicalPlan: baseLogicalPlan{
  1185. children: []LogicalPlan{
  1186. FilterPlan{
  1187. baseLogicalPlan: baseLogicalPlan{
  1188. children: []LogicalPlan{
  1189. AnalyticFuncsPlan{
  1190. baseLogicalPlan: baseLogicalPlan{
  1191. children: []LogicalPlan{
  1192. DataSourcePlan{
  1193. name: "src1",
  1194. streamFields: map[string]*ast.JsonStreamField{
  1195. "id1": {
  1196. Type: "bigint",
  1197. },
  1198. "name": {
  1199. Type: "string",
  1200. },
  1201. "temp": {
  1202. Type: "bigint",
  1203. },
  1204. },
  1205. streamStmt: streams["src1"],
  1206. metaFields: []string{},
  1207. }.Init(),
  1208. },
  1209. },
  1210. funcs: []*ast.Call{
  1211. {
  1212. Name: "lag",
  1213. FuncId: 2,
  1214. CachedField: "$$a_lag_2",
  1215. Args: []ast.Expr{&ast.FieldRef{
  1216. Name: "temp",
  1217. StreamName: "src1",
  1218. }},
  1219. },
  1220. {
  1221. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1222. },
  1223. {
  1224. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1225. },
  1226. },
  1227. }.Init(),
  1228. },
  1229. },
  1230. condition: &ast.BinaryExpr{
  1231. LHS: &ast.Call{
  1232. Name: "lag",
  1233. FuncId: 2,
  1234. Args: []ast.Expr{&ast.FieldRef{
  1235. Name: "temp",
  1236. StreamName: "src1",
  1237. }},
  1238. CachedField: "$$a_lag_2",
  1239. Cached: true,
  1240. },
  1241. OP: ast.GT,
  1242. RHS: &ast.FieldRef{
  1243. Name: "temp",
  1244. StreamName: "src1",
  1245. },
  1246. },
  1247. }.Init(),
  1248. },
  1249. },
  1250. fields: []ast.Field{
  1251. {
  1252. Expr: &ast.Call{
  1253. Name: "latest",
  1254. FuncId: 1,
  1255. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1256. CachedField: "$$a_latest_1",
  1257. Cached: true,
  1258. },
  1259. Name: "latest",
  1260. }, {
  1261. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1262. Name: "id1",
  1263. },
  1264. },
  1265. isAggregate: false,
  1266. sendMeta: false,
  1267. }.Init(),
  1268. },
  1269. { // 14
  1270. sql: `SELECT name, *, meta(device) FROM src1`,
  1271. p: ProjectPlan{
  1272. baseLogicalPlan: baseLogicalPlan{
  1273. children: []LogicalPlan{
  1274. DataSourcePlan{
  1275. baseLogicalPlan: baseLogicalPlan{},
  1276. name: "src1",
  1277. streamFields: map[string]*ast.JsonStreamField{
  1278. "id1": {
  1279. Type: "bigint",
  1280. },
  1281. "temp": {
  1282. Type: "bigint",
  1283. },
  1284. "name": {
  1285. Type: "string",
  1286. },
  1287. "myarray": {
  1288. Type: "array",
  1289. Items: &ast.JsonStreamField{
  1290. Type: "string",
  1291. },
  1292. },
  1293. },
  1294. streamStmt: streams["src1"],
  1295. metaFields: []string{"device"},
  1296. isWildCard: true,
  1297. }.Init(),
  1298. },
  1299. },
  1300. fields: []ast.Field{
  1301. {
  1302. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1303. Name: "name",
  1304. AName: "",
  1305. },
  1306. {
  1307. Name: "*",
  1308. Expr: &ast.Wildcard{
  1309. Token: ast.ASTERISK,
  1310. },
  1311. },
  1312. {
  1313. Name: "meta",
  1314. Expr: &ast.Call{
  1315. Name: "meta",
  1316. Args: []ast.Expr{
  1317. &ast.MetaRef{
  1318. StreamName: ast.DefaultStream,
  1319. Name: "device",
  1320. },
  1321. },
  1322. },
  1323. },
  1324. },
  1325. isAggregate: false,
  1326. allWildcard: true,
  1327. sendMeta: false,
  1328. }.Init(),
  1329. },
  1330. { // 15 analytic function over partition plan
  1331. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp), id1 FROM src1 WHERE lag(temp) > temp`,
  1332. p: ProjectPlan{
  1333. baseLogicalPlan: baseLogicalPlan{
  1334. children: []LogicalPlan{
  1335. FilterPlan{
  1336. baseLogicalPlan: baseLogicalPlan{
  1337. children: []LogicalPlan{
  1338. AnalyticFuncsPlan{
  1339. baseLogicalPlan: baseLogicalPlan{
  1340. children: []LogicalPlan{
  1341. DataSourcePlan{
  1342. name: "src1",
  1343. streamFields: map[string]*ast.JsonStreamField{
  1344. "id1": {
  1345. Type: "bigint",
  1346. },
  1347. "name": {
  1348. Type: "string",
  1349. },
  1350. "temp": {
  1351. Type: "bigint",
  1352. },
  1353. },
  1354. streamStmt: streams["src1"],
  1355. metaFields: []string{},
  1356. }.Init(),
  1357. },
  1358. },
  1359. funcs: []*ast.Call{
  1360. {
  1361. Name: "lag",
  1362. FuncId: 2,
  1363. CachedField: "$$a_lag_2",
  1364. Args: []ast.Expr{&ast.FieldRef{
  1365. Name: "temp",
  1366. StreamName: "src1",
  1367. }},
  1368. },
  1369. {
  1370. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}},
  1371. },
  1372. {
  1373. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1374. },
  1375. },
  1376. }.Init(),
  1377. },
  1378. },
  1379. condition: &ast.BinaryExpr{
  1380. LHS: &ast.Call{
  1381. Name: "lag",
  1382. FuncId: 2,
  1383. Args: []ast.Expr{&ast.FieldRef{
  1384. Name: "temp",
  1385. StreamName: "src1",
  1386. }},
  1387. CachedField: "$$a_lag_2",
  1388. Cached: true,
  1389. },
  1390. OP: ast.GT,
  1391. RHS: &ast.FieldRef{
  1392. Name: "temp",
  1393. StreamName: "src1",
  1394. },
  1395. },
  1396. }.Init(),
  1397. },
  1398. },
  1399. fields: []ast.Field{
  1400. {
  1401. Expr: &ast.Call{
  1402. Name: "latest",
  1403. FuncId: 1,
  1404. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1405. CachedField: "$$a_latest_1",
  1406. Cached: true,
  1407. Partition: &ast.PartitionExpr{
  1408. Exprs: []ast.Expr{
  1409. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1410. },
  1411. },
  1412. },
  1413. Name: "latest",
  1414. }, {
  1415. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1416. Name: "id1",
  1417. },
  1418. },
  1419. isAggregate: false,
  1420. sendMeta: false,
  1421. }.Init(),
  1422. },
  1423. { // 16 analytic function over partition when plan
  1424. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp WHEN temp > 12), id1 FROM src1 WHERE lag(temp) > temp`,
  1425. p: ProjectPlan{
  1426. baseLogicalPlan: baseLogicalPlan{
  1427. children: []LogicalPlan{
  1428. FilterPlan{
  1429. baseLogicalPlan: baseLogicalPlan{
  1430. children: []LogicalPlan{
  1431. AnalyticFuncsPlan{
  1432. baseLogicalPlan: baseLogicalPlan{
  1433. children: []LogicalPlan{
  1434. DataSourcePlan{
  1435. name: "src1",
  1436. streamFields: map[string]*ast.JsonStreamField{
  1437. "id1": {
  1438. Type: "bigint",
  1439. },
  1440. "name": {
  1441. Type: "string",
  1442. },
  1443. "temp": {
  1444. Type: "bigint",
  1445. },
  1446. },
  1447. streamStmt: streams["src1"],
  1448. metaFields: []string{},
  1449. }.Init(),
  1450. },
  1451. },
  1452. funcs: []*ast.Call{
  1453. {
  1454. Name: "lag",
  1455. FuncId: 2,
  1456. CachedField: "$$a_lag_2",
  1457. Args: []ast.Expr{&ast.FieldRef{
  1458. Name: "temp",
  1459. StreamName: "src1",
  1460. }},
  1461. },
  1462. {
  1463. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}}, WhenExpr: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"}, OP: ast.GT, RHS: &ast.IntegerLiteral{Val: 12}},
  1464. },
  1465. {
  1466. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1467. },
  1468. },
  1469. }.Init(),
  1470. },
  1471. },
  1472. condition: &ast.BinaryExpr{
  1473. LHS: &ast.Call{
  1474. Name: "lag",
  1475. FuncId: 2,
  1476. Args: []ast.Expr{&ast.FieldRef{
  1477. Name: "temp",
  1478. StreamName: "src1",
  1479. }},
  1480. CachedField: "$$a_lag_2",
  1481. Cached: true,
  1482. },
  1483. OP: ast.GT,
  1484. RHS: &ast.FieldRef{
  1485. Name: "temp",
  1486. StreamName: "src1",
  1487. },
  1488. },
  1489. }.Init(),
  1490. },
  1491. },
  1492. fields: []ast.Field{
  1493. {
  1494. Expr: &ast.Call{
  1495. Name: "latest",
  1496. FuncId: 1,
  1497. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1498. CachedField: "$$a_latest_1",
  1499. Cached: true,
  1500. Partition: &ast.PartitionExpr{
  1501. Exprs: []ast.Expr{
  1502. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1503. },
  1504. },
  1505. WhenExpr: &ast.BinaryExpr{
  1506. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1507. OP: ast.GT,
  1508. RHS: &ast.IntegerLiteral{Val: 12},
  1509. },
  1510. },
  1511. Name: "latest",
  1512. }, {
  1513. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1514. Name: "id1",
  1515. },
  1516. },
  1517. isAggregate: false,
  1518. sendMeta: false,
  1519. }.Init(),
  1520. }, { // 17. do not optimize sliding window
  1521. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY SLIDINGWINDOW(ss, 10) HAVING COUNT(*) > 2`,
  1522. p: ProjectPlan{
  1523. baseLogicalPlan: baseLogicalPlan{
  1524. children: []LogicalPlan{
  1525. HavingPlan{
  1526. baseLogicalPlan: baseLogicalPlan{
  1527. children: []LogicalPlan{
  1528. FilterPlan{
  1529. baseLogicalPlan: baseLogicalPlan{
  1530. children: []LogicalPlan{
  1531. WindowPlan{
  1532. baseLogicalPlan: baseLogicalPlan{
  1533. children: []LogicalPlan{
  1534. DataSourcePlan{
  1535. name: "src1",
  1536. isWildCard: true,
  1537. streamFields: map[string]*ast.JsonStreamField{
  1538. "id1": {
  1539. Type: "bigint",
  1540. },
  1541. "temp": {
  1542. Type: "bigint",
  1543. },
  1544. "name": {
  1545. Type: "string",
  1546. },
  1547. "myarray": {
  1548. Type: "array",
  1549. Items: &ast.JsonStreamField{
  1550. Type: "string",
  1551. },
  1552. },
  1553. },
  1554. streamStmt: streams["src1"],
  1555. metaFields: []string{},
  1556. }.Init(),
  1557. },
  1558. },
  1559. condition: nil,
  1560. wtype: ast.SLIDING_WINDOW,
  1561. length: 10000,
  1562. interval: 0,
  1563. limit: 0,
  1564. }.Init(),
  1565. },
  1566. },
  1567. condition: &ast.BinaryExpr{
  1568. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1569. OP: ast.GT,
  1570. RHS: &ast.IntegerLiteral{Val: 20},
  1571. },
  1572. }.Init(),
  1573. },
  1574. },
  1575. condition: &ast.BinaryExpr{
  1576. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1577. Token: ast.ASTERISK,
  1578. }}, FuncType: ast.FuncTypeAgg},
  1579. OP: ast.GT,
  1580. RHS: &ast.IntegerLiteral{Val: 2},
  1581. },
  1582. }.Init(),
  1583. },
  1584. },
  1585. fields: []ast.Field{
  1586. {
  1587. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1588. Name: "*",
  1589. AName: ""},
  1590. },
  1591. isAggregate: false,
  1592. sendMeta: false,
  1593. }.Init(),
  1594. },
  1595. }
  1596. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1597. for i, tt := range tests {
  1598. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1599. if err != nil {
  1600. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1601. continue
  1602. }
  1603. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1604. IsEventTime: false,
  1605. LateTol: 0,
  1606. Concurrency: 0,
  1607. BufferLength: 0,
  1608. SendMetaToSink: false,
  1609. Qos: 0,
  1610. CheckpointInterval: 0,
  1611. SendError: true,
  1612. }, store)
  1613. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1614. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1615. } else if !reflect.DeepEqual(tt.p, p) {
  1616. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1617. }
  1618. }
  1619. }
  1620. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1621. store, err := store.GetKV("stream")
  1622. if err != nil {
  1623. t.Error(err)
  1624. return
  1625. }
  1626. streamSqls := map[string]string{
  1627. "src1": `CREATE STREAM src1 (
  1628. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1629. "src2": `CREATE STREAM src2 (
  1630. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1631. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1632. id BIGINT,
  1633. name STRING,
  1634. value STRING,
  1635. hum BIGINT
  1636. ) WITH (TYPE="file");`,
  1637. }
  1638. types := map[string]ast.StreamType{
  1639. "src1": ast.TypeStream,
  1640. "src2": ast.TypeStream,
  1641. "tableInPlanner": ast.TypeTable,
  1642. }
  1643. for name, sql := range streamSqls {
  1644. s, err := json.Marshal(&xsql.StreamInfo{
  1645. StreamType: types[name],
  1646. Statement: sql,
  1647. })
  1648. if err != nil {
  1649. t.Error(err)
  1650. t.Fail()
  1651. }
  1652. err = store.Set(name, string(s))
  1653. if err != nil {
  1654. t.Error(err)
  1655. t.Fail()
  1656. }
  1657. }
  1658. streams := make(map[string]*ast.StreamStmt)
  1659. for n := range streamSqls {
  1660. streamStmt, err := xsql.GetDataSource(store, n)
  1661. if err != nil {
  1662. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1663. return
  1664. }
  1665. streams[n] = streamStmt
  1666. }
  1667. var (
  1668. //boolTrue = true
  1669. boolFalse = false
  1670. )
  1671. var tests = []struct {
  1672. sql string
  1673. p LogicalPlan
  1674. err string
  1675. }{
  1676. { // 0
  1677. sql: `SELECT name FROM src1`,
  1678. p: ProjectPlan{
  1679. baseLogicalPlan: baseLogicalPlan{
  1680. children: []LogicalPlan{
  1681. DataSourcePlan{
  1682. baseLogicalPlan: baseLogicalPlan{},
  1683. name: "src1",
  1684. streamFields: map[string]*ast.JsonStreamField{
  1685. "name": nil,
  1686. },
  1687. streamStmt: streams["src1"],
  1688. isSchemaless: true,
  1689. metaFields: []string{},
  1690. }.Init(),
  1691. },
  1692. },
  1693. fields: []ast.Field{
  1694. {
  1695. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1696. Name: "name",
  1697. AName: "",
  1698. },
  1699. },
  1700. isAggregate: false,
  1701. sendMeta: false,
  1702. }.Init(),
  1703. }, { // 1 optimize where to data source
  1704. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1705. p: ProjectPlan{
  1706. baseLogicalPlan: baseLogicalPlan{
  1707. children: []LogicalPlan{
  1708. WindowPlan{
  1709. baseLogicalPlan: baseLogicalPlan{
  1710. children: []LogicalPlan{
  1711. FilterPlan{
  1712. baseLogicalPlan: baseLogicalPlan{
  1713. children: []LogicalPlan{
  1714. DataSourcePlan{
  1715. name: "src1",
  1716. streamFields: map[string]*ast.JsonStreamField{
  1717. "name": nil,
  1718. "temp": nil,
  1719. },
  1720. streamStmt: streams["src1"],
  1721. metaFields: []string{},
  1722. isSchemaless: true,
  1723. }.Init(),
  1724. },
  1725. },
  1726. condition: &ast.BinaryExpr{
  1727. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1728. OP: ast.EQ,
  1729. RHS: &ast.StringLiteral{Val: "v1"},
  1730. },
  1731. }.Init(),
  1732. },
  1733. },
  1734. condition: nil,
  1735. wtype: ast.TUMBLING_WINDOW,
  1736. length: 10000,
  1737. interval: 0,
  1738. limit: 0,
  1739. }.Init(),
  1740. },
  1741. },
  1742. fields: []ast.Field{
  1743. {
  1744. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1745. Name: "temp",
  1746. AName: ""},
  1747. },
  1748. isAggregate: false,
  1749. sendMeta: false,
  1750. }.Init(),
  1751. }, { // 2 condition that cannot be optimized
  1752. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1753. p: ProjectPlan{
  1754. baseLogicalPlan: baseLogicalPlan{
  1755. children: []LogicalPlan{
  1756. JoinPlan{
  1757. baseLogicalPlan: baseLogicalPlan{
  1758. children: []LogicalPlan{
  1759. WindowPlan{
  1760. baseLogicalPlan: baseLogicalPlan{
  1761. children: []LogicalPlan{
  1762. DataSourcePlan{
  1763. name: "src1",
  1764. streamFields: map[string]*ast.JsonStreamField{
  1765. "id1": nil,
  1766. "temp": nil,
  1767. },
  1768. streamStmt: streams["src1"],
  1769. metaFields: []string{},
  1770. isSchemaless: true,
  1771. }.Init(),
  1772. DataSourcePlan{
  1773. name: "src2",
  1774. streamFields: map[string]*ast.JsonStreamField{ // can't determine where is id1 belonged to
  1775. "hum": nil,
  1776. "id1": nil,
  1777. "id2": nil,
  1778. },
  1779. isSchemaless: true,
  1780. streamStmt: streams["src2"],
  1781. metaFields: []string{},
  1782. }.Init(),
  1783. },
  1784. },
  1785. condition: nil,
  1786. wtype: ast.TUMBLING_WINDOW,
  1787. length: 10000,
  1788. interval: 0,
  1789. limit: 0,
  1790. }.Init(),
  1791. },
  1792. },
  1793. from: &ast.Table{Name: "src1"},
  1794. joins: ast.Joins{ast.Join{
  1795. Name: "src2",
  1796. JoinType: ast.INNER_JOIN,
  1797. Expr: &ast.BinaryExpr{
  1798. OP: ast.AND,
  1799. LHS: &ast.BinaryExpr{
  1800. LHS: &ast.BinaryExpr{
  1801. OP: ast.GT,
  1802. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1803. RHS: &ast.IntegerLiteral{Val: 20},
  1804. },
  1805. OP: ast.OR,
  1806. RHS: &ast.BinaryExpr{
  1807. OP: ast.GT,
  1808. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1809. RHS: &ast.IntegerLiteral{Val: 60},
  1810. },
  1811. },
  1812. RHS: &ast.BinaryExpr{
  1813. OP: ast.EQ,
  1814. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1815. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1816. },
  1817. },
  1818. }},
  1819. }.Init(),
  1820. },
  1821. },
  1822. fields: []ast.Field{
  1823. {
  1824. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1825. Name: "id1",
  1826. AName: ""},
  1827. },
  1828. isAggregate: false,
  1829. sendMeta: false,
  1830. }.Init(),
  1831. }, { // 3 optimize window filter
  1832. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1833. p: ProjectPlan{
  1834. baseLogicalPlan: baseLogicalPlan{
  1835. children: []LogicalPlan{
  1836. WindowPlan{
  1837. baseLogicalPlan: baseLogicalPlan{
  1838. children: []LogicalPlan{
  1839. FilterPlan{
  1840. baseLogicalPlan: baseLogicalPlan{
  1841. children: []LogicalPlan{
  1842. DataSourcePlan{
  1843. name: "src1",
  1844. streamFields: map[string]*ast.JsonStreamField{
  1845. "id1": nil,
  1846. "name": nil,
  1847. "temp": nil,
  1848. },
  1849. isSchemaless: true,
  1850. streamStmt: streams["src1"],
  1851. metaFields: []string{},
  1852. }.Init(),
  1853. },
  1854. },
  1855. condition: &ast.BinaryExpr{
  1856. OP: ast.AND,
  1857. LHS: &ast.BinaryExpr{
  1858. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1859. OP: ast.EQ,
  1860. RHS: &ast.StringLiteral{Val: "v1"},
  1861. },
  1862. RHS: &ast.BinaryExpr{
  1863. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1864. OP: ast.GT,
  1865. RHS: &ast.IntegerLiteral{Val: 2},
  1866. },
  1867. },
  1868. }.Init(),
  1869. },
  1870. },
  1871. condition: nil,
  1872. wtype: ast.TUMBLING_WINDOW,
  1873. length: 10000,
  1874. interval: 0,
  1875. limit: 0,
  1876. }.Init(),
  1877. },
  1878. },
  1879. fields: []ast.Field{
  1880. {
  1881. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1882. Name: "id1",
  1883. AName: ""},
  1884. },
  1885. isAggregate: false,
  1886. sendMeta: false,
  1887. }.Init(),
  1888. }, { // 4. do not optimize count window
  1889. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1890. p: ProjectPlan{
  1891. baseLogicalPlan: baseLogicalPlan{
  1892. children: []LogicalPlan{
  1893. HavingPlan{
  1894. baseLogicalPlan: baseLogicalPlan{
  1895. children: []LogicalPlan{
  1896. FilterPlan{
  1897. baseLogicalPlan: baseLogicalPlan{
  1898. children: []LogicalPlan{
  1899. WindowPlan{
  1900. baseLogicalPlan: baseLogicalPlan{
  1901. children: []LogicalPlan{
  1902. DataSourcePlan{
  1903. name: "src1",
  1904. isWildCard: true,
  1905. streamFields: map[string]*ast.JsonStreamField{},
  1906. streamStmt: streams["src1"],
  1907. metaFields: []string{},
  1908. isSchemaless: true,
  1909. }.Init(),
  1910. },
  1911. },
  1912. condition: nil,
  1913. wtype: ast.COUNT_WINDOW,
  1914. length: 5,
  1915. interval: 1,
  1916. limit: 0,
  1917. }.Init(),
  1918. },
  1919. },
  1920. condition: &ast.BinaryExpr{
  1921. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1922. OP: ast.GT,
  1923. RHS: &ast.IntegerLiteral{Val: 20},
  1924. },
  1925. }.Init(),
  1926. },
  1927. },
  1928. condition: &ast.BinaryExpr{
  1929. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1930. Token: ast.ASTERISK,
  1931. }}, FuncType: ast.FuncTypeAgg},
  1932. OP: ast.GT,
  1933. RHS: &ast.IntegerLiteral{Val: 2},
  1934. },
  1935. }.Init(),
  1936. },
  1937. },
  1938. fields: []ast.Field{
  1939. {
  1940. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1941. Name: "*",
  1942. AName: ""},
  1943. },
  1944. isAggregate: false,
  1945. sendMeta: false,
  1946. }.Init(),
  1947. }, { // 5. optimize join on
  1948. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1949. p: ProjectPlan{
  1950. baseLogicalPlan: baseLogicalPlan{
  1951. children: []LogicalPlan{
  1952. JoinPlan{
  1953. baseLogicalPlan: baseLogicalPlan{
  1954. children: []LogicalPlan{
  1955. WindowPlan{
  1956. baseLogicalPlan: baseLogicalPlan{
  1957. children: []LogicalPlan{
  1958. FilterPlan{
  1959. baseLogicalPlan: baseLogicalPlan{
  1960. children: []LogicalPlan{
  1961. DataSourcePlan{
  1962. name: "src1",
  1963. streamFields: map[string]*ast.JsonStreamField{
  1964. "id1": nil,
  1965. "temp": nil,
  1966. },
  1967. isSchemaless: true,
  1968. streamStmt: streams["src1"],
  1969. metaFields: []string{},
  1970. }.Init(),
  1971. },
  1972. },
  1973. condition: &ast.BinaryExpr{
  1974. RHS: &ast.BinaryExpr{
  1975. OP: ast.GT,
  1976. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1977. RHS: &ast.IntegerLiteral{Val: 20},
  1978. },
  1979. OP: ast.AND,
  1980. LHS: &ast.BinaryExpr{
  1981. OP: ast.GT,
  1982. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1983. RHS: &ast.IntegerLiteral{Val: 111},
  1984. },
  1985. },
  1986. }.Init(),
  1987. FilterPlan{
  1988. baseLogicalPlan: baseLogicalPlan{
  1989. children: []LogicalPlan{
  1990. DataSourcePlan{
  1991. name: "src2",
  1992. streamFields: map[string]*ast.JsonStreamField{
  1993. "hum": nil,
  1994. "id1": nil,
  1995. "id2": nil,
  1996. },
  1997. isSchemaless: true,
  1998. streamStmt: streams["src2"],
  1999. metaFields: []string{},
  2000. }.Init(),
  2001. },
  2002. },
  2003. condition: &ast.BinaryExpr{
  2004. OP: ast.LT,
  2005. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2006. RHS: &ast.IntegerLiteral{Val: 60},
  2007. },
  2008. }.Init(),
  2009. },
  2010. },
  2011. condition: nil,
  2012. wtype: ast.TUMBLING_WINDOW,
  2013. length: 10000,
  2014. interval: 0,
  2015. limit: 0,
  2016. }.Init(),
  2017. },
  2018. },
  2019. from: &ast.Table{
  2020. Name: "src1",
  2021. },
  2022. joins: []ast.Join{
  2023. {
  2024. Name: "src2",
  2025. Alias: "",
  2026. JoinType: ast.INNER_JOIN,
  2027. Expr: &ast.BinaryExpr{
  2028. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2029. OP: ast.EQ,
  2030. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2031. },
  2032. },
  2033. },
  2034. }.Init(),
  2035. },
  2036. },
  2037. fields: []ast.Field{
  2038. {
  2039. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2040. Name: "id1",
  2041. AName: ""},
  2042. },
  2043. isAggregate: false,
  2044. sendMeta: false,
  2045. }.Init(),
  2046. }, { // 6. optimize outter join on
  2047. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2048. p: ProjectPlan{
  2049. baseLogicalPlan: baseLogicalPlan{
  2050. children: []LogicalPlan{
  2051. JoinPlan{
  2052. baseLogicalPlan: baseLogicalPlan{
  2053. children: []LogicalPlan{
  2054. WindowPlan{
  2055. baseLogicalPlan: baseLogicalPlan{
  2056. children: []LogicalPlan{
  2057. FilterPlan{
  2058. baseLogicalPlan: baseLogicalPlan{
  2059. children: []LogicalPlan{
  2060. DataSourcePlan{
  2061. name: "src1",
  2062. streamFields: map[string]*ast.JsonStreamField{
  2063. "id1": nil,
  2064. "temp": nil,
  2065. },
  2066. isSchemaless: true,
  2067. streamStmt: streams["src1"],
  2068. metaFields: []string{},
  2069. }.Init(),
  2070. },
  2071. },
  2072. condition: &ast.BinaryExpr{
  2073. OP: ast.GT,
  2074. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2075. RHS: &ast.IntegerLiteral{Val: 111},
  2076. },
  2077. }.Init(),
  2078. DataSourcePlan{
  2079. name: "src2",
  2080. streamFields: map[string]*ast.JsonStreamField{
  2081. "hum": nil,
  2082. "id1": nil,
  2083. "id2": nil,
  2084. },
  2085. isSchemaless: true,
  2086. streamStmt: streams["src2"],
  2087. metaFields: []string{},
  2088. }.Init(),
  2089. },
  2090. },
  2091. condition: nil,
  2092. wtype: ast.TUMBLING_WINDOW,
  2093. length: 10000,
  2094. interval: 0,
  2095. limit: 0,
  2096. }.Init(),
  2097. },
  2098. },
  2099. from: &ast.Table{
  2100. Name: "src1",
  2101. },
  2102. joins: []ast.Join{
  2103. {
  2104. Name: "src2",
  2105. Alias: "",
  2106. JoinType: ast.FULL_JOIN,
  2107. Expr: &ast.BinaryExpr{
  2108. OP: ast.AND,
  2109. LHS: &ast.BinaryExpr{
  2110. OP: ast.AND,
  2111. LHS: &ast.BinaryExpr{
  2112. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2113. OP: ast.EQ,
  2114. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2115. },
  2116. RHS: &ast.BinaryExpr{
  2117. OP: ast.GT,
  2118. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2119. RHS: &ast.IntegerLiteral{Val: 20},
  2120. },
  2121. },
  2122. RHS: &ast.BinaryExpr{
  2123. OP: ast.LT,
  2124. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2125. RHS: &ast.IntegerLiteral{Val: 60},
  2126. },
  2127. },
  2128. },
  2129. },
  2130. }.Init(),
  2131. },
  2132. },
  2133. fields: []ast.Field{
  2134. {
  2135. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2136. Name: "id1",
  2137. AName: ""},
  2138. },
  2139. isAggregate: false,
  2140. sendMeta: false,
  2141. }.Init(),
  2142. }, { // 7 window error for table
  2143. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2144. p: nil,
  2145. err: "cannot run window for TABLE sources",
  2146. }, { // 8 join table without window
  2147. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  2148. p: ProjectPlan{
  2149. baseLogicalPlan: baseLogicalPlan{
  2150. children: []LogicalPlan{
  2151. JoinPlan{
  2152. baseLogicalPlan: baseLogicalPlan{
  2153. children: []LogicalPlan{
  2154. JoinAlignPlan{
  2155. baseLogicalPlan: baseLogicalPlan{
  2156. children: []LogicalPlan{
  2157. FilterPlan{
  2158. baseLogicalPlan: baseLogicalPlan{
  2159. children: []LogicalPlan{
  2160. DataSourcePlan{
  2161. name: "src1",
  2162. streamFields: map[string]*ast.JsonStreamField{
  2163. "hum": nil,
  2164. "id1": nil,
  2165. "temp": nil,
  2166. },
  2167. isSchemaless: true,
  2168. streamStmt: streams["src1"],
  2169. metaFields: []string{},
  2170. }.Init(),
  2171. },
  2172. },
  2173. condition: &ast.BinaryExpr{
  2174. RHS: &ast.BinaryExpr{
  2175. OP: ast.GT,
  2176. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2177. RHS: &ast.IntegerLiteral{Val: 20},
  2178. },
  2179. OP: ast.AND,
  2180. LHS: &ast.BinaryExpr{
  2181. OP: ast.GT,
  2182. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2183. RHS: &ast.IntegerLiteral{Val: 111},
  2184. },
  2185. },
  2186. }.Init(),
  2187. DataSourcePlan{
  2188. name: "tableInPlanner",
  2189. streamFields: map[string]*ast.JsonStreamField{
  2190. "hum": {
  2191. Type: "bigint",
  2192. },
  2193. "id": {
  2194. Type: "bigint",
  2195. },
  2196. },
  2197. streamStmt: streams["tableInPlanner"],
  2198. metaFields: []string{},
  2199. }.Init(),
  2200. },
  2201. },
  2202. Emitters: []string{"tableInPlanner"},
  2203. }.Init(),
  2204. },
  2205. },
  2206. from: &ast.Table{
  2207. Name: "src1",
  2208. },
  2209. joins: []ast.Join{
  2210. {
  2211. Name: "tableInPlanner",
  2212. Alias: "",
  2213. JoinType: ast.INNER_JOIN,
  2214. Expr: &ast.BinaryExpr{
  2215. OP: ast.AND,
  2216. LHS: &ast.BinaryExpr{
  2217. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2218. OP: ast.EQ,
  2219. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  2220. },
  2221. RHS: &ast.BinaryExpr{
  2222. OP: ast.LT,
  2223. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  2224. RHS: &ast.IntegerLiteral{Val: 60},
  2225. },
  2226. },
  2227. },
  2228. },
  2229. }.Init(),
  2230. },
  2231. },
  2232. fields: []ast.Field{
  2233. {
  2234. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2235. Name: "id1",
  2236. AName: ""},
  2237. },
  2238. isAggregate: false,
  2239. sendMeta: false,
  2240. }.Init(),
  2241. }, { // 9 join table with window
  2242. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2243. p: ProjectPlan{
  2244. baseLogicalPlan: baseLogicalPlan{
  2245. children: []LogicalPlan{
  2246. JoinPlan{
  2247. baseLogicalPlan: baseLogicalPlan{
  2248. children: []LogicalPlan{
  2249. JoinAlignPlan{
  2250. baseLogicalPlan: baseLogicalPlan{
  2251. children: []LogicalPlan{
  2252. WindowPlan{
  2253. baseLogicalPlan: baseLogicalPlan{
  2254. children: []LogicalPlan{
  2255. DataSourcePlan{
  2256. name: "src1",
  2257. streamFields: map[string]*ast.JsonStreamField{
  2258. "id1": nil,
  2259. "temp": nil,
  2260. },
  2261. isSchemaless: true,
  2262. streamStmt: streams["src1"],
  2263. metaFields: []string{},
  2264. }.Init(),
  2265. },
  2266. },
  2267. condition: nil,
  2268. wtype: ast.TUMBLING_WINDOW,
  2269. length: 10000,
  2270. interval: 0,
  2271. limit: 0,
  2272. }.Init(),
  2273. DataSourcePlan{
  2274. name: "tableInPlanner",
  2275. streamFields: map[string]*ast.JsonStreamField{
  2276. "hum": {
  2277. Type: "bigint",
  2278. },
  2279. "id": {
  2280. Type: "bigint",
  2281. },
  2282. },
  2283. streamStmt: streams["tableInPlanner"],
  2284. metaFields: []string{},
  2285. }.Init(),
  2286. },
  2287. },
  2288. Emitters: []string{"tableInPlanner"},
  2289. }.Init(),
  2290. },
  2291. },
  2292. from: &ast.Table{
  2293. Name: "src1",
  2294. },
  2295. joins: []ast.Join{
  2296. {
  2297. Name: "tableInPlanner",
  2298. Alias: "",
  2299. JoinType: ast.INNER_JOIN,
  2300. Expr: &ast.BinaryExpr{
  2301. OP: ast.AND,
  2302. LHS: &ast.BinaryExpr{
  2303. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2304. OP: ast.EQ,
  2305. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  2306. },
  2307. RHS: &ast.BinaryExpr{
  2308. RHS: &ast.BinaryExpr{
  2309. OP: ast.AND,
  2310. LHS: &ast.BinaryExpr{
  2311. OP: ast.GT,
  2312. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2313. RHS: &ast.IntegerLiteral{Val: 20},
  2314. },
  2315. RHS: &ast.BinaryExpr{
  2316. OP: ast.LT,
  2317. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  2318. RHS: &ast.IntegerLiteral{Val: 60},
  2319. },
  2320. },
  2321. OP: ast.AND,
  2322. LHS: &ast.BinaryExpr{
  2323. OP: ast.GT,
  2324. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2325. RHS: &ast.IntegerLiteral{Val: 111},
  2326. },
  2327. },
  2328. },
  2329. },
  2330. },
  2331. }.Init(),
  2332. },
  2333. },
  2334. fields: []ast.Field{
  2335. {
  2336. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2337. Name: "id1",
  2338. AName: ""},
  2339. },
  2340. isAggregate: false,
  2341. sendMeta: false,
  2342. }.Init(),
  2343. }, { // 10 meta
  2344. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  2345. p: ProjectPlan{
  2346. baseLogicalPlan: baseLogicalPlan{
  2347. children: []LogicalPlan{
  2348. FilterPlan{
  2349. baseLogicalPlan: baseLogicalPlan{
  2350. children: []LogicalPlan{
  2351. DataSourcePlan{
  2352. name: "src1",
  2353. streamFields: map[string]*ast.JsonStreamField{
  2354. "temp": nil,
  2355. },
  2356. isSchemaless: true,
  2357. streamStmt: streams["src1"],
  2358. metaFields: []string{"Humidity", "device", "id"},
  2359. }.Init(),
  2360. },
  2361. },
  2362. condition: &ast.BinaryExpr{
  2363. LHS: &ast.Call{
  2364. Name: "meta",
  2365. FuncId: 2,
  2366. Args: []ast.Expr{&ast.MetaRef{
  2367. Name: "device",
  2368. StreamName: ast.DefaultStream,
  2369. }},
  2370. },
  2371. OP: ast.EQ,
  2372. RHS: &ast.StringLiteral{
  2373. Val: "demo2",
  2374. },
  2375. },
  2376. }.Init(),
  2377. },
  2378. },
  2379. fields: []ast.Field{
  2380. {
  2381. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2382. Name: "temp",
  2383. AName: "",
  2384. }, {
  2385. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2386. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  2387. Name: "id",
  2388. StreamName: ast.DefaultStream,
  2389. }}},
  2390. []ast.StreamName{},
  2391. nil,
  2392. )},
  2393. Name: "meta",
  2394. AName: "eid",
  2395. }, {
  2396. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2397. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  2398. &ast.BinaryExpr{
  2399. OP: ast.ARROW,
  2400. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  2401. RHS: &ast.JsonFieldRef{Name: "Device"},
  2402. },
  2403. }},
  2404. []ast.StreamName{},
  2405. nil,
  2406. )},
  2407. Name: "meta",
  2408. AName: "hdevice",
  2409. },
  2410. },
  2411. isAggregate: false,
  2412. sendMeta: false,
  2413. }.Init(),
  2414. }, { // 11 join with same name field and aliased
  2415. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  2416. p: ProjectPlan{
  2417. baseLogicalPlan: baseLogicalPlan{
  2418. children: []LogicalPlan{
  2419. JoinPlan{
  2420. baseLogicalPlan: baseLogicalPlan{
  2421. children: []LogicalPlan{
  2422. JoinAlignPlan{
  2423. baseLogicalPlan: baseLogicalPlan{
  2424. children: []LogicalPlan{
  2425. DataSourcePlan{
  2426. name: "src2",
  2427. streamFields: map[string]*ast.JsonStreamField{
  2428. "hum": nil,
  2429. "id": nil,
  2430. "id2": nil,
  2431. },
  2432. isSchemaless: true,
  2433. streamStmt: streams["src2"],
  2434. metaFields: []string{},
  2435. }.Init(),
  2436. DataSourcePlan{
  2437. name: "tableInPlanner",
  2438. streamFields: map[string]*ast.JsonStreamField{
  2439. "hum": {
  2440. Type: "bigint",
  2441. },
  2442. "id": {
  2443. Type: "bigint",
  2444. },
  2445. },
  2446. streamStmt: streams["tableInPlanner"],
  2447. metaFields: []string{},
  2448. }.Init(),
  2449. },
  2450. },
  2451. Emitters: []string{"tableInPlanner"},
  2452. }.Init(),
  2453. },
  2454. },
  2455. from: &ast.Table{
  2456. Name: "src2",
  2457. },
  2458. joins: []ast.Join{
  2459. {
  2460. Name: "tableInPlanner",
  2461. Alias: "",
  2462. JoinType: ast.INNER_JOIN,
  2463. Expr: &ast.BinaryExpr{
  2464. RHS: &ast.BinaryExpr{
  2465. OP: ast.EQ,
  2466. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  2467. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  2468. },
  2469. OP: ast.AND,
  2470. LHS: &ast.BinaryExpr{
  2471. OP: ast.GT,
  2472. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2473. &ast.FieldRef{
  2474. Name: "hum",
  2475. StreamName: "src2",
  2476. },
  2477. []ast.StreamName{"src2"},
  2478. &boolFalse,
  2479. )},
  2480. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2481. &ast.FieldRef{
  2482. Name: "hum",
  2483. StreamName: "tableInPlanner",
  2484. },
  2485. []ast.StreamName{"tableInPlanner"},
  2486. &boolFalse,
  2487. )},
  2488. },
  2489. },
  2490. },
  2491. },
  2492. }.Init(),
  2493. },
  2494. },
  2495. fields: []ast.Field{
  2496. {
  2497. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2498. &ast.FieldRef{
  2499. Name: "hum",
  2500. StreamName: "src2",
  2501. },
  2502. []ast.StreamName{"src2"},
  2503. &boolFalse,
  2504. )},
  2505. Name: "hum",
  2506. AName: "hum1",
  2507. }, {
  2508. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2509. &ast.FieldRef{
  2510. Name: "hum",
  2511. StreamName: "tableInPlanner",
  2512. },
  2513. []ast.StreamName{"tableInPlanner"},
  2514. &boolFalse,
  2515. )},
  2516. Name: "hum",
  2517. AName: "hum2",
  2518. },
  2519. },
  2520. isAggregate: false,
  2521. sendMeta: false,
  2522. }.Init(),
  2523. }, { // 12
  2524. sql: `SELECT name->first, name->last FROM src1`,
  2525. p: ProjectPlan{
  2526. baseLogicalPlan: baseLogicalPlan{
  2527. children: []LogicalPlan{
  2528. DataSourcePlan{
  2529. baseLogicalPlan: baseLogicalPlan{},
  2530. name: "src1",
  2531. streamFields: map[string]*ast.JsonStreamField{
  2532. "name": nil,
  2533. },
  2534. isSchemaless: true,
  2535. streamStmt: streams["src1"],
  2536. metaFields: []string{},
  2537. }.Init(),
  2538. },
  2539. },
  2540. fields: []ast.Field{
  2541. {
  2542. Expr: &ast.BinaryExpr{
  2543. OP: ast.ARROW,
  2544. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2545. RHS: &ast.JsonFieldRef{Name: "first"},
  2546. },
  2547. Name: "kuiper_field_0",
  2548. AName: "",
  2549. }, {
  2550. Expr: &ast.BinaryExpr{
  2551. OP: ast.ARROW,
  2552. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2553. RHS: &ast.JsonFieldRef{Name: "last"},
  2554. },
  2555. Name: "kuiper_field_1",
  2556. AName: "",
  2557. },
  2558. },
  2559. isAggregate: false,
  2560. sendMeta: false,
  2561. }.Init(),
  2562. },
  2563. }
  2564. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2565. for i, tt := range tests {
  2566. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2567. if err != nil {
  2568. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2569. continue
  2570. }
  2571. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2572. IsEventTime: false,
  2573. LateTol: 0,
  2574. Concurrency: 0,
  2575. BufferLength: 0,
  2576. SendMetaToSink: false,
  2577. Qos: 0,
  2578. CheckpointInterval: 0,
  2579. SendError: true,
  2580. }, store)
  2581. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2582. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2583. } else if !reflect.DeepEqual(tt.p, p) {
  2584. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2585. }
  2586. }
  2587. }
  2588. func Test_createLogicalPlan4Lookup(t *testing.T) {
  2589. store, err := store.GetKV("stream")
  2590. if err != nil {
  2591. t.Error(err)
  2592. return
  2593. }
  2594. streamSqls := map[string]string{
  2595. "src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  2596. "table1": `CREATE TABLE table1 () WITH (DATASOURCE="table1",TYPE="sql", KIND="lookup");`,
  2597. "table2": `CREATE TABLE table2 () WITH (DATASOURCE="table2",TYPE="sql", KIND="lookup");`,
  2598. }
  2599. types := map[string]ast.StreamType{
  2600. "src1": ast.TypeStream,
  2601. "table1": ast.TypeTable,
  2602. "table2": ast.TypeTable,
  2603. }
  2604. for name, sql := range streamSqls {
  2605. s, err := json.Marshal(&xsql.StreamInfo{
  2606. StreamType: types[name],
  2607. Statement: sql,
  2608. })
  2609. if err != nil {
  2610. t.Error(err)
  2611. t.Fail()
  2612. }
  2613. err = store.Set(name, string(s))
  2614. if err != nil {
  2615. t.Error(err)
  2616. t.Fail()
  2617. }
  2618. }
  2619. streams := make(map[string]*ast.StreamStmt)
  2620. for n := range streamSqls {
  2621. streamStmt, err := xsql.GetDataSource(store, n)
  2622. if err != nil {
  2623. t.Errorf("fail to get stream %s, please check if stream is created", n)
  2624. return
  2625. }
  2626. streams[n] = streamStmt
  2627. }
  2628. var tests = []struct {
  2629. sql string
  2630. p LogicalPlan
  2631. err string
  2632. }{
  2633. { // 0
  2634. sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id`,
  2635. p: ProjectPlan{
  2636. baseLogicalPlan: baseLogicalPlan{
  2637. children: []LogicalPlan{
  2638. LookupPlan{
  2639. baseLogicalPlan: baseLogicalPlan{
  2640. children: []LogicalPlan{
  2641. DataSourcePlan{
  2642. baseLogicalPlan: baseLogicalPlan{},
  2643. name: "src1",
  2644. streamFields: map[string]*ast.JsonStreamField{
  2645. "a": nil,
  2646. },
  2647. isSchemaless: true,
  2648. streamStmt: streams["src1"],
  2649. metaFields: []string{},
  2650. }.Init(),
  2651. },
  2652. },
  2653. joinExpr: ast.Join{
  2654. Name: "table1",
  2655. Alias: "",
  2656. JoinType: ast.INNER_JOIN,
  2657. Expr: &ast.BinaryExpr{
  2658. OP: ast.EQ,
  2659. LHS: &ast.FieldRef{
  2660. StreamName: "src1",
  2661. Name: "id",
  2662. },
  2663. RHS: &ast.FieldRef{
  2664. StreamName: "table1",
  2665. Name: "id",
  2666. },
  2667. },
  2668. },
  2669. keys: []string{"id"},
  2670. fields: []string{"b"},
  2671. valvars: []ast.Expr{
  2672. &ast.FieldRef{
  2673. StreamName: "src1",
  2674. Name: "id",
  2675. },
  2676. },
  2677. options: &ast.Options{
  2678. DATASOURCE: "table1",
  2679. TYPE: "sql",
  2680. KIND: "lookup",
  2681. },
  2682. conditions: nil,
  2683. }.Init(),
  2684. },
  2685. },
  2686. fields: []ast.Field{
  2687. {
  2688. Expr: &ast.FieldRef{
  2689. StreamName: "src1",
  2690. Name: "a",
  2691. },
  2692. Name: "a",
  2693. AName: "",
  2694. },
  2695. {
  2696. Expr: &ast.FieldRef{
  2697. StreamName: "table1",
  2698. Name: "b",
  2699. },
  2700. Name: "b",
  2701. AName: "",
  2702. },
  2703. },
  2704. isAggregate: false,
  2705. sendMeta: false,
  2706. }.Init(),
  2707. },
  2708. { // 1
  2709. sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
  2710. p: ProjectPlan{
  2711. baseLogicalPlan: baseLogicalPlan{
  2712. children: []LogicalPlan{
  2713. FilterPlan{
  2714. baseLogicalPlan: baseLogicalPlan{
  2715. children: []LogicalPlan{
  2716. LookupPlan{
  2717. baseLogicalPlan: baseLogicalPlan{
  2718. children: []LogicalPlan{
  2719. FilterPlan{
  2720. baseLogicalPlan: baseLogicalPlan{
  2721. children: []LogicalPlan{
  2722. DataSourcePlan{
  2723. baseLogicalPlan: baseLogicalPlan{},
  2724. name: "src1",
  2725. streamFields: map[string]*ast.JsonStreamField{
  2726. "a": nil,
  2727. },
  2728. isSchemaless: true,
  2729. streamStmt: streams["src1"],
  2730. metaFields: []string{},
  2731. }.Init(),
  2732. },
  2733. },
  2734. condition: &ast.BinaryExpr{
  2735. OP: ast.LT,
  2736. LHS: &ast.FieldRef{
  2737. StreamName: "src1",
  2738. Name: "c",
  2739. },
  2740. RHS: &ast.IntegerLiteral{Val: 40},
  2741. },
  2742. }.Init(),
  2743. },
  2744. },
  2745. joinExpr: ast.Join{
  2746. Name: "table1",
  2747. Alias: "",
  2748. JoinType: ast.INNER_JOIN,
  2749. Expr: &ast.BinaryExpr{
  2750. OP: ast.AND,
  2751. RHS: &ast.BinaryExpr{
  2752. OP: ast.EQ,
  2753. LHS: &ast.FieldRef{
  2754. StreamName: "src1",
  2755. Name: "id",
  2756. },
  2757. RHS: &ast.FieldRef{
  2758. StreamName: "table1",
  2759. Name: "id",
  2760. },
  2761. },
  2762. LHS: &ast.BinaryExpr{
  2763. OP: ast.AND,
  2764. LHS: &ast.BinaryExpr{
  2765. OP: ast.GT,
  2766. LHS: &ast.FieldRef{
  2767. StreamName: "table1",
  2768. Name: "b",
  2769. },
  2770. RHS: &ast.IntegerLiteral{Val: 20},
  2771. },
  2772. RHS: &ast.BinaryExpr{
  2773. OP: ast.LT,
  2774. LHS: &ast.FieldRef{
  2775. StreamName: "src1",
  2776. Name: "c",
  2777. },
  2778. RHS: &ast.IntegerLiteral{Val: 40},
  2779. },
  2780. },
  2781. },
  2782. },
  2783. keys: []string{"id"},
  2784. valvars: []ast.Expr{
  2785. &ast.FieldRef{
  2786. StreamName: "src1",
  2787. Name: "id",
  2788. },
  2789. },
  2790. options: &ast.Options{
  2791. DATASOURCE: "table1",
  2792. TYPE: "sql",
  2793. KIND: "lookup",
  2794. },
  2795. conditions: &ast.BinaryExpr{
  2796. OP: ast.AND,
  2797. LHS: &ast.BinaryExpr{
  2798. OP: ast.GT,
  2799. LHS: &ast.FieldRef{
  2800. StreamName: "table1",
  2801. Name: "b",
  2802. },
  2803. RHS: &ast.IntegerLiteral{Val: 20},
  2804. },
  2805. RHS: &ast.BinaryExpr{
  2806. OP: ast.LT,
  2807. LHS: &ast.FieldRef{
  2808. StreamName: "src1",
  2809. Name: "c",
  2810. },
  2811. RHS: &ast.IntegerLiteral{Val: 40},
  2812. },
  2813. },
  2814. }.Init(),
  2815. },
  2816. },
  2817. condition: &ast.BinaryExpr{
  2818. OP: ast.GT,
  2819. LHS: &ast.FieldRef{
  2820. StreamName: "table1",
  2821. Name: "b",
  2822. },
  2823. RHS: &ast.IntegerLiteral{Val: 20},
  2824. },
  2825. }.Init(),
  2826. },
  2827. },
  2828. fields: []ast.Field{
  2829. {
  2830. Expr: &ast.FieldRef{
  2831. StreamName: "src1",
  2832. Name: "a",
  2833. },
  2834. Name: "a",
  2835. AName: "",
  2836. },
  2837. {
  2838. Expr: &ast.FieldRef{
  2839. StreamName: "table1",
  2840. Name: "*",
  2841. },
  2842. Name: "*",
  2843. AName: "",
  2844. },
  2845. },
  2846. isAggregate: false,
  2847. sendMeta: false,
  2848. }.Init(),
  2849. },
  2850. { // 2
  2851. sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
  2852. p: ProjectPlan{
  2853. baseLogicalPlan: baseLogicalPlan{
  2854. children: []LogicalPlan{
  2855. LookupPlan{
  2856. baseLogicalPlan: baseLogicalPlan{
  2857. children: []LogicalPlan{
  2858. LookupPlan{
  2859. baseLogicalPlan: baseLogicalPlan{
  2860. children: []LogicalPlan{
  2861. DataSourcePlan{
  2862. baseLogicalPlan: baseLogicalPlan{},
  2863. name: "src1",
  2864. streamFields: map[string]*ast.JsonStreamField{
  2865. "a": nil,
  2866. },
  2867. isSchemaless: true,
  2868. streamStmt: streams["src1"],
  2869. metaFields: []string{},
  2870. }.Init(),
  2871. },
  2872. },
  2873. joinExpr: ast.Join{
  2874. Name: "table1",
  2875. Alias: "",
  2876. JoinType: ast.INNER_JOIN,
  2877. Expr: &ast.BinaryExpr{
  2878. OP: ast.EQ,
  2879. LHS: &ast.FieldRef{
  2880. StreamName: "src1",
  2881. Name: "id",
  2882. },
  2883. RHS: &ast.FieldRef{
  2884. StreamName: "table1",
  2885. Name: "id",
  2886. },
  2887. },
  2888. },
  2889. keys: []string{"id"},
  2890. fields: []string{"b"},
  2891. valvars: []ast.Expr{
  2892. &ast.FieldRef{
  2893. StreamName: "src1",
  2894. Name: "id",
  2895. },
  2896. },
  2897. options: &ast.Options{
  2898. DATASOURCE: "table1",
  2899. TYPE: "sql",
  2900. KIND: "lookup",
  2901. },
  2902. conditions: nil,
  2903. }.Init(),
  2904. },
  2905. },
  2906. joinExpr: ast.Join{
  2907. Name: "table2",
  2908. Alias: "",
  2909. JoinType: ast.INNER_JOIN,
  2910. Expr: &ast.BinaryExpr{
  2911. OP: ast.EQ,
  2912. LHS: &ast.FieldRef{
  2913. StreamName: "table1",
  2914. Name: "id",
  2915. },
  2916. RHS: &ast.FieldRef{
  2917. StreamName: "table2",
  2918. Name: "id",
  2919. },
  2920. },
  2921. },
  2922. keys: []string{"id"},
  2923. fields: []string{"c"},
  2924. valvars: []ast.Expr{
  2925. &ast.FieldRef{
  2926. StreamName: "table1",
  2927. Name: "id",
  2928. },
  2929. },
  2930. options: &ast.Options{
  2931. DATASOURCE: "table2",
  2932. TYPE: "sql",
  2933. KIND: "lookup",
  2934. },
  2935. }.Init(),
  2936. },
  2937. },
  2938. fields: []ast.Field{
  2939. {
  2940. Expr: &ast.FieldRef{
  2941. StreamName: "src1",
  2942. Name: "a",
  2943. },
  2944. Name: "a",
  2945. AName: "",
  2946. },
  2947. {
  2948. Expr: &ast.FieldRef{
  2949. StreamName: "table1",
  2950. Name: "b",
  2951. },
  2952. Name: "b",
  2953. AName: "",
  2954. },
  2955. {
  2956. Expr: &ast.FieldRef{
  2957. StreamName: "table2",
  2958. Name: "c",
  2959. },
  2960. Name: "c",
  2961. AName: "",
  2962. },
  2963. },
  2964. isAggregate: false,
  2965. sendMeta: false,
  2966. }.Init(),
  2967. },
  2968. { // 3
  2969. sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2970. p: ProjectPlan{
  2971. baseLogicalPlan: baseLogicalPlan{
  2972. children: []LogicalPlan{
  2973. LookupPlan{
  2974. baseLogicalPlan: baseLogicalPlan{
  2975. children: []LogicalPlan{
  2976. WindowPlan{
  2977. baseLogicalPlan: baseLogicalPlan{
  2978. children: []LogicalPlan{
  2979. DataSourcePlan{
  2980. baseLogicalPlan: baseLogicalPlan{},
  2981. name: "src1",
  2982. streamStmt: streams["src1"],
  2983. streamFields: map[string]*ast.JsonStreamField{},
  2984. metaFields: []string{},
  2985. isWildCard: true,
  2986. isSchemaless: true,
  2987. }.Init(),
  2988. },
  2989. },
  2990. condition: nil,
  2991. wtype: ast.TUMBLING_WINDOW,
  2992. length: 10000,
  2993. interval: 0,
  2994. limit: 0,
  2995. }.Init(),
  2996. },
  2997. },
  2998. joinExpr: ast.Join{
  2999. Name: "table1",
  3000. Alias: "",
  3001. JoinType: ast.INNER_JOIN,
  3002. Expr: &ast.BinaryExpr{
  3003. OP: ast.EQ,
  3004. LHS: &ast.FieldRef{
  3005. StreamName: "src1",
  3006. Name: "id",
  3007. },
  3008. RHS: &ast.FieldRef{
  3009. StreamName: "table1",
  3010. Name: "id",
  3011. },
  3012. },
  3013. },
  3014. keys: []string{"id"},
  3015. valvars: []ast.Expr{
  3016. &ast.FieldRef{
  3017. StreamName: "src1",
  3018. Name: "id",
  3019. },
  3020. },
  3021. options: &ast.Options{
  3022. DATASOURCE: "table1",
  3023. TYPE: "sql",
  3024. KIND: "lookup",
  3025. },
  3026. conditions: nil,
  3027. }.Init(),
  3028. },
  3029. },
  3030. fields: []ast.Field{
  3031. {
  3032. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  3033. Name: "*",
  3034. AName: "",
  3035. },
  3036. },
  3037. isAggregate: false,
  3038. sendMeta: false,
  3039. }.Init(),
  3040. },
  3041. }
  3042. for i, tt := range tests {
  3043. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  3044. if err != nil {
  3045. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  3046. continue
  3047. }
  3048. p, err := createLogicalPlan(stmt, &api.RuleOption{
  3049. IsEventTime: false,
  3050. LateTol: 0,
  3051. Concurrency: 0,
  3052. BufferLength: 0,
  3053. SendMetaToSink: false,
  3054. Qos: 0,
  3055. CheckpointInterval: 0,
  3056. SendError: true,
  3057. }, store)
  3058. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  3059. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  3060. } else if !reflect.DeepEqual(tt.p, p) {
  3061. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  3062. }
  3063. }
  3064. }