planner_test.go 76 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/internal/testx"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func init() {
  29. testx.InitEnv()
  30. }
  31. func Test_createLogicalPlan(t *testing.T) {
  32. err, store := store.GetKV("stream")
  33. if err != nil {
  34. t.Error(err)
  35. return
  36. }
  37. streamSqls := map[string]string{
  38. "src1": `CREATE STREAM src1 (
  39. id1 BIGINT,
  40. temp BIGINT,
  41. name string,
  42. myarray array(string)
  43. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  44. "src2": `CREATE STREAM src2 (
  45. id2 BIGINT,
  46. hum BIGINT
  47. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  48. "tableInPlanner": `CREATE TABLE tableInPlanner (
  49. id BIGINT,
  50. name STRING,
  51. value STRING,
  52. hum BIGINT
  53. ) WITH (TYPE="file");`,
  54. }
  55. types := map[string]ast.StreamType{
  56. "src1": ast.TypeStream,
  57. "src2": ast.TypeStream,
  58. "tableInPlanner": ast.TypeTable,
  59. }
  60. for name, sql := range streamSqls {
  61. s, err := json.Marshal(&xsql.StreamInfo{
  62. StreamType: types[name],
  63. Statement: sql,
  64. })
  65. if err != nil {
  66. t.Error(err)
  67. t.Fail()
  68. }
  69. err = store.Set(name, string(s))
  70. if err != nil {
  71. t.Error(err)
  72. t.Fail()
  73. }
  74. }
  75. streams := make(map[string]*ast.StreamStmt)
  76. for n := range streamSqls {
  77. streamStmt, err := xsql.GetDataSource(store, n)
  78. if err != nil {
  79. t.Errorf("fail to get stream %s, please check if stream is created", n)
  80. return
  81. }
  82. streams[n] = streamStmt
  83. }
  84. var (
  85. //boolTrue = true
  86. boolFalse = false
  87. )
  88. var tests = []struct {
  89. sql string
  90. p LogicalPlan
  91. err string
  92. }{
  93. { // 0
  94. sql: `SELECT myarray[temp] FROM src1`,
  95. p: ProjectPlan{
  96. baseLogicalPlan: baseLogicalPlan{
  97. children: []LogicalPlan{
  98. DataSourcePlan{
  99. baseLogicalPlan: baseLogicalPlan{},
  100. name: "src1",
  101. streamFields: []interface{}{
  102. &ast.StreamField{
  103. Name: "myarray",
  104. FieldType: &ast.ArrayType{Type: ast.STRINGS},
  105. },
  106. &ast.StreamField{
  107. Name: "temp",
  108. FieldType: &ast.BasicType{Type: ast.BIGINT},
  109. },
  110. },
  111. streamStmt: streams["src1"],
  112. metaFields: []string{},
  113. }.Init(),
  114. },
  115. },
  116. fields: []ast.Field{
  117. {
  118. Expr: &ast.BinaryExpr{
  119. OP: ast.SUBSET,
  120. LHS: &ast.FieldRef{
  121. StreamName: "src1",
  122. Name: "myarray",
  123. },
  124. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  125. StreamName: "src1",
  126. Name: "temp",
  127. }},
  128. },
  129. Name: "kuiper_field_0",
  130. AName: "",
  131. },
  132. },
  133. isAggregate: false,
  134. sendMeta: false,
  135. }.Init(),
  136. }, { // 1 optimize where to data source
  137. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  138. p: ProjectPlan{
  139. baseLogicalPlan: baseLogicalPlan{
  140. children: []LogicalPlan{
  141. WindowPlan{
  142. baseLogicalPlan: baseLogicalPlan{
  143. children: []LogicalPlan{
  144. FilterPlan{
  145. baseLogicalPlan: baseLogicalPlan{
  146. children: []LogicalPlan{
  147. DataSourcePlan{
  148. name: "src1",
  149. streamFields: []interface{}{
  150. &ast.StreamField{
  151. Name: "name",
  152. FieldType: &ast.BasicType{Type: ast.STRINGS},
  153. },
  154. &ast.StreamField{
  155. Name: "temp",
  156. FieldType: &ast.BasicType{Type: ast.BIGINT},
  157. },
  158. },
  159. streamStmt: streams["src1"],
  160. metaFields: []string{},
  161. }.Init(),
  162. },
  163. },
  164. condition: &ast.BinaryExpr{
  165. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  166. OP: ast.EQ,
  167. RHS: &ast.StringLiteral{Val: "v1"},
  168. },
  169. }.Init(),
  170. },
  171. },
  172. condition: nil,
  173. wtype: ast.TUMBLING_WINDOW,
  174. length: 10000,
  175. interval: 0,
  176. limit: 0,
  177. }.Init(),
  178. },
  179. },
  180. fields: []ast.Field{
  181. {
  182. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  183. Name: "temp",
  184. AName: ""},
  185. },
  186. isAggregate: false,
  187. sendMeta: false,
  188. }.Init(),
  189. }, { // 2 condition that cannot be optimized
  190. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  191. p: ProjectPlan{
  192. baseLogicalPlan: baseLogicalPlan{
  193. children: []LogicalPlan{
  194. JoinPlan{
  195. baseLogicalPlan: baseLogicalPlan{
  196. children: []LogicalPlan{
  197. WindowPlan{
  198. baseLogicalPlan: baseLogicalPlan{
  199. children: []LogicalPlan{
  200. DataSourcePlan{
  201. name: "src1",
  202. streamFields: []interface{}{
  203. &ast.StreamField{
  204. Name: "id1",
  205. FieldType: &ast.BasicType{Type: ast.BIGINT},
  206. },
  207. &ast.StreamField{
  208. Name: "temp",
  209. FieldType: &ast.BasicType{Type: ast.BIGINT},
  210. },
  211. },
  212. streamStmt: streams["src1"],
  213. metaFields: []string{},
  214. }.Init(),
  215. DataSourcePlan{
  216. name: "src2",
  217. streamFields: []interface{}{
  218. &ast.StreamField{
  219. Name: "hum",
  220. FieldType: &ast.BasicType{Type: ast.BIGINT},
  221. },
  222. &ast.StreamField{
  223. Name: "id2",
  224. FieldType: &ast.BasicType{Type: ast.BIGINT},
  225. },
  226. },
  227. streamStmt: streams["src2"],
  228. metaFields: []string{},
  229. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  230. }.Init(),
  231. },
  232. },
  233. condition: nil,
  234. wtype: ast.TUMBLING_WINDOW,
  235. length: 10000,
  236. interval: 0,
  237. limit: 0,
  238. }.Init(),
  239. },
  240. },
  241. from: &ast.Table{Name: "src1"},
  242. joins: ast.Joins{ast.Join{
  243. Name: "src2",
  244. JoinType: ast.INNER_JOIN,
  245. Expr: &ast.BinaryExpr{
  246. OP: ast.AND,
  247. LHS: &ast.BinaryExpr{
  248. LHS: &ast.BinaryExpr{
  249. OP: ast.GT,
  250. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  251. RHS: &ast.IntegerLiteral{Val: 20},
  252. },
  253. OP: ast.OR,
  254. RHS: &ast.BinaryExpr{
  255. OP: ast.GT,
  256. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  257. RHS: &ast.IntegerLiteral{Val: 60},
  258. },
  259. },
  260. RHS: &ast.BinaryExpr{
  261. OP: ast.EQ,
  262. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  263. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  264. },
  265. },
  266. }},
  267. }.Init(),
  268. },
  269. },
  270. fields: []ast.Field{
  271. {
  272. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  273. Name: "id1",
  274. AName: ""},
  275. },
  276. isAggregate: false,
  277. sendMeta: false,
  278. }.Init(),
  279. }, { // 3 optimize window filter
  280. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  281. p: ProjectPlan{
  282. baseLogicalPlan: baseLogicalPlan{
  283. children: []LogicalPlan{
  284. WindowPlan{
  285. baseLogicalPlan: baseLogicalPlan{
  286. children: []LogicalPlan{
  287. FilterPlan{
  288. baseLogicalPlan: baseLogicalPlan{
  289. children: []LogicalPlan{
  290. DataSourcePlan{
  291. name: "src1",
  292. streamFields: []interface{}{
  293. &ast.StreamField{
  294. Name: "id1",
  295. FieldType: &ast.BasicType{Type: ast.BIGINT},
  296. },
  297. &ast.StreamField{
  298. Name: "name",
  299. FieldType: &ast.BasicType{Type: ast.STRINGS},
  300. },
  301. &ast.StreamField{
  302. Name: "temp",
  303. FieldType: &ast.BasicType{Type: ast.BIGINT},
  304. },
  305. },
  306. streamStmt: streams["src1"],
  307. metaFields: []string{},
  308. }.Init(),
  309. },
  310. },
  311. condition: &ast.BinaryExpr{
  312. OP: ast.AND,
  313. LHS: &ast.BinaryExpr{
  314. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  315. OP: ast.EQ,
  316. RHS: &ast.StringLiteral{Val: "v1"},
  317. },
  318. RHS: &ast.BinaryExpr{
  319. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  320. OP: ast.GT,
  321. RHS: &ast.IntegerLiteral{Val: 2},
  322. },
  323. },
  324. }.Init(),
  325. },
  326. },
  327. condition: nil,
  328. wtype: ast.TUMBLING_WINDOW,
  329. length: 10000,
  330. interval: 0,
  331. limit: 0,
  332. }.Init(),
  333. },
  334. },
  335. fields: []ast.Field{
  336. {
  337. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  338. Name: "id1",
  339. AName: ""},
  340. },
  341. isAggregate: false,
  342. sendMeta: false,
  343. }.Init(),
  344. }, { // 4. do not optimize count window
  345. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  346. p: ProjectPlan{
  347. baseLogicalPlan: baseLogicalPlan{
  348. children: []LogicalPlan{
  349. HavingPlan{
  350. baseLogicalPlan: baseLogicalPlan{
  351. children: []LogicalPlan{
  352. FilterPlan{
  353. baseLogicalPlan: baseLogicalPlan{
  354. children: []LogicalPlan{
  355. WindowPlan{
  356. baseLogicalPlan: baseLogicalPlan{
  357. children: []LogicalPlan{
  358. DataSourcePlan{
  359. name: "src1",
  360. isWildCard: true,
  361. streamFields: []interface{}{
  362. &ast.StreamField{
  363. Name: "id1",
  364. FieldType: &ast.BasicType{Type: ast.BIGINT},
  365. },
  366. &ast.StreamField{
  367. Name: "temp",
  368. FieldType: &ast.BasicType{Type: ast.BIGINT},
  369. },
  370. &ast.StreamField{
  371. Name: "name",
  372. FieldType: &ast.BasicType{Type: ast.STRINGS},
  373. },
  374. &ast.StreamField{
  375. Name: "myarray",
  376. FieldType: &ast.ArrayType{Type: ast.STRINGS},
  377. },
  378. },
  379. streamStmt: streams["src1"],
  380. metaFields: []string{},
  381. }.Init(),
  382. },
  383. },
  384. condition: nil,
  385. wtype: ast.COUNT_WINDOW,
  386. length: 5,
  387. interval: 1,
  388. limit: 0,
  389. }.Init(),
  390. },
  391. },
  392. condition: &ast.BinaryExpr{
  393. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  394. OP: ast.GT,
  395. RHS: &ast.IntegerLiteral{Val: 20},
  396. },
  397. }.Init(),
  398. },
  399. },
  400. condition: &ast.BinaryExpr{
  401. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  402. Token: ast.ASTERISK,
  403. }}, FuncType: ast.FuncTypeAgg},
  404. OP: ast.GT,
  405. RHS: &ast.IntegerLiteral{Val: 2},
  406. },
  407. }.Init(),
  408. },
  409. },
  410. fields: []ast.Field{
  411. {
  412. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  413. Name: "*",
  414. AName: ""},
  415. },
  416. isAggregate: false,
  417. sendMeta: false,
  418. }.Init(),
  419. }, { // 5. optimize join on
  420. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  421. p: ProjectPlan{
  422. baseLogicalPlan: baseLogicalPlan{
  423. children: []LogicalPlan{
  424. JoinPlan{
  425. baseLogicalPlan: baseLogicalPlan{
  426. children: []LogicalPlan{
  427. WindowPlan{
  428. baseLogicalPlan: baseLogicalPlan{
  429. children: []LogicalPlan{
  430. FilterPlan{
  431. baseLogicalPlan: baseLogicalPlan{
  432. children: []LogicalPlan{
  433. DataSourcePlan{
  434. name: "src1",
  435. streamFields: []interface{}{
  436. &ast.StreamField{
  437. Name: "id1",
  438. FieldType: &ast.BasicType{Type: ast.BIGINT},
  439. },
  440. &ast.StreamField{
  441. Name: "temp",
  442. FieldType: &ast.BasicType{Type: ast.BIGINT},
  443. },
  444. },
  445. streamStmt: streams["src1"],
  446. metaFields: []string{},
  447. }.Init(),
  448. },
  449. },
  450. condition: &ast.BinaryExpr{
  451. RHS: &ast.BinaryExpr{
  452. OP: ast.GT,
  453. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  454. RHS: &ast.IntegerLiteral{Val: 20},
  455. },
  456. OP: ast.AND,
  457. LHS: &ast.BinaryExpr{
  458. OP: ast.GT,
  459. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  460. RHS: &ast.IntegerLiteral{Val: 111},
  461. },
  462. },
  463. }.Init(),
  464. FilterPlan{
  465. baseLogicalPlan: baseLogicalPlan{
  466. children: []LogicalPlan{
  467. DataSourcePlan{
  468. name: "src2",
  469. streamFields: []interface{}{
  470. &ast.StreamField{
  471. Name: "hum",
  472. FieldType: &ast.BasicType{Type: ast.BIGINT},
  473. },
  474. &ast.StreamField{
  475. Name: "id2",
  476. FieldType: &ast.BasicType{Type: ast.BIGINT},
  477. },
  478. },
  479. streamStmt: streams["src2"],
  480. metaFields: []string{},
  481. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  482. }.Init(),
  483. },
  484. },
  485. condition: &ast.BinaryExpr{
  486. OP: ast.LT,
  487. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  488. RHS: &ast.IntegerLiteral{Val: 60},
  489. },
  490. }.Init(),
  491. },
  492. },
  493. condition: nil,
  494. wtype: ast.TUMBLING_WINDOW,
  495. length: 10000,
  496. interval: 0,
  497. limit: 0,
  498. }.Init(),
  499. },
  500. },
  501. from: &ast.Table{
  502. Name: "src1",
  503. },
  504. joins: []ast.Join{
  505. {
  506. Name: "src2",
  507. Alias: "",
  508. JoinType: ast.INNER_JOIN,
  509. Expr: &ast.BinaryExpr{
  510. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  511. OP: ast.EQ,
  512. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  513. },
  514. },
  515. },
  516. }.Init(),
  517. },
  518. },
  519. fields: []ast.Field{
  520. {
  521. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  522. Name: "id1",
  523. AName: ""},
  524. },
  525. isAggregate: false,
  526. sendMeta: false,
  527. }.Init(),
  528. }, { // 6. optimize outter join on
  529. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  530. p: ProjectPlan{
  531. baseLogicalPlan: baseLogicalPlan{
  532. children: []LogicalPlan{
  533. JoinPlan{
  534. baseLogicalPlan: baseLogicalPlan{
  535. children: []LogicalPlan{
  536. WindowPlan{
  537. baseLogicalPlan: baseLogicalPlan{
  538. children: []LogicalPlan{
  539. FilterPlan{
  540. baseLogicalPlan: baseLogicalPlan{
  541. children: []LogicalPlan{
  542. DataSourcePlan{
  543. name: "src1",
  544. streamFields: []interface{}{
  545. &ast.StreamField{
  546. Name: "id1",
  547. FieldType: &ast.BasicType{Type: ast.BIGINT},
  548. },
  549. &ast.StreamField{
  550. Name: "temp",
  551. FieldType: &ast.BasicType{Type: ast.BIGINT},
  552. },
  553. },
  554. streamStmt: streams["src1"],
  555. metaFields: []string{},
  556. }.Init(),
  557. },
  558. },
  559. condition: &ast.BinaryExpr{
  560. OP: ast.GT,
  561. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  562. RHS: &ast.IntegerLiteral{Val: 111},
  563. },
  564. }.Init(),
  565. DataSourcePlan{
  566. name: "src2",
  567. streamFields: []interface{}{
  568. &ast.StreamField{
  569. Name: "hum",
  570. FieldType: &ast.BasicType{Type: ast.BIGINT},
  571. },
  572. &ast.StreamField{
  573. Name: "id2",
  574. FieldType: &ast.BasicType{Type: ast.BIGINT},
  575. },
  576. },
  577. streamStmt: streams["src2"],
  578. metaFields: []string{},
  579. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  580. }.Init(),
  581. },
  582. },
  583. condition: nil,
  584. wtype: ast.TUMBLING_WINDOW,
  585. length: 10000,
  586. interval: 0,
  587. limit: 0,
  588. }.Init(),
  589. },
  590. },
  591. from: &ast.Table{
  592. Name: "src1",
  593. },
  594. joins: []ast.Join{
  595. {
  596. Name: "src2",
  597. Alias: "",
  598. JoinType: ast.FULL_JOIN,
  599. Expr: &ast.BinaryExpr{
  600. OP: ast.AND,
  601. LHS: &ast.BinaryExpr{
  602. OP: ast.AND,
  603. LHS: &ast.BinaryExpr{
  604. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  605. OP: ast.EQ,
  606. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  607. },
  608. RHS: &ast.BinaryExpr{
  609. OP: ast.GT,
  610. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  611. RHS: &ast.IntegerLiteral{Val: 20},
  612. },
  613. },
  614. RHS: &ast.BinaryExpr{
  615. OP: ast.LT,
  616. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  617. RHS: &ast.IntegerLiteral{Val: 60},
  618. },
  619. },
  620. },
  621. },
  622. }.Init(),
  623. },
  624. },
  625. fields: []ast.Field{
  626. {
  627. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  628. Name: "id1",
  629. AName: ""},
  630. },
  631. isAggregate: false,
  632. sendMeta: false,
  633. }.Init(),
  634. }, { // 7 window error for table
  635. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  636. p: nil,
  637. err: "cannot run window for TABLE sources",
  638. }, { // 8 join table without window
  639. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  640. p: ProjectPlan{
  641. baseLogicalPlan: baseLogicalPlan{
  642. children: []LogicalPlan{
  643. JoinPlan{
  644. baseLogicalPlan: baseLogicalPlan{
  645. children: []LogicalPlan{
  646. JoinAlignPlan{
  647. baseLogicalPlan: baseLogicalPlan{
  648. children: []LogicalPlan{
  649. FilterPlan{
  650. baseLogicalPlan: baseLogicalPlan{
  651. children: []LogicalPlan{
  652. DataSourcePlan{
  653. name: "src1",
  654. streamFields: []interface{}{
  655. &ast.StreamField{
  656. Name: "id1",
  657. FieldType: &ast.BasicType{Type: ast.BIGINT},
  658. },
  659. &ast.StreamField{
  660. Name: "temp",
  661. FieldType: &ast.BasicType{Type: ast.BIGINT},
  662. },
  663. },
  664. streamStmt: streams["src1"],
  665. metaFields: []string{},
  666. }.Init(),
  667. },
  668. },
  669. condition: &ast.BinaryExpr{
  670. RHS: &ast.BinaryExpr{
  671. OP: ast.GT,
  672. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  673. RHS: &ast.IntegerLiteral{Val: 20},
  674. },
  675. OP: ast.AND,
  676. LHS: &ast.BinaryExpr{
  677. OP: ast.GT,
  678. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  679. RHS: &ast.IntegerLiteral{Val: 111},
  680. },
  681. },
  682. }.Init(),
  683. DataSourcePlan{
  684. name: "tableInPlanner",
  685. streamFields: []interface{}{
  686. &ast.StreamField{
  687. Name: "hum",
  688. FieldType: &ast.BasicType{Type: ast.BIGINT},
  689. },
  690. &ast.StreamField{
  691. Name: "id",
  692. FieldType: &ast.BasicType{Type: ast.BIGINT},
  693. },
  694. },
  695. streamStmt: streams["tableInPlanner"],
  696. metaFields: []string{},
  697. }.Init(),
  698. },
  699. },
  700. Emitters: []string{"tableInPlanner"},
  701. }.Init(),
  702. },
  703. },
  704. from: &ast.Table{
  705. Name: "src1",
  706. },
  707. joins: []ast.Join{
  708. {
  709. Name: "tableInPlanner",
  710. Alias: "",
  711. JoinType: ast.INNER_JOIN,
  712. Expr: &ast.BinaryExpr{
  713. OP: ast.AND,
  714. LHS: &ast.BinaryExpr{
  715. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  716. OP: ast.EQ,
  717. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  718. },
  719. RHS: &ast.BinaryExpr{
  720. OP: ast.LT,
  721. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  722. RHS: &ast.IntegerLiteral{Val: 60},
  723. },
  724. },
  725. },
  726. },
  727. }.Init(),
  728. },
  729. },
  730. fields: []ast.Field{
  731. {
  732. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  733. Name: "id1",
  734. AName: ""},
  735. },
  736. isAggregate: false,
  737. sendMeta: false,
  738. }.Init(),
  739. }, { // 9 join table with window
  740. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  741. p: ProjectPlan{
  742. baseLogicalPlan: baseLogicalPlan{
  743. children: []LogicalPlan{
  744. JoinPlan{
  745. baseLogicalPlan: baseLogicalPlan{
  746. children: []LogicalPlan{
  747. JoinAlignPlan{
  748. baseLogicalPlan: baseLogicalPlan{
  749. children: []LogicalPlan{
  750. WindowPlan{
  751. baseLogicalPlan: baseLogicalPlan{
  752. children: []LogicalPlan{
  753. DataSourcePlan{
  754. name: "src1",
  755. streamFields: []interface{}{
  756. &ast.StreamField{
  757. Name: "id1",
  758. FieldType: &ast.BasicType{Type: ast.BIGINT},
  759. },
  760. &ast.StreamField{
  761. Name: "temp",
  762. FieldType: &ast.BasicType{Type: ast.BIGINT},
  763. },
  764. },
  765. streamStmt: streams["src1"],
  766. metaFields: []string{},
  767. }.Init(),
  768. },
  769. },
  770. condition: nil,
  771. wtype: ast.TUMBLING_WINDOW,
  772. length: 10000,
  773. interval: 0,
  774. limit: 0,
  775. }.Init(),
  776. DataSourcePlan{
  777. name: "tableInPlanner",
  778. streamFields: []interface{}{
  779. &ast.StreamField{
  780. Name: "hum",
  781. FieldType: &ast.BasicType{Type: ast.BIGINT},
  782. },
  783. &ast.StreamField{
  784. Name: "id",
  785. FieldType: &ast.BasicType{Type: ast.BIGINT},
  786. },
  787. },
  788. streamStmt: streams["tableInPlanner"],
  789. metaFields: []string{},
  790. }.Init(),
  791. },
  792. },
  793. Emitters: []string{"tableInPlanner"},
  794. }.Init(),
  795. },
  796. },
  797. from: &ast.Table{
  798. Name: "src1",
  799. },
  800. joins: []ast.Join{
  801. {
  802. Name: "tableInPlanner",
  803. Alias: "",
  804. JoinType: ast.INNER_JOIN,
  805. Expr: &ast.BinaryExpr{
  806. OP: ast.AND,
  807. LHS: &ast.BinaryExpr{
  808. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  809. OP: ast.EQ,
  810. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  811. },
  812. RHS: &ast.BinaryExpr{
  813. RHS: &ast.BinaryExpr{
  814. OP: ast.AND,
  815. LHS: &ast.BinaryExpr{
  816. OP: ast.GT,
  817. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  818. RHS: &ast.IntegerLiteral{Val: 20},
  819. },
  820. RHS: &ast.BinaryExpr{
  821. OP: ast.LT,
  822. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  823. RHS: &ast.IntegerLiteral{Val: 60},
  824. },
  825. },
  826. OP: ast.AND,
  827. LHS: &ast.BinaryExpr{
  828. OP: ast.GT,
  829. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  830. RHS: &ast.IntegerLiteral{Val: 111},
  831. },
  832. },
  833. },
  834. },
  835. },
  836. }.Init(),
  837. },
  838. },
  839. fields: []ast.Field{
  840. {
  841. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  842. Name: "id1",
  843. AName: ""},
  844. },
  845. isAggregate: false,
  846. sendMeta: false,
  847. }.Init(),
  848. }, { // 10 meta
  849. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  850. p: ProjectPlan{
  851. baseLogicalPlan: baseLogicalPlan{
  852. children: []LogicalPlan{
  853. FilterPlan{
  854. baseLogicalPlan: baseLogicalPlan{
  855. children: []LogicalPlan{
  856. DataSourcePlan{
  857. name: "src1",
  858. streamFields: []interface{}{
  859. &ast.StreamField{
  860. Name: "temp",
  861. FieldType: &ast.BasicType{Type: ast.BIGINT},
  862. },
  863. },
  864. streamStmt: streams["src1"],
  865. metaFields: []string{"Humidity", "device", "id"},
  866. }.Init(),
  867. },
  868. },
  869. condition: &ast.BinaryExpr{
  870. LHS: &ast.Call{
  871. Name: "meta",
  872. FuncId: 2,
  873. Args: []ast.Expr{&ast.MetaRef{
  874. Name: "device",
  875. StreamName: ast.DefaultStream,
  876. }},
  877. },
  878. OP: ast.EQ,
  879. RHS: &ast.StringLiteral{
  880. Val: "demo2",
  881. },
  882. },
  883. }.Init(),
  884. },
  885. },
  886. fields: []ast.Field{
  887. {
  888. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  889. Name: "temp",
  890. AName: "",
  891. }, {
  892. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  893. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  894. Name: "id",
  895. StreamName: ast.DefaultStream,
  896. }}},
  897. []ast.StreamName{},
  898. nil,
  899. )},
  900. Name: "meta",
  901. AName: "eid",
  902. }, {
  903. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  904. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  905. &ast.BinaryExpr{
  906. OP: ast.ARROW,
  907. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  908. RHS: &ast.JsonFieldRef{Name: "Device"},
  909. },
  910. }},
  911. []ast.StreamName{},
  912. nil,
  913. )},
  914. Name: "meta",
  915. AName: "hdevice",
  916. },
  917. },
  918. isAggregate: false,
  919. sendMeta: false,
  920. }.Init(),
  921. }, { // 11 join with same name field and aliased
  922. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  923. p: ProjectPlan{
  924. baseLogicalPlan: baseLogicalPlan{
  925. children: []LogicalPlan{
  926. JoinPlan{
  927. baseLogicalPlan: baseLogicalPlan{
  928. children: []LogicalPlan{
  929. JoinAlignPlan{
  930. baseLogicalPlan: baseLogicalPlan{
  931. children: []LogicalPlan{
  932. DataSourcePlan{
  933. name: "src2",
  934. streamFields: []interface{}{
  935. &ast.StreamField{
  936. Name: "hum",
  937. FieldType: &ast.BasicType{Type: ast.BIGINT},
  938. },
  939. &ast.StreamField{
  940. Name: "id2",
  941. FieldType: &ast.BasicType{Type: ast.BIGINT},
  942. },
  943. },
  944. streamStmt: streams["src2"],
  945. metaFields: []string{},
  946. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  947. }.Init(),
  948. DataSourcePlan{
  949. name: "tableInPlanner",
  950. streamFields: []interface{}{
  951. &ast.StreamField{
  952. Name: "hum",
  953. FieldType: &ast.BasicType{Type: ast.BIGINT},
  954. },
  955. &ast.StreamField{
  956. Name: "id",
  957. FieldType: &ast.BasicType{Type: ast.BIGINT},
  958. },
  959. },
  960. streamStmt: streams["tableInPlanner"],
  961. metaFields: []string{},
  962. }.Init(),
  963. },
  964. },
  965. Emitters: []string{"tableInPlanner"},
  966. }.Init(),
  967. },
  968. },
  969. from: &ast.Table{
  970. Name: "src2",
  971. },
  972. joins: []ast.Join{
  973. {
  974. Name: "tableInPlanner",
  975. Alias: "",
  976. JoinType: ast.INNER_JOIN,
  977. Expr: &ast.BinaryExpr{
  978. RHS: &ast.BinaryExpr{
  979. OP: ast.EQ,
  980. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  981. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  982. },
  983. OP: ast.AND,
  984. LHS: &ast.BinaryExpr{
  985. OP: ast.GT,
  986. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  987. &ast.FieldRef{
  988. Name: "hum",
  989. StreamName: "src2",
  990. },
  991. []ast.StreamName{"src2"},
  992. &boolFalse,
  993. )},
  994. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  995. &ast.FieldRef{
  996. Name: "hum",
  997. StreamName: "tableInPlanner",
  998. },
  999. []ast.StreamName{"tableInPlanner"},
  1000. &boolFalse,
  1001. )},
  1002. },
  1003. },
  1004. },
  1005. },
  1006. }.Init(),
  1007. },
  1008. },
  1009. fields: []ast.Field{
  1010. {
  1011. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1012. &ast.FieldRef{
  1013. Name: "hum",
  1014. StreamName: "src2",
  1015. },
  1016. []ast.StreamName{"src2"},
  1017. &boolFalse,
  1018. )},
  1019. Name: "hum",
  1020. AName: "hum1",
  1021. }, {
  1022. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1023. &ast.FieldRef{
  1024. Name: "hum",
  1025. StreamName: "tableInPlanner",
  1026. },
  1027. []ast.StreamName{"tableInPlanner"},
  1028. &boolFalse,
  1029. )},
  1030. Name: "hum",
  1031. AName: "hum2",
  1032. },
  1033. },
  1034. isAggregate: false,
  1035. sendMeta: false,
  1036. }.Init(),
  1037. }, { // 12 meta with more fields
  1038. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1039. p: ProjectPlan{
  1040. baseLogicalPlan: baseLogicalPlan{
  1041. children: []LogicalPlan{
  1042. FilterPlan{
  1043. baseLogicalPlan: baseLogicalPlan{
  1044. children: []LogicalPlan{
  1045. DataSourcePlan{
  1046. name: "src1",
  1047. streamFields: []interface{}{
  1048. &ast.StreamField{
  1049. Name: "temp",
  1050. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1051. },
  1052. },
  1053. streamStmt: streams["src1"],
  1054. metaFields: []string{},
  1055. allMeta: true,
  1056. }.Init(),
  1057. },
  1058. },
  1059. condition: &ast.BinaryExpr{
  1060. LHS: &ast.Call{
  1061. Name: "meta",
  1062. FuncId: 1,
  1063. Args: []ast.Expr{&ast.MetaRef{
  1064. Name: "device",
  1065. StreamName: ast.DefaultStream,
  1066. }},
  1067. },
  1068. OP: ast.EQ,
  1069. RHS: &ast.StringLiteral{
  1070. Val: "demo2",
  1071. },
  1072. },
  1073. }.Init(),
  1074. },
  1075. },
  1076. fields: []ast.Field{
  1077. {
  1078. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1079. Name: "temp",
  1080. AName: "",
  1081. }, {
  1082. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1083. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1084. Name: "*",
  1085. StreamName: ast.DefaultStream,
  1086. }}},
  1087. []ast.StreamName{},
  1088. nil,
  1089. )},
  1090. Name: "meta",
  1091. AName: "m",
  1092. },
  1093. },
  1094. isAggregate: false,
  1095. sendMeta: false,
  1096. }.Init(),
  1097. }, { // 13 analytic function plan
  1098. sql: `SELECT latest(lag(name)), id1 FROM src1 WHERE lag(temp) > temp`,
  1099. p: ProjectPlan{
  1100. baseLogicalPlan: baseLogicalPlan{
  1101. children: []LogicalPlan{
  1102. FilterPlan{
  1103. baseLogicalPlan: baseLogicalPlan{
  1104. children: []LogicalPlan{
  1105. AnalyticFuncsPlan{
  1106. baseLogicalPlan: baseLogicalPlan{
  1107. children: []LogicalPlan{
  1108. DataSourcePlan{
  1109. name: "src1",
  1110. streamFields: []interface{}{
  1111. &ast.StreamField{
  1112. Name: "id1",
  1113. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1114. },
  1115. &ast.StreamField{
  1116. Name: "name",
  1117. FieldType: &ast.BasicType{Type: ast.STRINGS},
  1118. },
  1119. &ast.StreamField{
  1120. Name: "temp",
  1121. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1122. },
  1123. },
  1124. streamStmt: streams["src1"],
  1125. metaFields: []string{},
  1126. }.Init(),
  1127. },
  1128. },
  1129. funcs: []*ast.Call{
  1130. {
  1131. Name: "lag",
  1132. FuncId: 2,
  1133. CachedField: "$$a_lag_2",
  1134. Args: []ast.Expr{&ast.FieldRef{
  1135. Name: "temp",
  1136. StreamName: "src1",
  1137. }},
  1138. },
  1139. {
  1140. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1141. },
  1142. {
  1143. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1144. },
  1145. },
  1146. }.Init(),
  1147. },
  1148. },
  1149. condition: &ast.BinaryExpr{
  1150. LHS: &ast.Call{
  1151. Name: "lag",
  1152. FuncId: 2,
  1153. Args: []ast.Expr{&ast.FieldRef{
  1154. Name: "temp",
  1155. StreamName: "src1",
  1156. }},
  1157. CachedField: "$$a_lag_2",
  1158. Cached: true,
  1159. },
  1160. OP: ast.GT,
  1161. RHS: &ast.FieldRef{
  1162. Name: "temp",
  1163. StreamName: "src1",
  1164. },
  1165. },
  1166. }.Init(),
  1167. },
  1168. },
  1169. fields: []ast.Field{
  1170. {
  1171. Expr: &ast.Call{
  1172. Name: "latest",
  1173. FuncId: 1,
  1174. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1175. CachedField: "$$a_latest_1",
  1176. Cached: true,
  1177. },
  1178. Name: "latest",
  1179. }, {
  1180. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1181. Name: "id1",
  1182. },
  1183. },
  1184. isAggregate: false,
  1185. sendMeta: false,
  1186. }.Init(),
  1187. },
  1188. { // 14
  1189. sql: `SELECT name, *, meta(device) FROM src1`,
  1190. p: ProjectPlan{
  1191. baseLogicalPlan: baseLogicalPlan{
  1192. children: []LogicalPlan{
  1193. DataSourcePlan{
  1194. baseLogicalPlan: baseLogicalPlan{},
  1195. name: "src1",
  1196. streamFields: []interface{}{
  1197. &ast.StreamField{
  1198. Name: "id1",
  1199. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1200. },
  1201. &ast.StreamField{
  1202. Name: "temp",
  1203. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1204. },
  1205. &ast.StreamField{
  1206. Name: "name",
  1207. FieldType: &ast.BasicType{Type: ast.STRINGS},
  1208. },
  1209. &ast.StreamField{
  1210. Name: "myarray",
  1211. FieldType: &ast.ArrayType{Type: ast.STRINGS},
  1212. },
  1213. },
  1214. streamStmt: streams["src1"],
  1215. metaFields: []string{"device"},
  1216. isWildCard: true,
  1217. }.Init(),
  1218. },
  1219. },
  1220. fields: []ast.Field{
  1221. {
  1222. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1223. Name: "name",
  1224. AName: "",
  1225. },
  1226. {
  1227. Name: "*",
  1228. Expr: &ast.Wildcard{
  1229. Token: ast.ASTERISK,
  1230. },
  1231. },
  1232. {
  1233. Name: "meta",
  1234. Expr: &ast.Call{
  1235. Name: "meta",
  1236. Args: []ast.Expr{
  1237. &ast.MetaRef{
  1238. StreamName: ast.DefaultStream,
  1239. Name: "device",
  1240. },
  1241. },
  1242. },
  1243. },
  1244. },
  1245. isAggregate: false,
  1246. allWildcard: true,
  1247. sendMeta: false,
  1248. }.Init(),
  1249. },
  1250. }
  1251. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1252. for i, tt := range tests {
  1253. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1254. if err != nil {
  1255. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1256. continue
  1257. }
  1258. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1259. IsEventTime: false,
  1260. LateTol: 0,
  1261. Concurrency: 0,
  1262. BufferLength: 0,
  1263. SendMetaToSink: false,
  1264. Qos: 0,
  1265. CheckpointInterval: 0,
  1266. SendError: true,
  1267. }, store)
  1268. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1269. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1270. } else if !reflect.DeepEqual(tt.p, p) {
  1271. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1272. }
  1273. }
  1274. }
  1275. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1276. err, store := store.GetKV("stream")
  1277. if err != nil {
  1278. t.Error(err)
  1279. return
  1280. }
  1281. streamSqls := map[string]string{
  1282. "src1": `CREATE STREAM src1 (
  1283. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1284. "src2": `CREATE STREAM src2 (
  1285. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1286. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1287. id BIGINT,
  1288. name STRING,
  1289. value STRING,
  1290. hum BIGINT
  1291. ) WITH (TYPE="file");`,
  1292. }
  1293. types := map[string]ast.StreamType{
  1294. "src1": ast.TypeStream,
  1295. "src2": ast.TypeStream,
  1296. "tableInPlanner": ast.TypeTable,
  1297. }
  1298. for name, sql := range streamSqls {
  1299. s, err := json.Marshal(&xsql.StreamInfo{
  1300. StreamType: types[name],
  1301. Statement: sql,
  1302. })
  1303. if err != nil {
  1304. t.Error(err)
  1305. t.Fail()
  1306. }
  1307. err = store.Set(name, string(s))
  1308. if err != nil {
  1309. t.Error(err)
  1310. t.Fail()
  1311. }
  1312. }
  1313. streams := make(map[string]*ast.StreamStmt)
  1314. for n := range streamSqls {
  1315. streamStmt, err := xsql.GetDataSource(store, n)
  1316. if err != nil {
  1317. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1318. return
  1319. }
  1320. streams[n] = streamStmt
  1321. }
  1322. var (
  1323. //boolTrue = true
  1324. boolFalse = false
  1325. )
  1326. var tests = []struct {
  1327. sql string
  1328. p LogicalPlan
  1329. err string
  1330. }{
  1331. { // 0
  1332. sql: `SELECT name FROM src1`,
  1333. p: ProjectPlan{
  1334. baseLogicalPlan: baseLogicalPlan{
  1335. children: []LogicalPlan{
  1336. DataSourcePlan{
  1337. baseLogicalPlan: baseLogicalPlan{},
  1338. name: "src1",
  1339. streamFields: []interface{}{
  1340. "name",
  1341. },
  1342. streamStmt: streams["src1"],
  1343. metaFields: []string{},
  1344. }.Init(),
  1345. },
  1346. },
  1347. fields: []ast.Field{
  1348. {
  1349. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1350. Name: "name",
  1351. AName: "",
  1352. },
  1353. },
  1354. isAggregate: false,
  1355. sendMeta: false,
  1356. }.Init(),
  1357. }, { // 1 optimize where to data source
  1358. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1359. p: ProjectPlan{
  1360. baseLogicalPlan: baseLogicalPlan{
  1361. children: []LogicalPlan{
  1362. WindowPlan{
  1363. baseLogicalPlan: baseLogicalPlan{
  1364. children: []LogicalPlan{
  1365. FilterPlan{
  1366. baseLogicalPlan: baseLogicalPlan{
  1367. children: []LogicalPlan{
  1368. DataSourcePlan{
  1369. name: "src1",
  1370. streamFields: []interface{}{
  1371. "name", "temp",
  1372. },
  1373. streamStmt: streams["src1"],
  1374. metaFields: []string{},
  1375. }.Init(),
  1376. },
  1377. },
  1378. condition: &ast.BinaryExpr{
  1379. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1380. OP: ast.EQ,
  1381. RHS: &ast.StringLiteral{Val: "v1"},
  1382. },
  1383. }.Init(),
  1384. },
  1385. },
  1386. condition: nil,
  1387. wtype: ast.TUMBLING_WINDOW,
  1388. length: 10000,
  1389. interval: 0,
  1390. limit: 0,
  1391. }.Init(),
  1392. },
  1393. },
  1394. fields: []ast.Field{
  1395. {
  1396. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1397. Name: "temp",
  1398. AName: ""},
  1399. },
  1400. isAggregate: false,
  1401. sendMeta: false,
  1402. }.Init(),
  1403. }, { // 2 condition that cannot be optimized
  1404. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1405. p: ProjectPlan{
  1406. baseLogicalPlan: baseLogicalPlan{
  1407. children: []LogicalPlan{
  1408. JoinPlan{
  1409. baseLogicalPlan: baseLogicalPlan{
  1410. children: []LogicalPlan{
  1411. WindowPlan{
  1412. baseLogicalPlan: baseLogicalPlan{
  1413. children: []LogicalPlan{
  1414. DataSourcePlan{
  1415. name: "src1",
  1416. streamFields: []interface{}{
  1417. "id1", "temp",
  1418. },
  1419. streamStmt: streams["src1"],
  1420. metaFields: []string{},
  1421. }.Init(),
  1422. DataSourcePlan{
  1423. name: "src2",
  1424. streamFields: []interface{}{ // can't determine where is id1 belonged to
  1425. "hum", "id1", "id2",
  1426. },
  1427. streamStmt: streams["src2"],
  1428. metaFields: []string{},
  1429. }.Init(),
  1430. },
  1431. },
  1432. condition: nil,
  1433. wtype: ast.TUMBLING_WINDOW,
  1434. length: 10000,
  1435. interval: 0,
  1436. limit: 0,
  1437. }.Init(),
  1438. },
  1439. },
  1440. from: &ast.Table{Name: "src1"},
  1441. joins: ast.Joins{ast.Join{
  1442. Name: "src2",
  1443. JoinType: ast.INNER_JOIN,
  1444. Expr: &ast.BinaryExpr{
  1445. OP: ast.AND,
  1446. LHS: &ast.BinaryExpr{
  1447. LHS: &ast.BinaryExpr{
  1448. OP: ast.GT,
  1449. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1450. RHS: &ast.IntegerLiteral{Val: 20},
  1451. },
  1452. OP: ast.OR,
  1453. RHS: &ast.BinaryExpr{
  1454. OP: ast.GT,
  1455. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1456. RHS: &ast.IntegerLiteral{Val: 60},
  1457. },
  1458. },
  1459. RHS: &ast.BinaryExpr{
  1460. OP: ast.EQ,
  1461. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1462. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1463. },
  1464. },
  1465. }},
  1466. }.Init(),
  1467. },
  1468. },
  1469. fields: []ast.Field{
  1470. {
  1471. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1472. Name: "id1",
  1473. AName: ""},
  1474. },
  1475. isAggregate: false,
  1476. sendMeta: false,
  1477. }.Init(),
  1478. }, { // 3 optimize window filter
  1479. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1480. p: ProjectPlan{
  1481. baseLogicalPlan: baseLogicalPlan{
  1482. children: []LogicalPlan{
  1483. WindowPlan{
  1484. baseLogicalPlan: baseLogicalPlan{
  1485. children: []LogicalPlan{
  1486. FilterPlan{
  1487. baseLogicalPlan: baseLogicalPlan{
  1488. children: []LogicalPlan{
  1489. DataSourcePlan{
  1490. name: "src1",
  1491. streamFields: []interface{}{
  1492. "id1", "name", "temp",
  1493. },
  1494. streamStmt: streams["src1"],
  1495. metaFields: []string{},
  1496. }.Init(),
  1497. },
  1498. },
  1499. condition: &ast.BinaryExpr{
  1500. OP: ast.AND,
  1501. LHS: &ast.BinaryExpr{
  1502. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1503. OP: ast.EQ,
  1504. RHS: &ast.StringLiteral{Val: "v1"},
  1505. },
  1506. RHS: &ast.BinaryExpr{
  1507. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1508. OP: ast.GT,
  1509. RHS: &ast.IntegerLiteral{Val: 2},
  1510. },
  1511. },
  1512. }.Init(),
  1513. },
  1514. },
  1515. condition: nil,
  1516. wtype: ast.TUMBLING_WINDOW,
  1517. length: 10000,
  1518. interval: 0,
  1519. limit: 0,
  1520. }.Init(),
  1521. },
  1522. },
  1523. fields: []ast.Field{
  1524. {
  1525. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1526. Name: "id1",
  1527. AName: ""},
  1528. },
  1529. isAggregate: false,
  1530. sendMeta: false,
  1531. }.Init(),
  1532. }, { // 4. do not optimize count window
  1533. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1534. p: ProjectPlan{
  1535. baseLogicalPlan: baseLogicalPlan{
  1536. children: []LogicalPlan{
  1537. HavingPlan{
  1538. baseLogicalPlan: baseLogicalPlan{
  1539. children: []LogicalPlan{
  1540. FilterPlan{
  1541. baseLogicalPlan: baseLogicalPlan{
  1542. children: []LogicalPlan{
  1543. WindowPlan{
  1544. baseLogicalPlan: baseLogicalPlan{
  1545. children: []LogicalPlan{
  1546. DataSourcePlan{
  1547. name: "src1",
  1548. isWildCard: true,
  1549. streamFields: nil,
  1550. streamStmt: streams["src1"],
  1551. metaFields: []string{},
  1552. }.Init(),
  1553. },
  1554. },
  1555. condition: nil,
  1556. wtype: ast.COUNT_WINDOW,
  1557. length: 5,
  1558. interval: 1,
  1559. limit: 0,
  1560. }.Init(),
  1561. },
  1562. },
  1563. condition: &ast.BinaryExpr{
  1564. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1565. OP: ast.GT,
  1566. RHS: &ast.IntegerLiteral{Val: 20},
  1567. },
  1568. }.Init(),
  1569. },
  1570. },
  1571. condition: &ast.BinaryExpr{
  1572. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1573. Token: ast.ASTERISK,
  1574. }}, FuncType: ast.FuncTypeAgg},
  1575. OP: ast.GT,
  1576. RHS: &ast.IntegerLiteral{Val: 2},
  1577. },
  1578. }.Init(),
  1579. },
  1580. },
  1581. fields: []ast.Field{
  1582. {
  1583. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1584. Name: "*",
  1585. AName: ""},
  1586. },
  1587. isAggregate: false,
  1588. sendMeta: false,
  1589. }.Init(),
  1590. }, { // 5. optimize join on
  1591. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1592. p: ProjectPlan{
  1593. baseLogicalPlan: baseLogicalPlan{
  1594. children: []LogicalPlan{
  1595. JoinPlan{
  1596. baseLogicalPlan: baseLogicalPlan{
  1597. children: []LogicalPlan{
  1598. WindowPlan{
  1599. baseLogicalPlan: baseLogicalPlan{
  1600. children: []LogicalPlan{
  1601. FilterPlan{
  1602. baseLogicalPlan: baseLogicalPlan{
  1603. children: []LogicalPlan{
  1604. DataSourcePlan{
  1605. name: "src1",
  1606. streamFields: []interface{}{
  1607. "id1", "temp",
  1608. },
  1609. streamStmt: streams["src1"],
  1610. metaFields: []string{},
  1611. }.Init(),
  1612. },
  1613. },
  1614. condition: &ast.BinaryExpr{
  1615. RHS: &ast.BinaryExpr{
  1616. OP: ast.GT,
  1617. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1618. RHS: &ast.IntegerLiteral{Val: 20},
  1619. },
  1620. OP: ast.AND,
  1621. LHS: &ast.BinaryExpr{
  1622. OP: ast.GT,
  1623. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1624. RHS: &ast.IntegerLiteral{Val: 111},
  1625. },
  1626. },
  1627. }.Init(),
  1628. FilterPlan{
  1629. baseLogicalPlan: baseLogicalPlan{
  1630. children: []LogicalPlan{
  1631. DataSourcePlan{
  1632. name: "src2",
  1633. streamFields: []interface{}{
  1634. "hum", "id1", "id2",
  1635. },
  1636. streamStmt: streams["src2"],
  1637. metaFields: []string{},
  1638. }.Init(),
  1639. },
  1640. },
  1641. condition: &ast.BinaryExpr{
  1642. OP: ast.LT,
  1643. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1644. RHS: &ast.IntegerLiteral{Val: 60},
  1645. },
  1646. }.Init(),
  1647. },
  1648. },
  1649. condition: nil,
  1650. wtype: ast.TUMBLING_WINDOW,
  1651. length: 10000,
  1652. interval: 0,
  1653. limit: 0,
  1654. }.Init(),
  1655. },
  1656. },
  1657. from: &ast.Table{
  1658. Name: "src1",
  1659. },
  1660. joins: []ast.Join{
  1661. {
  1662. Name: "src2",
  1663. Alias: "",
  1664. JoinType: ast.INNER_JOIN,
  1665. Expr: &ast.BinaryExpr{
  1666. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1667. OP: ast.EQ,
  1668. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1669. },
  1670. },
  1671. },
  1672. }.Init(),
  1673. },
  1674. },
  1675. fields: []ast.Field{
  1676. {
  1677. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1678. Name: "id1",
  1679. AName: ""},
  1680. },
  1681. isAggregate: false,
  1682. sendMeta: false,
  1683. }.Init(),
  1684. }, { // 6. optimize outter join on
  1685. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1686. p: ProjectPlan{
  1687. baseLogicalPlan: baseLogicalPlan{
  1688. children: []LogicalPlan{
  1689. JoinPlan{
  1690. baseLogicalPlan: baseLogicalPlan{
  1691. children: []LogicalPlan{
  1692. WindowPlan{
  1693. baseLogicalPlan: baseLogicalPlan{
  1694. children: []LogicalPlan{
  1695. FilterPlan{
  1696. baseLogicalPlan: baseLogicalPlan{
  1697. children: []LogicalPlan{
  1698. DataSourcePlan{
  1699. name: "src1",
  1700. streamFields: []interface{}{
  1701. "id1", "temp",
  1702. },
  1703. streamStmt: streams["src1"],
  1704. metaFields: []string{},
  1705. }.Init(),
  1706. },
  1707. },
  1708. condition: &ast.BinaryExpr{
  1709. OP: ast.GT,
  1710. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1711. RHS: &ast.IntegerLiteral{Val: 111},
  1712. },
  1713. }.Init(),
  1714. DataSourcePlan{
  1715. name: "src2",
  1716. streamFields: []interface{}{
  1717. "hum", "id1", "id2",
  1718. },
  1719. streamStmt: streams["src2"],
  1720. metaFields: []string{},
  1721. }.Init(),
  1722. },
  1723. },
  1724. condition: nil,
  1725. wtype: ast.TUMBLING_WINDOW,
  1726. length: 10000,
  1727. interval: 0,
  1728. limit: 0,
  1729. }.Init(),
  1730. },
  1731. },
  1732. from: &ast.Table{
  1733. Name: "src1",
  1734. },
  1735. joins: []ast.Join{
  1736. {
  1737. Name: "src2",
  1738. Alias: "",
  1739. JoinType: ast.FULL_JOIN,
  1740. Expr: &ast.BinaryExpr{
  1741. OP: ast.AND,
  1742. LHS: &ast.BinaryExpr{
  1743. OP: ast.AND,
  1744. LHS: &ast.BinaryExpr{
  1745. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1746. OP: ast.EQ,
  1747. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1748. },
  1749. RHS: &ast.BinaryExpr{
  1750. OP: ast.GT,
  1751. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1752. RHS: &ast.IntegerLiteral{Val: 20},
  1753. },
  1754. },
  1755. RHS: &ast.BinaryExpr{
  1756. OP: ast.LT,
  1757. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1758. RHS: &ast.IntegerLiteral{Val: 60},
  1759. },
  1760. },
  1761. },
  1762. },
  1763. }.Init(),
  1764. },
  1765. },
  1766. fields: []ast.Field{
  1767. {
  1768. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1769. Name: "id1",
  1770. AName: ""},
  1771. },
  1772. isAggregate: false,
  1773. sendMeta: false,
  1774. }.Init(),
  1775. }, { // 7 window error for table
  1776. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1777. p: nil,
  1778. err: "cannot run window for TABLE sources",
  1779. }, { // 8 join table without window
  1780. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  1781. p: ProjectPlan{
  1782. baseLogicalPlan: baseLogicalPlan{
  1783. children: []LogicalPlan{
  1784. JoinPlan{
  1785. baseLogicalPlan: baseLogicalPlan{
  1786. children: []LogicalPlan{
  1787. JoinAlignPlan{
  1788. baseLogicalPlan: baseLogicalPlan{
  1789. children: []LogicalPlan{
  1790. FilterPlan{
  1791. baseLogicalPlan: baseLogicalPlan{
  1792. children: []LogicalPlan{
  1793. DataSourcePlan{
  1794. name: "src1",
  1795. streamFields: []interface{}{
  1796. "hum", "id1", "temp",
  1797. },
  1798. streamStmt: streams["src1"],
  1799. metaFields: []string{},
  1800. }.Init(),
  1801. },
  1802. },
  1803. condition: &ast.BinaryExpr{
  1804. RHS: &ast.BinaryExpr{
  1805. OP: ast.GT,
  1806. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1807. RHS: &ast.IntegerLiteral{Val: 20},
  1808. },
  1809. OP: ast.AND,
  1810. LHS: &ast.BinaryExpr{
  1811. OP: ast.GT,
  1812. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1813. RHS: &ast.IntegerLiteral{Val: 111},
  1814. },
  1815. },
  1816. }.Init(),
  1817. DataSourcePlan{
  1818. name: "tableInPlanner",
  1819. streamFields: []interface{}{
  1820. &ast.StreamField{
  1821. Name: "hum",
  1822. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1823. },
  1824. &ast.StreamField{
  1825. Name: "id",
  1826. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1827. },
  1828. },
  1829. streamStmt: streams["tableInPlanner"],
  1830. metaFields: []string{},
  1831. }.Init(),
  1832. },
  1833. },
  1834. Emitters: []string{"tableInPlanner"},
  1835. }.Init(),
  1836. },
  1837. },
  1838. from: &ast.Table{
  1839. Name: "src1",
  1840. },
  1841. joins: []ast.Join{
  1842. {
  1843. Name: "tableInPlanner",
  1844. Alias: "",
  1845. JoinType: ast.INNER_JOIN,
  1846. Expr: &ast.BinaryExpr{
  1847. OP: ast.AND,
  1848. LHS: &ast.BinaryExpr{
  1849. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1850. OP: ast.EQ,
  1851. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1852. },
  1853. RHS: &ast.BinaryExpr{
  1854. OP: ast.LT,
  1855. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  1856. RHS: &ast.IntegerLiteral{Val: 60},
  1857. },
  1858. },
  1859. },
  1860. },
  1861. }.Init(),
  1862. },
  1863. },
  1864. fields: []ast.Field{
  1865. {
  1866. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1867. Name: "id1",
  1868. AName: ""},
  1869. },
  1870. isAggregate: false,
  1871. sendMeta: false,
  1872. }.Init(),
  1873. }, { // 9 join table with window
  1874. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1875. p: ProjectPlan{
  1876. baseLogicalPlan: baseLogicalPlan{
  1877. children: []LogicalPlan{
  1878. JoinPlan{
  1879. baseLogicalPlan: baseLogicalPlan{
  1880. children: []LogicalPlan{
  1881. JoinAlignPlan{
  1882. baseLogicalPlan: baseLogicalPlan{
  1883. children: []LogicalPlan{
  1884. WindowPlan{
  1885. baseLogicalPlan: baseLogicalPlan{
  1886. children: []LogicalPlan{
  1887. DataSourcePlan{
  1888. name: "src1",
  1889. streamFields: []interface{}{
  1890. "id1", "temp",
  1891. },
  1892. streamStmt: streams["src1"],
  1893. metaFields: []string{},
  1894. }.Init(),
  1895. },
  1896. },
  1897. condition: nil,
  1898. wtype: ast.TUMBLING_WINDOW,
  1899. length: 10000,
  1900. interval: 0,
  1901. limit: 0,
  1902. }.Init(),
  1903. DataSourcePlan{
  1904. name: "tableInPlanner",
  1905. streamFields: []interface{}{
  1906. &ast.StreamField{
  1907. Name: "hum",
  1908. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1909. },
  1910. &ast.StreamField{
  1911. Name: "id",
  1912. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1913. },
  1914. },
  1915. streamStmt: streams["tableInPlanner"],
  1916. metaFields: []string{},
  1917. }.Init(),
  1918. },
  1919. },
  1920. Emitters: []string{"tableInPlanner"},
  1921. }.Init(),
  1922. },
  1923. },
  1924. from: &ast.Table{
  1925. Name: "src1",
  1926. },
  1927. joins: []ast.Join{
  1928. {
  1929. Name: "tableInPlanner",
  1930. Alias: "",
  1931. JoinType: ast.INNER_JOIN,
  1932. Expr: &ast.BinaryExpr{
  1933. OP: ast.AND,
  1934. LHS: &ast.BinaryExpr{
  1935. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1936. OP: ast.EQ,
  1937. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1938. },
  1939. RHS: &ast.BinaryExpr{
  1940. RHS: &ast.BinaryExpr{
  1941. OP: ast.AND,
  1942. LHS: &ast.BinaryExpr{
  1943. OP: ast.GT,
  1944. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1945. RHS: &ast.IntegerLiteral{Val: 20},
  1946. },
  1947. RHS: &ast.BinaryExpr{
  1948. OP: ast.LT,
  1949. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1950. RHS: &ast.IntegerLiteral{Val: 60},
  1951. },
  1952. },
  1953. OP: ast.AND,
  1954. LHS: &ast.BinaryExpr{
  1955. OP: ast.GT,
  1956. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1957. RHS: &ast.IntegerLiteral{Val: 111},
  1958. },
  1959. },
  1960. },
  1961. },
  1962. },
  1963. }.Init(),
  1964. },
  1965. },
  1966. fields: []ast.Field{
  1967. {
  1968. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1969. Name: "id1",
  1970. AName: ""},
  1971. },
  1972. isAggregate: false,
  1973. sendMeta: false,
  1974. }.Init(),
  1975. }, { // 10 meta
  1976. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1977. p: ProjectPlan{
  1978. baseLogicalPlan: baseLogicalPlan{
  1979. children: []LogicalPlan{
  1980. FilterPlan{
  1981. baseLogicalPlan: baseLogicalPlan{
  1982. children: []LogicalPlan{
  1983. DataSourcePlan{
  1984. name: "src1",
  1985. streamFields: []interface{}{
  1986. "temp",
  1987. },
  1988. streamStmt: streams["src1"],
  1989. metaFields: []string{"Humidity", "device", "id"},
  1990. }.Init(),
  1991. },
  1992. },
  1993. condition: &ast.BinaryExpr{
  1994. LHS: &ast.Call{
  1995. Name: "meta",
  1996. FuncId: 2,
  1997. Args: []ast.Expr{&ast.MetaRef{
  1998. Name: "device",
  1999. StreamName: ast.DefaultStream,
  2000. }},
  2001. },
  2002. OP: ast.EQ,
  2003. RHS: &ast.StringLiteral{
  2004. Val: "demo2",
  2005. },
  2006. },
  2007. }.Init(),
  2008. },
  2009. },
  2010. fields: []ast.Field{
  2011. {
  2012. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2013. Name: "temp",
  2014. AName: "",
  2015. }, {
  2016. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2017. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  2018. Name: "id",
  2019. StreamName: ast.DefaultStream,
  2020. }}},
  2021. []ast.StreamName{},
  2022. nil,
  2023. )},
  2024. Name: "meta",
  2025. AName: "eid",
  2026. }, {
  2027. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2028. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  2029. &ast.BinaryExpr{
  2030. OP: ast.ARROW,
  2031. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  2032. RHS: &ast.JsonFieldRef{Name: "Device"},
  2033. },
  2034. }},
  2035. []ast.StreamName{},
  2036. nil,
  2037. )},
  2038. Name: "meta",
  2039. AName: "hdevice",
  2040. },
  2041. },
  2042. isAggregate: false,
  2043. sendMeta: false,
  2044. }.Init(),
  2045. }, { // 11 join with same name field and aliased
  2046. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  2047. p: ProjectPlan{
  2048. baseLogicalPlan: baseLogicalPlan{
  2049. children: []LogicalPlan{
  2050. JoinPlan{
  2051. baseLogicalPlan: baseLogicalPlan{
  2052. children: []LogicalPlan{
  2053. JoinAlignPlan{
  2054. baseLogicalPlan: baseLogicalPlan{
  2055. children: []LogicalPlan{
  2056. DataSourcePlan{
  2057. name: "src2",
  2058. streamFields: []interface{}{
  2059. "hum", "id", "id2",
  2060. },
  2061. streamStmt: streams["src2"],
  2062. metaFields: []string{},
  2063. }.Init(),
  2064. DataSourcePlan{
  2065. name: "tableInPlanner",
  2066. streamFields: []interface{}{
  2067. &ast.StreamField{
  2068. Name: "hum",
  2069. FieldType: &ast.BasicType{Type: ast.BIGINT},
  2070. },
  2071. &ast.StreamField{
  2072. Name: "id",
  2073. FieldType: &ast.BasicType{Type: ast.BIGINT},
  2074. },
  2075. },
  2076. streamStmt: streams["tableInPlanner"],
  2077. metaFields: []string{},
  2078. }.Init(),
  2079. },
  2080. },
  2081. Emitters: []string{"tableInPlanner"},
  2082. }.Init(),
  2083. },
  2084. },
  2085. from: &ast.Table{
  2086. Name: "src2",
  2087. },
  2088. joins: []ast.Join{
  2089. {
  2090. Name: "tableInPlanner",
  2091. Alias: "",
  2092. JoinType: ast.INNER_JOIN,
  2093. Expr: &ast.BinaryExpr{
  2094. RHS: &ast.BinaryExpr{
  2095. OP: ast.EQ,
  2096. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  2097. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  2098. },
  2099. OP: ast.AND,
  2100. LHS: &ast.BinaryExpr{
  2101. OP: ast.GT,
  2102. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2103. &ast.FieldRef{
  2104. Name: "hum",
  2105. StreamName: "src2",
  2106. },
  2107. []ast.StreamName{"src2"},
  2108. &boolFalse,
  2109. )},
  2110. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2111. &ast.FieldRef{
  2112. Name: "hum",
  2113. StreamName: "tableInPlanner",
  2114. },
  2115. []ast.StreamName{"tableInPlanner"},
  2116. &boolFalse,
  2117. )},
  2118. },
  2119. },
  2120. },
  2121. },
  2122. }.Init(),
  2123. },
  2124. },
  2125. fields: []ast.Field{
  2126. {
  2127. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2128. &ast.FieldRef{
  2129. Name: "hum",
  2130. StreamName: "src2",
  2131. },
  2132. []ast.StreamName{"src2"},
  2133. &boolFalse,
  2134. )},
  2135. Name: "hum",
  2136. AName: "hum1",
  2137. }, {
  2138. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2139. &ast.FieldRef{
  2140. Name: "hum",
  2141. StreamName: "tableInPlanner",
  2142. },
  2143. []ast.StreamName{"tableInPlanner"},
  2144. &boolFalse,
  2145. )},
  2146. Name: "hum",
  2147. AName: "hum2",
  2148. },
  2149. },
  2150. isAggregate: false,
  2151. sendMeta: false,
  2152. }.Init(),
  2153. }, { // 12
  2154. sql: `SELECT name->first, name->last FROM src1`,
  2155. p: ProjectPlan{
  2156. baseLogicalPlan: baseLogicalPlan{
  2157. children: []LogicalPlan{
  2158. DataSourcePlan{
  2159. baseLogicalPlan: baseLogicalPlan{},
  2160. name: "src1",
  2161. streamFields: []interface{}{
  2162. "name",
  2163. },
  2164. streamStmt: streams["src1"],
  2165. metaFields: []string{},
  2166. }.Init(),
  2167. },
  2168. },
  2169. fields: []ast.Field{
  2170. {
  2171. Expr: &ast.BinaryExpr{
  2172. OP: ast.ARROW,
  2173. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2174. RHS: &ast.JsonFieldRef{Name: "first"},
  2175. },
  2176. Name: "kuiper_field_0",
  2177. AName: "",
  2178. }, {
  2179. Expr: &ast.BinaryExpr{
  2180. OP: ast.ARROW,
  2181. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2182. RHS: &ast.JsonFieldRef{Name: "last"},
  2183. },
  2184. Name: "kuiper_field_1",
  2185. AName: "",
  2186. },
  2187. },
  2188. isAggregate: false,
  2189. sendMeta: false,
  2190. }.Init(),
  2191. },
  2192. }
  2193. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2194. for i, tt := range tests {
  2195. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2196. if err != nil {
  2197. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2198. continue
  2199. }
  2200. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2201. IsEventTime: false,
  2202. LateTol: 0,
  2203. Concurrency: 0,
  2204. BufferLength: 0,
  2205. SendMetaToSink: false,
  2206. Qos: 0,
  2207. CheckpointInterval: 0,
  2208. SendError: true,
  2209. }, store)
  2210. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2211. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2212. } else if !reflect.DeepEqual(tt.p, p) {
  2213. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2214. }
  2215. }
  2216. }
  2217. func Test_createLogicalPlan4Lookup(t *testing.T) {
  2218. err, store := store.GetKV("stream")
  2219. if err != nil {
  2220. t.Error(err)
  2221. return
  2222. }
  2223. streamSqls := map[string]string{
  2224. "src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  2225. "table1": `CREATE TABLE table1 () WITH (DATASOURCE="table1",TYPE="sql", KIND="lookup");`,
  2226. "table2": `CREATE TABLE table2 () WITH (DATASOURCE="table2",TYPE="sql", KIND="lookup");`,
  2227. }
  2228. types := map[string]ast.StreamType{
  2229. "src1": ast.TypeStream,
  2230. "table1": ast.TypeTable,
  2231. "table2": ast.TypeTable,
  2232. }
  2233. for name, sql := range streamSqls {
  2234. s, err := json.Marshal(&xsql.StreamInfo{
  2235. StreamType: types[name],
  2236. Statement: sql,
  2237. })
  2238. if err != nil {
  2239. t.Error(err)
  2240. t.Fail()
  2241. }
  2242. err = store.Set(name, string(s))
  2243. if err != nil {
  2244. t.Error(err)
  2245. t.Fail()
  2246. }
  2247. }
  2248. streams := make(map[string]*ast.StreamStmt)
  2249. for n := range streamSqls {
  2250. streamStmt, err := xsql.GetDataSource(store, n)
  2251. if err != nil {
  2252. t.Errorf("fail to get stream %s, please check if stream is created", n)
  2253. return
  2254. }
  2255. streams[n] = streamStmt
  2256. }
  2257. var tests = []struct {
  2258. sql string
  2259. p LogicalPlan
  2260. err string
  2261. }{
  2262. { // 0
  2263. sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id`,
  2264. p: ProjectPlan{
  2265. baseLogicalPlan: baseLogicalPlan{
  2266. children: []LogicalPlan{
  2267. LookupPlan{
  2268. baseLogicalPlan: baseLogicalPlan{
  2269. children: []LogicalPlan{
  2270. DataSourcePlan{
  2271. baseLogicalPlan: baseLogicalPlan{},
  2272. name: "src1",
  2273. streamFields: []interface{}{
  2274. "a",
  2275. },
  2276. streamStmt: streams["src1"],
  2277. metaFields: []string{},
  2278. }.Init(),
  2279. },
  2280. },
  2281. joinExpr: ast.Join{
  2282. Name: "table1",
  2283. Alias: "",
  2284. JoinType: ast.INNER_JOIN,
  2285. Expr: &ast.BinaryExpr{
  2286. OP: ast.EQ,
  2287. LHS: &ast.FieldRef{
  2288. StreamName: "src1",
  2289. Name: "id",
  2290. },
  2291. RHS: &ast.FieldRef{
  2292. StreamName: "table1",
  2293. Name: "id",
  2294. },
  2295. },
  2296. },
  2297. keys: []string{"id"},
  2298. fields: []string{"b"},
  2299. valvars: []ast.Expr{
  2300. &ast.FieldRef{
  2301. StreamName: "src1",
  2302. Name: "id",
  2303. },
  2304. },
  2305. options: &ast.Options{
  2306. DATASOURCE: "table1",
  2307. TYPE: "sql",
  2308. STRICT_VALIDATION: true,
  2309. KIND: "lookup",
  2310. },
  2311. conditions: nil,
  2312. }.Init(),
  2313. },
  2314. },
  2315. fields: []ast.Field{
  2316. {
  2317. Expr: &ast.FieldRef{
  2318. StreamName: "src1",
  2319. Name: "a",
  2320. },
  2321. Name: "a",
  2322. AName: "",
  2323. },
  2324. {
  2325. Expr: &ast.FieldRef{
  2326. StreamName: "table1",
  2327. Name: "b",
  2328. },
  2329. Name: "b",
  2330. AName: "",
  2331. },
  2332. },
  2333. isAggregate: false,
  2334. sendMeta: false,
  2335. }.Init(),
  2336. },
  2337. { // 1
  2338. sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
  2339. p: ProjectPlan{
  2340. baseLogicalPlan: baseLogicalPlan{
  2341. children: []LogicalPlan{
  2342. FilterPlan{
  2343. baseLogicalPlan: baseLogicalPlan{
  2344. children: []LogicalPlan{
  2345. LookupPlan{
  2346. baseLogicalPlan: baseLogicalPlan{
  2347. children: []LogicalPlan{
  2348. FilterPlan{
  2349. baseLogicalPlan: baseLogicalPlan{
  2350. children: []LogicalPlan{
  2351. DataSourcePlan{
  2352. baseLogicalPlan: baseLogicalPlan{},
  2353. name: "src1",
  2354. streamFields: []interface{}{
  2355. "a",
  2356. },
  2357. streamStmt: streams["src1"],
  2358. metaFields: []string{},
  2359. }.Init(),
  2360. },
  2361. },
  2362. condition: &ast.BinaryExpr{
  2363. OP: ast.LT,
  2364. LHS: &ast.FieldRef{
  2365. StreamName: "src1",
  2366. Name: "c",
  2367. },
  2368. RHS: &ast.IntegerLiteral{Val: 40},
  2369. },
  2370. }.Init(),
  2371. },
  2372. },
  2373. joinExpr: ast.Join{
  2374. Name: "table1",
  2375. Alias: "",
  2376. JoinType: ast.INNER_JOIN,
  2377. Expr: &ast.BinaryExpr{
  2378. OP: ast.AND,
  2379. RHS: &ast.BinaryExpr{
  2380. OP: ast.EQ,
  2381. LHS: &ast.FieldRef{
  2382. StreamName: "src1",
  2383. Name: "id",
  2384. },
  2385. RHS: &ast.FieldRef{
  2386. StreamName: "table1",
  2387. Name: "id",
  2388. },
  2389. },
  2390. LHS: &ast.BinaryExpr{
  2391. OP: ast.AND,
  2392. LHS: &ast.BinaryExpr{
  2393. OP: ast.GT,
  2394. LHS: &ast.FieldRef{
  2395. StreamName: "table1",
  2396. Name: "b",
  2397. },
  2398. RHS: &ast.IntegerLiteral{Val: 20},
  2399. },
  2400. RHS: &ast.BinaryExpr{
  2401. OP: ast.LT,
  2402. LHS: &ast.FieldRef{
  2403. StreamName: "src1",
  2404. Name: "c",
  2405. },
  2406. RHS: &ast.IntegerLiteral{Val: 40},
  2407. },
  2408. },
  2409. },
  2410. },
  2411. keys: []string{"id"},
  2412. valvars: []ast.Expr{
  2413. &ast.FieldRef{
  2414. StreamName: "src1",
  2415. Name: "id",
  2416. },
  2417. },
  2418. options: &ast.Options{
  2419. DATASOURCE: "table1",
  2420. TYPE: "sql",
  2421. STRICT_VALIDATION: true,
  2422. KIND: "lookup",
  2423. },
  2424. conditions: &ast.BinaryExpr{
  2425. OP: ast.AND,
  2426. LHS: &ast.BinaryExpr{
  2427. OP: ast.GT,
  2428. LHS: &ast.FieldRef{
  2429. StreamName: "table1",
  2430. Name: "b",
  2431. },
  2432. RHS: &ast.IntegerLiteral{Val: 20},
  2433. },
  2434. RHS: &ast.BinaryExpr{
  2435. OP: ast.LT,
  2436. LHS: &ast.FieldRef{
  2437. StreamName: "src1",
  2438. Name: "c",
  2439. },
  2440. RHS: &ast.IntegerLiteral{Val: 40},
  2441. },
  2442. },
  2443. }.Init(),
  2444. },
  2445. },
  2446. condition: &ast.BinaryExpr{
  2447. OP: ast.GT,
  2448. LHS: &ast.FieldRef{
  2449. StreamName: "table1",
  2450. Name: "b",
  2451. },
  2452. RHS: &ast.IntegerLiteral{Val: 20},
  2453. },
  2454. }.Init(),
  2455. },
  2456. },
  2457. fields: []ast.Field{
  2458. {
  2459. Expr: &ast.FieldRef{
  2460. StreamName: "src1",
  2461. Name: "a",
  2462. },
  2463. Name: "a",
  2464. AName: "",
  2465. },
  2466. {
  2467. Expr: &ast.FieldRef{
  2468. StreamName: "table1",
  2469. Name: "*",
  2470. },
  2471. Name: "*",
  2472. AName: "",
  2473. },
  2474. },
  2475. isAggregate: false,
  2476. sendMeta: false,
  2477. }.Init(),
  2478. },
  2479. { // 2
  2480. sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
  2481. p: ProjectPlan{
  2482. baseLogicalPlan: baseLogicalPlan{
  2483. children: []LogicalPlan{
  2484. LookupPlan{
  2485. baseLogicalPlan: baseLogicalPlan{
  2486. children: []LogicalPlan{
  2487. LookupPlan{
  2488. baseLogicalPlan: baseLogicalPlan{
  2489. children: []LogicalPlan{
  2490. DataSourcePlan{
  2491. baseLogicalPlan: baseLogicalPlan{},
  2492. name: "src1",
  2493. streamFields: []interface{}{
  2494. "a",
  2495. },
  2496. streamStmt: streams["src1"],
  2497. metaFields: []string{},
  2498. }.Init(),
  2499. },
  2500. },
  2501. joinExpr: ast.Join{
  2502. Name: "table1",
  2503. Alias: "",
  2504. JoinType: ast.INNER_JOIN,
  2505. Expr: &ast.BinaryExpr{
  2506. OP: ast.EQ,
  2507. LHS: &ast.FieldRef{
  2508. StreamName: "src1",
  2509. Name: "id",
  2510. },
  2511. RHS: &ast.FieldRef{
  2512. StreamName: "table1",
  2513. Name: "id",
  2514. },
  2515. },
  2516. },
  2517. keys: []string{"id"},
  2518. fields: []string{"b"},
  2519. valvars: []ast.Expr{
  2520. &ast.FieldRef{
  2521. StreamName: "src1",
  2522. Name: "id",
  2523. },
  2524. },
  2525. options: &ast.Options{
  2526. DATASOURCE: "table1",
  2527. TYPE: "sql",
  2528. STRICT_VALIDATION: true,
  2529. KIND: "lookup",
  2530. },
  2531. conditions: nil,
  2532. }.Init(),
  2533. },
  2534. },
  2535. joinExpr: ast.Join{
  2536. Name: "table2",
  2537. Alias: "",
  2538. JoinType: ast.INNER_JOIN,
  2539. Expr: &ast.BinaryExpr{
  2540. OP: ast.EQ,
  2541. LHS: &ast.FieldRef{
  2542. StreamName: "table1",
  2543. Name: "id",
  2544. },
  2545. RHS: &ast.FieldRef{
  2546. StreamName: "table2",
  2547. Name: "id",
  2548. },
  2549. },
  2550. },
  2551. keys: []string{"id"},
  2552. fields: []string{"c"},
  2553. valvars: []ast.Expr{
  2554. &ast.FieldRef{
  2555. StreamName: "table1",
  2556. Name: "id",
  2557. },
  2558. },
  2559. options: &ast.Options{
  2560. DATASOURCE: "table2",
  2561. TYPE: "sql",
  2562. STRICT_VALIDATION: true,
  2563. KIND: "lookup",
  2564. },
  2565. }.Init(),
  2566. },
  2567. },
  2568. fields: []ast.Field{
  2569. {
  2570. Expr: &ast.FieldRef{
  2571. StreamName: "src1",
  2572. Name: "a",
  2573. },
  2574. Name: "a",
  2575. AName: "",
  2576. },
  2577. {
  2578. Expr: &ast.FieldRef{
  2579. StreamName: "table1",
  2580. Name: "b",
  2581. },
  2582. Name: "b",
  2583. AName: "",
  2584. },
  2585. {
  2586. Expr: &ast.FieldRef{
  2587. StreamName: "table2",
  2588. Name: "c",
  2589. },
  2590. Name: "c",
  2591. AName: "",
  2592. },
  2593. },
  2594. isAggregate: false,
  2595. sendMeta: false,
  2596. }.Init(),
  2597. },
  2598. { // 3
  2599. sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2600. p: ProjectPlan{
  2601. baseLogicalPlan: baseLogicalPlan{
  2602. children: []LogicalPlan{
  2603. LookupPlan{
  2604. baseLogicalPlan: baseLogicalPlan{
  2605. children: []LogicalPlan{
  2606. WindowPlan{
  2607. baseLogicalPlan: baseLogicalPlan{
  2608. children: []LogicalPlan{
  2609. DataSourcePlan{
  2610. baseLogicalPlan: baseLogicalPlan{},
  2611. name: "src1",
  2612. streamStmt: streams["src1"],
  2613. metaFields: []string{},
  2614. isWildCard: true,
  2615. }.Init(),
  2616. },
  2617. },
  2618. condition: nil,
  2619. wtype: ast.TUMBLING_WINDOW,
  2620. length: 10000,
  2621. interval: 0,
  2622. limit: 0,
  2623. }.Init(),
  2624. },
  2625. },
  2626. joinExpr: ast.Join{
  2627. Name: "table1",
  2628. Alias: "",
  2629. JoinType: ast.INNER_JOIN,
  2630. Expr: &ast.BinaryExpr{
  2631. OP: ast.EQ,
  2632. LHS: &ast.FieldRef{
  2633. StreamName: "src1",
  2634. Name: "id",
  2635. },
  2636. RHS: &ast.FieldRef{
  2637. StreamName: "table1",
  2638. Name: "id",
  2639. },
  2640. },
  2641. },
  2642. keys: []string{"id"},
  2643. valvars: []ast.Expr{
  2644. &ast.FieldRef{
  2645. StreamName: "src1",
  2646. Name: "id",
  2647. },
  2648. },
  2649. options: &ast.Options{
  2650. DATASOURCE: "table1",
  2651. TYPE: "sql",
  2652. STRICT_VALIDATION: true,
  2653. KIND: "lookup",
  2654. },
  2655. conditions: nil,
  2656. }.Init(),
  2657. },
  2658. },
  2659. fields: []ast.Field{
  2660. {
  2661. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  2662. Name: "*",
  2663. AName: "",
  2664. },
  2665. },
  2666. isAggregate: false,
  2667. sendMeta: false,
  2668. }.Init(),
  2669. },
  2670. }
  2671. for i, tt := range tests {
  2672. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2673. if err != nil {
  2674. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2675. continue
  2676. }
  2677. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2678. IsEventTime: false,
  2679. LateTol: 0,
  2680. Concurrency: 0,
  2681. BufferLength: 0,
  2682. SendMetaToSink: false,
  2683. Qos: 0,
  2684. CheckpointInterval: 0,
  2685. SendError: true,
  2686. }, store)
  2687. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2688. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2689. } else if !reflect.DeepEqual(tt.p, p) {
  2690. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2691. }
  2692. }
  2693. }