planner_test.go 72 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/internal/testx"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func init() {
  29. testx.InitEnv()
  30. }
  31. func Test_createLogicalPlan(t *testing.T) {
  32. err, store := store.GetKV("stream")
  33. if err != nil {
  34. t.Error(err)
  35. return
  36. }
  37. streamSqls := map[string]string{
  38. "src1": `CREATE STREAM src1 (
  39. id1 BIGINT,
  40. temp BIGINT,
  41. name string,
  42. myarray array(string)
  43. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  44. "src2": `CREATE STREAM src2 (
  45. id2 BIGINT,
  46. hum BIGINT
  47. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
  48. "tableInPlanner": `CREATE TABLE tableInPlanner (
  49. id BIGINT,
  50. name STRING,
  51. value STRING,
  52. hum BIGINT
  53. ) WITH (TYPE="file");`,
  54. }
  55. types := map[string]ast.StreamType{
  56. "src1": ast.TypeStream,
  57. "src2": ast.TypeStream,
  58. "tableInPlanner": ast.TypeTable,
  59. }
  60. for name, sql := range streamSqls {
  61. s, err := json.Marshal(&xsql.StreamInfo{
  62. StreamType: types[name],
  63. Statement: sql,
  64. })
  65. if err != nil {
  66. t.Error(err)
  67. t.Fail()
  68. }
  69. err = store.Set(name, string(s))
  70. if err != nil {
  71. t.Error(err)
  72. t.Fail()
  73. }
  74. }
  75. streams := make(map[string]*ast.StreamStmt)
  76. for n := range streamSqls {
  77. streamStmt, err := xsql.GetDataSource(store, n)
  78. if err != nil {
  79. t.Errorf("fail to get stream %s, please check if stream is created", n)
  80. return
  81. }
  82. streams[n] = streamStmt
  83. }
  84. var (
  85. //boolTrue = true
  86. boolFalse = false
  87. )
  88. var tests = []struct {
  89. sql string
  90. p LogicalPlan
  91. err string
  92. }{
  93. { // 0
  94. sql: `SELECT myarray[temp] FROM src1`,
  95. p: ProjectPlan{
  96. baseLogicalPlan: baseLogicalPlan{
  97. children: []LogicalPlan{
  98. DataSourcePlan{
  99. baseLogicalPlan: baseLogicalPlan{},
  100. name: "src1",
  101. streamFields: []interface{}{
  102. &ast.StreamField{
  103. Name: "myarray",
  104. FieldType: &ast.ArrayType{Type: ast.STRINGS},
  105. },
  106. &ast.StreamField{
  107. Name: "temp",
  108. FieldType: &ast.BasicType{Type: ast.BIGINT},
  109. },
  110. },
  111. streamStmt: streams["src1"],
  112. metaFields: []string{},
  113. }.Init(),
  114. },
  115. },
  116. fields: []ast.Field{
  117. {
  118. Expr: &ast.BinaryExpr{
  119. OP: ast.SUBSET,
  120. LHS: &ast.FieldRef{
  121. StreamName: "src1",
  122. Name: "myarray",
  123. },
  124. RHS: &ast.IndexExpr{Index: &ast.FieldRef{
  125. StreamName: "src1",
  126. Name: "temp",
  127. }},
  128. },
  129. Name: "kuiper_field_0",
  130. AName: "",
  131. },
  132. },
  133. isAggregate: false,
  134. sendMeta: false,
  135. }.Init(),
  136. }, { // 1 optimize where to data source
  137. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  138. p: ProjectPlan{
  139. baseLogicalPlan: baseLogicalPlan{
  140. children: []LogicalPlan{
  141. WindowPlan{
  142. baseLogicalPlan: baseLogicalPlan{
  143. children: []LogicalPlan{
  144. FilterPlan{
  145. baseLogicalPlan: baseLogicalPlan{
  146. children: []LogicalPlan{
  147. DataSourcePlan{
  148. name: "src1",
  149. streamFields: []interface{}{
  150. &ast.StreamField{
  151. Name: "name",
  152. FieldType: &ast.BasicType{Type: ast.STRINGS},
  153. },
  154. &ast.StreamField{
  155. Name: "temp",
  156. FieldType: &ast.BasicType{Type: ast.BIGINT},
  157. },
  158. },
  159. streamStmt: streams["src1"],
  160. metaFields: []string{},
  161. }.Init(),
  162. },
  163. },
  164. condition: &ast.BinaryExpr{
  165. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  166. OP: ast.EQ,
  167. RHS: &ast.StringLiteral{Val: "v1"},
  168. },
  169. }.Init(),
  170. },
  171. },
  172. condition: nil,
  173. wtype: ast.TUMBLING_WINDOW,
  174. length: 10000,
  175. interval: 0,
  176. limit: 0,
  177. }.Init(),
  178. },
  179. },
  180. fields: []ast.Field{
  181. {
  182. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  183. Name: "temp",
  184. AName: ""},
  185. },
  186. isAggregate: false,
  187. sendMeta: false,
  188. }.Init(),
  189. }, { // 2 condition that cannot be optimized
  190. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  191. p: ProjectPlan{
  192. baseLogicalPlan: baseLogicalPlan{
  193. children: []LogicalPlan{
  194. JoinPlan{
  195. baseLogicalPlan: baseLogicalPlan{
  196. children: []LogicalPlan{
  197. WindowPlan{
  198. baseLogicalPlan: baseLogicalPlan{
  199. children: []LogicalPlan{
  200. DataSourcePlan{
  201. name: "src1",
  202. streamFields: []interface{}{
  203. &ast.StreamField{
  204. Name: "id1",
  205. FieldType: &ast.BasicType{Type: ast.BIGINT},
  206. },
  207. &ast.StreamField{
  208. Name: "temp",
  209. FieldType: &ast.BasicType{Type: ast.BIGINT},
  210. },
  211. },
  212. streamStmt: streams["src1"],
  213. metaFields: []string{},
  214. }.Init(),
  215. DataSourcePlan{
  216. name: "src2",
  217. streamFields: []interface{}{
  218. &ast.StreamField{
  219. Name: "hum",
  220. FieldType: &ast.BasicType{Type: ast.BIGINT},
  221. },
  222. &ast.StreamField{
  223. Name: "id2",
  224. FieldType: &ast.BasicType{Type: ast.BIGINT},
  225. },
  226. },
  227. streamStmt: streams["src2"],
  228. metaFields: []string{},
  229. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  230. }.Init(),
  231. },
  232. },
  233. condition: nil,
  234. wtype: ast.TUMBLING_WINDOW,
  235. length: 10000,
  236. interval: 0,
  237. limit: 0,
  238. }.Init(),
  239. },
  240. },
  241. from: &ast.Table{Name: "src1"},
  242. joins: ast.Joins{ast.Join{
  243. Name: "src2",
  244. JoinType: ast.INNER_JOIN,
  245. Expr: &ast.BinaryExpr{
  246. OP: ast.AND,
  247. LHS: &ast.BinaryExpr{
  248. LHS: &ast.BinaryExpr{
  249. OP: ast.GT,
  250. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  251. RHS: &ast.IntegerLiteral{Val: 20},
  252. },
  253. OP: ast.OR,
  254. RHS: &ast.BinaryExpr{
  255. OP: ast.GT,
  256. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  257. RHS: &ast.IntegerLiteral{Val: 60},
  258. },
  259. },
  260. RHS: &ast.BinaryExpr{
  261. OP: ast.EQ,
  262. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  263. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  264. },
  265. },
  266. }},
  267. }.Init(),
  268. },
  269. },
  270. fields: []ast.Field{
  271. {
  272. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  273. Name: "id1",
  274. AName: ""},
  275. },
  276. isAggregate: false,
  277. sendMeta: false,
  278. }.Init(),
  279. }, { // 3 optimize window filter
  280. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  281. p: ProjectPlan{
  282. baseLogicalPlan: baseLogicalPlan{
  283. children: []LogicalPlan{
  284. WindowPlan{
  285. baseLogicalPlan: baseLogicalPlan{
  286. children: []LogicalPlan{
  287. FilterPlan{
  288. baseLogicalPlan: baseLogicalPlan{
  289. children: []LogicalPlan{
  290. DataSourcePlan{
  291. name: "src1",
  292. streamFields: []interface{}{
  293. &ast.StreamField{
  294. Name: "id1",
  295. FieldType: &ast.BasicType{Type: ast.BIGINT},
  296. },
  297. &ast.StreamField{
  298. Name: "name",
  299. FieldType: &ast.BasicType{Type: ast.STRINGS},
  300. },
  301. &ast.StreamField{
  302. Name: "temp",
  303. FieldType: &ast.BasicType{Type: ast.BIGINT},
  304. },
  305. },
  306. streamStmt: streams["src1"],
  307. metaFields: []string{},
  308. }.Init(),
  309. },
  310. },
  311. condition: &ast.BinaryExpr{
  312. OP: ast.AND,
  313. LHS: &ast.BinaryExpr{
  314. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  315. OP: ast.EQ,
  316. RHS: &ast.StringLiteral{Val: "v1"},
  317. },
  318. RHS: &ast.BinaryExpr{
  319. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  320. OP: ast.GT,
  321. RHS: &ast.IntegerLiteral{Val: 2},
  322. },
  323. },
  324. }.Init(),
  325. },
  326. },
  327. condition: nil,
  328. wtype: ast.TUMBLING_WINDOW,
  329. length: 10000,
  330. interval: 0,
  331. limit: 0,
  332. }.Init(),
  333. },
  334. },
  335. fields: []ast.Field{
  336. {
  337. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  338. Name: "id1",
  339. AName: ""},
  340. },
  341. isAggregate: false,
  342. sendMeta: false,
  343. }.Init(),
  344. }, { // 4. do not optimize count window
  345. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  346. p: ProjectPlan{
  347. baseLogicalPlan: baseLogicalPlan{
  348. children: []LogicalPlan{
  349. HavingPlan{
  350. baseLogicalPlan: baseLogicalPlan{
  351. children: []LogicalPlan{
  352. FilterPlan{
  353. baseLogicalPlan: baseLogicalPlan{
  354. children: []LogicalPlan{
  355. WindowPlan{
  356. baseLogicalPlan: baseLogicalPlan{
  357. children: []LogicalPlan{
  358. DataSourcePlan{
  359. name: "src1",
  360. isWildCard: true,
  361. streamFields: []interface{}{
  362. &ast.StreamField{
  363. Name: "id1",
  364. FieldType: &ast.BasicType{Type: ast.BIGINT},
  365. },
  366. &ast.StreamField{
  367. Name: "temp",
  368. FieldType: &ast.BasicType{Type: ast.BIGINT},
  369. },
  370. &ast.StreamField{
  371. Name: "name",
  372. FieldType: &ast.BasicType{Type: ast.STRINGS},
  373. },
  374. &ast.StreamField{
  375. Name: "myarray",
  376. FieldType: &ast.ArrayType{Type: ast.STRINGS},
  377. },
  378. },
  379. streamStmt: streams["src1"],
  380. metaFields: []string{},
  381. }.Init(),
  382. },
  383. },
  384. condition: nil,
  385. wtype: ast.COUNT_WINDOW,
  386. length: 5,
  387. interval: 1,
  388. limit: 0,
  389. }.Init(),
  390. },
  391. },
  392. condition: &ast.BinaryExpr{
  393. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  394. OP: ast.GT,
  395. RHS: &ast.IntegerLiteral{Val: 20},
  396. },
  397. }.Init(),
  398. },
  399. },
  400. condition: &ast.BinaryExpr{
  401. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  402. Token: ast.ASTERISK,
  403. }}},
  404. OP: ast.GT,
  405. RHS: &ast.IntegerLiteral{Val: 2},
  406. },
  407. }.Init(),
  408. },
  409. },
  410. fields: []ast.Field{
  411. {
  412. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  413. Name: "",
  414. AName: ""},
  415. },
  416. isAggregate: false,
  417. sendMeta: false,
  418. }.Init(),
  419. }, { // 5. optimize join on
  420. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  421. p: ProjectPlan{
  422. baseLogicalPlan: baseLogicalPlan{
  423. children: []LogicalPlan{
  424. JoinPlan{
  425. baseLogicalPlan: baseLogicalPlan{
  426. children: []LogicalPlan{
  427. WindowPlan{
  428. baseLogicalPlan: baseLogicalPlan{
  429. children: []LogicalPlan{
  430. FilterPlan{
  431. baseLogicalPlan: baseLogicalPlan{
  432. children: []LogicalPlan{
  433. DataSourcePlan{
  434. name: "src1",
  435. streamFields: []interface{}{
  436. &ast.StreamField{
  437. Name: "id1",
  438. FieldType: &ast.BasicType{Type: ast.BIGINT},
  439. },
  440. &ast.StreamField{
  441. Name: "temp",
  442. FieldType: &ast.BasicType{Type: ast.BIGINT},
  443. },
  444. },
  445. streamStmt: streams["src1"],
  446. metaFields: []string{},
  447. }.Init(),
  448. },
  449. },
  450. condition: &ast.BinaryExpr{
  451. RHS: &ast.BinaryExpr{
  452. OP: ast.GT,
  453. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  454. RHS: &ast.IntegerLiteral{Val: 20},
  455. },
  456. OP: ast.AND,
  457. LHS: &ast.BinaryExpr{
  458. OP: ast.GT,
  459. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  460. RHS: &ast.IntegerLiteral{Val: 111},
  461. },
  462. },
  463. }.Init(),
  464. FilterPlan{
  465. baseLogicalPlan: baseLogicalPlan{
  466. children: []LogicalPlan{
  467. DataSourcePlan{
  468. name: "src2",
  469. streamFields: []interface{}{
  470. &ast.StreamField{
  471. Name: "hum",
  472. FieldType: &ast.BasicType{Type: ast.BIGINT},
  473. },
  474. &ast.StreamField{
  475. Name: "id2",
  476. FieldType: &ast.BasicType{Type: ast.BIGINT},
  477. },
  478. },
  479. streamStmt: streams["src2"],
  480. metaFields: []string{},
  481. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  482. }.Init(),
  483. },
  484. },
  485. condition: &ast.BinaryExpr{
  486. OP: ast.LT,
  487. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  488. RHS: &ast.IntegerLiteral{Val: 60},
  489. },
  490. }.Init(),
  491. },
  492. },
  493. condition: nil,
  494. wtype: ast.TUMBLING_WINDOW,
  495. length: 10000,
  496. interval: 0,
  497. limit: 0,
  498. }.Init(),
  499. },
  500. },
  501. from: &ast.Table{
  502. Name: "src1",
  503. },
  504. joins: []ast.Join{
  505. {
  506. Name: "src2",
  507. Alias: "",
  508. JoinType: ast.INNER_JOIN,
  509. Expr: &ast.BinaryExpr{
  510. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  511. OP: ast.EQ,
  512. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  513. },
  514. },
  515. },
  516. }.Init(),
  517. },
  518. },
  519. fields: []ast.Field{
  520. {
  521. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  522. Name: "id1",
  523. AName: ""},
  524. },
  525. isAggregate: false,
  526. sendMeta: false,
  527. }.Init(),
  528. }, { // 6. optimize outter join on
  529. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  530. p: ProjectPlan{
  531. baseLogicalPlan: baseLogicalPlan{
  532. children: []LogicalPlan{
  533. JoinPlan{
  534. baseLogicalPlan: baseLogicalPlan{
  535. children: []LogicalPlan{
  536. WindowPlan{
  537. baseLogicalPlan: baseLogicalPlan{
  538. children: []LogicalPlan{
  539. FilterPlan{
  540. baseLogicalPlan: baseLogicalPlan{
  541. children: []LogicalPlan{
  542. DataSourcePlan{
  543. name: "src1",
  544. streamFields: []interface{}{
  545. &ast.StreamField{
  546. Name: "id1",
  547. FieldType: &ast.BasicType{Type: ast.BIGINT},
  548. },
  549. &ast.StreamField{
  550. Name: "temp",
  551. FieldType: &ast.BasicType{Type: ast.BIGINT},
  552. },
  553. },
  554. streamStmt: streams["src1"],
  555. metaFields: []string{},
  556. }.Init(),
  557. },
  558. },
  559. condition: &ast.BinaryExpr{
  560. OP: ast.GT,
  561. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  562. RHS: &ast.IntegerLiteral{Val: 111},
  563. },
  564. }.Init(),
  565. DataSourcePlan{
  566. name: "src2",
  567. streamFields: []interface{}{
  568. &ast.StreamField{
  569. Name: "hum",
  570. FieldType: &ast.BasicType{Type: ast.BIGINT},
  571. },
  572. &ast.StreamField{
  573. Name: "id2",
  574. FieldType: &ast.BasicType{Type: ast.BIGINT},
  575. },
  576. },
  577. streamStmt: streams["src2"],
  578. metaFields: []string{},
  579. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  580. }.Init(),
  581. },
  582. },
  583. condition: nil,
  584. wtype: ast.TUMBLING_WINDOW,
  585. length: 10000,
  586. interval: 0,
  587. limit: 0,
  588. }.Init(),
  589. },
  590. },
  591. from: &ast.Table{
  592. Name: "src1",
  593. },
  594. joins: []ast.Join{
  595. {
  596. Name: "src2",
  597. Alias: "",
  598. JoinType: ast.FULL_JOIN,
  599. Expr: &ast.BinaryExpr{
  600. OP: ast.AND,
  601. LHS: &ast.BinaryExpr{
  602. OP: ast.AND,
  603. LHS: &ast.BinaryExpr{
  604. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  605. OP: ast.EQ,
  606. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  607. },
  608. RHS: &ast.BinaryExpr{
  609. OP: ast.GT,
  610. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  611. RHS: &ast.IntegerLiteral{Val: 20},
  612. },
  613. },
  614. RHS: &ast.BinaryExpr{
  615. OP: ast.LT,
  616. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  617. RHS: &ast.IntegerLiteral{Val: 60},
  618. },
  619. },
  620. },
  621. },
  622. }.Init(),
  623. },
  624. },
  625. fields: []ast.Field{
  626. {
  627. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  628. Name: "id1",
  629. AName: ""},
  630. },
  631. isAggregate: false,
  632. sendMeta: false,
  633. }.Init(),
  634. }, { // 7 window error for table
  635. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  636. p: nil,
  637. err: "cannot run window for TABLE sources",
  638. }, { // 8 join table without window
  639. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  640. p: ProjectPlan{
  641. baseLogicalPlan: baseLogicalPlan{
  642. children: []LogicalPlan{
  643. JoinPlan{
  644. baseLogicalPlan: baseLogicalPlan{
  645. children: []LogicalPlan{
  646. JoinAlignPlan{
  647. baseLogicalPlan: baseLogicalPlan{
  648. children: []LogicalPlan{
  649. FilterPlan{
  650. baseLogicalPlan: baseLogicalPlan{
  651. children: []LogicalPlan{
  652. DataSourcePlan{
  653. name: "src1",
  654. streamFields: []interface{}{
  655. &ast.StreamField{
  656. Name: "id1",
  657. FieldType: &ast.BasicType{Type: ast.BIGINT},
  658. },
  659. &ast.StreamField{
  660. Name: "temp",
  661. FieldType: &ast.BasicType{Type: ast.BIGINT},
  662. },
  663. },
  664. streamStmt: streams["src1"],
  665. metaFields: []string{},
  666. }.Init(),
  667. },
  668. },
  669. condition: &ast.BinaryExpr{
  670. RHS: &ast.BinaryExpr{
  671. OP: ast.GT,
  672. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  673. RHS: &ast.IntegerLiteral{Val: 20},
  674. },
  675. OP: ast.AND,
  676. LHS: &ast.BinaryExpr{
  677. OP: ast.GT,
  678. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  679. RHS: &ast.IntegerLiteral{Val: 111},
  680. },
  681. },
  682. }.Init(),
  683. DataSourcePlan{
  684. name: "tableInPlanner",
  685. streamFields: []interface{}{
  686. &ast.StreamField{
  687. Name: "hum",
  688. FieldType: &ast.BasicType{Type: ast.BIGINT},
  689. },
  690. &ast.StreamField{
  691. Name: "id",
  692. FieldType: &ast.BasicType{Type: ast.BIGINT},
  693. },
  694. },
  695. streamStmt: streams["tableInPlanner"],
  696. metaFields: []string{},
  697. }.Init(),
  698. },
  699. },
  700. Emitters: []string{"tableInPlanner"},
  701. }.Init(),
  702. },
  703. },
  704. from: &ast.Table{
  705. Name: "src1",
  706. },
  707. joins: []ast.Join{
  708. {
  709. Name: "tableInPlanner",
  710. Alias: "",
  711. JoinType: ast.INNER_JOIN,
  712. Expr: &ast.BinaryExpr{
  713. OP: ast.AND,
  714. LHS: &ast.BinaryExpr{
  715. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  716. OP: ast.EQ,
  717. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  718. },
  719. RHS: &ast.BinaryExpr{
  720. OP: ast.LT,
  721. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  722. RHS: &ast.IntegerLiteral{Val: 60},
  723. },
  724. },
  725. },
  726. },
  727. }.Init(),
  728. },
  729. },
  730. fields: []ast.Field{
  731. {
  732. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  733. Name: "id1",
  734. AName: ""},
  735. },
  736. isAggregate: false,
  737. sendMeta: false,
  738. }.Init(),
  739. }, { // 9 join table with window
  740. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  741. p: ProjectPlan{
  742. baseLogicalPlan: baseLogicalPlan{
  743. children: []LogicalPlan{
  744. JoinPlan{
  745. baseLogicalPlan: baseLogicalPlan{
  746. children: []LogicalPlan{
  747. JoinAlignPlan{
  748. baseLogicalPlan: baseLogicalPlan{
  749. children: []LogicalPlan{
  750. WindowPlan{
  751. baseLogicalPlan: baseLogicalPlan{
  752. children: []LogicalPlan{
  753. DataSourcePlan{
  754. name: "src1",
  755. streamFields: []interface{}{
  756. &ast.StreamField{
  757. Name: "id1",
  758. FieldType: &ast.BasicType{Type: ast.BIGINT},
  759. },
  760. &ast.StreamField{
  761. Name: "temp",
  762. FieldType: &ast.BasicType{Type: ast.BIGINT},
  763. },
  764. },
  765. streamStmt: streams["src1"],
  766. metaFields: []string{},
  767. }.Init(),
  768. },
  769. },
  770. condition: nil,
  771. wtype: ast.TUMBLING_WINDOW,
  772. length: 10000,
  773. interval: 0,
  774. limit: 0,
  775. }.Init(),
  776. DataSourcePlan{
  777. name: "tableInPlanner",
  778. streamFields: []interface{}{
  779. &ast.StreamField{
  780. Name: "hum",
  781. FieldType: &ast.BasicType{Type: ast.BIGINT},
  782. },
  783. &ast.StreamField{
  784. Name: "id",
  785. FieldType: &ast.BasicType{Type: ast.BIGINT},
  786. },
  787. },
  788. streamStmt: streams["tableInPlanner"],
  789. metaFields: []string{},
  790. }.Init(),
  791. },
  792. },
  793. Emitters: []string{"tableInPlanner"},
  794. }.Init(),
  795. },
  796. },
  797. from: &ast.Table{
  798. Name: "src1",
  799. },
  800. joins: []ast.Join{
  801. {
  802. Name: "tableInPlanner",
  803. Alias: "",
  804. JoinType: ast.INNER_JOIN,
  805. Expr: &ast.BinaryExpr{
  806. OP: ast.AND,
  807. LHS: &ast.BinaryExpr{
  808. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  809. OP: ast.EQ,
  810. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  811. },
  812. RHS: &ast.BinaryExpr{
  813. RHS: &ast.BinaryExpr{
  814. OP: ast.AND,
  815. LHS: &ast.BinaryExpr{
  816. OP: ast.GT,
  817. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  818. RHS: &ast.IntegerLiteral{Val: 20},
  819. },
  820. RHS: &ast.BinaryExpr{
  821. OP: ast.LT,
  822. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  823. RHS: &ast.IntegerLiteral{Val: 60},
  824. },
  825. },
  826. OP: ast.AND,
  827. LHS: &ast.BinaryExpr{
  828. OP: ast.GT,
  829. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  830. RHS: &ast.IntegerLiteral{Val: 111},
  831. },
  832. },
  833. },
  834. },
  835. },
  836. }.Init(),
  837. },
  838. },
  839. fields: []ast.Field{
  840. {
  841. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  842. Name: "id1",
  843. AName: ""},
  844. },
  845. isAggregate: false,
  846. sendMeta: false,
  847. }.Init(),
  848. }, { // 10 meta
  849. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  850. p: ProjectPlan{
  851. baseLogicalPlan: baseLogicalPlan{
  852. children: []LogicalPlan{
  853. FilterPlan{
  854. baseLogicalPlan: baseLogicalPlan{
  855. children: []LogicalPlan{
  856. DataSourcePlan{
  857. name: "src1",
  858. streamFields: []interface{}{
  859. &ast.StreamField{
  860. Name: "temp",
  861. FieldType: &ast.BasicType{Type: ast.BIGINT},
  862. },
  863. },
  864. streamStmt: streams["src1"],
  865. metaFields: []string{"Humidity", "device", "id"},
  866. }.Init(),
  867. },
  868. },
  869. condition: &ast.BinaryExpr{
  870. LHS: &ast.Call{
  871. Name: "meta",
  872. FuncId: 2,
  873. Args: []ast.Expr{&ast.MetaRef{
  874. Name: "device",
  875. StreamName: ast.DefaultStream,
  876. }},
  877. },
  878. OP: ast.EQ,
  879. RHS: &ast.StringLiteral{
  880. Val: "demo2",
  881. },
  882. },
  883. }.Init(),
  884. },
  885. },
  886. fields: []ast.Field{
  887. {
  888. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  889. Name: "temp",
  890. AName: "",
  891. }, {
  892. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  893. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  894. Name: "id",
  895. StreamName: ast.DefaultStream,
  896. }}},
  897. []ast.StreamName{},
  898. nil,
  899. )},
  900. Name: "meta",
  901. AName: "eid",
  902. }, {
  903. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  904. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  905. &ast.BinaryExpr{
  906. OP: ast.ARROW,
  907. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  908. RHS: &ast.JsonFieldRef{Name: "Device"},
  909. },
  910. }},
  911. []ast.StreamName{},
  912. nil,
  913. )},
  914. Name: "meta",
  915. AName: "hdevice",
  916. },
  917. },
  918. isAggregate: false,
  919. sendMeta: false,
  920. }.Init(),
  921. }, { // 11 join with same name field and aliased
  922. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  923. p: ProjectPlan{
  924. baseLogicalPlan: baseLogicalPlan{
  925. children: []LogicalPlan{
  926. JoinPlan{
  927. baseLogicalPlan: baseLogicalPlan{
  928. children: []LogicalPlan{
  929. JoinAlignPlan{
  930. baseLogicalPlan: baseLogicalPlan{
  931. children: []LogicalPlan{
  932. DataSourcePlan{
  933. name: "src2",
  934. streamFields: []interface{}{
  935. &ast.StreamField{
  936. Name: "hum",
  937. FieldType: &ast.BasicType{Type: ast.BIGINT},
  938. },
  939. &ast.StreamField{
  940. Name: "id2",
  941. FieldType: &ast.BasicType{Type: ast.BIGINT},
  942. },
  943. },
  944. streamStmt: streams["src2"],
  945. metaFields: []string{},
  946. timestampFormat: "YYYY-MM-dd HH:mm:ss",
  947. }.Init(),
  948. DataSourcePlan{
  949. name: "tableInPlanner",
  950. streamFields: []interface{}{
  951. &ast.StreamField{
  952. Name: "hum",
  953. FieldType: &ast.BasicType{Type: ast.BIGINT},
  954. },
  955. &ast.StreamField{
  956. Name: "id",
  957. FieldType: &ast.BasicType{Type: ast.BIGINT},
  958. },
  959. },
  960. streamStmt: streams["tableInPlanner"],
  961. metaFields: []string{},
  962. }.Init(),
  963. },
  964. },
  965. Emitters: []string{"tableInPlanner"},
  966. }.Init(),
  967. },
  968. },
  969. from: &ast.Table{
  970. Name: "src2",
  971. },
  972. joins: []ast.Join{
  973. {
  974. Name: "tableInPlanner",
  975. Alias: "",
  976. JoinType: ast.INNER_JOIN,
  977. Expr: &ast.BinaryExpr{
  978. RHS: &ast.BinaryExpr{
  979. OP: ast.EQ,
  980. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  981. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  982. },
  983. OP: ast.AND,
  984. LHS: &ast.BinaryExpr{
  985. OP: ast.GT,
  986. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  987. &ast.FieldRef{
  988. Name: "hum",
  989. StreamName: "src2",
  990. },
  991. []ast.StreamName{"src2"},
  992. &boolFalse,
  993. )},
  994. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  995. &ast.FieldRef{
  996. Name: "hum",
  997. StreamName: "tableInPlanner",
  998. },
  999. []ast.StreamName{"tableInPlanner"},
  1000. &boolFalse,
  1001. )},
  1002. },
  1003. },
  1004. },
  1005. },
  1006. }.Init(),
  1007. },
  1008. },
  1009. fields: []ast.Field{
  1010. {
  1011. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1012. &ast.FieldRef{
  1013. Name: "hum",
  1014. StreamName: "src2",
  1015. },
  1016. []ast.StreamName{"src2"},
  1017. &boolFalse,
  1018. )},
  1019. Name: "hum",
  1020. AName: "hum1",
  1021. }, {
  1022. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1023. &ast.FieldRef{
  1024. Name: "hum",
  1025. StreamName: "tableInPlanner",
  1026. },
  1027. []ast.StreamName{"tableInPlanner"},
  1028. &boolFalse,
  1029. )},
  1030. Name: "hum",
  1031. AName: "hum2",
  1032. },
  1033. },
  1034. isAggregate: false,
  1035. sendMeta: false,
  1036. }.Init(),
  1037. }, { // 12 meta with more fields
  1038. sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
  1039. p: ProjectPlan{
  1040. baseLogicalPlan: baseLogicalPlan{
  1041. children: []LogicalPlan{
  1042. FilterPlan{
  1043. baseLogicalPlan: baseLogicalPlan{
  1044. children: []LogicalPlan{
  1045. DataSourcePlan{
  1046. name: "src1",
  1047. streamFields: []interface{}{
  1048. &ast.StreamField{
  1049. Name: "temp",
  1050. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1051. },
  1052. },
  1053. streamStmt: streams["src1"],
  1054. metaFields: []string{},
  1055. allMeta: true,
  1056. }.Init(),
  1057. },
  1058. },
  1059. condition: &ast.BinaryExpr{
  1060. LHS: &ast.Call{
  1061. Name: "meta",
  1062. FuncId: 1,
  1063. Args: []ast.Expr{&ast.MetaRef{
  1064. Name: "device",
  1065. StreamName: ast.DefaultStream,
  1066. }},
  1067. },
  1068. OP: ast.EQ,
  1069. RHS: &ast.StringLiteral{
  1070. Val: "demo2",
  1071. },
  1072. },
  1073. }.Init(),
  1074. },
  1075. },
  1076. fields: []ast.Field{
  1077. {
  1078. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1079. Name: "temp",
  1080. AName: "",
  1081. }, {
  1082. Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1083. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1084. Name: "*",
  1085. StreamName: ast.DefaultStream,
  1086. }}},
  1087. []ast.StreamName{},
  1088. nil,
  1089. )},
  1090. Name: "meta",
  1091. AName: "m",
  1092. },
  1093. },
  1094. isAggregate: false,
  1095. sendMeta: false,
  1096. }.Init(),
  1097. },
  1098. }
  1099. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1100. for i, tt := range tests {
  1101. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1102. if err != nil {
  1103. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1104. continue
  1105. }
  1106. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1107. IsEventTime: false,
  1108. LateTol: 0,
  1109. Concurrency: 0,
  1110. BufferLength: 0,
  1111. SendMetaToSink: false,
  1112. Qos: 0,
  1113. CheckpointInterval: 0,
  1114. SendError: true,
  1115. }, store)
  1116. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1117. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1118. } else if !reflect.DeepEqual(tt.p, p) {
  1119. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1120. }
  1121. }
  1122. }
  1123. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1124. err, store := store.GetKV("stream")
  1125. if err != nil {
  1126. t.Error(err)
  1127. return
  1128. }
  1129. streamSqls := map[string]string{
  1130. "src1": `CREATE STREAM src1 (
  1131. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1132. "src2": `CREATE STREAM src2 (
  1133. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1134. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1135. id BIGINT,
  1136. name STRING,
  1137. value STRING,
  1138. hum BIGINT
  1139. ) WITH (TYPE="file");`,
  1140. }
  1141. types := map[string]ast.StreamType{
  1142. "src1": ast.TypeStream,
  1143. "src2": ast.TypeStream,
  1144. "tableInPlanner": ast.TypeTable,
  1145. }
  1146. for name, sql := range streamSqls {
  1147. s, err := json.Marshal(&xsql.StreamInfo{
  1148. StreamType: types[name],
  1149. Statement: sql,
  1150. })
  1151. if err != nil {
  1152. t.Error(err)
  1153. t.Fail()
  1154. }
  1155. err = store.Set(name, string(s))
  1156. if err != nil {
  1157. t.Error(err)
  1158. t.Fail()
  1159. }
  1160. }
  1161. streams := make(map[string]*ast.StreamStmt)
  1162. for n := range streamSqls {
  1163. streamStmt, err := xsql.GetDataSource(store, n)
  1164. if err != nil {
  1165. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1166. return
  1167. }
  1168. streams[n] = streamStmt
  1169. }
  1170. var (
  1171. //boolTrue = true
  1172. boolFalse = false
  1173. )
  1174. var tests = []struct {
  1175. sql string
  1176. p LogicalPlan
  1177. err string
  1178. }{
  1179. { // 0
  1180. sql: `SELECT name FROM src1`,
  1181. p: ProjectPlan{
  1182. baseLogicalPlan: baseLogicalPlan{
  1183. children: []LogicalPlan{
  1184. DataSourcePlan{
  1185. baseLogicalPlan: baseLogicalPlan{},
  1186. name: "src1",
  1187. streamFields: []interface{}{
  1188. "name",
  1189. },
  1190. streamStmt: streams["src1"],
  1191. metaFields: []string{},
  1192. }.Init(),
  1193. },
  1194. },
  1195. fields: []ast.Field{
  1196. {
  1197. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1198. Name: "name",
  1199. AName: "",
  1200. },
  1201. },
  1202. isAggregate: false,
  1203. sendMeta: false,
  1204. }.Init(),
  1205. }, { // 1 optimize where to data source
  1206. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1207. p: ProjectPlan{
  1208. baseLogicalPlan: baseLogicalPlan{
  1209. children: []LogicalPlan{
  1210. WindowPlan{
  1211. baseLogicalPlan: baseLogicalPlan{
  1212. children: []LogicalPlan{
  1213. FilterPlan{
  1214. baseLogicalPlan: baseLogicalPlan{
  1215. children: []LogicalPlan{
  1216. DataSourcePlan{
  1217. name: "src1",
  1218. streamFields: []interface{}{
  1219. "name", "temp",
  1220. },
  1221. streamStmt: streams["src1"],
  1222. metaFields: []string{},
  1223. }.Init(),
  1224. },
  1225. },
  1226. condition: &ast.BinaryExpr{
  1227. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1228. OP: ast.EQ,
  1229. RHS: &ast.StringLiteral{Val: "v1"},
  1230. },
  1231. }.Init(),
  1232. },
  1233. },
  1234. condition: nil,
  1235. wtype: ast.TUMBLING_WINDOW,
  1236. length: 10000,
  1237. interval: 0,
  1238. limit: 0,
  1239. }.Init(),
  1240. },
  1241. },
  1242. fields: []ast.Field{
  1243. {
  1244. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1245. Name: "temp",
  1246. AName: ""},
  1247. },
  1248. isAggregate: false,
  1249. sendMeta: false,
  1250. }.Init(),
  1251. }, { // 2 condition that cannot be optimized
  1252. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1253. p: ProjectPlan{
  1254. baseLogicalPlan: baseLogicalPlan{
  1255. children: []LogicalPlan{
  1256. JoinPlan{
  1257. baseLogicalPlan: baseLogicalPlan{
  1258. children: []LogicalPlan{
  1259. WindowPlan{
  1260. baseLogicalPlan: baseLogicalPlan{
  1261. children: []LogicalPlan{
  1262. DataSourcePlan{
  1263. name: "src1",
  1264. streamFields: []interface{}{
  1265. "id1", "temp",
  1266. },
  1267. streamStmt: streams["src1"],
  1268. metaFields: []string{},
  1269. }.Init(),
  1270. DataSourcePlan{
  1271. name: "src2",
  1272. streamFields: []interface{}{ // can't determine where is id1 belonged to
  1273. "hum", "id1", "id2",
  1274. },
  1275. streamStmt: streams["src2"],
  1276. metaFields: []string{},
  1277. }.Init(),
  1278. },
  1279. },
  1280. condition: nil,
  1281. wtype: ast.TUMBLING_WINDOW,
  1282. length: 10000,
  1283. interval: 0,
  1284. limit: 0,
  1285. }.Init(),
  1286. },
  1287. },
  1288. from: &ast.Table{Name: "src1"},
  1289. joins: ast.Joins{ast.Join{
  1290. Name: "src2",
  1291. JoinType: ast.INNER_JOIN,
  1292. Expr: &ast.BinaryExpr{
  1293. OP: ast.AND,
  1294. LHS: &ast.BinaryExpr{
  1295. LHS: &ast.BinaryExpr{
  1296. OP: ast.GT,
  1297. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1298. RHS: &ast.IntegerLiteral{Val: 20},
  1299. },
  1300. OP: ast.OR,
  1301. RHS: &ast.BinaryExpr{
  1302. OP: ast.GT,
  1303. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1304. RHS: &ast.IntegerLiteral{Val: 60},
  1305. },
  1306. },
  1307. RHS: &ast.BinaryExpr{
  1308. OP: ast.EQ,
  1309. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1310. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1311. },
  1312. },
  1313. }},
  1314. }.Init(),
  1315. },
  1316. },
  1317. fields: []ast.Field{
  1318. {
  1319. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1320. Name: "id1",
  1321. AName: ""},
  1322. },
  1323. isAggregate: false,
  1324. sendMeta: false,
  1325. }.Init(),
  1326. }, { // 3 optimize window filter
  1327. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1328. p: ProjectPlan{
  1329. baseLogicalPlan: baseLogicalPlan{
  1330. children: []LogicalPlan{
  1331. WindowPlan{
  1332. baseLogicalPlan: baseLogicalPlan{
  1333. children: []LogicalPlan{
  1334. FilterPlan{
  1335. baseLogicalPlan: baseLogicalPlan{
  1336. children: []LogicalPlan{
  1337. DataSourcePlan{
  1338. name: "src1",
  1339. streamFields: []interface{}{
  1340. "id1", "name", "temp",
  1341. },
  1342. streamStmt: streams["src1"],
  1343. metaFields: []string{},
  1344. }.Init(),
  1345. },
  1346. },
  1347. condition: &ast.BinaryExpr{
  1348. OP: ast.AND,
  1349. LHS: &ast.BinaryExpr{
  1350. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1351. OP: ast.EQ,
  1352. RHS: &ast.StringLiteral{Val: "v1"},
  1353. },
  1354. RHS: &ast.BinaryExpr{
  1355. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1356. OP: ast.GT,
  1357. RHS: &ast.IntegerLiteral{Val: 2},
  1358. },
  1359. },
  1360. }.Init(),
  1361. },
  1362. },
  1363. condition: nil,
  1364. wtype: ast.TUMBLING_WINDOW,
  1365. length: 10000,
  1366. interval: 0,
  1367. limit: 0,
  1368. }.Init(),
  1369. },
  1370. },
  1371. fields: []ast.Field{
  1372. {
  1373. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1374. Name: "id1",
  1375. AName: ""},
  1376. },
  1377. isAggregate: false,
  1378. sendMeta: false,
  1379. }.Init(),
  1380. }, { // 4. do not optimize count window
  1381. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1382. p: ProjectPlan{
  1383. baseLogicalPlan: baseLogicalPlan{
  1384. children: []LogicalPlan{
  1385. HavingPlan{
  1386. baseLogicalPlan: baseLogicalPlan{
  1387. children: []LogicalPlan{
  1388. FilterPlan{
  1389. baseLogicalPlan: baseLogicalPlan{
  1390. children: []LogicalPlan{
  1391. WindowPlan{
  1392. baseLogicalPlan: baseLogicalPlan{
  1393. children: []LogicalPlan{
  1394. DataSourcePlan{
  1395. name: "src1",
  1396. isWildCard: true,
  1397. streamFields: nil,
  1398. streamStmt: streams["src1"],
  1399. metaFields: []string{},
  1400. }.Init(),
  1401. },
  1402. },
  1403. condition: nil,
  1404. wtype: ast.COUNT_WINDOW,
  1405. length: 5,
  1406. interval: 1,
  1407. limit: 0,
  1408. }.Init(),
  1409. },
  1410. },
  1411. condition: &ast.BinaryExpr{
  1412. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1413. OP: ast.GT,
  1414. RHS: &ast.IntegerLiteral{Val: 20},
  1415. },
  1416. }.Init(),
  1417. },
  1418. },
  1419. condition: &ast.BinaryExpr{
  1420. LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
  1421. Token: ast.ASTERISK,
  1422. }}},
  1423. OP: ast.GT,
  1424. RHS: &ast.IntegerLiteral{Val: 2},
  1425. },
  1426. }.Init(),
  1427. },
  1428. },
  1429. fields: []ast.Field{
  1430. {
  1431. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1432. Name: "",
  1433. AName: ""},
  1434. },
  1435. isAggregate: false,
  1436. sendMeta: false,
  1437. }.Init(),
  1438. }, { // 5. optimize join on
  1439. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1440. p: ProjectPlan{
  1441. baseLogicalPlan: baseLogicalPlan{
  1442. children: []LogicalPlan{
  1443. JoinPlan{
  1444. baseLogicalPlan: baseLogicalPlan{
  1445. children: []LogicalPlan{
  1446. WindowPlan{
  1447. baseLogicalPlan: baseLogicalPlan{
  1448. children: []LogicalPlan{
  1449. FilterPlan{
  1450. baseLogicalPlan: baseLogicalPlan{
  1451. children: []LogicalPlan{
  1452. DataSourcePlan{
  1453. name: "src1",
  1454. streamFields: []interface{}{
  1455. "id1", "temp",
  1456. },
  1457. streamStmt: streams["src1"],
  1458. metaFields: []string{},
  1459. }.Init(),
  1460. },
  1461. },
  1462. condition: &ast.BinaryExpr{
  1463. RHS: &ast.BinaryExpr{
  1464. OP: ast.GT,
  1465. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1466. RHS: &ast.IntegerLiteral{Val: 20},
  1467. },
  1468. OP: ast.AND,
  1469. LHS: &ast.BinaryExpr{
  1470. OP: ast.GT,
  1471. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1472. RHS: &ast.IntegerLiteral{Val: 111},
  1473. },
  1474. },
  1475. }.Init(),
  1476. FilterPlan{
  1477. baseLogicalPlan: baseLogicalPlan{
  1478. children: []LogicalPlan{
  1479. DataSourcePlan{
  1480. name: "src2",
  1481. streamFields: []interface{}{
  1482. "hum", "id1", "id2",
  1483. },
  1484. streamStmt: streams["src2"],
  1485. metaFields: []string{},
  1486. }.Init(),
  1487. },
  1488. },
  1489. condition: &ast.BinaryExpr{
  1490. OP: ast.LT,
  1491. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1492. RHS: &ast.IntegerLiteral{Val: 60},
  1493. },
  1494. }.Init(),
  1495. },
  1496. },
  1497. condition: nil,
  1498. wtype: ast.TUMBLING_WINDOW,
  1499. length: 10000,
  1500. interval: 0,
  1501. limit: 0,
  1502. }.Init(),
  1503. },
  1504. },
  1505. from: &ast.Table{
  1506. Name: "src1",
  1507. },
  1508. joins: []ast.Join{
  1509. {
  1510. Name: "src2",
  1511. Alias: "",
  1512. JoinType: ast.INNER_JOIN,
  1513. Expr: &ast.BinaryExpr{
  1514. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1515. OP: ast.EQ,
  1516. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1517. },
  1518. },
  1519. },
  1520. }.Init(),
  1521. },
  1522. },
  1523. fields: []ast.Field{
  1524. {
  1525. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1526. Name: "id1",
  1527. AName: ""},
  1528. },
  1529. isAggregate: false,
  1530. sendMeta: false,
  1531. }.Init(),
  1532. }, { // 6. optimize outter join on
  1533. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1534. p: ProjectPlan{
  1535. baseLogicalPlan: baseLogicalPlan{
  1536. children: []LogicalPlan{
  1537. JoinPlan{
  1538. baseLogicalPlan: baseLogicalPlan{
  1539. children: []LogicalPlan{
  1540. WindowPlan{
  1541. baseLogicalPlan: baseLogicalPlan{
  1542. children: []LogicalPlan{
  1543. FilterPlan{
  1544. baseLogicalPlan: baseLogicalPlan{
  1545. children: []LogicalPlan{
  1546. DataSourcePlan{
  1547. name: "src1",
  1548. streamFields: []interface{}{
  1549. "id1", "temp",
  1550. },
  1551. streamStmt: streams["src1"],
  1552. metaFields: []string{},
  1553. }.Init(),
  1554. },
  1555. },
  1556. condition: &ast.BinaryExpr{
  1557. OP: ast.GT,
  1558. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1559. RHS: &ast.IntegerLiteral{Val: 111},
  1560. },
  1561. }.Init(),
  1562. DataSourcePlan{
  1563. name: "src2",
  1564. streamFields: []interface{}{
  1565. "hum", "id1", "id2",
  1566. },
  1567. streamStmt: streams["src2"],
  1568. metaFields: []string{},
  1569. }.Init(),
  1570. },
  1571. },
  1572. condition: nil,
  1573. wtype: ast.TUMBLING_WINDOW,
  1574. length: 10000,
  1575. interval: 0,
  1576. limit: 0,
  1577. }.Init(),
  1578. },
  1579. },
  1580. from: &ast.Table{
  1581. Name: "src1",
  1582. },
  1583. joins: []ast.Join{
  1584. {
  1585. Name: "src2",
  1586. Alias: "",
  1587. JoinType: ast.FULL_JOIN,
  1588. Expr: &ast.BinaryExpr{
  1589. OP: ast.AND,
  1590. LHS: &ast.BinaryExpr{
  1591. OP: ast.AND,
  1592. LHS: &ast.BinaryExpr{
  1593. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1594. OP: ast.EQ,
  1595. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1596. },
  1597. RHS: &ast.BinaryExpr{
  1598. OP: ast.GT,
  1599. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1600. RHS: &ast.IntegerLiteral{Val: 20},
  1601. },
  1602. },
  1603. RHS: &ast.BinaryExpr{
  1604. OP: ast.LT,
  1605. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1606. RHS: &ast.IntegerLiteral{Val: 60},
  1607. },
  1608. },
  1609. },
  1610. },
  1611. }.Init(),
  1612. },
  1613. },
  1614. fields: []ast.Field{
  1615. {
  1616. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1617. Name: "id1",
  1618. AName: ""},
  1619. },
  1620. isAggregate: false,
  1621. sendMeta: false,
  1622. }.Init(),
  1623. }, { // 7 window error for table
  1624. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1625. p: nil,
  1626. err: "cannot run window for TABLE sources",
  1627. }, { // 8 join table without window
  1628. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  1629. p: ProjectPlan{
  1630. baseLogicalPlan: baseLogicalPlan{
  1631. children: []LogicalPlan{
  1632. JoinPlan{
  1633. baseLogicalPlan: baseLogicalPlan{
  1634. children: []LogicalPlan{
  1635. JoinAlignPlan{
  1636. baseLogicalPlan: baseLogicalPlan{
  1637. children: []LogicalPlan{
  1638. FilterPlan{
  1639. baseLogicalPlan: baseLogicalPlan{
  1640. children: []LogicalPlan{
  1641. DataSourcePlan{
  1642. name: "src1",
  1643. streamFields: []interface{}{
  1644. "hum", "id1", "temp",
  1645. },
  1646. streamStmt: streams["src1"],
  1647. metaFields: []string{},
  1648. }.Init(),
  1649. },
  1650. },
  1651. condition: &ast.BinaryExpr{
  1652. RHS: &ast.BinaryExpr{
  1653. OP: ast.GT,
  1654. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1655. RHS: &ast.IntegerLiteral{Val: 20},
  1656. },
  1657. OP: ast.AND,
  1658. LHS: &ast.BinaryExpr{
  1659. OP: ast.GT,
  1660. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1661. RHS: &ast.IntegerLiteral{Val: 111},
  1662. },
  1663. },
  1664. }.Init(),
  1665. DataSourcePlan{
  1666. name: "tableInPlanner",
  1667. streamFields: []interface{}{
  1668. &ast.StreamField{
  1669. Name: "hum",
  1670. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1671. },
  1672. &ast.StreamField{
  1673. Name: "id",
  1674. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1675. },
  1676. },
  1677. streamStmt: streams["tableInPlanner"],
  1678. metaFields: []string{},
  1679. }.Init(),
  1680. },
  1681. },
  1682. Emitters: []string{"tableInPlanner"},
  1683. }.Init(),
  1684. },
  1685. },
  1686. from: &ast.Table{
  1687. Name: "src1",
  1688. },
  1689. joins: []ast.Join{
  1690. {
  1691. Name: "tableInPlanner",
  1692. Alias: "",
  1693. JoinType: ast.INNER_JOIN,
  1694. Expr: &ast.BinaryExpr{
  1695. OP: ast.AND,
  1696. LHS: &ast.BinaryExpr{
  1697. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1698. OP: ast.EQ,
  1699. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1700. },
  1701. RHS: &ast.BinaryExpr{
  1702. OP: ast.LT,
  1703. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  1704. RHS: &ast.IntegerLiteral{Val: 60},
  1705. },
  1706. },
  1707. },
  1708. },
  1709. }.Init(),
  1710. },
  1711. },
  1712. fields: []ast.Field{
  1713. {
  1714. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1715. Name: "id1",
  1716. AName: ""},
  1717. },
  1718. isAggregate: false,
  1719. sendMeta: false,
  1720. }.Init(),
  1721. }, { // 9 join table with window
  1722. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1723. p: ProjectPlan{
  1724. baseLogicalPlan: baseLogicalPlan{
  1725. children: []LogicalPlan{
  1726. JoinPlan{
  1727. baseLogicalPlan: baseLogicalPlan{
  1728. children: []LogicalPlan{
  1729. JoinAlignPlan{
  1730. baseLogicalPlan: baseLogicalPlan{
  1731. children: []LogicalPlan{
  1732. WindowPlan{
  1733. baseLogicalPlan: baseLogicalPlan{
  1734. children: []LogicalPlan{
  1735. DataSourcePlan{
  1736. name: "src1",
  1737. streamFields: []interface{}{
  1738. "id1", "temp",
  1739. },
  1740. streamStmt: streams["src1"],
  1741. metaFields: []string{},
  1742. }.Init(),
  1743. },
  1744. },
  1745. condition: nil,
  1746. wtype: ast.TUMBLING_WINDOW,
  1747. length: 10000,
  1748. interval: 0,
  1749. limit: 0,
  1750. }.Init(),
  1751. DataSourcePlan{
  1752. name: "tableInPlanner",
  1753. streamFields: []interface{}{
  1754. &ast.StreamField{
  1755. Name: "hum",
  1756. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1757. },
  1758. &ast.StreamField{
  1759. Name: "id",
  1760. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1761. },
  1762. },
  1763. streamStmt: streams["tableInPlanner"],
  1764. metaFields: []string{},
  1765. }.Init(),
  1766. },
  1767. },
  1768. Emitters: []string{"tableInPlanner"},
  1769. }.Init(),
  1770. },
  1771. },
  1772. from: &ast.Table{
  1773. Name: "src1",
  1774. },
  1775. joins: []ast.Join{
  1776. {
  1777. Name: "tableInPlanner",
  1778. Alias: "",
  1779. JoinType: ast.INNER_JOIN,
  1780. Expr: &ast.BinaryExpr{
  1781. OP: ast.AND,
  1782. LHS: &ast.BinaryExpr{
  1783. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1784. OP: ast.EQ,
  1785. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1786. },
  1787. RHS: &ast.BinaryExpr{
  1788. RHS: &ast.BinaryExpr{
  1789. OP: ast.AND,
  1790. LHS: &ast.BinaryExpr{
  1791. OP: ast.GT,
  1792. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1793. RHS: &ast.IntegerLiteral{Val: 20},
  1794. },
  1795. RHS: &ast.BinaryExpr{
  1796. OP: ast.LT,
  1797. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1798. RHS: &ast.IntegerLiteral{Val: 60},
  1799. },
  1800. },
  1801. OP: ast.AND,
  1802. LHS: &ast.BinaryExpr{
  1803. OP: ast.GT,
  1804. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1805. RHS: &ast.IntegerLiteral{Val: 111},
  1806. },
  1807. },
  1808. },
  1809. },
  1810. },
  1811. }.Init(),
  1812. },
  1813. },
  1814. fields: []ast.Field{
  1815. {
  1816. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1817. Name: "id1",
  1818. AName: ""},
  1819. },
  1820. isAggregate: false,
  1821. sendMeta: false,
  1822. }.Init(),
  1823. }, { // 10 meta
  1824. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1825. p: ProjectPlan{
  1826. baseLogicalPlan: baseLogicalPlan{
  1827. children: []LogicalPlan{
  1828. FilterPlan{
  1829. baseLogicalPlan: baseLogicalPlan{
  1830. children: []LogicalPlan{
  1831. DataSourcePlan{
  1832. name: "src1",
  1833. streamFields: []interface{}{
  1834. "temp",
  1835. },
  1836. streamStmt: streams["src1"],
  1837. metaFields: []string{"Humidity", "device", "id"},
  1838. }.Init(),
  1839. },
  1840. },
  1841. condition: &ast.BinaryExpr{
  1842. LHS: &ast.Call{
  1843. Name: "meta",
  1844. FuncId: 2,
  1845. Args: []ast.Expr{&ast.MetaRef{
  1846. Name: "device",
  1847. StreamName: ast.DefaultStream,
  1848. }},
  1849. },
  1850. OP: ast.EQ,
  1851. RHS: &ast.StringLiteral{
  1852. Val: "demo2",
  1853. },
  1854. },
  1855. }.Init(),
  1856. },
  1857. },
  1858. fields: []ast.Field{
  1859. {
  1860. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1861. Name: "temp",
  1862. AName: "",
  1863. }, {
  1864. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1865. &ast.Call{Name: "meta", FuncId: 0, Args: []ast.Expr{&ast.MetaRef{
  1866. Name: "id",
  1867. StreamName: ast.DefaultStream,
  1868. }}},
  1869. []ast.StreamName{},
  1870. nil,
  1871. )},
  1872. Name: "meta",
  1873. AName: "eid",
  1874. }, {
  1875. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1876. &ast.Call{Name: "meta", FuncId: 1, Args: []ast.Expr{
  1877. &ast.BinaryExpr{
  1878. OP: ast.ARROW,
  1879. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  1880. RHS: &ast.JsonFieldRef{Name: "Device"},
  1881. },
  1882. }},
  1883. []ast.StreamName{},
  1884. nil,
  1885. )},
  1886. Name: "meta",
  1887. AName: "hdevice",
  1888. },
  1889. },
  1890. isAggregate: false,
  1891. sendMeta: false,
  1892. }.Init(),
  1893. }, { // 11 join with same name field and aliased
  1894. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1895. p: ProjectPlan{
  1896. baseLogicalPlan: baseLogicalPlan{
  1897. children: []LogicalPlan{
  1898. JoinPlan{
  1899. baseLogicalPlan: baseLogicalPlan{
  1900. children: []LogicalPlan{
  1901. JoinAlignPlan{
  1902. baseLogicalPlan: baseLogicalPlan{
  1903. children: []LogicalPlan{
  1904. DataSourcePlan{
  1905. name: "src2",
  1906. streamFields: []interface{}{
  1907. "hum", "id", "id2",
  1908. },
  1909. streamStmt: streams["src2"],
  1910. metaFields: []string{},
  1911. }.Init(),
  1912. DataSourcePlan{
  1913. name: "tableInPlanner",
  1914. streamFields: []interface{}{
  1915. &ast.StreamField{
  1916. Name: "hum",
  1917. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1918. },
  1919. &ast.StreamField{
  1920. Name: "id",
  1921. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1922. },
  1923. },
  1924. streamStmt: streams["tableInPlanner"],
  1925. metaFields: []string{},
  1926. }.Init(),
  1927. },
  1928. },
  1929. Emitters: []string{"tableInPlanner"},
  1930. }.Init(),
  1931. },
  1932. },
  1933. from: &ast.Table{
  1934. Name: "src2",
  1935. },
  1936. joins: []ast.Join{
  1937. {
  1938. Name: "tableInPlanner",
  1939. Alias: "",
  1940. JoinType: ast.INNER_JOIN,
  1941. Expr: &ast.BinaryExpr{
  1942. RHS: &ast.BinaryExpr{
  1943. OP: ast.EQ,
  1944. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  1945. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  1946. },
  1947. OP: ast.AND,
  1948. LHS: &ast.BinaryExpr{
  1949. OP: ast.GT,
  1950. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1951. &ast.FieldRef{
  1952. Name: "hum",
  1953. StreamName: "src2",
  1954. },
  1955. []ast.StreamName{"src2"},
  1956. &boolFalse,
  1957. )},
  1958. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1959. &ast.FieldRef{
  1960. Name: "hum",
  1961. StreamName: "tableInPlanner",
  1962. },
  1963. []ast.StreamName{"tableInPlanner"},
  1964. &boolFalse,
  1965. )},
  1966. },
  1967. },
  1968. },
  1969. },
  1970. }.Init(),
  1971. },
  1972. },
  1973. fields: []ast.Field{
  1974. {
  1975. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1976. &ast.FieldRef{
  1977. Name: "hum",
  1978. StreamName: "src2",
  1979. },
  1980. []ast.StreamName{"src2"},
  1981. &boolFalse,
  1982. )},
  1983. Name: "hum",
  1984. AName: "hum1",
  1985. }, {
  1986. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1987. &ast.FieldRef{
  1988. Name: "hum",
  1989. StreamName: "tableInPlanner",
  1990. },
  1991. []ast.StreamName{"tableInPlanner"},
  1992. &boolFalse,
  1993. )},
  1994. Name: "hum",
  1995. AName: "hum2",
  1996. },
  1997. },
  1998. isAggregate: false,
  1999. sendMeta: false,
  2000. }.Init(),
  2001. }, { // 12
  2002. sql: `SELECT name->first, name->last FROM src1`,
  2003. p: ProjectPlan{
  2004. baseLogicalPlan: baseLogicalPlan{
  2005. children: []LogicalPlan{
  2006. DataSourcePlan{
  2007. baseLogicalPlan: baseLogicalPlan{},
  2008. name: "src1",
  2009. streamFields: []interface{}{
  2010. "name",
  2011. },
  2012. streamStmt: streams["src1"],
  2013. metaFields: []string{},
  2014. }.Init(),
  2015. },
  2016. },
  2017. fields: []ast.Field{
  2018. {
  2019. Expr: &ast.BinaryExpr{
  2020. OP: ast.ARROW,
  2021. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2022. RHS: &ast.JsonFieldRef{Name: "first"},
  2023. },
  2024. Name: "kuiper_field_0",
  2025. AName: "",
  2026. }, {
  2027. Expr: &ast.BinaryExpr{
  2028. OP: ast.ARROW,
  2029. LHS: &ast.FieldRef{StreamName: "src1", Name: "name"},
  2030. RHS: &ast.JsonFieldRef{Name: "last"},
  2031. },
  2032. Name: "kuiper_field_1",
  2033. AName: "",
  2034. },
  2035. },
  2036. isAggregate: false,
  2037. sendMeta: false,
  2038. }.Init(),
  2039. },
  2040. }
  2041. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2042. for i, tt := range tests {
  2043. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2044. if err != nil {
  2045. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2046. continue
  2047. }
  2048. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2049. IsEventTime: false,
  2050. LateTol: 0,
  2051. Concurrency: 0,
  2052. BufferLength: 0,
  2053. SendMetaToSink: false,
  2054. Qos: 0,
  2055. CheckpointInterval: 0,
  2056. SendError: true,
  2057. }, store)
  2058. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2059. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2060. } else if !reflect.DeepEqual(tt.p, p) {
  2061. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2062. }
  2063. }
  2064. }
  2065. func Test_createLogicalPlan4Lookup(t *testing.T) {
  2066. err, store := store.GetKV("stream")
  2067. if err != nil {
  2068. t.Error(err)
  2069. return
  2070. }
  2071. streamSqls := map[string]string{
  2072. "src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  2073. "table1": `CREATE TABLE table1 () WITH (DATASOURCE="table1",TYPE="sql", KIND="lookup");`,
  2074. "table2": `CREATE TABLE table2 () WITH (DATASOURCE="table2",TYPE="sql", KIND="lookup");`,
  2075. }
  2076. types := map[string]ast.StreamType{
  2077. "src1": ast.TypeStream,
  2078. "table1": ast.TypeTable,
  2079. "table2": ast.TypeTable,
  2080. }
  2081. for name, sql := range streamSqls {
  2082. s, err := json.Marshal(&xsql.StreamInfo{
  2083. StreamType: types[name],
  2084. Statement: sql,
  2085. })
  2086. if err != nil {
  2087. t.Error(err)
  2088. t.Fail()
  2089. }
  2090. err = store.Set(name, string(s))
  2091. if err != nil {
  2092. t.Error(err)
  2093. t.Fail()
  2094. }
  2095. }
  2096. streams := make(map[string]*ast.StreamStmt)
  2097. for n := range streamSqls {
  2098. streamStmt, err := xsql.GetDataSource(store, n)
  2099. if err != nil {
  2100. t.Errorf("fail to get stream %s, please check if stream is created", n)
  2101. return
  2102. }
  2103. streams[n] = streamStmt
  2104. }
  2105. var tests = []struct {
  2106. sql string
  2107. p LogicalPlan
  2108. err string
  2109. }{
  2110. { // 0
  2111. sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id`,
  2112. p: ProjectPlan{
  2113. baseLogicalPlan: baseLogicalPlan{
  2114. children: []LogicalPlan{
  2115. LookupPlan{
  2116. baseLogicalPlan: baseLogicalPlan{
  2117. children: []LogicalPlan{
  2118. DataSourcePlan{
  2119. baseLogicalPlan: baseLogicalPlan{},
  2120. name: "src1",
  2121. streamFields: []interface{}{
  2122. "a",
  2123. },
  2124. streamStmt: streams["src1"],
  2125. metaFields: []string{},
  2126. }.Init(),
  2127. },
  2128. },
  2129. joinExpr: ast.Join{
  2130. Name: "table1",
  2131. Alias: "",
  2132. JoinType: ast.INNER_JOIN,
  2133. Expr: &ast.BinaryExpr{
  2134. OP: ast.EQ,
  2135. LHS: &ast.FieldRef{
  2136. StreamName: "src1",
  2137. Name: "id",
  2138. },
  2139. RHS: &ast.FieldRef{
  2140. StreamName: "table1",
  2141. Name: "id",
  2142. },
  2143. },
  2144. },
  2145. keys: []string{"id"},
  2146. fields: []string{"b"},
  2147. valvars: []ast.Expr{
  2148. &ast.FieldRef{
  2149. StreamName: "src1",
  2150. Name: "id",
  2151. },
  2152. },
  2153. options: &ast.Options{
  2154. DATASOURCE: "table1",
  2155. TYPE: "sql",
  2156. STRICT_VALIDATION: true,
  2157. KIND: "lookup",
  2158. },
  2159. conditions: nil,
  2160. }.Init(),
  2161. },
  2162. },
  2163. fields: []ast.Field{
  2164. {
  2165. Expr: &ast.FieldRef{
  2166. StreamName: "src1",
  2167. Name: "a",
  2168. },
  2169. Name: "a",
  2170. AName: "",
  2171. },
  2172. {
  2173. Expr: &ast.FieldRef{
  2174. StreamName: "table1",
  2175. Name: "b",
  2176. },
  2177. Name: "b",
  2178. AName: "",
  2179. },
  2180. },
  2181. isAggregate: false,
  2182. sendMeta: false,
  2183. }.Init(),
  2184. },
  2185. { // 1
  2186. sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
  2187. p: ProjectPlan{
  2188. baseLogicalPlan: baseLogicalPlan{
  2189. children: []LogicalPlan{
  2190. FilterPlan{
  2191. baseLogicalPlan: baseLogicalPlan{
  2192. children: []LogicalPlan{
  2193. LookupPlan{
  2194. baseLogicalPlan: baseLogicalPlan{
  2195. children: []LogicalPlan{
  2196. FilterPlan{
  2197. baseLogicalPlan: baseLogicalPlan{
  2198. children: []LogicalPlan{
  2199. DataSourcePlan{
  2200. baseLogicalPlan: baseLogicalPlan{},
  2201. name: "src1",
  2202. streamFields: []interface{}{
  2203. "a",
  2204. },
  2205. streamStmt: streams["src1"],
  2206. metaFields: []string{},
  2207. }.Init(),
  2208. },
  2209. },
  2210. condition: &ast.BinaryExpr{
  2211. OP: ast.LT,
  2212. LHS: &ast.FieldRef{
  2213. StreamName: "src1",
  2214. Name: "c",
  2215. },
  2216. RHS: &ast.IntegerLiteral{Val: 40},
  2217. },
  2218. }.Init(),
  2219. },
  2220. },
  2221. joinExpr: ast.Join{
  2222. Name: "table1",
  2223. Alias: "",
  2224. JoinType: ast.INNER_JOIN,
  2225. Expr: &ast.BinaryExpr{
  2226. OP: ast.AND,
  2227. RHS: &ast.BinaryExpr{
  2228. OP: ast.EQ,
  2229. LHS: &ast.FieldRef{
  2230. StreamName: "src1",
  2231. Name: "id",
  2232. },
  2233. RHS: &ast.FieldRef{
  2234. StreamName: "table1",
  2235. Name: "id",
  2236. },
  2237. },
  2238. LHS: &ast.BinaryExpr{
  2239. OP: ast.AND,
  2240. LHS: &ast.BinaryExpr{
  2241. OP: ast.GT,
  2242. LHS: &ast.FieldRef{
  2243. StreamName: "table1",
  2244. Name: "b",
  2245. },
  2246. RHS: &ast.IntegerLiteral{Val: 20},
  2247. },
  2248. RHS: &ast.BinaryExpr{
  2249. OP: ast.LT,
  2250. LHS: &ast.FieldRef{
  2251. StreamName: "src1",
  2252. Name: "c",
  2253. },
  2254. RHS: &ast.IntegerLiteral{Val: 40},
  2255. },
  2256. },
  2257. },
  2258. },
  2259. keys: []string{"id"},
  2260. valvars: []ast.Expr{
  2261. &ast.FieldRef{
  2262. StreamName: "src1",
  2263. Name: "id",
  2264. },
  2265. },
  2266. options: &ast.Options{
  2267. DATASOURCE: "table1",
  2268. TYPE: "sql",
  2269. STRICT_VALIDATION: true,
  2270. KIND: "lookup",
  2271. },
  2272. conditions: &ast.BinaryExpr{
  2273. OP: ast.AND,
  2274. LHS: &ast.BinaryExpr{
  2275. OP: ast.GT,
  2276. LHS: &ast.FieldRef{
  2277. StreamName: "table1",
  2278. Name: "b",
  2279. },
  2280. RHS: &ast.IntegerLiteral{Val: 20},
  2281. },
  2282. RHS: &ast.BinaryExpr{
  2283. OP: ast.LT,
  2284. LHS: &ast.FieldRef{
  2285. StreamName: "src1",
  2286. Name: "c",
  2287. },
  2288. RHS: &ast.IntegerLiteral{Val: 40},
  2289. },
  2290. },
  2291. }.Init(),
  2292. },
  2293. },
  2294. condition: &ast.BinaryExpr{
  2295. OP: ast.GT,
  2296. LHS: &ast.FieldRef{
  2297. StreamName: "table1",
  2298. Name: "b",
  2299. },
  2300. RHS: &ast.IntegerLiteral{Val: 20},
  2301. },
  2302. }.Init(),
  2303. },
  2304. },
  2305. fields: []ast.Field{
  2306. {
  2307. Expr: &ast.FieldRef{
  2308. StreamName: "src1",
  2309. Name: "a",
  2310. },
  2311. Name: "a",
  2312. AName: "",
  2313. },
  2314. {
  2315. Expr: &ast.FieldRef{
  2316. StreamName: "table1",
  2317. Name: "*",
  2318. },
  2319. Name: "*",
  2320. AName: "",
  2321. },
  2322. },
  2323. isAggregate: false,
  2324. sendMeta: false,
  2325. }.Init(),
  2326. },
  2327. { // 2
  2328. sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
  2329. p: ProjectPlan{
  2330. baseLogicalPlan: baseLogicalPlan{
  2331. children: []LogicalPlan{
  2332. LookupPlan{
  2333. baseLogicalPlan: baseLogicalPlan{
  2334. children: []LogicalPlan{
  2335. LookupPlan{
  2336. baseLogicalPlan: baseLogicalPlan{
  2337. children: []LogicalPlan{
  2338. DataSourcePlan{
  2339. baseLogicalPlan: baseLogicalPlan{},
  2340. name: "src1",
  2341. streamFields: []interface{}{
  2342. "a",
  2343. },
  2344. streamStmt: streams["src1"],
  2345. metaFields: []string{},
  2346. }.Init(),
  2347. },
  2348. },
  2349. joinExpr: ast.Join{
  2350. Name: "table1",
  2351. Alias: "",
  2352. JoinType: ast.INNER_JOIN,
  2353. Expr: &ast.BinaryExpr{
  2354. OP: ast.EQ,
  2355. LHS: &ast.FieldRef{
  2356. StreamName: "src1",
  2357. Name: "id",
  2358. },
  2359. RHS: &ast.FieldRef{
  2360. StreamName: "table1",
  2361. Name: "id",
  2362. },
  2363. },
  2364. },
  2365. keys: []string{"id"},
  2366. fields: []string{"b"},
  2367. valvars: []ast.Expr{
  2368. &ast.FieldRef{
  2369. StreamName: "src1",
  2370. Name: "id",
  2371. },
  2372. },
  2373. options: &ast.Options{
  2374. DATASOURCE: "table1",
  2375. TYPE: "sql",
  2376. STRICT_VALIDATION: true,
  2377. KIND: "lookup",
  2378. },
  2379. conditions: nil,
  2380. }.Init(),
  2381. },
  2382. },
  2383. joinExpr: ast.Join{
  2384. Name: "table2",
  2385. Alias: "",
  2386. JoinType: ast.INNER_JOIN,
  2387. Expr: &ast.BinaryExpr{
  2388. OP: ast.EQ,
  2389. LHS: &ast.FieldRef{
  2390. StreamName: "table1",
  2391. Name: "id",
  2392. },
  2393. RHS: &ast.FieldRef{
  2394. StreamName: "table2",
  2395. Name: "id",
  2396. },
  2397. },
  2398. },
  2399. keys: []string{"id"},
  2400. fields: []string{"c"},
  2401. valvars: []ast.Expr{
  2402. &ast.FieldRef{
  2403. StreamName: "table1",
  2404. Name: "id",
  2405. },
  2406. },
  2407. options: &ast.Options{
  2408. DATASOURCE: "table2",
  2409. TYPE: "sql",
  2410. STRICT_VALIDATION: true,
  2411. KIND: "lookup",
  2412. },
  2413. }.Init(),
  2414. },
  2415. },
  2416. fields: []ast.Field{
  2417. {
  2418. Expr: &ast.FieldRef{
  2419. StreamName: "src1",
  2420. Name: "a",
  2421. },
  2422. Name: "a",
  2423. AName: "",
  2424. },
  2425. {
  2426. Expr: &ast.FieldRef{
  2427. StreamName: "table1",
  2428. Name: "b",
  2429. },
  2430. Name: "b",
  2431. AName: "",
  2432. },
  2433. {
  2434. Expr: &ast.FieldRef{
  2435. StreamName: "table2",
  2436. Name: "c",
  2437. },
  2438. Name: "c",
  2439. AName: "",
  2440. },
  2441. },
  2442. isAggregate: false,
  2443. sendMeta: false,
  2444. }.Init(),
  2445. },
  2446. { // 3
  2447. sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
  2448. p: ProjectPlan{
  2449. baseLogicalPlan: baseLogicalPlan{
  2450. children: []LogicalPlan{
  2451. LookupPlan{
  2452. baseLogicalPlan: baseLogicalPlan{
  2453. children: []LogicalPlan{
  2454. WindowPlan{
  2455. baseLogicalPlan: baseLogicalPlan{
  2456. children: []LogicalPlan{
  2457. DataSourcePlan{
  2458. baseLogicalPlan: baseLogicalPlan{},
  2459. name: "src1",
  2460. streamStmt: streams["src1"],
  2461. metaFields: []string{},
  2462. isWildCard: true,
  2463. }.Init(),
  2464. },
  2465. },
  2466. condition: nil,
  2467. wtype: ast.TUMBLING_WINDOW,
  2468. length: 10000,
  2469. interval: 0,
  2470. limit: 0,
  2471. }.Init(),
  2472. },
  2473. },
  2474. joinExpr: ast.Join{
  2475. Name: "table1",
  2476. Alias: "",
  2477. JoinType: ast.INNER_JOIN,
  2478. Expr: &ast.BinaryExpr{
  2479. OP: ast.EQ,
  2480. LHS: &ast.FieldRef{
  2481. StreamName: "src1",
  2482. Name: "id",
  2483. },
  2484. RHS: &ast.FieldRef{
  2485. StreamName: "table1",
  2486. Name: "id",
  2487. },
  2488. },
  2489. },
  2490. keys: []string{"id"},
  2491. valvars: []ast.Expr{
  2492. &ast.FieldRef{
  2493. StreamName: "src1",
  2494. Name: "id",
  2495. },
  2496. },
  2497. options: &ast.Options{
  2498. DATASOURCE: "table1",
  2499. TYPE: "sql",
  2500. STRICT_VALIDATION: true,
  2501. KIND: "lookup",
  2502. },
  2503. conditions: nil,
  2504. }.Init(),
  2505. },
  2506. },
  2507. fields: []ast.Field{
  2508. {
  2509. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  2510. Name: "",
  2511. AName: "",
  2512. },
  2513. },
  2514. isAggregate: false,
  2515. sendMeta: false,
  2516. }.Init(),
  2517. },
  2518. }
  2519. for i, tt := range tests {
  2520. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2521. if err != nil {
  2522. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  2523. continue
  2524. }
  2525. p, err := createLogicalPlan(stmt, &api.RuleOption{
  2526. IsEventTime: false,
  2527. LateTol: 0,
  2528. Concurrency: 0,
  2529. BufferLength: 0,
  2530. SendMetaToSink: false,
  2531. Qos: 0,
  2532. CheckpointInterval: 0,
  2533. SendError: true,
  2534. }, store)
  2535. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  2536. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  2537. } else if !reflect.DeepEqual(tt.p, p) {
  2538. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  2539. }
  2540. }
  2541. }