planner_test.go 74 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/internal/testx"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func init() {
  29. testx.InitEnv()
  30. }
  31. func Test_createLogicalPlan(t *testing.T) {
  32. err, store := store.GetKV("stream")
  33. if err != nil {
  34. t.Error(err)
  35. return
  36. }
  37. streamSqls := map[string]string{
  38. "src1": `CREATE STREAM src1 (
  39. id1 BIGINT,
  40. temp BIGINT,
  41. name string,
  42. myarray array(string)
  43. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  44. "src2": `CREATE STREAM src2 (
  45. id2 BIGINT,
  46. hum BIGINT
  47. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  48. "tableInPlanner": `CREATE TABLE tableInPlanner (
  49. id BIGINT,
  50. name STRING,
  51. value STRING,
  52. hum BIGINT
  53. ) WITH (TYPE="file");`,
  54. }
  55. types := map[string]ast.StreamType{
  56. "src1": ast.TypeStream,
  57. "src2": ast.TypeStream,
  58. "tableInPlanner": ast.TypeTable,
  59. }
  60. for name, sql := range streamSqls {
  61. s, err := json.Marshal(&xsql.StreamInfo{
  62. StreamType: types[name],
  63. Statement: sql,
  64. })
  65. if err != nil {
  66. t.Error(err)
  67. t.Fail()
  68. }
  69. err = store.Set(name, string(s))
  70. if err != nil {
  71. t.Error(err)
  72. t.Fail()
  73. }
  74. }
  75. streams := make(map[string]*ast.StreamStmt)
  76. for n := range streamSqls {
  77. streamStmt, err := xsql.GetDataSource(store, n)
  78. if err != nil {
  79. t.Errorf("fail to get stream %s, please check if stream is created", n)
  80. return
  81. }
  82. streams[n] = streamStmt
  83. }
  84. var (
  85. //boolTrue = true
  86. boolFalse = false
  87. )
  88. var tests = []struct {
  89. sql string
  90. p LogicalPlan
  91. err string
  92. }{
  93. { // 0
  94. sql: `SELECT myarray[temp] FROM src1`,
  95. p: ProjectPlan{
  96. baseLogicalPlan: baseLogicalPlan{
  97. children: []LogicalPlan{
  98. DataSourcePlan{
  99. baseLogicalPlan: baseLogicalPlan{},
  100. name: "src1",
  101. streamFields: []interface{}{
  102. &ast.StreamField{
  103. Name: "myarray",
  104. FieldType: &ast.ArrayType{Type: ast.STRINGS},
  105. },
  106. &ast.StreamField{
  107. Name: "temp",
  108. FieldType: &ast.BasicType{Type: ast.BIGINT},
  109. },
  110. },
  111. streamStmt: streams["src1"],
  112. metaFields: []string{},
  113. }.Init(),
  114. },
  115. },
  116. fields: []ast.Field{
  117. {
  118. Expr: &ast.BinaryExpr{
  119. OP: ast.SUBSET,
  120. LHS: &ast.FieldRef{
  121. StreamName: "src1",
  122. Name: "myarray",
  123. },
  124. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  125. StreamName: "src1",
  126. Name: "temp",
  127. }},
  128. },
  129. Name: "kuiper_field_0",
  130. AName: "",
  131. },
  132. },
  133. isAggregate: false,
  134. sendMeta: false,
  135. }.Init(),
  136. }, { // 1 optimize where to data source
  137. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  138. p: ProjectPlan{
  139. baseLogicalPlan: baseLogicalPlan{
  140. children: []LogicalPlan{
  141. WindowPlan{
  142. baseLogicalPlan: baseLogicalPlan{
  143. children: []LogicalPlan{
  144. FilterPlan{
  145. baseLogicalPlan: baseLogicalPlan{
  146. children: []LogicalPlan{
  147. DataSourcePlan{
  148. name: "src1",
  149. streamFields: []interface{}{
  150. &ast.StreamField{
  151. Name: "name",
  152. FieldType: &ast.BasicType{Type: ast.STRINGS},
  153. },
  154. &ast.StreamField{
  155. Name: "temp",
  156. FieldType: &ast.BasicType{Type: ast.BIGINT},
  157. },
  158. },
  159. streamStmt: streams["src1"],
  160. metaFields: []string{},
  161. }.Init(),
  162. },
  163. },
  164. condition: &ast.BinaryExpr{
  165. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  166. OP: ast.EQ,
  167. RHS: &ast.StringLiteral{Val: "v1"},
  168. },
  169. }.Init(),
  170. },
  171. },
  172. condition: nil,
  173. wtype: ast.TUMBLING_WINDOW,
  174. length: 10000,
  175. interval: 0,
  176. limit: 0,
  177. }.Init(),
  178. },
  179. },
  180. fields: []ast.Field{
  181. {
  182. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  183. Name: "temp",
  184. AName: ""},
  185. },
  186. isAggregate: false,
  187. sendMeta: false,
  188. }.Init(),
  189. }, { // 2 condition that cannot be optimized
  190. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  191. p: ProjectPlan{
  192. baseLogicalPlan: baseLogicalPlan{
  193. children: []LogicalPlan{
  194. JoinPlan{
  195. baseLogicalPlan: baseLogicalPlan{
  196. children: []LogicalPlan{
  197. WindowPlan{
  198. baseLogicalPlan: baseLogicalPlan{
  199. children: []LogicalPlan{
  200. DataSourcePlan{
  201. name: "src1",
  202. streamFields: []interface{}{
  203. &ast.StreamField{
  204. Name: "id1",
  205. FieldType: &ast.BasicType{Type: ast.BIGINT},
  206. },
  207. &ast.StreamField{
  208. Name: "temp",
  209. FieldType: &ast.BasicType{Type: ast.BIGINT},
  210. },
  211. },
  212. streamStmt: streams["src1"],
  213. metaFields: []string{},
  214. }.Init(),
  215. DataSourcePlan{
  216. name: "src2",
  217. streamFields: []interface{}{
  218. &ast.StreamField{
  219. Name: "hum",
  220. FieldType: &ast.BasicType{Type: ast.BIGINT},
  221. },
  222. &ast.StreamField{
  223. Name: "id2",
  224. FieldType: &ast.BasicType{Type: ast.BIGINT},
  225. },
  226. },
  227. streamStmt: streams["src2"],
  228. metaFields: []string{},
  229. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  230. }.Init(),
  231. },
  232. },
  233. condition: nil,
  234. wtype: ast.TUMBLING_WINDOW,
  235. length: 10000,
  236. interval: 0,
  237. limit: 0,
  238. }.Init(),
  239. },
  240. },
  241. from: &ast.Table{Name: "src1"},
  242. joins: ast.Joins{ast.Join{
  243. Name: "src2",
  244. JoinType: ast.INNER_JOIN,
  245. Expr: &ast.BinaryExpr{
  246. OP: ast.AND,
  247. LHS: &ast.BinaryExpr{
  248. LHS: &ast.BinaryExpr{
  249. OP: ast.GT,
  250. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  251. RHS: &ast.IntegerLiteral{Val: 20},
  252. },
  253. OP: ast.OR,
  254. RHS: &ast.BinaryExpr{
  255. OP: ast.GT,
  256. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  257. RHS: &ast.IntegerLiteral{Val: 60},
  258. },
  259. },
  260. RHS: &ast.BinaryExpr{
  261. OP: ast.EQ,
  262. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  263. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  264. },
  265. },
  266. }},
  267. }.Init(),
  268. },
  269. },
  270. fields: []ast.Field{
  271. {
  272. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  273. Name: "id1",
  274. AName: ""},
  275. },
  276. isAggregate: false,
  277. sendMeta: false,
  278. }.Init(),
  279. }, { // 3 optimize window filter
  280. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  281. p: ProjectPlan{
  282. baseLogicalPlan: baseLogicalPlan{
  283. children: []LogicalPlan{
  284. WindowPlan{
  285. baseLogicalPlan: baseLogicalPlan{
  286. children: []LogicalPlan{
  287. FilterPlan{
  288. baseLogicalPlan: baseLogicalPlan{
  289. children: []LogicalPlan{
  290. DataSourcePlan{
  291. name: "src1",
  292. streamFields: []interface{}{
  293. &ast.StreamField{
  294. Name: "id1",
  295. FieldType: &ast.BasicType{Type: ast.BIGINT},
  296. },
  297. &ast.StreamField{
  298. Name: "name",
  299. FieldType: &ast.BasicType{Type: ast.STRINGS},
  300. },
  301. &ast.StreamField{
  302. Name: "temp",
  303. FieldType: &ast.BasicType{Type: ast.BIGINT},
  304. },
  305. },
  306. streamStmt: streams["src1"],
  307. metaFields: []string{},
  308. }.Init(),
  309. },
  310. },
  311. condition: &ast.BinaryExpr{
  312. OP: ast.AND,
  313. LHS: &ast.BinaryExpr{
  314. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  315. OP: ast.EQ,
  316. RHS: &ast.StringLiteral{Val: "v1"},
  317. },
  318. RHS: &ast.BinaryExpr{
  319. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  320. OP: ast.GT,
  321. RHS: &ast.IntegerLiteral{Val: 2},
  322. },
  323. },
  324. }.Init(),
  325. },
  326. },
  327. condition: nil,
  328. wtype: ast.TUMBLING_WINDOW,
  329. length: 10000,
  330. interval: 0,
  331. limit: 0,
  332. }.Init(),
  333. },
  334. },
  335. fields: []ast.Field{
  336. {
  337. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  338. Name: "id1",
  339. AName: ""},
  340. },
  341. isAggregate: false,
  342. sendMeta: false,
  343. }.Init(),
  344. }, { // 4. do not optimize count window
  345. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  346. p: ProjectPlan{
  347. baseLogicalPlan: baseLogicalPlan{
  348. children: []LogicalPlan{
  349. HavingPlan{
  350. baseLogicalPlan: baseLogicalPlan{
  351. children: []LogicalPlan{
  352. FilterPlan{
  353. baseLogicalPlan: baseLogicalPlan{
  354. children: []LogicalPlan{
  355. WindowPlan{
  356. baseLogicalPlan: baseLogicalPlan{
  357. children: []LogicalPlan{
  358. DataSourcePlan{
  359. name: "src1",
  360. isWildCard: true,
  361. streamFields: []interface{}{
  362. &ast.StreamField{
  363. Name: "id1",
  364. FieldType: &ast.BasicType{Type: ast.BIGINT},
  365. },
  366. &ast.StreamField{
  367. Name: "temp",
  368. FieldType: &ast.BasicType{Type: ast.BIGINT},
  369. },
  370. &ast.StreamField{
  371. Name: "name",
  372. FieldType: &ast.BasicType{Type: ast.STRINGS},
  373. },
  374. &ast.StreamField{
  375. Name: "myarray",
  376. FieldType: &ast.ArrayType{Type: ast.STRINGS},
  377. },
  378. },
  379. streamStmt: streams["src1"],
  380. metaFields: []string{},
  381. }.Init(),
  382. },
  383. },
  384. condition: nil,
  385. wtype: ast.COUNT_WINDOW,
  386. length: 5,
  387. interval: 1,
  388. limit: 0,
  389. }.Init(),
  390. },
  391. },
  392. condition: &ast.BinaryExpr{
  393. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  394. OP: ast.GT,
  395. RHS: &ast.IntegerLiteral{Val: 20},
  396. },
  397. }.Init(),
  398. },
  399. },
  400. condition: &ast.BinaryExpr{
  401. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  402. Token: ast.ASTERISK,
  403. }}, FuncType: ast.FuncTypeAgg},
  404. OP: ast.GT,
  405. RHS: &ast.IntegerLiteral{Val: 2},
  406. },
  407. }.Init(),
  408. },
  409. },
  410. fields: []ast.Field{
  411. {
  412. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  413. Name: "",
  414. AName: ""},
  415. },
  416. isAggregate: false,
  417. sendMeta: false,
  418. }.Init(),
  419. }, { // 5. optimize join on
  420. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  421. p: ProjectPlan{
  422. baseLogicalPlan: baseLogicalPlan{
  423. children: []LogicalPlan{
  424. JoinPlan{
  425. baseLogicalPlan: baseLogicalPlan{
  426. children: []LogicalPlan{
  427. WindowPlan{
  428. baseLogicalPlan: baseLogicalPlan{
  429. children: []LogicalPlan{
  430. FilterPlan{
  431. baseLogicalPlan: baseLogicalPlan{
  432. children: []LogicalPlan{
  433. DataSourcePlan{
  434. name: "src1",
  435. streamFields: []interface{}{
  436. &ast.StreamField{
  437. Name: "id1",
  438. FieldType: &ast.BasicType{Type: ast.BIGINT},
  439. },
  440. &ast.StreamField{
  441. Name: "temp",
  442. FieldType: &ast.BasicType{Type: ast.BIGINT},
  443. },
  444. },
  445. streamStmt: streams["src1"],
  446. metaFields: []string{},
  447. }.Init(),
  448. },
  449. },
  450. condition: &ast.BinaryExpr{
  451. RHS: &ast.BinaryExpr{
  452. OP: ast.GT,
  453. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  454. RHS: &ast.IntegerLiteral{Val: 20},
  455. },
  456. OP: ast.AND,
  457. LHS: &ast.BinaryExpr{
  458. OP: ast.GT,
  459. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  460. RHS: &ast.IntegerLiteral{Val: 111},
  461. },
  462. },
  463. }.Init(),
  464. FilterPlan{
  465. baseLogicalPlan: baseLogicalPlan{
  466. children: []LogicalPlan{
  467. DataSourcePlan{
  468. name: "src2",
  469. streamFields: []interface{}{
  470. &ast.StreamField{
  471. Name: "hum",
  472. FieldType: &ast.BasicType{Type: ast.BIGINT},
  473. },
  474. &ast.StreamField{
  475. Name: "id2",
  476. FieldType: &ast.BasicType{Type: ast.BIGINT},
  477. },
  478. },
  479. streamStmt: streams["src2"],
  480. metaFields: []string{},
  481. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  482. }.Init(),
  483. },
  484. },
  485. condition: &ast.BinaryExpr{
  486. OP: ast.LT,
  487. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  488. RHS: &ast.IntegerLiteral{Val: 60},
  489. },
  490. }.Init(),
  491. },
  492. },
  493. condition: nil,
  494. wtype: ast.TUMBLING_WINDOW,
  495. length: 10000,
  496. interval: 0,
  497. limit: 0,
  498. }.Init(),
  499. },
  500. },
  501. from: &ast.Table{
  502. Name: "src1",
  503. },
  504. joins: []ast.Join{
  505. {
  506. Name: "src2",
  507. Alias: "",
  508. JoinType: ast.INNER_JOIN,
  509. Expr: &ast.BinaryExpr{
  510. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  511. OP: ast.EQ,
  512. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  513. },
  514. },
  515. },
  516. }.Init(),
  517. },
  518. },
  519. fields: []ast.Field{
  520. {
  521. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  522. Name: "id1",
  523. AName: ""},
  524. },
  525. isAggregate: false,
  526. sendMeta: false,
  527. }.Init(),
  528. }, { // 6. optimize outter join on
  529. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  530. p: ProjectPlan{
  531. baseLogicalPlan: baseLogicalPlan{
  532. children: []LogicalPlan{
  533. JoinPlan{
  534. baseLogicalPlan: baseLogicalPlan{
  535. children: []LogicalPlan{
  536. WindowPlan{
  537. baseLogicalPlan: baseLogicalPlan{
  538. children: []LogicalPlan{
  539. FilterPlan{
  540. baseLogicalPlan: baseLogicalPlan{
  541. children: []LogicalPlan{
  542. DataSourcePlan{
  543. name: "src1",
  544. streamFields: []interface{}{
  545. &ast.StreamField{
  546. Name: "id1",
  547. FieldType: &ast.BasicType{Type: ast.BIGINT},
  548. },
  549. &ast.StreamField{
  550. Name: "temp",
  551. FieldType: &ast.BasicType{Type: ast.BIGINT},
  552. },
  553. },
  554. streamStmt: streams["src1"],
  555. metaFields: []string{},
  556. }.Init(),
  557. },
  558. },
  559. condition: &ast.BinaryExpr{
  560. OP: ast.GT,
  561. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  562. RHS: &ast.IntegerLiteral{Val: 111},
  563. },
  564. }.Init(),
  565. DataSourcePlan{
  566. name: "src2",
  567. streamFields: []interface{}{
  568. &ast.StreamField{
  569. Name: "hum",
  570. FieldType: &ast.BasicType{Type: ast.BIGINT},
  571. },
  572. &ast.StreamField{
  573. Name: "id2",
  574. FieldType: &ast.BasicType{Type: ast.BIGINT},
  575. },
  576. },
  577. streamStmt: streams["src2"],
  578. metaFields: []string{},
  579. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  580. }.Init(),
  581. },
  582. },
  583. condition: nil,
  584. wtype: ast.TUMBLING_WINDOW,
  585. length: 10000,
  586. interval: 0,
  587. limit: 0,
  588. }.Init(),
  589. },
  590. },
  591. from: &ast.Table{
  592. Name: "src1",
  593. },
  594. joins: []ast.Join{
  595. {
  596. Name: "src2",
  597. Alias: "",
  598. JoinType: ast.FULL_JOIN,
  599. Expr: &ast.BinaryExpr{
  600. OP: ast.AND,
  601. LHS: &ast.BinaryExpr{
  602. OP: ast.AND,
  603. LHS: &ast.BinaryExpr{
  604. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  605. OP: ast.EQ,
  606. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  607. },
  608. RHS: &ast.BinaryExpr{
  609. OP: ast.GT,
  610. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  611. RHS: &ast.IntegerLiteral{Val: 20},
  612. },
  613. },
  614. RHS: &ast.BinaryExpr{
  615. OP: ast.LT,
  616. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  617. RHS: &ast.IntegerLiteral{Val: 60},
  618. },
  619. },
  620. },
  621. },
  622. }.Init(),
  623. },
  624. },
  625. fields: []ast.Field{
  626. {
  627. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  628. Name: "id1",
  629. AName: ""},
  630. },
  631. isAggregate: false,
  632. sendMeta: false,
  633. }.Init(),
  634. }, { // 7 window error for table
  635. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  636. p: nil,
  637. err: "cannot run window for TABLE sources",
  638. }, { // 8 join table without window
  639. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  640. p: ProjectPlan{
  641. baseLogicalPlan: baseLogicalPlan{
  642. children: []LogicalPlan{
  643. JoinPlan{
  644. baseLogicalPlan: baseLogicalPlan{
  645. children: []LogicalPlan{
  646. JoinAlignPlan{
  647. baseLogicalPlan: baseLogicalPlan{
  648. children: []LogicalPlan{
  649. FilterPlan{
  650. baseLogicalPlan: baseLogicalPlan{
  651. children: []LogicalPlan{
  652. DataSourcePlan{
  653. name: "src1",
  654. streamFields: []interface{}{
  655. &ast.StreamField{
  656. Name: "id1",
  657. FieldType: &ast.BasicType{Type: ast.BIGINT},
  658. },
  659. &ast.StreamField{
  660. Name: "temp",
  661. FieldType: &ast.BasicType{Type: ast.BIGINT},
  662. },
  663. },
  664. streamStmt: streams["src1"],
  665. metaFields: []string{},
  666. }.Init(),
  667. },
  668. },
  669. condition: &ast.BinaryExpr{
  670. RHS: &ast.BinaryExpr{
  671. OP: ast.GT,
  672. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  673. RHS: &ast.IntegerLiteral{Val: 20},
  674. },
  675. OP: ast.AND,
  676. LHS: &ast.BinaryExpr{
  677. OP: ast.GT,
  678. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  679. RHS: &ast.IntegerLiteral{Val: 111},
  680. },
  681. },
  682. }.Init(),
  683. DataSourcePlan{
  684. name: "tableInPlanner",
  685. streamFields: []interface{}{
  686. &ast.StreamField{
  687. Name: "hum",
  688. FieldType: &ast.BasicType{Type: ast.BIGINT},
  689. },
  690. &ast.StreamField{
  691. Name: "id",
  692. FieldType: &ast.BasicType{Type: ast.BIGINT},
  693. },
  694. },
  695. streamStmt: streams["tableInPlanner"],
  696. metaFields: []string{},
  697. }.Init(),
  698. },
  699. },
  700. Emitters: []string{"tableInPlanner"},
  701. }.Init(),
  702. },
  703. },
  704. from: &ast.Table{
  705. Name: "src1",
  706. },
  707. joins: []ast.Join{
  708. {
  709. Name: "tableInPlanner",
  710. Alias: "",
  711. JoinType: ast.INNER_JOIN,
  712. Expr: &ast.BinaryExpr{
  713. OP: ast.AND,
  714. LHS: &ast.BinaryExpr{
  715. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  716. OP: ast.EQ,
  717. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  718. },
  719. RHS: &ast.BinaryExpr{
  720. OP: ast.LT,
  721. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  722. RHS: &ast.IntegerLiteral{Val: 60},
  723. },
  724. },
  725. },
  726. },
  727. }.Init(),
  728. },
  729. },
  730. fields: []ast.Field{
  731. {
  732. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  733. Name: "id1",
  734. AName: ""},
  735. },
  736. isAggregate: false,
  737. sendMeta: false,
  738. }.Init(),
  739. }, { // 9 join table with window
  740. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  741. p: ProjectPlan{
  742. baseLogicalPlan: baseLogicalPlan{
  743. children: []LogicalPlan{
  744. JoinPlan{
  745. baseLogicalPlan: baseLogicalPlan{
  746. children: []LogicalPlan{
  747. JoinAlignPlan{
  748. baseLogicalPlan: baseLogicalPlan{
  749. children: []LogicalPlan{
  750. WindowPlan{
  751. baseLogicalPlan: baseLogicalPlan{
  752. children: []LogicalPlan{
  753. DataSourcePlan{
  754. name: "src1",
  755. streamFields: []interface{}{
  756. &ast.StreamField{
  757. Name: "id1",
  758. FieldType: &ast.BasicType{Type: ast.BIGINT},
  759. },
  760. &ast.StreamField{
  761. Name: "temp",
  762. FieldType: &ast.BasicType{Type: ast.BIGINT},
  763. },
  764. },
  765. streamStmt: streams["src1"],
  766. metaFields: []string{},
  767. }.Init(),
  768. },
  769. },
  770. condition: nil,
  771. wtype: ast.TUMBLING_WINDOW,
  772. length: 10000,
  773. interval: 0,
  774. limit: 0,
  775. }.Init(),
  776. DataSourcePlan{
  777. name: "tableInPlanner",
  778. streamFields: []interface{}{
  779. &ast.StreamField{
  780. Name: "hum",
  781. FieldType: &ast.BasicType{Type: ast.BIGINT},
  782. },
  783. &ast.StreamField{
  784. Name: "id",
  785. FieldType: &ast.BasicType{Type: ast.BIGINT},
  786. },
  787. },
  788. streamStmt: streams["tableInPlanner"],
  789. metaFields: []string{},
  790. }.Init(),
  791. },
  792. },
  793. Emitters: []string{"tableInPlanner"},
  794. }.Init(),
  795. },
  796. },
  797. from: &ast.Table{
  798. Name: "src1",
  799. },
  800. joins: []ast.Join{
  801. {
  802. Name: "tableInPlanner",
  803. Alias: "",
  804. JoinType: ast.INNER_JOIN,
  805. Expr: &ast.BinaryExpr{
  806. OP: ast.AND,
  807. LHS: &ast.BinaryExpr{
  808. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  809. OP: ast.EQ,
  810. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  811. },
  812. RHS: &ast.BinaryExpr{
  813. RHS: &ast.BinaryExpr{
  814. OP: ast.AND,
  815. LHS: &ast.BinaryExpr{
  816. OP: ast.GT,
  817. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  818. RHS: &ast.IntegerLiteral{Val: 20},
  819. },
  820. RHS: &ast.BinaryExpr{
  821. OP: ast.LT,
  822. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  823. RHS: &ast.IntegerLiteral{Val: 60},
  824. },
  825. },
  826. OP: ast.AND,
  827. LHS: &ast.BinaryExpr{
  828. OP: ast.GT,
  829. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  830. RHS: &ast.IntegerLiteral{Val: 111},
  831. },
  832. },
  833. },
  834. },
  835. },
  836. }.Init(),
  837. },
  838. },
  839. fields: []ast.Field{
  840. {
  841. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  842. Name: "id1",
  843. AName: ""},
  844. },
  845. isAggregate: false,
  846. sendMeta: false,
  847. }.Init(),
  848. }, { // 10 meta
  849. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  850. p: ProjectPlan{
  851. baseLogicalPlan: baseLogicalPlan{
  852. children: []LogicalPlan{
  853. FilterPlan{
  854. baseLogicalPlan: baseLogicalPlan{
  855. children: []LogicalPlan{
  856. DataSourcePlan{
  857. name: "src1",
  858. streamFields: []interface{}{
  859. &ast.StreamField{
  860. Name: "temp",
  861. FieldType: &ast.BasicType{Type: ast.BIGINT},
  862. },
  863. },
  864. streamStmt: streams["src1"],
  865. metaFields: []string{"Humidity", "device", "id"},
  866. }.Init(),
  867. },
  868. },
  869. condition: &ast.BinaryExpr{
  870. LHS: &ast.Call{
  871. Name: "meta",
  872. FuncId: 2,
  873. Args: []ast.Expr{&ast.MetaRef{
  874. Name: "device",
  875. StreamName: ast.DefaultStream,
  876. }},
  877. },
  878. OP: ast.EQ,
  879. RHS: &ast.StringLiteral{
  880. Val: "demo2",
  881. },
  882. },
  883. }.Init(),
  884. },
  885. },
  886. fields: []ast.Field{
  887. {
  888. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  889. Name: "temp",
  890. AName: "",
  891. }, {
  892. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  893. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  894. Name: "id",
  895. StreamName: ast.DefaultStream,
  896. }}},
  897. []ast.StreamName{},
  898. nil,
  899. )},
  900. Name: "meta",
  901. AName: "eid",
  902. }, {
  903. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  904. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  905. &ast.BinaryExpr{
  906. OP: ast.ARROW,
  907. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  908. RHS: &ast.JsonFieldRef{Name: "Device"},
  909. },
  910. }},
  911. []ast.StreamName{},
  912. nil,
  913. )},
  914. Name: "meta",
  915. AName: "hdevice",
  916. },
  917. },
  918. isAggregate: false,
  919. sendMeta: false,
  920. }.Init(),
  921. }, { // 11 join with same name field and aliased
  922. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  923. p: ProjectPlan{
  924. baseLogicalPlan: baseLogicalPlan{
  925. children: []LogicalPlan{
  926. JoinPlan{
  927. baseLogicalPlan: baseLogicalPlan{
  928. children: []LogicalPlan{
  929. JoinAlignPlan{
  930. baseLogicalPlan: baseLogicalPlan{
  931. children: []LogicalPlan{
  932. DataSourcePlan{
  933. name: "src2",
  934. streamFields: []interface{}{
  935. &ast.StreamField{
  936. Name: "hum",
  937. FieldType: &ast.BasicType{Type: ast.BIGINT},
  938. },
  939. &ast.StreamField{
  940. Name: "id2",
  941. FieldType: &ast.BasicType{Type: ast.BIGINT},
  942. },
  943. },
  944. streamStmt: streams["src2"],
  945. metaFields: []string{},
  946. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  947. }.Init(),
  948. DataSourcePlan{
  949. name: "tableInPlanner",
  950. streamFields: []interface{}{
  951. &ast.StreamField{
  952. Name: "hum",
  953. FieldType: &ast.BasicType{Type: ast.BIGINT},
  954. },
  955. &ast.StreamField{
  956. Name: "id",
  957. FieldType: &ast.BasicType{Type: ast.BIGINT},
  958. },
  959. },
  960. streamStmt: streams["tableInPlanner"],
  961. metaFields: []string{},
  962. }.Init(),
  963. },
  964. },
  965. Emitters: []string{"tableInPlanner"},
  966. }.Init(),
  967. },
  968. },
  969. from: &ast.Table{
  970. Name: "src2",
  971. },
  972. joins: []ast.Join{
  973. {
  974. Name: "tableInPlanner",
  975. Alias: "",
  976. JoinType: ast.INNER_JOIN,
  977. Expr: &ast.BinaryExpr{
  978. RHS: &ast.BinaryExpr{
  979. OP: ast.EQ,
  980. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  981. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  982. },
  983. OP: ast.AND,
  984. LHS: &ast.BinaryExpr{
  985. OP: ast.GT,
  986. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  987. &ast.FieldRef{
  988. Name: "hum",
  989. StreamName: "src2",
  990. },
  991. []ast.StreamName{"src2"},
  992. &boolFalse,
  993. )},
  994. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  995. &ast.FieldRef{
  996. Name: "hum",
  997. StreamName: "tableInPlanner",
  998. },
  999. []ast.StreamName{"tableInPlanner"},
  1000. &boolFalse,
  1001. )},
  1002. },
  1003. },
  1004. },
  1005. },
  1006. }.Init(),
  1007. },
  1008. },
  1009. fields: []ast.Field{
  1010. {
  1011. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1012. &ast.FieldRef{
  1013. Name: "hum",
  1014. StreamName: "src2",
  1015. },
  1016. []ast.StreamName{"src2"},
  1017. &boolFalse,
  1018. )},
  1019. Name: "hum",
  1020. AName: "hum1",
  1021. }, {
  1022. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1023. &ast.FieldRef{
  1024. Name: "hum",
  1025. StreamName: "tableInPlanner",
  1026. },
  1027. []ast.StreamName{"tableInPlanner"},
  1028. &boolFalse,
  1029. )},
  1030. Name: "hum",
  1031. AName: "hum2",
  1032. },
  1033. },
  1034. isAggregate: false,
  1035. sendMeta: false,
  1036. }.Init(),
  1037. }, { // 12 meta with more fields
  1038. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1039. p: ProjectPlan{
  1040. baseLogicalPlan: baseLogicalPlan{
  1041. children: []LogicalPlan{
  1042. FilterPlan{
  1043. baseLogicalPlan: baseLogicalPlan{
  1044. children: []LogicalPlan{
  1045. DataSourcePlan{
  1046. name: "src1",
  1047. streamFields: []interface{}{
  1048. &ast.StreamField{
  1049. Name: "temp",
  1050. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1051. },
  1052. },
  1053. streamStmt: streams["src1"],
  1054. metaFields: []string{},
  1055. allMeta: true,
  1056. }.Init(),
  1057. },
  1058. },
  1059. condition: &ast.BinaryExpr{
  1060. LHS: &ast.Call{
  1061. Name: "meta",
  1062. FuncId: 1,
  1063. Args: []ast.Expr{&ast.MetaRef{
  1064. Name: "device",
  1065. StreamName: ast.DefaultStream,
  1066. }},
  1067. },
  1068. OP: ast.EQ,
  1069. RHS: &ast.StringLiteral{
  1070. Val: "demo2",
  1071. },
  1072. },
  1073. }.Init(),
  1074. },
  1075. },
  1076. fields: []ast.Field{
  1077. {
  1078. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1079. Name: "temp",
  1080. AName: "",
  1081. }, {
  1082. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1083. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1084. Name: "*",
  1085. StreamName: ast.DefaultStream,
  1086. }}},
  1087. []ast.StreamName{},
  1088. nil,
  1089. )},
  1090. Name: "meta",
  1091. AName: "m",
  1092. },
  1093. },
  1094. isAggregate: false,
  1095. sendMeta: false,
  1096. }.Init(),
  1097. }, { // 13 analytic function plan
  1098. sql: `SELECT lag(name), id1 FROM src1 WHERE lag(temp) > temp`,
  1099. p: ProjectPlan{
  1100. baseLogicalPlan: baseLogicalPlan{
  1101. children: []LogicalPlan{
  1102. FilterPlan{
  1103. baseLogicalPlan: baseLogicalPlan{
  1104. children: []LogicalPlan{
  1105. AnalyticFuncsPlan{
  1106. baseLogicalPlan: baseLogicalPlan{
  1107. children: []LogicalPlan{
  1108. DataSourcePlan{
  1109. name: "src1",
  1110. streamFields: []interface{}{
  1111. &ast.StreamField{
  1112. Name: "id1",
  1113. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1114. },
  1115. &ast.StreamField{
  1116. Name: "name",
  1117. FieldType: &ast.BasicType{Type: ast.STRINGS},
  1118. },
  1119. &ast.StreamField{
  1120. Name: "temp",
  1121. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1122. },
  1123. },
  1124. streamStmt: streams["src1"],
  1125. metaFields: []string{},
  1126. }.Init(),
  1127. },
  1128. },
  1129. funcs: []*ast.Call{
  1130. {
  1131. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1132. }, {
  1133. Name: "lag",
  1134. FuncId: 1,
  1135. CachedField: "$$a_lag_1",
  1136. Args: []ast.Expr{&ast.FieldRef{
  1137. Name: "temp",
  1138. StreamName: "src1",
  1139. }},
  1140. },
  1141. },
  1142. }.Init(),
  1143. },
  1144. },
  1145. condition: &ast.BinaryExpr{
  1146. LHS: &ast.Call{
  1147. Name: "lag",
  1148. FuncId: 1,
  1149. Args: []ast.Expr{&ast.FieldRef{
  1150. Name: "temp",
  1151. StreamName: "src1",
  1152. }},
  1153. CachedField: "$$a_lag_1",
  1154. Cached: true,
  1155. },
  1156. OP: ast.GT,
  1157. RHS: &ast.FieldRef{
  1158. Name: "temp",
  1159. StreamName: "src1",
  1160. },
  1161. },
  1162. }.Init(),
  1163. },
  1164. },
  1165. fields: []ast.Field{
  1166. {
  1167. Expr: &ast.Call{Name: "lag", FuncId: 0, FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}, CachedField: "$$a_lag_0", Cached: true},
  1168. Name: "lag",
  1169. }, {
  1170. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1171. Name: "id1",
  1172. },
  1173. },
  1174. isAggregate: false,
  1175. sendMeta: false,
  1176. }.Init(),
  1177. },
  1178. }
  1179. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1180. for i, tt := range tests {
  1181. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1182. if err != nil {
  1183. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1184. continue
  1185. }
  1186. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1187. IsEventTime: false,
  1188. LateTol: 0,
  1189. Concurrency: 0,
  1190. BufferLength: 0,
  1191. SendMetaToSink: false,
  1192. Qos: 0,
  1193. CheckpointInterval: 0,
  1194. SendError: true,
  1195. }, store)
  1196. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1197. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1198. } else if !reflect.DeepEqual(tt.p, p) {
  1199. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1200. }
  1201. }
  1202. }
  1203. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1204. err, store := store.GetKV("stream")
  1205. if err != nil {
  1206. t.Error(err)
  1207. return
  1208. }
  1209. streamSqls := map[string]string{
  1210. "src1": `CREATE STREAM src1 (
  1211. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1212. "src2": `CREATE STREAM src2 (
  1213. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1214. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1215. id BIGINT,
  1216. name STRING,
  1217. value STRING,
  1218. hum BIGINT
  1219. ) WITH (TYPE="file");`,
  1220. }
  1221. types := map[string]ast.StreamType{
  1222. "src1": ast.TypeStream,
  1223. "src2": ast.TypeStream,
  1224. "tableInPlanner": ast.TypeTable,
  1225. }
  1226. for name, sql := range streamSqls {
  1227. s, err := json.Marshal(&xsql.StreamInfo{
  1228. StreamType: types[name],
  1229. Statement: sql,
  1230. })
  1231. if err != nil {
  1232. t.Error(err)
  1233. t.Fail()
  1234. }
  1235. err = store.Set(name, string(s))
  1236. if err != nil {
  1237. t.Error(err)
  1238. t.Fail()
  1239. }
  1240. }
  1241. streams := make(map[string]*ast.StreamStmt)
  1242. for n := range streamSqls {
  1243. streamStmt, err := xsql.GetDataSource(store, n)
  1244. if err != nil {
  1245. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1246. return
  1247. }
  1248. streams[n] = streamStmt
  1249. }
  1250. var (
  1251. //boolTrue = true
  1252. boolFalse = false
  1253. )
  1254. var tests = []struct {
  1255. sql string
  1256. p LogicalPlan
  1257. err string
  1258. }{
  1259. { // 0
  1260. sql: `SELECT name FROM src1`,
  1261. p: ProjectPlan{
  1262. baseLogicalPlan: baseLogicalPlan{
  1263. children: []LogicalPlan{
  1264. DataSourcePlan{
  1265. baseLogicalPlan: baseLogicalPlan{},
  1266. name: "src1",
  1267. streamFields: []interface{}{
  1268. "name",
  1269. },
  1270. streamStmt: streams["src1"],
  1271. metaFields: []string{},
  1272. }.Init(),
  1273. },
  1274. },
  1275. fields: []ast.Field{
  1276. {
  1277. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1278. Name: "name",
  1279. AName: "",
  1280. },
  1281. },
  1282. isAggregate: false,
  1283. sendMeta: false,
  1284. }.Init(),
  1285. }, { // 1 optimize where to data source
  1286. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1287. p: ProjectPlan{
  1288. baseLogicalPlan: baseLogicalPlan{
  1289. children: []LogicalPlan{
  1290. WindowPlan{
  1291. baseLogicalPlan: baseLogicalPlan{
  1292. children: []LogicalPlan{
  1293. FilterPlan{
  1294. baseLogicalPlan: baseLogicalPlan{
  1295. children: []LogicalPlan{
  1296. DataSourcePlan{
  1297. name: "src1",
  1298. streamFields: []interface{}{
  1299. "name", "temp",
  1300. },
  1301. streamStmt: streams["src1"],
  1302. metaFields: []string{},
  1303. }.Init(),
  1304. },
  1305. },
  1306. condition: &ast.BinaryExpr{
  1307. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1308. OP: ast.EQ,
  1309. RHS: &ast.StringLiteral{Val: "v1"},
  1310. },
  1311. }.Init(),
  1312. },
  1313. },
  1314. condition: nil,
  1315. wtype: ast.TUMBLING_WINDOW,
  1316. length: 10000,
  1317. interval: 0,
  1318. limit: 0,
  1319. }.Init(),
  1320. },
  1321. },
  1322. fields: []ast.Field{
  1323. {
  1324. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1325. Name: "temp",
  1326. AName: ""},
  1327. },
  1328. isAggregate: false,
  1329. sendMeta: false,
  1330. }.Init(),
  1331. }, { // 2 condition that cannot be optimized
  1332. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1333. p: ProjectPlan{
  1334. baseLogicalPlan: baseLogicalPlan{
  1335. children: []LogicalPlan{
  1336. JoinPlan{
  1337. baseLogicalPlan: baseLogicalPlan{
  1338. children: []LogicalPlan{
  1339. WindowPlan{
  1340. baseLogicalPlan: baseLogicalPlan{
  1341. children: []LogicalPlan{
  1342. DataSourcePlan{
  1343. name: "src1",
  1344. streamFields: []interface{}{
  1345. "id1", "temp",
  1346. },
  1347. streamStmt: streams["src1"],
  1348. metaFields: []string{},
  1349. }.Init(),
  1350. DataSourcePlan{
  1351. name: "src2",
  1352. streamFields: []interface{}{ // can't determine where is id1 belonged to
  1353. "hum", "id1", "id2",
  1354. },
  1355. streamStmt: streams["src2"],
  1356. metaFields: []string{},
  1357. }.Init(),
  1358. },
  1359. },
  1360. condition: nil,
  1361. wtype: ast.TUMBLING_WINDOW,
  1362. length: 10000,
  1363. interval: 0,
  1364. limit: 0,
  1365. }.Init(),
  1366. },
  1367. },
  1368. from: &ast.Table{Name: "src1"},
  1369. joins: ast.Joins{ast.Join{
  1370. Name: "src2",
  1371. JoinType: ast.INNER_JOIN,
  1372. Expr: &ast.BinaryExpr{
  1373. OP: ast.AND,
  1374. LHS: &ast.BinaryExpr{
  1375. LHS: &ast.BinaryExpr{
  1376. OP: ast.GT,
  1377. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1378. RHS: &ast.IntegerLiteral{Val: 20},
  1379. },
  1380. OP: ast.OR,
  1381. RHS: &ast.BinaryExpr{
  1382. OP: ast.GT,
  1383. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1384. RHS: &ast.IntegerLiteral{Val: 60},
  1385. },
  1386. },
  1387. RHS: &ast.BinaryExpr{
  1388. OP: ast.EQ,
  1389. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1390. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1391. },
  1392. },
  1393. }},
  1394. }.Init(),
  1395. },
  1396. },
  1397. fields: []ast.Field{
  1398. {
  1399. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1400. Name: "id1",
  1401. AName: ""},
  1402. },
  1403. isAggregate: false,
  1404. sendMeta: false,
  1405. }.Init(),
  1406. }, { // 3 optimize window filter
  1407. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1408. p: ProjectPlan{
  1409. baseLogicalPlan: baseLogicalPlan{
  1410. children: []LogicalPlan{
  1411. WindowPlan{
  1412. baseLogicalPlan: baseLogicalPlan{
  1413. children: []LogicalPlan{
  1414. FilterPlan{
  1415. baseLogicalPlan: baseLogicalPlan{
  1416. children: []LogicalPlan{
  1417. DataSourcePlan{
  1418. name: "src1",
  1419. streamFields: []interface{}{
  1420. "id1", "name", "temp",
  1421. },
  1422. streamStmt: streams["src1"],
  1423. metaFields: []string{},
  1424. }.Init(),
  1425. },
  1426. },
  1427. condition: &ast.BinaryExpr{
  1428. OP: ast.AND,
  1429. LHS: &ast.BinaryExpr{
  1430. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1431. OP: ast.EQ,
  1432. RHS: &ast.StringLiteral{Val: "v1"},
  1433. },
  1434. RHS: &ast.BinaryExpr{
  1435. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1436. OP: ast.GT,
  1437. RHS: &ast.IntegerLiteral{Val: 2},
  1438. },
  1439. },
  1440. }.Init(),
  1441. },
  1442. },
  1443. condition: nil,
  1444. wtype: ast.TUMBLING_WINDOW,
  1445. length: 10000,
  1446. interval: 0,
  1447. limit: 0,
  1448. }.Init(),
  1449. },
  1450. },
  1451. fields: []ast.Field{
  1452. {
  1453. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1454. Name: "id1",
  1455. AName: ""},
  1456. },
  1457. isAggregate: false,
  1458. sendMeta: false,
  1459. }.Init(),
  1460. }, { // 4. do not optimize count window
  1461. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1462. p: ProjectPlan{
  1463. baseLogicalPlan: baseLogicalPlan{
  1464. children: []LogicalPlan{
  1465. HavingPlan{
  1466. baseLogicalPlan: baseLogicalPlan{
  1467. children: []LogicalPlan{
  1468. FilterPlan{
  1469. baseLogicalPlan: baseLogicalPlan{
  1470. children: []LogicalPlan{
  1471. WindowPlan{
  1472. baseLogicalPlan: baseLogicalPlan{
  1473. children: []LogicalPlan{
  1474. DataSourcePlan{
  1475. name: "src1",
  1476. isWildCard: true,
  1477. streamFields: nil,
  1478. streamStmt: streams["src1"],
  1479. metaFields: []string{},
  1480. }.Init(),
  1481. },
  1482. },
  1483. condition: nil,
  1484. wtype: ast.COUNT_WINDOW,
  1485. length: 5,
  1486. interval: 1,
  1487. limit: 0,
  1488. }.Init(),
  1489. },
  1490. },
  1491. condition: &ast.BinaryExpr{
  1492. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1493. OP: ast.GT,
  1494. RHS: &ast.IntegerLiteral{Val: 20},
  1495. },
  1496. }.Init(),
  1497. },
  1498. },
  1499. condition: &ast.BinaryExpr{
  1500. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1501. Token: ast.ASTERISK,
  1502. }}, FuncType: ast.FuncTypeAgg},
  1503. OP: ast.GT,
  1504. RHS: &ast.IntegerLiteral{Val: 2},
  1505. },
  1506. }.Init(),
  1507. },
  1508. },
  1509. fields: []ast.Field{
  1510. {
  1511. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1512. Name: "",
  1513. AName: ""},
  1514. },
  1515. isAggregate: false,
  1516. sendMeta: false,
  1517. }.Init(),
  1518. }, { // 5. optimize join on
  1519. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1520. p: ProjectPlan{
  1521. baseLogicalPlan: baseLogicalPlan{
  1522. children: []LogicalPlan{
  1523. JoinPlan{
  1524. baseLogicalPlan: baseLogicalPlan{
  1525. children: []LogicalPlan{
  1526. WindowPlan{
  1527. baseLogicalPlan: baseLogicalPlan{
  1528. children: []LogicalPlan{
  1529. FilterPlan{
  1530. baseLogicalPlan: baseLogicalPlan{
  1531. children: []LogicalPlan{
  1532. DataSourcePlan{
  1533. name: "src1",
  1534. streamFields: []interface{}{
  1535. "id1", "temp",
  1536. },
  1537. streamStmt: streams["src1"],
  1538. metaFields: []string{},
  1539. }.Init(),
  1540. },
  1541. },
  1542. condition: &ast.BinaryExpr{
  1543. RHS: &ast.BinaryExpr{
  1544. OP: ast.GT,
  1545. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1546. RHS: &ast.IntegerLiteral{Val: 20},
  1547. },
  1548. OP: ast.AND,
  1549. LHS: &ast.BinaryExpr{
  1550. OP: ast.GT,
  1551. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1552. RHS: &ast.IntegerLiteral{Val: 111},
  1553. },
  1554. },
  1555. }.Init(),
  1556. FilterPlan{
  1557. baseLogicalPlan: baseLogicalPlan{
  1558. children: []LogicalPlan{
  1559. DataSourcePlan{
  1560. name: "src2",
  1561. streamFields: []interface{}{
  1562. "hum", "id1", "id2",
  1563. },
  1564. streamStmt: streams["src2"],
  1565. metaFields: []string{},
  1566. }.Init(),
  1567. },
  1568. },
  1569. condition: &ast.BinaryExpr{
  1570. OP: ast.LT,
  1571. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1572. RHS: &ast.IntegerLiteral{Val: 60},
  1573. },
  1574. }.Init(),
  1575. },
  1576. },
  1577. condition: nil,
  1578. wtype: ast.TUMBLING_WINDOW,
  1579. length: 10000,
  1580. interval: 0,
  1581. limit: 0,
  1582. }.Init(),
  1583. },
  1584. },
  1585. from: &ast.Table{
  1586. Name: "src1",
  1587. },
  1588. joins: []ast.Join{
  1589. {
  1590. Name: "src2",
  1591. Alias: "",
  1592. JoinType: ast.INNER_JOIN,
  1593. Expr: &ast.BinaryExpr{
  1594. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1595. OP: ast.EQ,
  1596. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1597. },
  1598. },
  1599. },
  1600. }.Init(),
  1601. },
  1602. },
  1603. fields: []ast.Field{
  1604. {
  1605. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1606. Name: "id1",
  1607. AName: ""},
  1608. },
  1609. isAggregate: false,
  1610. sendMeta: false,
  1611. }.Init(),
  1612. }, { // 6. optimize outter join on
  1613. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1614. p: ProjectPlan{
  1615. baseLogicalPlan: baseLogicalPlan{
  1616. children: []LogicalPlan{
  1617. JoinPlan{
  1618. baseLogicalPlan: baseLogicalPlan{
  1619. children: []LogicalPlan{
  1620. WindowPlan{
  1621. baseLogicalPlan: baseLogicalPlan{
  1622. children: []LogicalPlan{
  1623. FilterPlan{
  1624. baseLogicalPlan: baseLogicalPlan{
  1625. children: []LogicalPlan{
  1626. DataSourcePlan{
  1627. name: "src1",
  1628. streamFields: []interface{}{
  1629. "id1", "temp",
  1630. },
  1631. streamStmt: streams["src1"],
  1632. metaFields: []string{},
  1633. }.Init(),
  1634. },
  1635. },
  1636. condition: &ast.BinaryExpr{
  1637. OP: ast.GT,
  1638. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1639. RHS: &ast.IntegerLiteral{Val: 111},
  1640. },
  1641. }.Init(),
  1642. DataSourcePlan{
  1643. name: "src2",
  1644. streamFields: []interface{}{
  1645. "hum", "id1", "id2",
  1646. },
  1647. streamStmt: streams["src2"],
  1648. metaFields: []string{},
  1649. }.Init(),
  1650. },
  1651. },
  1652. condition: nil,
  1653. wtype: ast.TUMBLING_WINDOW,
  1654. length: 10000,
  1655. interval: 0,
  1656. limit: 0,
  1657. }.Init(),
  1658. },
  1659. },
  1660. from: &ast.Table{
  1661. Name: "src1",
  1662. },
  1663. joins: []ast.Join{
  1664. {
  1665. Name: "src2",
  1666. Alias: "",
  1667. JoinType: ast.FULL_JOIN,
  1668. Expr: &ast.BinaryExpr{
  1669. OP: ast.AND,
  1670. LHS: &ast.BinaryExpr{
  1671. OP: ast.AND,
  1672. LHS: &ast.BinaryExpr{
  1673. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1674. OP: ast.EQ,
  1675. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1676. },
  1677. RHS: &ast.BinaryExpr{
  1678. OP: ast.GT,
  1679. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1680. RHS: &ast.IntegerLiteral{Val: 20},
  1681. },
  1682. },
  1683. RHS: &ast.BinaryExpr{
  1684. OP: ast.LT,
  1685. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1686. RHS: &ast.IntegerLiteral{Val: 60},
  1687. },
  1688. },
  1689. },
  1690. },
  1691. }.Init(),
  1692. },
  1693. },
  1694. fields: []ast.Field{
  1695. {
  1696. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1697. Name: "id1",
  1698. AName: ""},
  1699. },
  1700. isAggregate: false,
  1701. sendMeta: false,
  1702. }.Init(),
  1703. }, { // 7 window error for table
  1704. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1705. p: nil,
  1706. err: "cannot run window for TABLE sources",
  1707. }, { // 8 join table without window
  1708. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  1709. p: ProjectPlan{
  1710. baseLogicalPlan: baseLogicalPlan{
  1711. children: []LogicalPlan{
  1712. JoinPlan{
  1713. baseLogicalPlan: baseLogicalPlan{
  1714. children: []LogicalPlan{
  1715. JoinAlignPlan{
  1716. baseLogicalPlan: baseLogicalPlan{
  1717. children: []LogicalPlan{
  1718. FilterPlan{
  1719. baseLogicalPlan: baseLogicalPlan{
  1720. children: []LogicalPlan{
  1721. DataSourcePlan{
  1722. name: "src1",
  1723. streamFields: []interface{}{
  1724. "hum", "id1", "temp",
  1725. },
  1726. streamStmt: streams["src1"],
  1727. metaFields: []string{},
  1728. }.Init(),
  1729. },
  1730. },
  1731. condition: &ast.BinaryExpr{
  1732. RHS: &ast.BinaryExpr{
  1733. OP: ast.GT,
  1734. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1735. RHS: &ast.IntegerLiteral{Val: 20},
  1736. },
  1737. OP: ast.AND,
  1738. LHS: &ast.BinaryExpr{
  1739. OP: ast.GT,
  1740. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1741. RHS: &ast.IntegerLiteral{Val: 111},
  1742. },
  1743. },
  1744. }.Init(),
  1745. DataSourcePlan{
  1746. name: "tableInPlanner",
  1747. streamFields: []interface{}{
  1748. &ast.StreamField{
  1749. Name: "hum",
  1750. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1751. },
  1752. &ast.StreamField{
  1753. Name: "id",
  1754. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1755. },
  1756. },
  1757. streamStmt: streams["tableInPlanner"],
  1758. metaFields: []string{},
  1759. }.Init(),
  1760. },
  1761. },
  1762. Emitters: []string{"tableInPlanner"},
  1763. }.Init(),
  1764. },
  1765. },
  1766. from: &ast.Table{
  1767. Name: "src1",
  1768. },
  1769. joins: []ast.Join{
  1770. {
  1771. Name: "tableInPlanner",
  1772. Alias: "",
  1773. JoinType: ast.INNER_JOIN,
  1774. Expr: &ast.BinaryExpr{
  1775. OP: ast.AND,
  1776. LHS: &ast.BinaryExpr{
  1777. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1778. OP: ast.EQ,
  1779. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1780. },
  1781. RHS: &ast.BinaryExpr{
  1782. OP: ast.LT,
  1783. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  1784. RHS: &ast.IntegerLiteral{Val: 60},
  1785. },
  1786. },
  1787. },
  1788. },
  1789. }.Init(),
  1790. },
  1791. },
  1792. fields: []ast.Field{
  1793. {
  1794. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1795. Name: "id1",
  1796. AName: ""},
  1797. },
  1798. isAggregate: false,
  1799. sendMeta: false,
  1800. }.Init(),
  1801. }, { // 9 join table with window
  1802. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1803. p: ProjectPlan{
  1804. baseLogicalPlan: baseLogicalPlan{
  1805. children: []LogicalPlan{
  1806. JoinPlan{
  1807. baseLogicalPlan: baseLogicalPlan{
  1808. children: []LogicalPlan{
  1809. JoinAlignPlan{
  1810. baseLogicalPlan: baseLogicalPlan{
  1811. children: []LogicalPlan{
  1812. WindowPlan{
  1813. baseLogicalPlan: baseLogicalPlan{
  1814. children: []LogicalPlan{
  1815. DataSourcePlan{
  1816. name: "src1",
  1817. streamFields: []interface{}{
  1818. "id1", "temp",
  1819. },
  1820. streamStmt: streams["src1"],
  1821. metaFields: []string{},
  1822. }.Init(),
  1823. },
  1824. },
  1825. condition: nil,
  1826. wtype: ast.TUMBLING_WINDOW,
  1827. length: 10000,
  1828. interval: 0,
  1829. limit: 0,
  1830. }.Init(),
  1831. DataSourcePlan{
  1832. name: "tableInPlanner",
  1833. streamFields: []interface{}{
  1834. &ast.StreamField{
  1835. Name: "hum",
  1836. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1837. },
  1838. &ast.StreamField{
  1839. Name: "id",
  1840. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1841. },
  1842. },
  1843. streamStmt: streams["tableInPlanner"],
  1844. metaFields: []string{},
  1845. }.Init(),
  1846. },
  1847. },
  1848. Emitters: []string{"tableInPlanner"},
  1849. }.Init(),
  1850. },
  1851. },
  1852. from: &ast.Table{
  1853. Name: "src1",
  1854. },
  1855. joins: []ast.Join{
  1856. {
  1857. Name: "tableInPlanner",
  1858. Alias: "",
  1859. JoinType: ast.INNER_JOIN,
  1860. Expr: &ast.BinaryExpr{
  1861. OP: ast.AND,
  1862. LHS: &ast.BinaryExpr{
  1863. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1864. OP: ast.EQ,
  1865. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1866. },
  1867. RHS: &ast.BinaryExpr{
  1868. RHS: &ast.BinaryExpr{
  1869. OP: ast.AND,
  1870. LHS: &ast.BinaryExpr{
  1871. OP: ast.GT,
  1872. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1873. RHS: &ast.IntegerLiteral{Val: 20},
  1874. },
  1875. RHS: &ast.BinaryExpr{
  1876. OP: ast.LT,
  1877. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1878. RHS: &ast.IntegerLiteral{Val: 60},
  1879. },
  1880. },
  1881. OP: ast.AND,
  1882. LHS: &ast.BinaryExpr{
  1883. OP: ast.GT,
  1884. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1885. RHS: &ast.IntegerLiteral{Val: 111},
  1886. },
  1887. },
  1888. },
  1889. },
  1890. },
  1891. }.Init(),
  1892. },
  1893. },
  1894. fields: []ast.Field{
  1895. {
  1896. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1897. Name: "id1",
  1898. AName: ""},
  1899. },
  1900. isAggregate: false,
  1901. sendMeta: false,
  1902. }.Init(),
  1903. }, { // 10 meta
  1904. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1905. p: ProjectPlan{
  1906. baseLogicalPlan: baseLogicalPlan{
  1907. children: []LogicalPlan{
  1908. FilterPlan{
  1909. baseLogicalPlan: baseLogicalPlan{
  1910. children: []LogicalPlan{
  1911. DataSourcePlan{
  1912. name: "src1",
  1913. streamFields: []interface{}{
  1914. "temp",
  1915. },
  1916. streamStmt: streams["src1"],
  1917. metaFields: []string{"Humidity", "device", "id"},
  1918. }.Init(),
  1919. },
  1920. },
  1921. condition: &ast.BinaryExpr{
  1922. LHS: &ast.Call{
  1923. Name: "meta",
  1924. FuncId: 2,
  1925. Args: []ast.Expr{&ast.MetaRef{
  1926. Name: "device",
  1927. StreamName: ast.DefaultStream,
  1928. }},
  1929. },
  1930. OP: ast.EQ,
  1931. RHS: &ast.StringLiteral{
  1932. Val: "demo2",
  1933. },
  1934. },
  1935. }.Init(),
  1936. },
  1937. },
  1938. fields: []ast.Field{
  1939. {
  1940. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1941. Name: "temp",
  1942. AName: "",
  1943. }, {
  1944. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1945. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1946. Name: "id",
  1947. StreamName: ast.DefaultStream,
  1948. }}},
  1949. []ast.StreamName{},
  1950. nil,
  1951. )},
  1952. Name: "meta",
  1953. AName: "eid",
  1954. }, {
  1955. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1956. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  1957. &ast.BinaryExpr{
  1958. OP: ast.ARROW,
  1959. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  1960. RHS: &ast.JsonFieldRef{Name: "Device"},
  1961. },
  1962. }},
  1963. []ast.StreamName{},
  1964. nil,
  1965. )},
  1966. Name: "meta",
  1967. AName: "hdevice",
  1968. },
  1969. },
  1970. isAggregate: false,
  1971. sendMeta: false,
  1972. }.Init(),
  1973. }, { // 11 join with same name field and aliased
  1974. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1975. p: ProjectPlan{
  1976. baseLogicalPlan: baseLogicalPlan{
  1977. children: []LogicalPlan{
  1978. JoinPlan{
  1979. baseLogicalPlan: baseLogicalPlan{
  1980. children: []LogicalPlan{
  1981. JoinAlignPlan{
  1982. baseLogicalPlan: baseLogicalPlan{
  1983. children: []LogicalPlan{
  1984. DataSourcePlan{
  1985. name: "src2",
  1986. streamFields: []interface{}{
  1987. "hum", "id", "id2",
  1988. },
  1989. streamStmt: streams["src2"],
  1990. metaFields: []string{},
  1991. }.Init(),
  1992. DataSourcePlan{
  1993. name: "tableInPlanner",
  1994. streamFields: []interface{}{
  1995. &ast.StreamField{
  1996. Name: "hum",
  1997. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1998. },
  1999. &ast.StreamField{
  2000. Name: "id",
  2001. FieldType: &ast.BasicType{Type: ast.BIGINT},
  2002. },
  2003. },
  2004. streamStmt: streams["tableInPlanner"],
  2005. metaFields: []string{},
  2006. }.Init(),
  2007. },
  2008. },
  2009. Emitters: []string{"tableInPlanner"},
  2010. }.Init(),
  2011. },
  2012. },
  2013. from: &ast.Table{
  2014. Name: "src2",
  2015. },
  2016. joins: []ast.Join{
  2017. {
  2018. Name: "tableInPlanner",
  2019. Alias: "",
  2020. JoinType: ast.INNER_JOIN,
  2021. Expr: &ast.BinaryExpr{
  2022. RHS: &ast.BinaryExpr{
  2023. OP: ast.EQ,
  2024. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  2025. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  2026. },
  2027. OP: ast.AND,
  2028. LHS: &ast.BinaryExpr{
  2029. OP: ast.GT,
  2030. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2031. &ast.FieldRef{
  2032. Name: "hum",
  2033. StreamName: "src2",
  2034. },
  2035. []ast.StreamName{"src2"},
  2036. &boolFalse,
  2037. )},
  2038. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2039. &ast.FieldRef{
  2040. Name: "hum",
  2041. StreamName: "tableInPlanner",
  2042. },
  2043. []ast.StreamName{"tableInPlanner"},
  2044. &boolFalse,
  2045. )},
  2046. },
  2047. },
  2048. },
  2049. },
  2050. }.Init(),
  2051. },
  2052. },
  2053. fields: []ast.Field{
  2054. {
  2055. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2056. &ast.FieldRef{
  2057. Name: "hum",
  2058. StreamName: "src2",
  2059. },
  2060. []ast.StreamName{"src2"},
  2061. &boolFalse,
  2062. )},
  2063. Name: "hum",
  2064. AName: "hum1",
  2065. }, {
  2066. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2067. &ast.FieldRef{
  2068. Name: "hum",
  2069. StreamName: "tableInPlanner",
  2070. },
  2071. []ast.StreamName{"tableInPlanner"},
  2072. &boolFalse,
  2073. )},
  2074. Name: "hum",
  2075. AName: "hum2",
  2076. },
  2077. },
  2078. isAggregate: false,
  2079. sendMeta: false,
  2080. }.Init(),
  2081. }, { // 12
  2082. sql: `SELECT name->first, name->last FROM src1`,
  2083. p: ProjectPlan{
  2084. baseLogicalPlan: baseLogicalPlan{
  2085. children: []LogicalPlan{
  2086. DataSourcePlan{
  2087. baseLogicalPlan: baseLogicalPlan{},
  2088. name: "src1",
  2089. streamFields: []interface{}{
  2090. "name",
  2091. },
  2092. streamStmt: streams["src1"],
  2093. metaFields: []string{},
  2094. }.Init(),
  2095. },
  2096. },
  2097. fields: []ast.Field{
  2098. {
  2099. Expr: &ast.BinaryExpr{
  2100. OP: ast.ARROW,
  2101. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2102. RHS: &ast.JsonFieldRef{Name: "first"},
  2103. },
  2104. Name: "kuiper_field_0",
  2105. AName: "",
  2106. }, {
  2107. Expr: &ast.BinaryExpr{
  2108. OP: ast.ARROW,
  2109. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2110. RHS: &ast.JsonFieldRef{Name: "last"},
  2111. },
  2112. Name: "kuiper_field_1",
  2113. AName: "",
  2114. },
  2115. },
  2116. isAggregate: false,
  2117. sendMeta: false,
  2118. }.Init(),
  2119. },
  2120. }
  2121. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2122. for i, tt := range tests {
  2123. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2124. if err != nil {
  2125. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2126. continue
  2127. }
  2128. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2129. IsEventTime: false,
  2130. LateTol: 0,
  2131. Concurrency: 0,
  2132. BufferLength: 0,
  2133. SendMetaToSink: false,
  2134. Qos: 0,
  2135. CheckpointInterval: 0,
  2136. SendError: true,
  2137. }, store)
  2138. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2139. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2140. } else if !reflect.DeepEqual(tt.p, p) {
  2141. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2142. }
  2143. }
  2144. }
  2145. func Test_createLogicalPlan4Lookup(t *testing.T) {
  2146. err, store := store.GetKV("stream")
  2147. if err != nil {
  2148. t.Error(err)
  2149. return
  2150. }
  2151. streamSqls := map[string]string{
  2152. "src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  2153. "table1": `CREATE TABLE table1 () WITH (DATASOURCE="table1",TYPE="sql", KIND="lookup");`,
  2154. "table2": `CREATE TABLE table2 () WITH (DATASOURCE="table2",TYPE="sql", KIND="lookup");`,
  2155. }
  2156. types := map[string]ast.StreamType{
  2157. "src1": ast.TypeStream,
  2158. "table1": ast.TypeTable,
  2159. "table2": ast.TypeTable,
  2160. }
  2161. for name, sql := range streamSqls {
  2162. s, err := json.Marshal(&xsql.StreamInfo{
  2163. StreamType: types[name],
  2164. Statement: sql,
  2165. })
  2166. if err != nil {
  2167. t.Error(err)
  2168. t.Fail()
  2169. }
  2170. err = store.Set(name, string(s))
  2171. if err != nil {
  2172. t.Error(err)
  2173. t.Fail()
  2174. }
  2175. }
  2176. streams := make(map[string]*ast.StreamStmt)
  2177. for n := range streamSqls {
  2178. streamStmt, err := xsql.GetDataSource(store, n)
  2179. if err != nil {
  2180. t.Errorf("fail to get stream %s, please check if stream is created", n)
  2181. return
  2182. }
  2183. streams[n] = streamStmt
  2184. }
  2185. var tests = []struct {
  2186. sql string
  2187. p LogicalPlan
  2188. err string
  2189. }{
  2190. { // 0
  2191. sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id`,
  2192. p: ProjectPlan{
  2193. baseLogicalPlan: baseLogicalPlan{
  2194. children: []LogicalPlan{
  2195. LookupPlan{
  2196. baseLogicalPlan: baseLogicalPlan{
  2197. children: []LogicalPlan{
  2198. DataSourcePlan{
  2199. baseLogicalPlan: baseLogicalPlan{},
  2200. name: "src1",
  2201. streamFields: []interface{}{
  2202. "a",
  2203. },
  2204. streamStmt: streams["src1"],
  2205. metaFields: []string{},
  2206. }.Init(),
  2207. },
  2208. },
  2209. joinExpr: ast.Join{
  2210. Name: "table1",
  2211. Alias: "",
  2212. JoinType: ast.INNER_JOIN,
  2213. Expr: &ast.BinaryExpr{
  2214. OP: ast.EQ,
  2215. LHS: &ast.FieldRef{
  2216. StreamName: "src1",
  2217. Name: "id",
  2218. },
  2219. RHS: &ast.FieldRef{
  2220. StreamName: "table1",
  2221. Name: "id",
  2222. },
  2223. },
  2224. },
  2225. keys: []string{"id"},
  2226. fields: []string{"b"},
  2227. valvars: []ast.Expr{
  2228. &ast.FieldRef{
  2229. StreamName: "src1",
  2230. Name: "id",
  2231. },
  2232. },
  2233. options: &ast.Options{
  2234. DATASOURCE: "table1",
  2235. TYPE: "sql",
  2236. STRICT_VALIDATION: true,
  2237. KIND: "lookup",
  2238. },
  2239. conditions: nil,
  2240. }.Init(),
  2241. },
  2242. },
  2243. fields: []ast.Field{
  2244. {
  2245. Expr: &ast.FieldRef{
  2246. StreamName: "src1",
  2247. Name: "a",
  2248. },
  2249. Name: "a",
  2250. AName: "",
  2251. },
  2252. {
  2253. Expr: &ast.FieldRef{
  2254. StreamName: "table1",
  2255. Name: "b",
  2256. },
  2257. Name: "b",
  2258. AName: "",
  2259. },
  2260. },
  2261. isAggregate: false,
  2262. sendMeta: false,
  2263. }.Init(),
  2264. },
  2265. { // 1
  2266. sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
  2267. p: ProjectPlan{
  2268. baseLogicalPlan: baseLogicalPlan{
  2269. children: []LogicalPlan{
  2270. FilterPlan{
  2271. baseLogicalPlan: baseLogicalPlan{
  2272. children: []LogicalPlan{
  2273. LookupPlan{
  2274. baseLogicalPlan: baseLogicalPlan{
  2275. children: []LogicalPlan{
  2276. FilterPlan{
  2277. baseLogicalPlan: baseLogicalPlan{
  2278. children: []LogicalPlan{
  2279. DataSourcePlan{
  2280. baseLogicalPlan: baseLogicalPlan{},
  2281. name: "src1",
  2282. streamFields: []interface{}{
  2283. "a",
  2284. },
  2285. streamStmt: streams["src1"],
  2286. metaFields: []string{},
  2287. }.Init(),
  2288. },
  2289. },
  2290. condition: &ast.BinaryExpr{
  2291. OP: ast.LT,
  2292. LHS: &ast.FieldRef{
  2293. StreamName: "src1",
  2294. Name: "c",
  2295. },
  2296. RHS: &ast.IntegerLiteral{Val: 40},
  2297. },
  2298. }.Init(),
  2299. },
  2300. },
  2301. joinExpr: ast.Join{
  2302. Name: "table1",
  2303. Alias: "",
  2304. JoinType: ast.INNER_JOIN,
  2305. Expr: &ast.BinaryExpr{
  2306. OP: ast.AND,
  2307. RHS: &ast.BinaryExpr{
  2308. OP: ast.EQ,
  2309. LHS: &ast.FieldRef{
  2310. StreamName: "src1",
  2311. Name: "id",
  2312. },
  2313. RHS: &ast.FieldRef{
  2314. StreamName: "table1",
  2315. Name: "id",
  2316. },
  2317. },
  2318. LHS: &ast.BinaryExpr{
  2319. OP: ast.AND,
  2320. LHS: &ast.BinaryExpr{
  2321. OP: ast.GT,
  2322. LHS: &ast.FieldRef{
  2323. StreamName: "table1",
  2324. Name: "b",
  2325. },
  2326. RHS: &ast.IntegerLiteral{Val: 20},
  2327. },
  2328. RHS: &ast.BinaryExpr{
  2329. OP: ast.LT,
  2330. LHS: &ast.FieldRef{
  2331. StreamName: "src1",
  2332. Name: "c",
  2333. },
  2334. RHS: &ast.IntegerLiteral{Val: 40},
  2335. },
  2336. },
  2337. },
  2338. },
  2339. keys: []string{"id"},
  2340. valvars: []ast.Expr{
  2341. &ast.FieldRef{
  2342. StreamName: "src1",
  2343. Name: "id",
  2344. },
  2345. },
  2346. options: &ast.Options{
  2347. DATASOURCE: "table1",
  2348. TYPE: "sql",
  2349. STRICT_VALIDATION: true,
  2350. KIND: "lookup",
  2351. },
  2352. conditions: &ast.BinaryExpr{
  2353. OP: ast.AND,
  2354. LHS: &ast.BinaryExpr{
  2355. OP: ast.GT,
  2356. LHS: &ast.FieldRef{
  2357. StreamName: "table1",
  2358. Name: "b",
  2359. },
  2360. RHS: &ast.IntegerLiteral{Val: 20},
  2361. },
  2362. RHS: &ast.BinaryExpr{
  2363. OP: ast.LT,
  2364. LHS: &ast.FieldRef{
  2365. StreamName: "src1",
  2366. Name: "c",
  2367. },
  2368. RHS: &ast.IntegerLiteral{Val: 40},
  2369. },
  2370. },
  2371. }.Init(),
  2372. },
  2373. },
  2374. condition: &ast.BinaryExpr{
  2375. OP: ast.GT,
  2376. LHS: &ast.FieldRef{
  2377. StreamName: "table1",
  2378. Name: "b",
  2379. },
  2380. RHS: &ast.IntegerLiteral{Val: 20},
  2381. },
  2382. }.Init(),
  2383. },
  2384. },
  2385. fields: []ast.Field{
  2386. {
  2387. Expr: &ast.FieldRef{
  2388. StreamName: "src1",
  2389. Name: "a",
  2390. },
  2391. Name: "a",
  2392. AName: "",
  2393. },
  2394. {
  2395. Expr: &ast.FieldRef{
  2396. StreamName: "table1",
  2397. Name: "*",
  2398. },
  2399. Name: "*",
  2400. AName: "",
  2401. },
  2402. },
  2403. isAggregate: false,
  2404. sendMeta: false,
  2405. }.Init(),
  2406. },
  2407. { // 2
  2408. sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
  2409. p: ProjectPlan{
  2410. baseLogicalPlan: baseLogicalPlan{
  2411. children: []LogicalPlan{
  2412. LookupPlan{
  2413. baseLogicalPlan: baseLogicalPlan{
  2414. children: []LogicalPlan{
  2415. LookupPlan{
  2416. baseLogicalPlan: baseLogicalPlan{
  2417. children: []LogicalPlan{
  2418. DataSourcePlan{
  2419. baseLogicalPlan: baseLogicalPlan{},
  2420. name: "src1",
  2421. streamFields: []interface{}{
  2422. "a",
  2423. },
  2424. streamStmt: streams["src1"],
  2425. metaFields: []string{},
  2426. }.Init(),
  2427. },
  2428. },
  2429. joinExpr: ast.Join{
  2430. Name: "table1",
  2431. Alias: "",
  2432. JoinType: ast.INNER_JOIN,
  2433. Expr: &ast.BinaryExpr{
  2434. OP: ast.EQ,
  2435. LHS: &ast.FieldRef{
  2436. StreamName: "src1",
  2437. Name: "id",
  2438. },
  2439. RHS: &ast.FieldRef{
  2440. StreamName: "table1",
  2441. Name: "id",
  2442. },
  2443. },
  2444. },
  2445. keys: []string{"id"},
  2446. fields: []string{"b"},
  2447. valvars: []ast.Expr{
  2448. &ast.FieldRef{
  2449. StreamName: "src1",
  2450. Name: "id",
  2451. },
  2452. },
  2453. options: &ast.Options{
  2454. DATASOURCE: "table1",
  2455. TYPE: "sql",
  2456. STRICT_VALIDATION: true,
  2457. KIND: "lookup",
  2458. },
  2459. conditions: nil,
  2460. }.Init(),
  2461. },
  2462. },
  2463. joinExpr: ast.Join{
  2464. Name: "table2",
  2465. Alias: "",
  2466. JoinType: ast.INNER_JOIN,
  2467. Expr: &ast.BinaryExpr{
  2468. OP: ast.EQ,
  2469. LHS: &ast.FieldRef{
  2470. StreamName: "table1",
  2471. Name: "id",
  2472. },
  2473. RHS: &ast.FieldRef{
  2474. StreamName: "table2",
  2475. Name: "id",
  2476. },
  2477. },
  2478. },
  2479. keys: []string{"id"},
  2480. fields: []string{"c"},
  2481. valvars: []ast.Expr{
  2482. &ast.FieldRef{
  2483. StreamName: "table1",
  2484. Name: "id",
  2485. },
  2486. },
  2487. options: &ast.Options{
  2488. DATASOURCE: "table2",
  2489. TYPE: "sql",
  2490. STRICT_VALIDATION: true,
  2491. KIND: "lookup",
  2492. },
  2493. }.Init(),
  2494. },
  2495. },
  2496. fields: []ast.Field{
  2497. {
  2498. Expr: &ast.FieldRef{
  2499. StreamName: "src1",
  2500. Name: "a",
  2501. },
  2502. Name: "a",
  2503. AName: "",
  2504. },
  2505. {
  2506. Expr: &ast.FieldRef{
  2507. StreamName: "table1",
  2508. Name: "b",
  2509. },
  2510. Name: "b",
  2511. AName: "",
  2512. },
  2513. {
  2514. Expr: &ast.FieldRef{
  2515. StreamName: "table2",
  2516. Name: "c",
  2517. },
  2518. Name: "c",
  2519. AName: "",
  2520. },
  2521. },
  2522. isAggregate: false,
  2523. sendMeta: false,
  2524. }.Init(),
  2525. },
  2526. { // 3
  2527. sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2528. p: ProjectPlan{
  2529. baseLogicalPlan: baseLogicalPlan{
  2530. children: []LogicalPlan{
  2531. LookupPlan{
  2532. baseLogicalPlan: baseLogicalPlan{
  2533. children: []LogicalPlan{
  2534. WindowPlan{
  2535. baseLogicalPlan: baseLogicalPlan{
  2536. children: []LogicalPlan{
  2537. DataSourcePlan{
  2538. baseLogicalPlan: baseLogicalPlan{},
  2539. name: "src1",
  2540. streamStmt: streams["src1"],
  2541. metaFields: []string{},
  2542. isWildCard: true,
  2543. }.Init(),
  2544. },
  2545. },
  2546. condition: nil,
  2547. wtype: ast.TUMBLING_WINDOW,
  2548. length: 10000,
  2549. interval: 0,
  2550. limit: 0,
  2551. }.Init(),
  2552. },
  2553. },
  2554. joinExpr: ast.Join{
  2555. Name: "table1",
  2556. Alias: "",
  2557. JoinType: ast.INNER_JOIN,
  2558. Expr: &ast.BinaryExpr{
  2559. OP: ast.EQ,
  2560. LHS: &ast.FieldRef{
  2561. StreamName: "src1",
  2562. Name: "id",
  2563. },
  2564. RHS: &ast.FieldRef{
  2565. StreamName: "table1",
  2566. Name: "id",
  2567. },
  2568. },
  2569. },
  2570. keys: []string{"id"},
  2571. valvars: []ast.Expr{
  2572. &ast.FieldRef{
  2573. StreamName: "src1",
  2574. Name: "id",
  2575. },
  2576. },
  2577. options: &ast.Options{
  2578. DATASOURCE: "table1",
  2579. TYPE: "sql",
  2580. STRICT_VALIDATION: true,
  2581. KIND: "lookup",
  2582. },
  2583. conditions: nil,
  2584. }.Init(),
  2585. },
  2586. },
  2587. fields: []ast.Field{
  2588. {
  2589. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  2590. Name: "",
  2591. AName: "",
  2592. },
  2593. },
  2594. isAggregate: false,
  2595. sendMeta: false,
  2596. }.Init(),
  2597. },
  2598. }
  2599. for i, tt := range tests {
  2600. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2601. if err != nil {
  2602. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2603. continue
  2604. }
  2605. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2606. IsEventTime: false,
  2607. LateTol: 0,
  2608. Concurrency: 0,
  2609. BufferLength: 0,
  2610. SendMetaToSink: false,
  2611. Qos: 0,
  2612. CheckpointInterval: 0,
  2613. SendError: true,
  2614. }, store)
  2615. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2616. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2617. } else if !reflect.DeepEqual(tt.p, p) {
  2618. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2619. }
  2620. }
  2621. }