planner_test.go 83 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/internal/testx"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func init() {
  29. testx.InitEnv()
  30. }
  31. func Test_createLogicalPlan(t *testing.T) {
  32. store, err := store.GetKV("stream")
  33. if err != nil {
  34. t.Error(err)
  35. return
  36. }
  37. streamSqls := map[string]string{
  38. "src1": `CREATE STREAM src1 (
  39. id1 BIGINT,
  40. temp BIGINT,
  41. name string,
  42. myarray array(string)
  43. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  44. "src2": `CREATE STREAM src2 (
  45. id2 BIGINT,
  46. hum BIGINT
  47. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  48. "tableInPlanner": `CREATE TABLE tableInPlanner (
  49. id BIGINT,
  50. name STRING,
  51. value STRING,
  52. hum BIGINT
  53. ) WITH (TYPE="file");`,
  54. }
  55. types := map[string]ast.StreamType{
  56. "src1": ast.TypeStream,
  57. "src2": ast.TypeStream,
  58. "tableInPlanner": ast.TypeTable,
  59. }
  60. for name, sql := range streamSqls {
  61. s, err := json.Marshal(&xsql.StreamInfo{
  62. StreamType: types[name],
  63. Statement: sql,
  64. })
  65. if err != nil {
  66. t.Error(err)
  67. t.Fail()
  68. }
  69. err = store.Set(name, string(s))
  70. if err != nil {
  71. t.Error(err)
  72. t.Fail()
  73. }
  74. }
  75. streams := make(map[string]*ast.StreamStmt)
  76. for n := range streamSqls {
  77. streamStmt, err := xsql.GetDataSource(store, n)
  78. if err != nil {
  79. t.Errorf("fail to get stream %s, please check if stream is created", n)
  80. return
  81. }
  82. streams[n] = streamStmt
  83. }
  84. var (
  85. //boolTrue = true
  86. boolFalse = false
  87. )
  88. var tests = []struct {
  89. sql string
  90. p LogicalPlan
  91. err string
  92. }{
  93. { // 0
  94. sql: `SELECT myarray[temp] FROM src1`,
  95. p: ProjectPlan{
  96. baseLogicalPlan: baseLogicalPlan{
  97. children: []LogicalPlan{
  98. DataSourcePlan{
  99. baseLogicalPlan: baseLogicalPlan{},
  100. name: "src1",
  101. streamFields: map[string]*ast.JsonStreamField{
  102. "myarray": {
  103. Type: "array",
  104. Items: &ast.JsonStreamField{
  105. Type: "string",
  106. },
  107. },
  108. "temp": {
  109. Type: "bigint",
  110. },
  111. },
  112. streamStmt: streams["src1"],
  113. metaFields: []string{},
  114. }.Init(),
  115. },
  116. },
  117. fields: []ast.Field{
  118. {
  119. Expr: &ast.BinaryExpr{
  120. OP: ast.SUBSET,
  121. LHS: &ast.FieldRef{
  122. StreamName: "src1",
  123. Name: "myarray",
  124. },
  125. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  126. StreamName: "src1",
  127. Name: "temp",
  128. }},
  129. },
  130. Name: "kuiper_field_0",
  131. AName: "",
  132. },
  133. },
  134. isAggregate: false,
  135. sendMeta: false,
  136. }.Init(),
  137. }, { // 1 optimize where to data source
  138. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  139. p: ProjectPlan{
  140. baseLogicalPlan: baseLogicalPlan{
  141. children: []LogicalPlan{
  142. WindowPlan{
  143. baseLogicalPlan: baseLogicalPlan{
  144. children: []LogicalPlan{
  145. FilterPlan{
  146. baseLogicalPlan: baseLogicalPlan{
  147. children: []LogicalPlan{
  148. DataSourcePlan{
  149. name: "src1",
  150. streamFields: map[string]*ast.JsonStreamField{
  151. "name": {
  152. Type: "string",
  153. },
  154. "temp": {
  155. Type: "bigint",
  156. },
  157. },
  158. streamStmt: streams["src1"],
  159. metaFields: []string{},
  160. }.Init(),
  161. },
  162. },
  163. condition: &ast.BinaryExpr{
  164. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  165. OP: ast.EQ,
  166. RHS: &ast.StringLiteral{Val: "v1"},
  167. },
  168. }.Init(),
  169. },
  170. },
  171. condition: nil,
  172. wtype: ast.TUMBLING_WINDOW,
  173. length: 10000,
  174. interval: 0,
  175. limit: 0,
  176. }.Init(),
  177. },
  178. },
  179. fields: []ast.Field{
  180. {
  181. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  182. Name: "temp",
  183. AName: ""},
  184. },
  185. isAggregate: false,
  186. sendMeta: false,
  187. }.Init(),
  188. }, { // 2 condition that cannot be optimized
  189. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  190. p: ProjectPlan{
  191. baseLogicalPlan: baseLogicalPlan{
  192. children: []LogicalPlan{
  193. JoinPlan{
  194. baseLogicalPlan: baseLogicalPlan{
  195. children: []LogicalPlan{
  196. WindowPlan{
  197. baseLogicalPlan: baseLogicalPlan{
  198. children: []LogicalPlan{
  199. DataSourcePlan{
  200. name: "src1",
  201. streamFields: map[string]*ast.JsonStreamField{
  202. "id1": {
  203. Type: "bigint",
  204. },
  205. "temp": {
  206. Type: "bigint",
  207. },
  208. },
  209. streamStmt: streams["src1"],
  210. metaFields: []string{},
  211. }.Init(),
  212. DataSourcePlan{
  213. name: "src2",
  214. streamFields: map[string]*ast.JsonStreamField{
  215. "hum": {
  216. Type: "bigint",
  217. },
  218. "id2": {
  219. Type: "bigint",
  220. },
  221. },
  222. streamStmt: streams["src2"],
  223. metaFields: []string{},
  224. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  225. }.Init(),
  226. },
  227. },
  228. condition: nil,
  229. wtype: ast.TUMBLING_WINDOW,
  230. length: 10000,
  231. interval: 0,
  232. limit: 0,
  233. }.Init(),
  234. },
  235. },
  236. from: &ast.Table{Name: "src1"},
  237. joins: ast.Joins{ast.Join{
  238. Name: "src2",
  239. JoinType: ast.INNER_JOIN,
  240. Expr: &ast.BinaryExpr{
  241. OP: ast.AND,
  242. LHS: &ast.BinaryExpr{
  243. LHS: &ast.BinaryExpr{
  244. OP: ast.GT,
  245. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  246. RHS: &ast.IntegerLiteral{Val: 20},
  247. },
  248. OP: ast.OR,
  249. RHS: &ast.BinaryExpr{
  250. OP: ast.GT,
  251. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  252. RHS: &ast.IntegerLiteral{Val: 60},
  253. },
  254. },
  255. RHS: &ast.BinaryExpr{
  256. OP: ast.EQ,
  257. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  258. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  259. },
  260. },
  261. }},
  262. }.Init(),
  263. },
  264. },
  265. fields: []ast.Field{
  266. {
  267. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  268. Name: "id1",
  269. AName: ""},
  270. },
  271. isAggregate: false,
  272. sendMeta: false,
  273. }.Init(),
  274. }, { // 3 optimize window filter
  275. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  276. p: ProjectPlan{
  277. baseLogicalPlan: baseLogicalPlan{
  278. children: []LogicalPlan{
  279. WindowPlan{
  280. baseLogicalPlan: baseLogicalPlan{
  281. children: []LogicalPlan{
  282. FilterPlan{
  283. baseLogicalPlan: baseLogicalPlan{
  284. children: []LogicalPlan{
  285. DataSourcePlan{
  286. name: "src1",
  287. streamFields: map[string]*ast.JsonStreamField{
  288. "id1": {
  289. Type: "bigint",
  290. },
  291. "name": {
  292. Type: "string",
  293. },
  294. "temp": {
  295. Type: "bigint",
  296. },
  297. },
  298. streamStmt: streams["src1"],
  299. metaFields: []string{},
  300. }.Init(),
  301. },
  302. },
  303. condition: &ast.BinaryExpr{
  304. OP: ast.AND,
  305. LHS: &ast.BinaryExpr{
  306. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  307. OP: ast.EQ,
  308. RHS: &ast.StringLiteral{Val: "v1"},
  309. },
  310. RHS: &ast.BinaryExpr{
  311. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  312. OP: ast.GT,
  313. RHS: &ast.IntegerLiteral{Val: 2},
  314. },
  315. },
  316. }.Init(),
  317. },
  318. },
  319. condition: nil,
  320. wtype: ast.TUMBLING_WINDOW,
  321. length: 10000,
  322. interval: 0,
  323. limit: 0,
  324. }.Init(),
  325. },
  326. },
  327. fields: []ast.Field{
  328. {
  329. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  330. Name: "id1",
  331. AName: ""},
  332. },
  333. isAggregate: false,
  334. sendMeta: false,
  335. }.Init(),
  336. }, { // 4. do not optimize count window
  337. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  338. p: ProjectPlan{
  339. baseLogicalPlan: baseLogicalPlan{
  340. children: []LogicalPlan{
  341. HavingPlan{
  342. baseLogicalPlan: baseLogicalPlan{
  343. children: []LogicalPlan{
  344. FilterPlan{
  345. baseLogicalPlan: baseLogicalPlan{
  346. children: []LogicalPlan{
  347. WindowPlan{
  348. baseLogicalPlan: baseLogicalPlan{
  349. children: []LogicalPlan{
  350. DataSourcePlan{
  351. name: "src1",
  352. isWildCard: true,
  353. streamFields: map[string]*ast.JsonStreamField{
  354. "id1": {
  355. Type: "bigint",
  356. },
  357. "temp": {
  358. Type: "bigint",
  359. },
  360. "name": {
  361. Type: "string",
  362. },
  363. "myarray": {
  364. Type: "array",
  365. Items: &ast.JsonStreamField{
  366. Type: "string",
  367. },
  368. },
  369. },
  370. streamStmt: streams["src1"],
  371. metaFields: []string{},
  372. }.Init(),
  373. },
  374. },
  375. condition: nil,
  376. wtype: ast.COUNT_WINDOW,
  377. length: 5,
  378. interval: 1,
  379. limit: 0,
  380. }.Init(),
  381. },
  382. },
  383. condition: &ast.BinaryExpr{
  384. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  385. OP: ast.GT,
  386. RHS: &ast.IntegerLiteral{Val: 20},
  387. },
  388. }.Init(),
  389. },
  390. },
  391. condition: &ast.BinaryExpr{
  392. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  393. Token: ast.ASTERISK,
  394. }}, FuncType: ast.FuncTypeAgg},
  395. OP: ast.GT,
  396. RHS: &ast.IntegerLiteral{Val: 2},
  397. },
  398. }.Init(),
  399. },
  400. },
  401. fields: []ast.Field{
  402. {
  403. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  404. Name: "*",
  405. AName: ""},
  406. },
  407. isAggregate: false,
  408. sendMeta: false,
  409. }.Init(),
  410. }, { // 5. optimize join on
  411. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  412. p: ProjectPlan{
  413. baseLogicalPlan: baseLogicalPlan{
  414. children: []LogicalPlan{
  415. JoinPlan{
  416. baseLogicalPlan: baseLogicalPlan{
  417. children: []LogicalPlan{
  418. WindowPlan{
  419. baseLogicalPlan: baseLogicalPlan{
  420. children: []LogicalPlan{
  421. FilterPlan{
  422. baseLogicalPlan: baseLogicalPlan{
  423. children: []LogicalPlan{
  424. DataSourcePlan{
  425. name: "src1",
  426. streamFields: map[string]*ast.JsonStreamField{
  427. "id1": {
  428. Type: "bigint",
  429. },
  430. "temp": {
  431. Type: "bigint",
  432. },
  433. },
  434. streamStmt: streams["src1"],
  435. metaFields: []string{},
  436. }.Init(),
  437. },
  438. },
  439. condition: &ast.BinaryExpr{
  440. RHS: &ast.BinaryExpr{
  441. OP: ast.GT,
  442. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  443. RHS: &ast.IntegerLiteral{Val: 20},
  444. },
  445. OP: ast.AND,
  446. LHS: &ast.BinaryExpr{
  447. OP: ast.GT,
  448. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  449. RHS: &ast.IntegerLiteral{Val: 111},
  450. },
  451. },
  452. }.Init(),
  453. FilterPlan{
  454. baseLogicalPlan: baseLogicalPlan{
  455. children: []LogicalPlan{
  456. DataSourcePlan{
  457. name: "src2",
  458. streamFields: map[string]*ast.JsonStreamField{
  459. "hum": {
  460. Type: "bigint",
  461. },
  462. "id2": {
  463. Type: "bigint",
  464. },
  465. },
  466. streamStmt: streams["src2"],
  467. metaFields: []string{},
  468. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  469. }.Init(),
  470. },
  471. },
  472. condition: &ast.BinaryExpr{
  473. OP: ast.LT,
  474. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  475. RHS: &ast.IntegerLiteral{Val: 60},
  476. },
  477. }.Init(),
  478. },
  479. },
  480. condition: nil,
  481. wtype: ast.TUMBLING_WINDOW,
  482. length: 10000,
  483. interval: 0,
  484. limit: 0,
  485. }.Init(),
  486. },
  487. },
  488. from: &ast.Table{
  489. Name: "src1",
  490. },
  491. joins: []ast.Join{
  492. {
  493. Name: "src2",
  494. Alias: "",
  495. JoinType: ast.INNER_JOIN,
  496. Expr: &ast.BinaryExpr{
  497. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  498. OP: ast.EQ,
  499. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  500. },
  501. },
  502. },
  503. }.Init(),
  504. },
  505. },
  506. fields: []ast.Field{
  507. {
  508. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  509. Name: "id1",
  510. AName: ""},
  511. },
  512. isAggregate: false,
  513. sendMeta: false,
  514. }.Init(),
  515. }, { // 6. optimize outter join on
  516. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  517. p: ProjectPlan{
  518. baseLogicalPlan: baseLogicalPlan{
  519. children: []LogicalPlan{
  520. JoinPlan{
  521. baseLogicalPlan: baseLogicalPlan{
  522. children: []LogicalPlan{
  523. WindowPlan{
  524. baseLogicalPlan: baseLogicalPlan{
  525. children: []LogicalPlan{
  526. FilterPlan{
  527. baseLogicalPlan: baseLogicalPlan{
  528. children: []LogicalPlan{
  529. DataSourcePlan{
  530. name: "src1",
  531. streamFields: map[string]*ast.JsonStreamField{
  532. "id1": {
  533. Type: "bigint",
  534. },
  535. "temp": {
  536. Type: "bigint",
  537. },
  538. },
  539. streamStmt: streams["src1"],
  540. metaFields: []string{},
  541. }.Init(),
  542. },
  543. },
  544. condition: &ast.BinaryExpr{
  545. OP: ast.GT,
  546. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  547. RHS: &ast.IntegerLiteral{Val: 111},
  548. },
  549. }.Init(),
  550. DataSourcePlan{
  551. name: "src2",
  552. streamFields: map[string]*ast.JsonStreamField{
  553. "hum": {
  554. Type: "bigint",
  555. },
  556. "id2": {
  557. Type: "bigint",
  558. },
  559. },
  560. streamStmt: streams["src2"],
  561. metaFields: []string{},
  562. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  563. }.Init(),
  564. },
  565. },
  566. condition: nil,
  567. wtype: ast.TUMBLING_WINDOW,
  568. length: 10000,
  569. interval: 0,
  570. limit: 0,
  571. }.Init(),
  572. },
  573. },
  574. from: &ast.Table{
  575. Name: "src1",
  576. },
  577. joins: []ast.Join{
  578. {
  579. Name: "src2",
  580. Alias: "",
  581. JoinType: ast.FULL_JOIN,
  582. Expr: &ast.BinaryExpr{
  583. OP: ast.AND,
  584. LHS: &ast.BinaryExpr{
  585. OP: ast.AND,
  586. LHS: &ast.BinaryExpr{
  587. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  588. OP: ast.EQ,
  589. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  590. },
  591. RHS: &ast.BinaryExpr{
  592. OP: ast.GT,
  593. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  594. RHS: &ast.IntegerLiteral{Val: 20},
  595. },
  596. },
  597. RHS: &ast.BinaryExpr{
  598. OP: ast.LT,
  599. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  600. RHS: &ast.IntegerLiteral{Val: 60},
  601. },
  602. },
  603. },
  604. },
  605. }.Init(),
  606. },
  607. },
  608. fields: []ast.Field{
  609. {
  610. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  611. Name: "id1",
  612. AName: ""},
  613. },
  614. isAggregate: false,
  615. sendMeta: false,
  616. }.Init(),
  617. }, { // 7 window error for table
  618. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  619. p: nil,
  620. err: "cannot run window for TABLE sources",
  621. }, { // 8 join table without window
  622. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  623. p: ProjectPlan{
  624. baseLogicalPlan: baseLogicalPlan{
  625. children: []LogicalPlan{
  626. JoinPlan{
  627. baseLogicalPlan: baseLogicalPlan{
  628. children: []LogicalPlan{
  629. JoinAlignPlan{
  630. baseLogicalPlan: baseLogicalPlan{
  631. children: []LogicalPlan{
  632. FilterPlan{
  633. baseLogicalPlan: baseLogicalPlan{
  634. children: []LogicalPlan{
  635. DataSourcePlan{
  636. name: "src1",
  637. streamFields: map[string]*ast.JsonStreamField{
  638. "id1": {
  639. Type: "bigint",
  640. },
  641. "temp": {
  642. Type: "bigint",
  643. },
  644. },
  645. streamStmt: streams["src1"],
  646. metaFields: []string{},
  647. }.Init(),
  648. },
  649. },
  650. condition: &ast.BinaryExpr{
  651. RHS: &ast.BinaryExpr{
  652. OP: ast.GT,
  653. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  654. RHS: &ast.IntegerLiteral{Val: 20},
  655. },
  656. OP: ast.AND,
  657. LHS: &ast.BinaryExpr{
  658. OP: ast.GT,
  659. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  660. RHS: &ast.IntegerLiteral{Val: 111},
  661. },
  662. },
  663. }.Init(),
  664. DataSourcePlan{
  665. name: "tableInPlanner",
  666. streamFields: map[string]*ast.JsonStreamField{
  667. "hum": {
  668. Type: "bigint",
  669. },
  670. "id": {
  671. Type: "bigint",
  672. },
  673. },
  674. streamStmt: streams["tableInPlanner"],
  675. metaFields: []string{},
  676. }.Init(),
  677. },
  678. },
  679. Emitters: []string{"tableInPlanner"},
  680. }.Init(),
  681. },
  682. },
  683. from: &ast.Table{
  684. Name: "src1",
  685. },
  686. joins: []ast.Join{
  687. {
  688. Name: "tableInPlanner",
  689. Alias: "",
  690. JoinType: ast.INNER_JOIN,
  691. Expr: &ast.BinaryExpr{
  692. OP: ast.AND,
  693. LHS: &ast.BinaryExpr{
  694. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  695. OP: ast.EQ,
  696. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  697. },
  698. RHS: &ast.BinaryExpr{
  699. OP: ast.LT,
  700. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  701. RHS: &ast.IntegerLiteral{Val: 60},
  702. },
  703. },
  704. },
  705. },
  706. }.Init(),
  707. },
  708. },
  709. fields: []ast.Field{
  710. {
  711. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  712. Name: "id1",
  713. AName: ""},
  714. },
  715. isAggregate: false,
  716. sendMeta: false,
  717. }.Init(),
  718. }, { // 9 join table with window
  719. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  720. p: ProjectPlan{
  721. baseLogicalPlan: baseLogicalPlan{
  722. children: []LogicalPlan{
  723. JoinPlan{
  724. baseLogicalPlan: baseLogicalPlan{
  725. children: []LogicalPlan{
  726. JoinAlignPlan{
  727. baseLogicalPlan: baseLogicalPlan{
  728. children: []LogicalPlan{
  729. WindowPlan{
  730. baseLogicalPlan: baseLogicalPlan{
  731. children: []LogicalPlan{
  732. DataSourcePlan{
  733. name: "src1",
  734. streamFields: map[string]*ast.JsonStreamField{
  735. "id1": {
  736. Type: "bigint",
  737. },
  738. "temp": {
  739. Type: "bigint",
  740. },
  741. },
  742. streamStmt: streams["src1"],
  743. metaFields: []string{},
  744. }.Init(),
  745. },
  746. },
  747. condition: nil,
  748. wtype: ast.TUMBLING_WINDOW,
  749. length: 10000,
  750. interval: 0,
  751. limit: 0,
  752. }.Init(),
  753. DataSourcePlan{
  754. name: "tableInPlanner",
  755. streamFields: map[string]*ast.JsonStreamField{
  756. "hum": {
  757. Type: "bigint",
  758. },
  759. "id": {
  760. Type: "bigint",
  761. },
  762. },
  763. streamStmt: streams["tableInPlanner"],
  764. metaFields: []string{},
  765. }.Init(),
  766. },
  767. },
  768. Emitters: []string{"tableInPlanner"},
  769. }.Init(),
  770. },
  771. },
  772. from: &ast.Table{
  773. Name: "src1",
  774. },
  775. joins: []ast.Join{
  776. {
  777. Name: "tableInPlanner",
  778. Alias: "",
  779. JoinType: ast.INNER_JOIN,
  780. Expr: &ast.BinaryExpr{
  781. OP: ast.AND,
  782. LHS: &ast.BinaryExpr{
  783. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  784. OP: ast.EQ,
  785. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  786. },
  787. RHS: &ast.BinaryExpr{
  788. RHS: &ast.BinaryExpr{
  789. OP: ast.AND,
  790. LHS: &ast.BinaryExpr{
  791. OP: ast.GT,
  792. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  793. RHS: &ast.IntegerLiteral{Val: 20},
  794. },
  795. RHS: &ast.BinaryExpr{
  796. OP: ast.LT,
  797. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  798. RHS: &ast.IntegerLiteral{Val: 60},
  799. },
  800. },
  801. OP: ast.AND,
  802. LHS: &ast.BinaryExpr{
  803. OP: ast.GT,
  804. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  805. RHS: &ast.IntegerLiteral{Val: 111},
  806. },
  807. },
  808. },
  809. },
  810. },
  811. }.Init(),
  812. },
  813. },
  814. fields: []ast.Field{
  815. {
  816. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  817. Name: "id1",
  818. AName: ""},
  819. },
  820. isAggregate: false,
  821. sendMeta: false,
  822. }.Init(),
  823. }, { // 10 meta
  824. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  825. p: ProjectPlan{
  826. baseLogicalPlan: baseLogicalPlan{
  827. children: []LogicalPlan{
  828. FilterPlan{
  829. baseLogicalPlan: baseLogicalPlan{
  830. children: []LogicalPlan{
  831. DataSourcePlan{
  832. name: "src1",
  833. streamFields: map[string]*ast.JsonStreamField{
  834. "temp": {
  835. Type: "bigint",
  836. },
  837. },
  838. streamStmt: streams["src1"],
  839. metaFields: []string{"Humidity", "device", "id"},
  840. }.Init(),
  841. },
  842. },
  843. condition: &ast.BinaryExpr{
  844. LHS: &ast.Call{
  845. Name: "meta",
  846. FuncId: 2,
  847. Args: []ast.Expr{&ast.MetaRef{
  848. Name: "device",
  849. StreamName: ast.DefaultStream,
  850. }},
  851. },
  852. OP: ast.EQ,
  853. RHS: &ast.StringLiteral{
  854. Val: "demo2",
  855. },
  856. },
  857. }.Init(),
  858. },
  859. },
  860. fields: []ast.Field{
  861. {
  862. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  863. Name: "temp",
  864. AName: "",
  865. }, {
  866. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  867. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  868. Name: "id",
  869. StreamName: ast.DefaultStream,
  870. }}},
  871. []ast.StreamName{},
  872. nil,
  873. )},
  874. Name: "meta",
  875. AName: "eid",
  876. }, {
  877. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  878. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  879. &ast.BinaryExpr{
  880. OP: ast.ARROW,
  881. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  882. RHS: &ast.JsonFieldRef{Name: "Device"},
  883. },
  884. }},
  885. []ast.StreamName{},
  886. nil,
  887. )},
  888. Name: "meta",
  889. AName: "hdevice",
  890. },
  891. },
  892. isAggregate: false,
  893. sendMeta: false,
  894. }.Init(),
  895. }, { // 11 join with same name field and aliased
  896. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  897. p: ProjectPlan{
  898. baseLogicalPlan: baseLogicalPlan{
  899. children: []LogicalPlan{
  900. JoinPlan{
  901. baseLogicalPlan: baseLogicalPlan{
  902. children: []LogicalPlan{
  903. JoinAlignPlan{
  904. baseLogicalPlan: baseLogicalPlan{
  905. children: []LogicalPlan{
  906. DataSourcePlan{
  907. name: "src2",
  908. streamFields: map[string]*ast.JsonStreamField{
  909. "hum": {
  910. Type: "bigint",
  911. },
  912. "id2": {
  913. Type: "bigint",
  914. },
  915. },
  916. streamStmt: streams["src2"],
  917. metaFields: []string{},
  918. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  919. }.Init(),
  920. DataSourcePlan{
  921. name: "tableInPlanner",
  922. streamFields: map[string]*ast.JsonStreamField{
  923. "hum": {
  924. Type: "bigint",
  925. },
  926. "id": {
  927. Type: "bigint",
  928. },
  929. },
  930. streamStmt: streams["tableInPlanner"],
  931. metaFields: []string{},
  932. }.Init(),
  933. },
  934. },
  935. Emitters: []string{"tableInPlanner"},
  936. }.Init(),
  937. },
  938. },
  939. from: &ast.Table{
  940. Name: "src2",
  941. },
  942. joins: []ast.Join{
  943. {
  944. Name: "tableInPlanner",
  945. Alias: "",
  946. JoinType: ast.INNER_JOIN,
  947. Expr: &ast.BinaryExpr{
  948. RHS: &ast.BinaryExpr{
  949. OP: ast.EQ,
  950. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  951. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  952. },
  953. OP: ast.AND,
  954. LHS: &ast.BinaryExpr{
  955. OP: ast.GT,
  956. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  957. &ast.FieldRef{
  958. Name: "hum",
  959. StreamName: "src2",
  960. },
  961. []ast.StreamName{"src2"},
  962. &boolFalse,
  963. )},
  964. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  965. &ast.FieldRef{
  966. Name: "hum",
  967. StreamName: "tableInPlanner",
  968. },
  969. []ast.StreamName{"tableInPlanner"},
  970. &boolFalse,
  971. )},
  972. },
  973. },
  974. },
  975. },
  976. }.Init(),
  977. },
  978. },
  979. fields: []ast.Field{
  980. {
  981. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  982. &ast.FieldRef{
  983. Name: "hum",
  984. StreamName: "src2",
  985. },
  986. []ast.StreamName{"src2"},
  987. &boolFalse,
  988. )},
  989. Name: "hum",
  990. AName: "hum1",
  991. }, {
  992. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  993. &ast.FieldRef{
  994. Name: "hum",
  995. StreamName: "tableInPlanner",
  996. },
  997. []ast.StreamName{"tableInPlanner"},
  998. &boolFalse,
  999. )},
  1000. Name: "hum",
  1001. AName: "hum2",
  1002. },
  1003. },
  1004. isAggregate: false,
  1005. sendMeta: false,
  1006. }.Init(),
  1007. }, { // 12 meta with more fields
  1008. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1009. p: ProjectPlan{
  1010. baseLogicalPlan: baseLogicalPlan{
  1011. children: []LogicalPlan{
  1012. FilterPlan{
  1013. baseLogicalPlan: baseLogicalPlan{
  1014. children: []LogicalPlan{
  1015. DataSourcePlan{
  1016. name: "src1",
  1017. streamFields: map[string]*ast.JsonStreamField{
  1018. "temp": {
  1019. Type: "bigint",
  1020. },
  1021. },
  1022. streamStmt: streams["src1"],
  1023. metaFields: []string{},
  1024. allMeta: true,
  1025. }.Init(),
  1026. },
  1027. },
  1028. condition: &ast.BinaryExpr{
  1029. LHS: &ast.Call{
  1030. Name: "meta",
  1031. FuncId: 1,
  1032. Args: []ast.Expr{&ast.MetaRef{
  1033. Name: "device",
  1034. StreamName: ast.DefaultStream,
  1035. }},
  1036. },
  1037. OP: ast.EQ,
  1038. RHS: &ast.StringLiteral{
  1039. Val: "demo2",
  1040. },
  1041. },
  1042. }.Init(),
  1043. },
  1044. },
  1045. fields: []ast.Field{
  1046. {
  1047. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1048. Name: "temp",
  1049. AName: "",
  1050. }, {
  1051. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1052. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1053. Name: "*",
  1054. StreamName: ast.DefaultStream,
  1055. }}},
  1056. []ast.StreamName{},
  1057. nil,
  1058. )},
  1059. Name: "meta",
  1060. AName: "m",
  1061. },
  1062. },
  1063. isAggregate: false,
  1064. sendMeta: false,
  1065. }.Init(),
  1066. }, { // 13 analytic function plan
  1067. sql: `SELECT latest(lag(name)), id1 FROM src1 WHERE lag(temp) > temp`,
  1068. p: ProjectPlan{
  1069. baseLogicalPlan: baseLogicalPlan{
  1070. children: []LogicalPlan{
  1071. FilterPlan{
  1072. baseLogicalPlan: baseLogicalPlan{
  1073. children: []LogicalPlan{
  1074. AnalyticFuncsPlan{
  1075. baseLogicalPlan: baseLogicalPlan{
  1076. children: []LogicalPlan{
  1077. DataSourcePlan{
  1078. name: "src1",
  1079. streamFields: map[string]*ast.JsonStreamField{
  1080. "id1": {
  1081. Type: "bigint",
  1082. },
  1083. "name": {
  1084. Type: "string",
  1085. },
  1086. "temp": {
  1087. Type: "bigint",
  1088. },
  1089. },
  1090. streamStmt: streams["src1"],
  1091. metaFields: []string{},
  1092. }.Init(),
  1093. },
  1094. },
  1095. funcs: []*ast.Call{
  1096. {
  1097. Name: "lag",
  1098. FuncId: 2,
  1099. CachedField: "$$a_lag_2",
  1100. Args: []ast.Expr{&ast.FieldRef{
  1101. Name: "temp",
  1102. StreamName: "src1",
  1103. }},
  1104. },
  1105. {
  1106. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1107. },
  1108. {
  1109. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1110. },
  1111. },
  1112. }.Init(),
  1113. },
  1114. },
  1115. condition: &ast.BinaryExpr{
  1116. LHS: &ast.Call{
  1117. Name: "lag",
  1118. FuncId: 2,
  1119. Args: []ast.Expr{&ast.FieldRef{
  1120. Name: "temp",
  1121. StreamName: "src1",
  1122. }},
  1123. CachedField: "$$a_lag_2",
  1124. Cached: true,
  1125. },
  1126. OP: ast.GT,
  1127. RHS: &ast.FieldRef{
  1128. Name: "temp",
  1129. StreamName: "src1",
  1130. },
  1131. },
  1132. }.Init(),
  1133. },
  1134. },
  1135. fields: []ast.Field{
  1136. {
  1137. Expr: &ast.Call{
  1138. Name: "latest",
  1139. FuncId: 1,
  1140. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1141. CachedField: "$$a_latest_1",
  1142. Cached: true,
  1143. },
  1144. Name: "latest",
  1145. }, {
  1146. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1147. Name: "id1",
  1148. },
  1149. },
  1150. isAggregate: false,
  1151. sendMeta: false,
  1152. }.Init(),
  1153. },
  1154. { // 14
  1155. sql: `SELECT name, *, meta(device) FROM src1`,
  1156. p: ProjectPlan{
  1157. baseLogicalPlan: baseLogicalPlan{
  1158. children: []LogicalPlan{
  1159. DataSourcePlan{
  1160. baseLogicalPlan: baseLogicalPlan{},
  1161. name: "src1",
  1162. streamFields: map[string]*ast.JsonStreamField{
  1163. "id1": {
  1164. Type: "bigint",
  1165. },
  1166. "temp": {
  1167. Type: "bigint",
  1168. },
  1169. "name": {
  1170. Type: "string",
  1171. },
  1172. "myarray": {
  1173. Type: "array",
  1174. Items: &ast.JsonStreamField{
  1175. Type: "string",
  1176. },
  1177. },
  1178. },
  1179. streamStmt: streams["src1"],
  1180. metaFields: []string{"device"},
  1181. isWildCard: true,
  1182. }.Init(),
  1183. },
  1184. },
  1185. fields: []ast.Field{
  1186. {
  1187. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1188. Name: "name",
  1189. AName: "",
  1190. },
  1191. {
  1192. Name: "*",
  1193. Expr: &ast.Wildcard{
  1194. Token: ast.ASTERISK,
  1195. },
  1196. },
  1197. {
  1198. Name: "meta",
  1199. Expr: &ast.Call{
  1200. Name: "meta",
  1201. Args: []ast.Expr{
  1202. &ast.MetaRef{
  1203. StreamName: ast.DefaultStream,
  1204. Name: "device",
  1205. },
  1206. },
  1207. },
  1208. },
  1209. },
  1210. isAggregate: false,
  1211. allWildcard: true,
  1212. sendMeta: false,
  1213. }.Init(),
  1214. },
  1215. { // 15 analytic function over partition plan
  1216. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp), id1 FROM src1 WHERE lag(temp) > temp`,
  1217. p: ProjectPlan{
  1218. baseLogicalPlan: baseLogicalPlan{
  1219. children: []LogicalPlan{
  1220. FilterPlan{
  1221. baseLogicalPlan: baseLogicalPlan{
  1222. children: []LogicalPlan{
  1223. AnalyticFuncsPlan{
  1224. baseLogicalPlan: baseLogicalPlan{
  1225. children: []LogicalPlan{
  1226. DataSourcePlan{
  1227. name: "src1",
  1228. streamFields: map[string]*ast.JsonStreamField{
  1229. "id1": {
  1230. Type: "bigint",
  1231. },
  1232. "name": {
  1233. Type: "string",
  1234. },
  1235. "temp": {
  1236. Type: "bigint",
  1237. },
  1238. },
  1239. streamStmt: streams["src1"],
  1240. metaFields: []string{},
  1241. }.Init(),
  1242. },
  1243. },
  1244. funcs: []*ast.Call{
  1245. {
  1246. Name: "lag",
  1247. FuncId: 2,
  1248. CachedField: "$$a_lag_2",
  1249. Args: []ast.Expr{&ast.FieldRef{
  1250. Name: "temp",
  1251. StreamName: "src1",
  1252. }},
  1253. },
  1254. {
  1255. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}},
  1256. },
  1257. {
  1258. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1259. },
  1260. },
  1261. }.Init(),
  1262. },
  1263. },
  1264. condition: &ast.BinaryExpr{
  1265. LHS: &ast.Call{
  1266. Name: "lag",
  1267. FuncId: 2,
  1268. Args: []ast.Expr{&ast.FieldRef{
  1269. Name: "temp",
  1270. StreamName: "src1",
  1271. }},
  1272. CachedField: "$$a_lag_2",
  1273. Cached: true,
  1274. },
  1275. OP: ast.GT,
  1276. RHS: &ast.FieldRef{
  1277. Name: "temp",
  1278. StreamName: "src1",
  1279. },
  1280. },
  1281. }.Init(),
  1282. },
  1283. },
  1284. fields: []ast.Field{
  1285. {
  1286. Expr: &ast.Call{
  1287. Name: "latest",
  1288. FuncId: 1,
  1289. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1290. CachedField: "$$a_latest_1",
  1291. Cached: true,
  1292. Partition: &ast.PartitionExpr{
  1293. Exprs: []ast.Expr{
  1294. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1295. },
  1296. },
  1297. },
  1298. Name: "latest",
  1299. }, {
  1300. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1301. Name: "id1",
  1302. },
  1303. },
  1304. isAggregate: false,
  1305. sendMeta: false,
  1306. }.Init(),
  1307. },
  1308. { // 16 analytic function over partition when plan
  1309. sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp WHEN temp > 12), id1 FROM src1 WHERE lag(temp) > temp`,
  1310. p: ProjectPlan{
  1311. baseLogicalPlan: baseLogicalPlan{
  1312. children: []LogicalPlan{
  1313. FilterPlan{
  1314. baseLogicalPlan: baseLogicalPlan{
  1315. children: []LogicalPlan{
  1316. AnalyticFuncsPlan{
  1317. baseLogicalPlan: baseLogicalPlan{
  1318. children: []LogicalPlan{
  1319. DataSourcePlan{
  1320. name: "src1",
  1321. streamFields: map[string]*ast.JsonStreamField{
  1322. "id1": {
  1323. Type: "bigint",
  1324. },
  1325. "name": {
  1326. Type: "string",
  1327. },
  1328. "temp": {
  1329. Type: "bigint",
  1330. },
  1331. },
  1332. streamStmt: streams["src1"],
  1333. metaFields: []string{},
  1334. }.Init(),
  1335. },
  1336. },
  1337. funcs: []*ast.Call{
  1338. {
  1339. Name: "lag",
  1340. FuncId: 2,
  1341. CachedField: "$$a_lag_2",
  1342. Args: []ast.Expr{&ast.FieldRef{
  1343. Name: "temp",
  1344. StreamName: "src1",
  1345. }},
  1346. },
  1347. {
  1348. Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}}, Partition: &ast.PartitionExpr{Exprs: []ast.Expr{&ast.FieldRef{Name: "temp", StreamName: "src1"}}}, WhenExpr: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"}, OP: ast.GT, RHS: &ast.IntegerLiteral{Val: 12}},
  1349. },
  1350. {
  1351. Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
  1352. },
  1353. },
  1354. }.Init(),
  1355. },
  1356. },
  1357. condition: &ast.BinaryExpr{
  1358. LHS: &ast.Call{
  1359. Name: "lag",
  1360. FuncId: 2,
  1361. Args: []ast.Expr{&ast.FieldRef{
  1362. Name: "temp",
  1363. StreamName: "src1",
  1364. }},
  1365. CachedField: "$$a_lag_2",
  1366. Cached: true,
  1367. },
  1368. OP: ast.GT,
  1369. RHS: &ast.FieldRef{
  1370. Name: "temp",
  1371. StreamName: "src1",
  1372. },
  1373. },
  1374. }.Init(),
  1375. },
  1376. },
  1377. fields: []ast.Field{
  1378. {
  1379. Expr: &ast.Call{
  1380. Name: "latest",
  1381. FuncId: 1,
  1382. Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
  1383. CachedField: "$$a_latest_1",
  1384. Cached: true,
  1385. Partition: &ast.PartitionExpr{
  1386. Exprs: []ast.Expr{
  1387. &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1388. },
  1389. },
  1390. WhenExpr: &ast.BinaryExpr{
  1391. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1392. OP: ast.GT,
  1393. RHS: &ast.IntegerLiteral{Val: 12},
  1394. },
  1395. },
  1396. Name: "latest",
  1397. }, {
  1398. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1399. Name: "id1",
  1400. },
  1401. },
  1402. isAggregate: false,
  1403. sendMeta: false,
  1404. }.Init(),
  1405. }, { // 17. do not optimize sliding window
  1406. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY SLIDINGWINDOW(ss, 10) HAVING COUNT(*) > 2`,
  1407. p: ProjectPlan{
  1408. baseLogicalPlan: baseLogicalPlan{
  1409. children: []LogicalPlan{
  1410. HavingPlan{
  1411. baseLogicalPlan: baseLogicalPlan{
  1412. children: []LogicalPlan{
  1413. FilterPlan{
  1414. baseLogicalPlan: baseLogicalPlan{
  1415. children: []LogicalPlan{
  1416. WindowPlan{
  1417. baseLogicalPlan: baseLogicalPlan{
  1418. children: []LogicalPlan{
  1419. DataSourcePlan{
  1420. name: "src1",
  1421. isWildCard: true,
  1422. streamFields: map[string]*ast.JsonStreamField{
  1423. "id1": {
  1424. Type: "bigint",
  1425. },
  1426. "temp": {
  1427. Type: "bigint",
  1428. },
  1429. "name": {
  1430. Type: "string",
  1431. },
  1432. "myarray": {
  1433. Type: "array",
  1434. Items: &ast.JsonStreamField{
  1435. Type: "string",
  1436. },
  1437. },
  1438. },
  1439. streamStmt: streams["src1"],
  1440. metaFields: []string{},
  1441. }.Init(),
  1442. },
  1443. },
  1444. condition: nil,
  1445. wtype: ast.SLIDING_WINDOW,
  1446. length: 10000,
  1447. interval: 0,
  1448. limit: 0,
  1449. }.Init(),
  1450. },
  1451. },
  1452. condition: &ast.BinaryExpr{
  1453. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1454. OP: ast.GT,
  1455. RHS: &ast.IntegerLiteral{Val: 20},
  1456. },
  1457. }.Init(),
  1458. },
  1459. },
  1460. condition: &ast.BinaryExpr{
  1461. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1462. Token: ast.ASTERISK,
  1463. }}, FuncType: ast.FuncTypeAgg},
  1464. OP: ast.GT,
  1465. RHS: &ast.IntegerLiteral{Val: 2},
  1466. },
  1467. }.Init(),
  1468. },
  1469. },
  1470. fields: []ast.Field{
  1471. {
  1472. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1473. Name: "*",
  1474. AName: ""},
  1475. },
  1476. isAggregate: false,
  1477. sendMeta: false,
  1478. }.Init(),
  1479. },
  1480. }
  1481. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1482. for i, tt := range tests {
  1483. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1484. if err != nil {
  1485. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1486. continue
  1487. }
  1488. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1489. IsEventTime: false,
  1490. LateTol: 0,
  1491. Concurrency: 0,
  1492. BufferLength: 0,
  1493. SendMetaToSink: false,
  1494. Qos: 0,
  1495. CheckpointInterval: 0,
  1496. SendError: true,
  1497. }, store)
  1498. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1499. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1500. } else if !reflect.DeepEqual(tt.p, p) {
  1501. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1502. }
  1503. }
  1504. }
  1505. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1506. store, err := store.GetKV("stream")
  1507. if err != nil {
  1508. t.Error(err)
  1509. return
  1510. }
  1511. streamSqls := map[string]string{
  1512. "src1": `CREATE STREAM src1 (
  1513. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1514. "src2": `CREATE STREAM src2 (
  1515. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1516. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1517. id BIGINT,
  1518. name STRING,
  1519. value STRING,
  1520. hum BIGINT
  1521. ) WITH (TYPE="file");`,
  1522. }
  1523. types := map[string]ast.StreamType{
  1524. "src1": ast.TypeStream,
  1525. "src2": ast.TypeStream,
  1526. "tableInPlanner": ast.TypeTable,
  1527. }
  1528. for name, sql := range streamSqls {
  1529. s, err := json.Marshal(&xsql.StreamInfo{
  1530. StreamType: types[name],
  1531. Statement: sql,
  1532. })
  1533. if err != nil {
  1534. t.Error(err)
  1535. t.Fail()
  1536. }
  1537. err = store.Set(name, string(s))
  1538. if err != nil {
  1539. t.Error(err)
  1540. t.Fail()
  1541. }
  1542. }
  1543. streams := make(map[string]*ast.StreamStmt)
  1544. for n := range streamSqls {
  1545. streamStmt, err := xsql.GetDataSource(store, n)
  1546. if err != nil {
  1547. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1548. return
  1549. }
  1550. streams[n] = streamStmt
  1551. }
  1552. var (
  1553. //boolTrue = true
  1554. boolFalse = false
  1555. )
  1556. var tests = []struct {
  1557. sql string
  1558. p LogicalPlan
  1559. err string
  1560. }{
  1561. { // 0
  1562. sql: `SELECT name FROM src1`,
  1563. p: ProjectPlan{
  1564. baseLogicalPlan: baseLogicalPlan{
  1565. children: []LogicalPlan{
  1566. DataSourcePlan{
  1567. baseLogicalPlan: baseLogicalPlan{},
  1568. name: "src1",
  1569. streamFields: map[string]*ast.JsonStreamField{
  1570. "name": nil,
  1571. },
  1572. streamStmt: streams["src1"],
  1573. isSchemaless: true,
  1574. metaFields: []string{},
  1575. }.Init(),
  1576. },
  1577. },
  1578. fields: []ast.Field{
  1579. {
  1580. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1581. Name: "name",
  1582. AName: "",
  1583. },
  1584. },
  1585. isAggregate: false,
  1586. sendMeta: false,
  1587. }.Init(),
  1588. }, { // 1 optimize where to data source
  1589. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1590. p: ProjectPlan{
  1591. baseLogicalPlan: baseLogicalPlan{
  1592. children: []LogicalPlan{
  1593. WindowPlan{
  1594. baseLogicalPlan: baseLogicalPlan{
  1595. children: []LogicalPlan{
  1596. FilterPlan{
  1597. baseLogicalPlan: baseLogicalPlan{
  1598. children: []LogicalPlan{
  1599. DataSourcePlan{
  1600. name: "src1",
  1601. streamFields: map[string]*ast.JsonStreamField{
  1602. "name": nil,
  1603. "temp": nil,
  1604. },
  1605. streamStmt: streams["src1"],
  1606. metaFields: []string{},
  1607. isSchemaless: true,
  1608. }.Init(),
  1609. },
  1610. },
  1611. condition: &ast.BinaryExpr{
  1612. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1613. OP: ast.EQ,
  1614. RHS: &ast.StringLiteral{Val: "v1"},
  1615. },
  1616. }.Init(),
  1617. },
  1618. },
  1619. condition: nil,
  1620. wtype: ast.TUMBLING_WINDOW,
  1621. length: 10000,
  1622. interval: 0,
  1623. limit: 0,
  1624. }.Init(),
  1625. },
  1626. },
  1627. fields: []ast.Field{
  1628. {
  1629. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1630. Name: "temp",
  1631. AName: ""},
  1632. },
  1633. isAggregate: false,
  1634. sendMeta: false,
  1635. }.Init(),
  1636. }, { // 2 condition that cannot be optimized
  1637. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1638. p: ProjectPlan{
  1639. baseLogicalPlan: baseLogicalPlan{
  1640. children: []LogicalPlan{
  1641. JoinPlan{
  1642. baseLogicalPlan: baseLogicalPlan{
  1643. children: []LogicalPlan{
  1644. WindowPlan{
  1645. baseLogicalPlan: baseLogicalPlan{
  1646. children: []LogicalPlan{
  1647. DataSourcePlan{
  1648. name: "src1",
  1649. streamFields: map[string]*ast.JsonStreamField{
  1650. "id1": nil,
  1651. "temp": nil,
  1652. },
  1653. streamStmt: streams["src1"],
  1654. metaFields: []string{},
  1655. isSchemaless: true,
  1656. }.Init(),
  1657. DataSourcePlan{
  1658. name: "src2",
  1659. streamFields: map[string]*ast.JsonStreamField{ // can't determine where is id1 belonged to
  1660. "hum": nil,
  1661. "id1": nil,
  1662. "id2": nil,
  1663. },
  1664. isSchemaless: true,
  1665. streamStmt: streams["src2"],
  1666. metaFields: []string{},
  1667. }.Init(),
  1668. },
  1669. },
  1670. condition: nil,
  1671. wtype: ast.TUMBLING_WINDOW,
  1672. length: 10000,
  1673. interval: 0,
  1674. limit: 0,
  1675. }.Init(),
  1676. },
  1677. },
  1678. from: &ast.Table{Name: "src1"},
  1679. joins: ast.Joins{ast.Join{
  1680. Name: "src2",
  1681. JoinType: ast.INNER_JOIN,
  1682. Expr: &ast.BinaryExpr{
  1683. OP: ast.AND,
  1684. LHS: &ast.BinaryExpr{
  1685. LHS: &ast.BinaryExpr{
  1686. OP: ast.GT,
  1687. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1688. RHS: &ast.IntegerLiteral{Val: 20},
  1689. },
  1690. OP: ast.OR,
  1691. RHS: &ast.BinaryExpr{
  1692. OP: ast.GT,
  1693. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1694. RHS: &ast.IntegerLiteral{Val: 60},
  1695. },
  1696. },
  1697. RHS: &ast.BinaryExpr{
  1698. OP: ast.EQ,
  1699. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1700. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1701. },
  1702. },
  1703. }},
  1704. }.Init(),
  1705. },
  1706. },
  1707. fields: []ast.Field{
  1708. {
  1709. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1710. Name: "id1",
  1711. AName: ""},
  1712. },
  1713. isAggregate: false,
  1714. sendMeta: false,
  1715. }.Init(),
  1716. }, { // 3 optimize window filter
  1717. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1718. p: ProjectPlan{
  1719. baseLogicalPlan: baseLogicalPlan{
  1720. children: []LogicalPlan{
  1721. WindowPlan{
  1722. baseLogicalPlan: baseLogicalPlan{
  1723. children: []LogicalPlan{
  1724. FilterPlan{
  1725. baseLogicalPlan: baseLogicalPlan{
  1726. children: []LogicalPlan{
  1727. DataSourcePlan{
  1728. name: "src1",
  1729. streamFields: map[string]*ast.JsonStreamField{
  1730. "id1": nil,
  1731. "name": nil,
  1732. "temp": nil,
  1733. },
  1734. isSchemaless: true,
  1735. streamStmt: streams["src1"],
  1736. metaFields: []string{},
  1737. }.Init(),
  1738. },
  1739. },
  1740. condition: &ast.BinaryExpr{
  1741. OP: ast.AND,
  1742. LHS: &ast.BinaryExpr{
  1743. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1744. OP: ast.EQ,
  1745. RHS: &ast.StringLiteral{Val: "v1"},
  1746. },
  1747. RHS: &ast.BinaryExpr{
  1748. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1749. OP: ast.GT,
  1750. RHS: &ast.IntegerLiteral{Val: 2},
  1751. },
  1752. },
  1753. }.Init(),
  1754. },
  1755. },
  1756. condition: nil,
  1757. wtype: ast.TUMBLING_WINDOW,
  1758. length: 10000,
  1759. interval: 0,
  1760. limit: 0,
  1761. }.Init(),
  1762. },
  1763. },
  1764. fields: []ast.Field{
  1765. {
  1766. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1767. Name: "id1",
  1768. AName: ""},
  1769. },
  1770. isAggregate: false,
  1771. sendMeta: false,
  1772. }.Init(),
  1773. }, { // 4. do not optimize count window
  1774. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1775. p: ProjectPlan{
  1776. baseLogicalPlan: baseLogicalPlan{
  1777. children: []LogicalPlan{
  1778. HavingPlan{
  1779. baseLogicalPlan: baseLogicalPlan{
  1780. children: []LogicalPlan{
  1781. FilterPlan{
  1782. baseLogicalPlan: baseLogicalPlan{
  1783. children: []LogicalPlan{
  1784. WindowPlan{
  1785. baseLogicalPlan: baseLogicalPlan{
  1786. children: []LogicalPlan{
  1787. DataSourcePlan{
  1788. name: "src1",
  1789. isWildCard: true,
  1790. streamFields: map[string]*ast.JsonStreamField{},
  1791. streamStmt: streams["src1"],
  1792. metaFields: []string{},
  1793. isSchemaless: true,
  1794. }.Init(),
  1795. },
  1796. },
  1797. condition: nil,
  1798. wtype: ast.COUNT_WINDOW,
  1799. length: 5,
  1800. interval: 1,
  1801. limit: 0,
  1802. }.Init(),
  1803. },
  1804. },
  1805. condition: &ast.BinaryExpr{
  1806. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1807. OP: ast.GT,
  1808. RHS: &ast.IntegerLiteral{Val: 20},
  1809. },
  1810. }.Init(),
  1811. },
  1812. },
  1813. condition: &ast.BinaryExpr{
  1814. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1815. Token: ast.ASTERISK,
  1816. }}, FuncType: ast.FuncTypeAgg},
  1817. OP: ast.GT,
  1818. RHS: &ast.IntegerLiteral{Val: 2},
  1819. },
  1820. }.Init(),
  1821. },
  1822. },
  1823. fields: []ast.Field{
  1824. {
  1825. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1826. Name: "*",
  1827. AName: ""},
  1828. },
  1829. isAggregate: false,
  1830. sendMeta: false,
  1831. }.Init(),
  1832. }, { // 5. optimize join on
  1833. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1834. p: ProjectPlan{
  1835. baseLogicalPlan: baseLogicalPlan{
  1836. children: []LogicalPlan{
  1837. JoinPlan{
  1838. baseLogicalPlan: baseLogicalPlan{
  1839. children: []LogicalPlan{
  1840. WindowPlan{
  1841. baseLogicalPlan: baseLogicalPlan{
  1842. children: []LogicalPlan{
  1843. FilterPlan{
  1844. baseLogicalPlan: baseLogicalPlan{
  1845. children: []LogicalPlan{
  1846. DataSourcePlan{
  1847. name: "src1",
  1848. streamFields: map[string]*ast.JsonStreamField{
  1849. "id1": nil,
  1850. "temp": nil,
  1851. },
  1852. isSchemaless: true,
  1853. streamStmt: streams["src1"],
  1854. metaFields: []string{},
  1855. }.Init(),
  1856. },
  1857. },
  1858. condition: &ast.BinaryExpr{
  1859. RHS: &ast.BinaryExpr{
  1860. OP: ast.GT,
  1861. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1862. RHS: &ast.IntegerLiteral{Val: 20},
  1863. },
  1864. OP: ast.AND,
  1865. LHS: &ast.BinaryExpr{
  1866. OP: ast.GT,
  1867. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1868. RHS: &ast.IntegerLiteral{Val: 111},
  1869. },
  1870. },
  1871. }.Init(),
  1872. FilterPlan{
  1873. baseLogicalPlan: baseLogicalPlan{
  1874. children: []LogicalPlan{
  1875. DataSourcePlan{
  1876. name: "src2",
  1877. streamFields: map[string]*ast.JsonStreamField{
  1878. "hum": nil,
  1879. "id1": nil,
  1880. "id2": nil,
  1881. },
  1882. isSchemaless: true,
  1883. streamStmt: streams["src2"],
  1884. metaFields: []string{},
  1885. }.Init(),
  1886. },
  1887. },
  1888. condition: &ast.BinaryExpr{
  1889. OP: ast.LT,
  1890. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1891. RHS: &ast.IntegerLiteral{Val: 60},
  1892. },
  1893. }.Init(),
  1894. },
  1895. },
  1896. condition: nil,
  1897. wtype: ast.TUMBLING_WINDOW,
  1898. length: 10000,
  1899. interval: 0,
  1900. limit: 0,
  1901. }.Init(),
  1902. },
  1903. },
  1904. from: &ast.Table{
  1905. Name: "src1",
  1906. },
  1907. joins: []ast.Join{
  1908. {
  1909. Name: "src2",
  1910. Alias: "",
  1911. JoinType: ast.INNER_JOIN,
  1912. Expr: &ast.BinaryExpr{
  1913. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1914. OP: ast.EQ,
  1915. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1916. },
  1917. },
  1918. },
  1919. }.Init(),
  1920. },
  1921. },
  1922. fields: []ast.Field{
  1923. {
  1924. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1925. Name: "id1",
  1926. AName: ""},
  1927. },
  1928. isAggregate: false,
  1929. sendMeta: false,
  1930. }.Init(),
  1931. }, { // 6. optimize outter join on
  1932. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1933. p: ProjectPlan{
  1934. baseLogicalPlan: baseLogicalPlan{
  1935. children: []LogicalPlan{
  1936. JoinPlan{
  1937. baseLogicalPlan: baseLogicalPlan{
  1938. children: []LogicalPlan{
  1939. WindowPlan{
  1940. baseLogicalPlan: baseLogicalPlan{
  1941. children: []LogicalPlan{
  1942. FilterPlan{
  1943. baseLogicalPlan: baseLogicalPlan{
  1944. children: []LogicalPlan{
  1945. DataSourcePlan{
  1946. name: "src1",
  1947. streamFields: map[string]*ast.JsonStreamField{
  1948. "id1": nil,
  1949. "temp": nil,
  1950. },
  1951. isSchemaless: true,
  1952. streamStmt: streams["src1"],
  1953. metaFields: []string{},
  1954. }.Init(),
  1955. },
  1956. },
  1957. condition: &ast.BinaryExpr{
  1958. OP: ast.GT,
  1959. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1960. RHS: &ast.IntegerLiteral{Val: 111},
  1961. },
  1962. }.Init(),
  1963. DataSourcePlan{
  1964. name: "src2",
  1965. streamFields: map[string]*ast.JsonStreamField{
  1966. "hum": nil,
  1967. "id1": nil,
  1968. "id2": nil,
  1969. },
  1970. isSchemaless: true,
  1971. streamStmt: streams["src2"],
  1972. metaFields: []string{},
  1973. }.Init(),
  1974. },
  1975. },
  1976. condition: nil,
  1977. wtype: ast.TUMBLING_WINDOW,
  1978. length: 10000,
  1979. interval: 0,
  1980. limit: 0,
  1981. }.Init(),
  1982. },
  1983. },
  1984. from: &ast.Table{
  1985. Name: "src1",
  1986. },
  1987. joins: []ast.Join{
  1988. {
  1989. Name: "src2",
  1990. Alias: "",
  1991. JoinType: ast.FULL_JOIN,
  1992. Expr: &ast.BinaryExpr{
  1993. OP: ast.AND,
  1994. LHS: &ast.BinaryExpr{
  1995. OP: ast.AND,
  1996. LHS: &ast.BinaryExpr{
  1997. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1998. OP: ast.EQ,
  1999. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  2000. },
  2001. RHS: &ast.BinaryExpr{
  2002. OP: ast.GT,
  2003. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2004. RHS: &ast.IntegerLiteral{Val: 20},
  2005. },
  2006. },
  2007. RHS: &ast.BinaryExpr{
  2008. OP: ast.LT,
  2009. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  2010. RHS: &ast.IntegerLiteral{Val: 60},
  2011. },
  2012. },
  2013. },
  2014. },
  2015. }.Init(),
  2016. },
  2017. },
  2018. fields: []ast.Field{
  2019. {
  2020. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2021. Name: "id1",
  2022. AName: ""},
  2023. },
  2024. isAggregate: false,
  2025. sendMeta: false,
  2026. }.Init(),
  2027. }, { // 7 window error for table
  2028. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2029. p: nil,
  2030. err: "cannot run window for TABLE sources",
  2031. }, { // 8 join table without window
  2032. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  2033. p: ProjectPlan{
  2034. baseLogicalPlan: baseLogicalPlan{
  2035. children: []LogicalPlan{
  2036. JoinPlan{
  2037. baseLogicalPlan: baseLogicalPlan{
  2038. children: []LogicalPlan{
  2039. JoinAlignPlan{
  2040. baseLogicalPlan: baseLogicalPlan{
  2041. children: []LogicalPlan{
  2042. FilterPlan{
  2043. baseLogicalPlan: baseLogicalPlan{
  2044. children: []LogicalPlan{
  2045. DataSourcePlan{
  2046. name: "src1",
  2047. streamFields: map[string]*ast.JsonStreamField{
  2048. "hum": nil,
  2049. "id1": nil,
  2050. "temp": nil,
  2051. },
  2052. isSchemaless: true,
  2053. streamStmt: streams["src1"],
  2054. metaFields: []string{},
  2055. }.Init(),
  2056. },
  2057. },
  2058. condition: &ast.BinaryExpr{
  2059. RHS: &ast.BinaryExpr{
  2060. OP: ast.GT,
  2061. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2062. RHS: &ast.IntegerLiteral{Val: 20},
  2063. },
  2064. OP: ast.AND,
  2065. LHS: &ast.BinaryExpr{
  2066. OP: ast.GT,
  2067. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2068. RHS: &ast.IntegerLiteral{Val: 111},
  2069. },
  2070. },
  2071. }.Init(),
  2072. DataSourcePlan{
  2073. name: "tableInPlanner",
  2074. streamFields: map[string]*ast.JsonStreamField{
  2075. "hum": {
  2076. Type: "bigint",
  2077. },
  2078. "id": {
  2079. Type: "bigint",
  2080. },
  2081. },
  2082. streamStmt: streams["tableInPlanner"],
  2083. metaFields: []string{},
  2084. }.Init(),
  2085. },
  2086. },
  2087. Emitters: []string{"tableInPlanner"},
  2088. }.Init(),
  2089. },
  2090. },
  2091. from: &ast.Table{
  2092. Name: "src1",
  2093. },
  2094. joins: []ast.Join{
  2095. {
  2096. Name: "tableInPlanner",
  2097. Alias: "",
  2098. JoinType: ast.INNER_JOIN,
  2099. Expr: &ast.BinaryExpr{
  2100. OP: ast.AND,
  2101. LHS: &ast.BinaryExpr{
  2102. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2103. OP: ast.EQ,
  2104. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  2105. },
  2106. RHS: &ast.BinaryExpr{
  2107. OP: ast.LT,
  2108. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  2109. RHS: &ast.IntegerLiteral{Val: 60},
  2110. },
  2111. },
  2112. },
  2113. },
  2114. }.Init(),
  2115. },
  2116. },
  2117. fields: []ast.Field{
  2118. {
  2119. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2120. Name: "id1",
  2121. AName: ""},
  2122. },
  2123. isAggregate: false,
  2124. sendMeta: false,
  2125. }.Init(),
  2126. }, { // 9 join table with window
  2127. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2128. p: ProjectPlan{
  2129. baseLogicalPlan: baseLogicalPlan{
  2130. children: []LogicalPlan{
  2131. JoinPlan{
  2132. baseLogicalPlan: baseLogicalPlan{
  2133. children: []LogicalPlan{
  2134. JoinAlignPlan{
  2135. baseLogicalPlan: baseLogicalPlan{
  2136. children: []LogicalPlan{
  2137. WindowPlan{
  2138. baseLogicalPlan: baseLogicalPlan{
  2139. children: []LogicalPlan{
  2140. DataSourcePlan{
  2141. name: "src1",
  2142. streamFields: map[string]*ast.JsonStreamField{
  2143. "id1": nil,
  2144. "temp": nil,
  2145. },
  2146. isSchemaless: true,
  2147. streamStmt: streams["src1"],
  2148. metaFields: []string{},
  2149. }.Init(),
  2150. },
  2151. },
  2152. condition: nil,
  2153. wtype: ast.TUMBLING_WINDOW,
  2154. length: 10000,
  2155. interval: 0,
  2156. limit: 0,
  2157. }.Init(),
  2158. DataSourcePlan{
  2159. name: "tableInPlanner",
  2160. streamFields: map[string]*ast.JsonStreamField{
  2161. "hum": {
  2162. Type: "bigint",
  2163. },
  2164. "id": {
  2165. Type: "bigint",
  2166. },
  2167. },
  2168. streamStmt: streams["tableInPlanner"],
  2169. metaFields: []string{},
  2170. }.Init(),
  2171. },
  2172. },
  2173. Emitters: []string{"tableInPlanner"},
  2174. }.Init(),
  2175. },
  2176. },
  2177. from: &ast.Table{
  2178. Name: "src1",
  2179. },
  2180. joins: []ast.Join{
  2181. {
  2182. Name: "tableInPlanner",
  2183. Alias: "",
  2184. JoinType: ast.INNER_JOIN,
  2185. Expr: &ast.BinaryExpr{
  2186. OP: ast.AND,
  2187. LHS: &ast.BinaryExpr{
  2188. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2189. OP: ast.EQ,
  2190. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  2191. },
  2192. RHS: &ast.BinaryExpr{
  2193. RHS: &ast.BinaryExpr{
  2194. OP: ast.AND,
  2195. LHS: &ast.BinaryExpr{
  2196. OP: ast.GT,
  2197. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2198. RHS: &ast.IntegerLiteral{Val: 20},
  2199. },
  2200. RHS: &ast.BinaryExpr{
  2201. OP: ast.LT,
  2202. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  2203. RHS: &ast.IntegerLiteral{Val: 60},
  2204. },
  2205. },
  2206. OP: ast.AND,
  2207. LHS: &ast.BinaryExpr{
  2208. OP: ast.GT,
  2209. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  2210. RHS: &ast.IntegerLiteral{Val: 111},
  2211. },
  2212. },
  2213. },
  2214. },
  2215. },
  2216. }.Init(),
  2217. },
  2218. },
  2219. fields: []ast.Field{
  2220. {
  2221. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  2222. Name: "id1",
  2223. AName: ""},
  2224. },
  2225. isAggregate: false,
  2226. sendMeta: false,
  2227. }.Init(),
  2228. }, { // 10 meta
  2229. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  2230. p: ProjectPlan{
  2231. baseLogicalPlan: baseLogicalPlan{
  2232. children: []LogicalPlan{
  2233. FilterPlan{
  2234. baseLogicalPlan: baseLogicalPlan{
  2235. children: []LogicalPlan{
  2236. DataSourcePlan{
  2237. name: "src1",
  2238. streamFields: map[string]*ast.JsonStreamField{
  2239. "temp": nil,
  2240. },
  2241. isSchemaless: true,
  2242. streamStmt: streams["src1"],
  2243. metaFields: []string{"Humidity", "device", "id"},
  2244. }.Init(),
  2245. },
  2246. },
  2247. condition: &ast.BinaryExpr{
  2248. LHS: &ast.Call{
  2249. Name: "meta",
  2250. FuncId: 2,
  2251. Args: []ast.Expr{&ast.MetaRef{
  2252. Name: "device",
  2253. StreamName: ast.DefaultStream,
  2254. }},
  2255. },
  2256. OP: ast.EQ,
  2257. RHS: &ast.StringLiteral{
  2258. Val: "demo2",
  2259. },
  2260. },
  2261. }.Init(),
  2262. },
  2263. },
  2264. fields: []ast.Field{
  2265. {
  2266. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  2267. Name: "temp",
  2268. AName: "",
  2269. }, {
  2270. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2271. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  2272. Name: "id",
  2273. StreamName: ast.DefaultStream,
  2274. }}},
  2275. []ast.StreamName{},
  2276. nil,
  2277. )},
  2278. Name: "meta",
  2279. AName: "eid",
  2280. }, {
  2281. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2282. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  2283. &ast.BinaryExpr{
  2284. OP: ast.ARROW,
  2285. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  2286. RHS: &ast.JsonFieldRef{Name: "Device"},
  2287. },
  2288. }},
  2289. []ast.StreamName{},
  2290. nil,
  2291. )},
  2292. Name: "meta",
  2293. AName: "hdevice",
  2294. },
  2295. },
  2296. isAggregate: false,
  2297. sendMeta: false,
  2298. }.Init(),
  2299. }, { // 11 join with same name field and aliased
  2300. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  2301. p: ProjectPlan{
  2302. baseLogicalPlan: baseLogicalPlan{
  2303. children: []LogicalPlan{
  2304. JoinPlan{
  2305. baseLogicalPlan: baseLogicalPlan{
  2306. children: []LogicalPlan{
  2307. JoinAlignPlan{
  2308. baseLogicalPlan: baseLogicalPlan{
  2309. children: []LogicalPlan{
  2310. DataSourcePlan{
  2311. name: "src2",
  2312. streamFields: map[string]*ast.JsonStreamField{
  2313. "hum": nil,
  2314. "id": nil,
  2315. "id2": nil,
  2316. },
  2317. isSchemaless: true,
  2318. streamStmt: streams["src2"],
  2319. metaFields: []string{},
  2320. }.Init(),
  2321. DataSourcePlan{
  2322. name: "tableInPlanner",
  2323. streamFields: map[string]*ast.JsonStreamField{
  2324. "hum": {
  2325. Type: "bigint",
  2326. },
  2327. "id": {
  2328. Type: "bigint",
  2329. },
  2330. },
  2331. streamStmt: streams["tableInPlanner"],
  2332. metaFields: []string{},
  2333. }.Init(),
  2334. },
  2335. },
  2336. Emitters: []string{"tableInPlanner"},
  2337. }.Init(),
  2338. },
  2339. },
  2340. from: &ast.Table{
  2341. Name: "src2",
  2342. },
  2343. joins: []ast.Join{
  2344. {
  2345. Name: "tableInPlanner",
  2346. Alias: "",
  2347. JoinType: ast.INNER_JOIN,
  2348. Expr: &ast.BinaryExpr{
  2349. RHS: &ast.BinaryExpr{
  2350. OP: ast.EQ,
  2351. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  2352. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  2353. },
  2354. OP: ast.AND,
  2355. LHS: &ast.BinaryExpr{
  2356. OP: ast.GT,
  2357. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2358. &ast.FieldRef{
  2359. Name: "hum",
  2360. StreamName: "src2",
  2361. },
  2362. []ast.StreamName{"src2"},
  2363. &boolFalse,
  2364. )},
  2365. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2366. &ast.FieldRef{
  2367. Name: "hum",
  2368. StreamName: "tableInPlanner",
  2369. },
  2370. []ast.StreamName{"tableInPlanner"},
  2371. &boolFalse,
  2372. )},
  2373. },
  2374. },
  2375. },
  2376. },
  2377. }.Init(),
  2378. },
  2379. },
  2380. fields: []ast.Field{
  2381. {
  2382. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2383. &ast.FieldRef{
  2384. Name: "hum",
  2385. StreamName: "src2",
  2386. },
  2387. []ast.StreamName{"src2"},
  2388. &boolFalse,
  2389. )},
  2390. Name: "hum",
  2391. AName: "hum1",
  2392. }, {
  2393. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  2394. &ast.FieldRef{
  2395. Name: "hum",
  2396. StreamName: "tableInPlanner",
  2397. },
  2398. []ast.StreamName{"tableInPlanner"},
  2399. &boolFalse,
  2400. )},
  2401. Name: "hum",
  2402. AName: "hum2",
  2403. },
  2404. },
  2405. isAggregate: false,
  2406. sendMeta: false,
  2407. }.Init(),
  2408. }, { // 12
  2409. sql: `SELECT name->first, name->last FROM src1`,
  2410. p: ProjectPlan{
  2411. baseLogicalPlan: baseLogicalPlan{
  2412. children: []LogicalPlan{
  2413. DataSourcePlan{
  2414. baseLogicalPlan: baseLogicalPlan{},
  2415. name: "src1",
  2416. streamFields: map[string]*ast.JsonStreamField{
  2417. "name": nil,
  2418. },
  2419. isSchemaless: true,
  2420. streamStmt: streams["src1"],
  2421. metaFields: []string{},
  2422. }.Init(),
  2423. },
  2424. },
  2425. fields: []ast.Field{
  2426. {
  2427. Expr: &ast.BinaryExpr{
  2428. OP: ast.ARROW,
  2429. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2430. RHS: &ast.JsonFieldRef{Name: "first"},
  2431. },
  2432. Name: "kuiper_field_0",
  2433. AName: "",
  2434. }, {
  2435. Expr: &ast.BinaryExpr{
  2436. OP: ast.ARROW,
  2437. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2438. RHS: &ast.JsonFieldRef{Name: "last"},
  2439. },
  2440. Name: "kuiper_field_1",
  2441. AName: "",
  2442. },
  2443. },
  2444. isAggregate: false,
  2445. sendMeta: false,
  2446. }.Init(),
  2447. },
  2448. }
  2449. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2450. for i, tt := range tests {
  2451. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2452. if err != nil {
  2453. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2454. continue
  2455. }
  2456. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2457. IsEventTime: false,
  2458. LateTol: 0,
  2459. Concurrency: 0,
  2460. BufferLength: 0,
  2461. SendMetaToSink: false,
  2462. Qos: 0,
  2463. CheckpointInterval: 0,
  2464. SendError: true,
  2465. }, store)
  2466. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2467. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2468. } else if !reflect.DeepEqual(tt.p, p) {
  2469. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2470. }
  2471. }
  2472. }
  2473. func Test_createLogicalPlan4Lookup(t *testing.T) {
  2474. store, err := store.GetKV("stream")
  2475. if err != nil {
  2476. t.Error(err)
  2477. return
  2478. }
  2479. streamSqls := map[string]string{
  2480. "src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  2481. "table1": `CREATE TABLE table1 () WITH (DATASOURCE="table1",TYPE="sql", KIND="lookup");`,
  2482. "table2": `CREATE TABLE table2 () WITH (DATASOURCE="table2",TYPE="sql", KIND="lookup");`,
  2483. }
  2484. types := map[string]ast.StreamType{
  2485. "src1": ast.TypeStream,
  2486. "table1": ast.TypeTable,
  2487. "table2": ast.TypeTable,
  2488. }
  2489. for name, sql := range streamSqls {
  2490. s, err := json.Marshal(&xsql.StreamInfo{
  2491. StreamType: types[name],
  2492. Statement: sql,
  2493. })
  2494. if err != nil {
  2495. t.Error(err)
  2496. t.Fail()
  2497. }
  2498. err = store.Set(name, string(s))
  2499. if err != nil {
  2500. t.Error(err)
  2501. t.Fail()
  2502. }
  2503. }
  2504. streams := make(map[string]*ast.StreamStmt)
  2505. for n := range streamSqls {
  2506. streamStmt, err := xsql.GetDataSource(store, n)
  2507. if err != nil {
  2508. t.Errorf("fail to get stream %s, please check if stream is created", n)
  2509. return
  2510. }
  2511. streams[n] = streamStmt
  2512. }
  2513. var tests = []struct {
  2514. sql string
  2515. p LogicalPlan
  2516. err string
  2517. }{
  2518. { // 0
  2519. sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id`,
  2520. p: ProjectPlan{
  2521. baseLogicalPlan: baseLogicalPlan{
  2522. children: []LogicalPlan{
  2523. LookupPlan{
  2524. baseLogicalPlan: baseLogicalPlan{
  2525. children: []LogicalPlan{
  2526. DataSourcePlan{
  2527. baseLogicalPlan: baseLogicalPlan{},
  2528. name: "src1",
  2529. streamFields: map[string]*ast.JsonStreamField{
  2530. "a": nil,
  2531. },
  2532. isSchemaless: true,
  2533. streamStmt: streams["src1"],
  2534. metaFields: []string{},
  2535. }.Init(),
  2536. },
  2537. },
  2538. joinExpr: ast.Join{
  2539. Name: "table1",
  2540. Alias: "",
  2541. JoinType: ast.INNER_JOIN,
  2542. Expr: &ast.BinaryExpr{
  2543. OP: ast.EQ,
  2544. LHS: &ast.FieldRef{
  2545. StreamName: "src1",
  2546. Name: "id",
  2547. },
  2548. RHS: &ast.FieldRef{
  2549. StreamName: "table1",
  2550. Name: "id",
  2551. },
  2552. },
  2553. },
  2554. keys: []string{"id"},
  2555. fields: []string{"b"},
  2556. valvars: []ast.Expr{
  2557. &ast.FieldRef{
  2558. StreamName: "src1",
  2559. Name: "id",
  2560. },
  2561. },
  2562. options: &ast.Options{
  2563. DATASOURCE: "table1",
  2564. TYPE: "sql",
  2565. KIND: "lookup",
  2566. },
  2567. conditions: nil,
  2568. }.Init(),
  2569. },
  2570. },
  2571. fields: []ast.Field{
  2572. {
  2573. Expr: &ast.FieldRef{
  2574. StreamName: "src1",
  2575. Name: "a",
  2576. },
  2577. Name: "a",
  2578. AName: "",
  2579. },
  2580. {
  2581. Expr: &ast.FieldRef{
  2582. StreamName: "table1",
  2583. Name: "b",
  2584. },
  2585. Name: "b",
  2586. AName: "",
  2587. },
  2588. },
  2589. isAggregate: false,
  2590. sendMeta: false,
  2591. }.Init(),
  2592. },
  2593. { // 1
  2594. sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
  2595. p: ProjectPlan{
  2596. baseLogicalPlan: baseLogicalPlan{
  2597. children: []LogicalPlan{
  2598. FilterPlan{
  2599. baseLogicalPlan: baseLogicalPlan{
  2600. children: []LogicalPlan{
  2601. LookupPlan{
  2602. baseLogicalPlan: baseLogicalPlan{
  2603. children: []LogicalPlan{
  2604. FilterPlan{
  2605. baseLogicalPlan: baseLogicalPlan{
  2606. children: []LogicalPlan{
  2607. DataSourcePlan{
  2608. baseLogicalPlan: baseLogicalPlan{},
  2609. name: "src1",
  2610. streamFields: map[string]*ast.JsonStreamField{
  2611. "a": nil,
  2612. },
  2613. isSchemaless: true,
  2614. streamStmt: streams["src1"],
  2615. metaFields: []string{},
  2616. }.Init(),
  2617. },
  2618. },
  2619. condition: &ast.BinaryExpr{
  2620. OP: ast.LT,
  2621. LHS: &ast.FieldRef{
  2622. StreamName: "src1",
  2623. Name: "c",
  2624. },
  2625. RHS: &ast.IntegerLiteral{Val: 40},
  2626. },
  2627. }.Init(),
  2628. },
  2629. },
  2630. joinExpr: ast.Join{
  2631. Name: "table1",
  2632. Alias: "",
  2633. JoinType: ast.INNER_JOIN,
  2634. Expr: &ast.BinaryExpr{
  2635. OP: ast.AND,
  2636. RHS: &ast.BinaryExpr{
  2637. OP: ast.EQ,
  2638. LHS: &ast.FieldRef{
  2639. StreamName: "src1",
  2640. Name: "id",
  2641. },
  2642. RHS: &ast.FieldRef{
  2643. StreamName: "table1",
  2644. Name: "id",
  2645. },
  2646. },
  2647. LHS: &ast.BinaryExpr{
  2648. OP: ast.AND,
  2649. LHS: &ast.BinaryExpr{
  2650. OP: ast.GT,
  2651. LHS: &ast.FieldRef{
  2652. StreamName: "table1",
  2653. Name: "b",
  2654. },
  2655. RHS: &ast.IntegerLiteral{Val: 20},
  2656. },
  2657. RHS: &ast.BinaryExpr{
  2658. OP: ast.LT,
  2659. LHS: &ast.FieldRef{
  2660. StreamName: "src1",
  2661. Name: "c",
  2662. },
  2663. RHS: &ast.IntegerLiteral{Val: 40},
  2664. },
  2665. },
  2666. },
  2667. },
  2668. keys: []string{"id"},
  2669. valvars: []ast.Expr{
  2670. &ast.FieldRef{
  2671. StreamName: "src1",
  2672. Name: "id",
  2673. },
  2674. },
  2675. options: &ast.Options{
  2676. DATASOURCE: "table1",
  2677. TYPE: "sql",
  2678. KIND: "lookup",
  2679. },
  2680. conditions: &ast.BinaryExpr{
  2681. OP: ast.AND,
  2682. LHS: &ast.BinaryExpr{
  2683. OP: ast.GT,
  2684. LHS: &ast.FieldRef{
  2685. StreamName: "table1",
  2686. Name: "b",
  2687. },
  2688. RHS: &ast.IntegerLiteral{Val: 20},
  2689. },
  2690. RHS: &ast.BinaryExpr{
  2691. OP: ast.LT,
  2692. LHS: &ast.FieldRef{
  2693. StreamName: "src1",
  2694. Name: "c",
  2695. },
  2696. RHS: &ast.IntegerLiteral{Val: 40},
  2697. },
  2698. },
  2699. }.Init(),
  2700. },
  2701. },
  2702. condition: &ast.BinaryExpr{
  2703. OP: ast.GT,
  2704. LHS: &ast.FieldRef{
  2705. StreamName: "table1",
  2706. Name: "b",
  2707. },
  2708. RHS: &ast.IntegerLiteral{Val: 20},
  2709. },
  2710. }.Init(),
  2711. },
  2712. },
  2713. fields: []ast.Field{
  2714. {
  2715. Expr: &ast.FieldRef{
  2716. StreamName: "src1",
  2717. Name: "a",
  2718. },
  2719. Name: "a",
  2720. AName: "",
  2721. },
  2722. {
  2723. Expr: &ast.FieldRef{
  2724. StreamName: "table1",
  2725. Name: "*",
  2726. },
  2727. Name: "*",
  2728. AName: "",
  2729. },
  2730. },
  2731. isAggregate: false,
  2732. sendMeta: false,
  2733. }.Init(),
  2734. },
  2735. { // 2
  2736. sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
  2737. p: ProjectPlan{
  2738. baseLogicalPlan: baseLogicalPlan{
  2739. children: []LogicalPlan{
  2740. LookupPlan{
  2741. baseLogicalPlan: baseLogicalPlan{
  2742. children: []LogicalPlan{
  2743. LookupPlan{
  2744. baseLogicalPlan: baseLogicalPlan{
  2745. children: []LogicalPlan{
  2746. DataSourcePlan{
  2747. baseLogicalPlan: baseLogicalPlan{},
  2748. name: "src1",
  2749. streamFields: map[string]*ast.JsonStreamField{
  2750. "a": nil,
  2751. },
  2752. isSchemaless: true,
  2753. streamStmt: streams["src1"],
  2754. metaFields: []string{},
  2755. }.Init(),
  2756. },
  2757. },
  2758. joinExpr: ast.Join{
  2759. Name: "table1",
  2760. Alias: "",
  2761. JoinType: ast.INNER_JOIN,
  2762. Expr: &ast.BinaryExpr{
  2763. OP: ast.EQ,
  2764. LHS: &ast.FieldRef{
  2765. StreamName: "src1",
  2766. Name: "id",
  2767. },
  2768. RHS: &ast.FieldRef{
  2769. StreamName: "table1",
  2770. Name: "id",
  2771. },
  2772. },
  2773. },
  2774. keys: []string{"id"},
  2775. fields: []string{"b"},
  2776. valvars: []ast.Expr{
  2777. &ast.FieldRef{
  2778. StreamName: "src1",
  2779. Name: "id",
  2780. },
  2781. },
  2782. options: &ast.Options{
  2783. DATASOURCE: "table1",
  2784. TYPE: "sql",
  2785. KIND: "lookup",
  2786. },
  2787. conditions: nil,
  2788. }.Init(),
  2789. },
  2790. },
  2791. joinExpr: ast.Join{
  2792. Name: "table2",
  2793. Alias: "",
  2794. JoinType: ast.INNER_JOIN,
  2795. Expr: &ast.BinaryExpr{
  2796. OP: ast.EQ,
  2797. LHS: &ast.FieldRef{
  2798. StreamName: "table1",
  2799. Name: "id",
  2800. },
  2801. RHS: &ast.FieldRef{
  2802. StreamName: "table2",
  2803. Name: "id",
  2804. },
  2805. },
  2806. },
  2807. keys: []string{"id"},
  2808. fields: []string{"c"},
  2809. valvars: []ast.Expr{
  2810. &ast.FieldRef{
  2811. StreamName: "table1",
  2812. Name: "id",
  2813. },
  2814. },
  2815. options: &ast.Options{
  2816. DATASOURCE: "table2",
  2817. TYPE: "sql",
  2818. KIND: "lookup",
  2819. },
  2820. }.Init(),
  2821. },
  2822. },
  2823. fields: []ast.Field{
  2824. {
  2825. Expr: &ast.FieldRef{
  2826. StreamName: "src1",
  2827. Name: "a",
  2828. },
  2829. Name: "a",
  2830. AName: "",
  2831. },
  2832. {
  2833. Expr: &ast.FieldRef{
  2834. StreamName: "table1",
  2835. Name: "b",
  2836. },
  2837. Name: "b",
  2838. AName: "",
  2839. },
  2840. {
  2841. Expr: &ast.FieldRef{
  2842. StreamName: "table2",
  2843. Name: "c",
  2844. },
  2845. Name: "c",
  2846. AName: "",
  2847. },
  2848. },
  2849. isAggregate: false,
  2850. sendMeta: false,
  2851. }.Init(),
  2852. },
  2853. { // 3
  2854. sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2855. p: ProjectPlan{
  2856. baseLogicalPlan: baseLogicalPlan{
  2857. children: []LogicalPlan{
  2858. LookupPlan{
  2859. baseLogicalPlan: baseLogicalPlan{
  2860. children: []LogicalPlan{
  2861. WindowPlan{
  2862. baseLogicalPlan: baseLogicalPlan{
  2863. children: []LogicalPlan{
  2864. DataSourcePlan{
  2865. baseLogicalPlan: baseLogicalPlan{},
  2866. name: "src1",
  2867. streamStmt: streams["src1"],
  2868. streamFields: map[string]*ast.JsonStreamField{},
  2869. metaFields: []string{},
  2870. isWildCard: true,
  2871. isSchemaless: true,
  2872. }.Init(),
  2873. },
  2874. },
  2875. condition: nil,
  2876. wtype: ast.TUMBLING_WINDOW,
  2877. length: 10000,
  2878. interval: 0,
  2879. limit: 0,
  2880. }.Init(),
  2881. },
  2882. },
  2883. joinExpr: ast.Join{
  2884. Name: "table1",
  2885. Alias: "",
  2886. JoinType: ast.INNER_JOIN,
  2887. Expr: &ast.BinaryExpr{
  2888. OP: ast.EQ,
  2889. LHS: &ast.FieldRef{
  2890. StreamName: "src1",
  2891. Name: "id",
  2892. },
  2893. RHS: &ast.FieldRef{
  2894. StreamName: "table1",
  2895. Name: "id",
  2896. },
  2897. },
  2898. },
  2899. keys: []string{"id"},
  2900. valvars: []ast.Expr{
  2901. &ast.FieldRef{
  2902. StreamName: "src1",
  2903. Name: "id",
  2904. },
  2905. },
  2906. options: &ast.Options{
  2907. DATASOURCE: "table1",
  2908. TYPE: "sql",
  2909. KIND: "lookup",
  2910. },
  2911. conditions: nil,
  2912. }.Init(),
  2913. },
  2914. },
  2915. fields: []ast.Field{
  2916. {
  2917. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  2918. Name: "*",
  2919. AName: "",
  2920. },
  2921. },
  2922. isAggregate: false,
  2923. sendMeta: false,
  2924. }.Init(),
  2925. },
  2926. }
  2927. for i, tt := range tests {
  2928. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2929. if err != nil {
  2930. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2931. continue
  2932. }
  2933. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2934. IsEventTime: false,
  2935. LateTol: 0,
  2936. Concurrency: 0,
  2937. BufferLength: 0,
  2938. SendMetaToSink: false,
  2939. Qos: 0,
  2940. CheckpointInterval: 0,
  2941. SendError: true,
  2942. }, store)
  2943. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2944. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2945. } else if !reflect.DeepEqual(tt.p, p) {
  2946. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2947. }
  2948. }
  2949. }