planner_test.go 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/testx"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/kv"
  24. "path"
  25. "reflect"
  26. "strings"
  27. "testing"
  28. )
  29. var (
  30. DbDir = testx.GetDbDir()
  31. )
  32. func Test_createLogicalPlan(t *testing.T) {
  33. store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
  34. err := store.Open()
  35. if err != nil {
  36. t.Error(err)
  37. return
  38. }
  39. defer store.Close()
  40. streamSqls := map[string]string{
  41. "src1": `CREATE STREAM src1 (
  42. id1 BIGINT,
  43. temp BIGINT,
  44. name string
  45. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  46. "src2": `CREATE STREAM src2 (
  47. id2 BIGINT,
  48. hum BIGINT
  49. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  50. "tableInPlanner": `CREATE TABLE tableInPlanner (
  51. id BIGINT,
  52. name STRING,
  53. value STRING,
  54. hum BIGINT
  55. ) WITH (TYPE="file");`,
  56. }
  57. types := map[string]ast.StreamType{
  58. "src1": ast.TypeStream,
  59. "src2": ast.TypeStream,
  60. "tableInPlanner": ast.TypeTable,
  61. }
  62. for name, sql := range streamSqls {
  63. s, err := json.Marshal(&xsql.StreamInfo{
  64. StreamType: types[name],
  65. Statement: sql,
  66. })
  67. if err != nil {
  68. t.Error(err)
  69. t.Fail()
  70. }
  71. store.Set(name, string(s))
  72. }
  73. streams := make(map[string]*ast.StreamStmt)
  74. for n := range streamSqls {
  75. streamStmt, err := xsql.GetDataSource(store, n)
  76. if err != nil {
  77. t.Errorf("fail to get stream %s, please check if stream is created", n)
  78. return
  79. }
  80. streams[n] = streamStmt
  81. }
  82. var (
  83. //boolTrue = true
  84. boolFalse = false
  85. )
  86. var tests = []struct {
  87. sql string
  88. p LogicalPlan
  89. err string
  90. }{
  91. { // 0
  92. sql: `SELECT name FROM src1`,
  93. p: ProjectPlan{
  94. baseLogicalPlan: baseLogicalPlan{
  95. children: []LogicalPlan{
  96. DataSourcePlan{
  97. baseLogicalPlan: baseLogicalPlan{},
  98. name: "src1",
  99. streamFields: []interface{}{
  100. &ast.StreamField{
  101. Name: "name",
  102. FieldType: &ast.BasicType{Type: ast.STRINGS},
  103. },
  104. },
  105. streamStmt: streams["src1"],
  106. metaFields: []string{},
  107. }.Init(),
  108. },
  109. },
  110. fields: []ast.Field{
  111. {
  112. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  113. Name: "name",
  114. AName: "",
  115. },
  116. },
  117. isAggregate: false,
  118. sendMeta: false,
  119. }.Init(),
  120. }, { // 1 optimize where to data source
  121. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  122. p: ProjectPlan{
  123. baseLogicalPlan: baseLogicalPlan{
  124. children: []LogicalPlan{
  125. WindowPlan{
  126. baseLogicalPlan: baseLogicalPlan{
  127. children: []LogicalPlan{
  128. FilterPlan{
  129. baseLogicalPlan: baseLogicalPlan{
  130. children: []LogicalPlan{
  131. DataSourcePlan{
  132. name: "src1",
  133. streamFields: []interface{}{
  134. &ast.StreamField{
  135. Name: "name",
  136. FieldType: &ast.BasicType{Type: ast.STRINGS},
  137. },
  138. &ast.StreamField{
  139. Name: "temp",
  140. FieldType: &ast.BasicType{Type: ast.BIGINT},
  141. },
  142. },
  143. streamStmt: streams["src1"],
  144. metaFields: []string{},
  145. }.Init(),
  146. },
  147. },
  148. condition: &ast.BinaryExpr{
  149. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  150. OP: ast.EQ,
  151. RHS: &ast.StringLiteral{Val: "v1"},
  152. },
  153. }.Init(),
  154. },
  155. },
  156. condition: nil,
  157. wtype: ast.TUMBLING_WINDOW,
  158. length: 10000,
  159. interval: 0,
  160. limit: 0,
  161. }.Init(),
  162. },
  163. },
  164. fields: []ast.Field{
  165. {
  166. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  167. Name: "temp",
  168. AName: ""},
  169. },
  170. isAggregate: false,
  171. sendMeta: false,
  172. }.Init(),
  173. }, { // 2 condition that cannot be optimized
  174. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  175. p: ProjectPlan{
  176. baseLogicalPlan: baseLogicalPlan{
  177. children: []LogicalPlan{
  178. JoinPlan{
  179. baseLogicalPlan: baseLogicalPlan{
  180. children: []LogicalPlan{
  181. WindowPlan{
  182. baseLogicalPlan: baseLogicalPlan{
  183. children: []LogicalPlan{
  184. DataSourcePlan{
  185. name: "src1",
  186. streamFields: []interface{}{
  187. &ast.StreamField{
  188. Name: "id1",
  189. FieldType: &ast.BasicType{Type: ast.BIGINT},
  190. },
  191. &ast.StreamField{
  192. Name: "temp",
  193. FieldType: &ast.BasicType{Type: ast.BIGINT},
  194. },
  195. },
  196. streamStmt: streams["src1"],
  197. metaFields: []string{},
  198. }.Init(),
  199. DataSourcePlan{
  200. name: "src2",
  201. streamFields: []interface{}{
  202. &ast.StreamField{
  203. Name: "hum",
  204. FieldType: &ast.BasicType{Type: ast.BIGINT},
  205. },
  206. &ast.StreamField{
  207. Name: "id2",
  208. FieldType: &ast.BasicType{Type: ast.BIGINT},
  209. },
  210. },
  211. streamStmt: streams["src2"],
  212. metaFields: []string{},
  213. }.Init(),
  214. },
  215. },
  216. condition: nil,
  217. wtype: ast.TUMBLING_WINDOW,
  218. length: 10000,
  219. interval: 0,
  220. limit: 0,
  221. }.Init(),
  222. },
  223. },
  224. from: &ast.Table{Name: "src1"},
  225. joins: ast.Joins{ast.Join{
  226. Name: "src2",
  227. JoinType: ast.INNER_JOIN,
  228. Expr: &ast.BinaryExpr{
  229. OP: ast.AND,
  230. LHS: &ast.BinaryExpr{
  231. LHS: &ast.BinaryExpr{
  232. OP: ast.GT,
  233. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  234. RHS: &ast.IntegerLiteral{Val: 20},
  235. },
  236. OP: ast.OR,
  237. RHS: &ast.BinaryExpr{
  238. OP: ast.GT,
  239. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  240. RHS: &ast.IntegerLiteral{Val: 60},
  241. },
  242. },
  243. RHS: &ast.BinaryExpr{
  244. OP: ast.EQ,
  245. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  246. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  247. },
  248. },
  249. }},
  250. }.Init(),
  251. },
  252. },
  253. fields: []ast.Field{
  254. {
  255. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  256. Name: "id1",
  257. AName: ""},
  258. },
  259. isAggregate: false,
  260. sendMeta: false,
  261. }.Init(),
  262. }, { // 3 optimize window filter
  263. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  264. p: ProjectPlan{
  265. baseLogicalPlan: baseLogicalPlan{
  266. children: []LogicalPlan{
  267. WindowPlan{
  268. baseLogicalPlan: baseLogicalPlan{
  269. children: []LogicalPlan{
  270. FilterPlan{
  271. baseLogicalPlan: baseLogicalPlan{
  272. children: []LogicalPlan{
  273. DataSourcePlan{
  274. name: "src1",
  275. streamFields: []interface{}{
  276. &ast.StreamField{
  277. Name: "id1",
  278. FieldType: &ast.BasicType{Type: ast.BIGINT},
  279. },
  280. &ast.StreamField{
  281. Name: "name",
  282. FieldType: &ast.BasicType{Type: ast.STRINGS},
  283. },
  284. &ast.StreamField{
  285. Name: "temp",
  286. FieldType: &ast.BasicType{Type: ast.BIGINT},
  287. },
  288. },
  289. streamStmt: streams["src1"],
  290. metaFields: []string{},
  291. }.Init(),
  292. },
  293. },
  294. condition: &ast.BinaryExpr{
  295. OP: ast.AND,
  296. LHS: &ast.BinaryExpr{
  297. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  298. OP: ast.EQ,
  299. RHS: &ast.StringLiteral{Val: "v1"},
  300. },
  301. RHS: &ast.BinaryExpr{
  302. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  303. OP: ast.GT,
  304. RHS: &ast.IntegerLiteral{Val: 2},
  305. },
  306. },
  307. }.Init(),
  308. },
  309. },
  310. condition: nil,
  311. wtype: ast.TUMBLING_WINDOW,
  312. length: 10000,
  313. interval: 0,
  314. limit: 0,
  315. }.Init(),
  316. },
  317. },
  318. fields: []ast.Field{
  319. {
  320. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  321. Name: "id1",
  322. AName: ""},
  323. },
  324. isAggregate: false,
  325. sendMeta: false,
  326. }.Init(),
  327. }, { // 4. do not optimize count window
  328. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  329. p: ProjectPlan{
  330. baseLogicalPlan: baseLogicalPlan{
  331. children: []LogicalPlan{
  332. HavingPlan{
  333. baseLogicalPlan: baseLogicalPlan{
  334. children: []LogicalPlan{
  335. FilterPlan{
  336. baseLogicalPlan: baseLogicalPlan{
  337. children: []LogicalPlan{
  338. WindowPlan{
  339. baseLogicalPlan: baseLogicalPlan{
  340. children: []LogicalPlan{
  341. DataSourcePlan{
  342. name: "src1",
  343. isWildCard: true,
  344. streamFields: []interface{}{
  345. &ast.StreamField{
  346. Name: "id1",
  347. FieldType: &ast.BasicType{Type: ast.BIGINT},
  348. },
  349. &ast.StreamField{
  350. Name: "temp",
  351. FieldType: &ast.BasicType{Type: ast.BIGINT},
  352. },
  353. &ast.StreamField{
  354. Name: "name",
  355. FieldType: &ast.BasicType{Type: ast.STRINGS},
  356. },
  357. },
  358. streamStmt: streams["src1"],
  359. metaFields: []string{},
  360. }.Init(),
  361. },
  362. },
  363. condition: nil,
  364. wtype: ast.COUNT_WINDOW,
  365. length: 5,
  366. interval: 1,
  367. limit: 0,
  368. }.Init(),
  369. },
  370. },
  371. condition: &ast.BinaryExpr{
  372. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  373. OP: ast.GT,
  374. RHS: &ast.IntegerLiteral{Val: 20},
  375. },
  376. }.Init(),
  377. },
  378. },
  379. condition: &ast.BinaryExpr{
  380. LHS: &ast.Call{Name: "COUNT", Args: []ast.Expr{&ast.Wildcard{
  381. Token: ast.ASTERISK,
  382. }}},
  383. OP: ast.GT,
  384. RHS: &ast.IntegerLiteral{Val: 2},
  385. },
  386. }.Init(),
  387. },
  388. },
  389. fields: []ast.Field{
  390. {
  391. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  392. Name: "kuiper_field_0",
  393. AName: ""},
  394. },
  395. isAggregate: false,
  396. sendMeta: false,
  397. }.Init(),
  398. }, { // 5. optimize join on
  399. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  400. p: ProjectPlan{
  401. baseLogicalPlan: baseLogicalPlan{
  402. children: []LogicalPlan{
  403. JoinPlan{
  404. baseLogicalPlan: baseLogicalPlan{
  405. children: []LogicalPlan{
  406. WindowPlan{
  407. baseLogicalPlan: baseLogicalPlan{
  408. children: []LogicalPlan{
  409. FilterPlan{
  410. baseLogicalPlan: baseLogicalPlan{
  411. children: []LogicalPlan{
  412. DataSourcePlan{
  413. name: "src1",
  414. streamFields: []interface{}{
  415. &ast.StreamField{
  416. Name: "id1",
  417. FieldType: &ast.BasicType{Type: ast.BIGINT},
  418. },
  419. &ast.StreamField{
  420. Name: "temp",
  421. FieldType: &ast.BasicType{Type: ast.BIGINT},
  422. },
  423. },
  424. streamStmt: streams["src1"],
  425. metaFields: []string{},
  426. }.Init(),
  427. },
  428. },
  429. condition: &ast.BinaryExpr{
  430. RHS: &ast.BinaryExpr{
  431. OP: ast.GT,
  432. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  433. RHS: &ast.IntegerLiteral{Val: 20},
  434. },
  435. OP: ast.AND,
  436. LHS: &ast.BinaryExpr{
  437. OP: ast.GT,
  438. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  439. RHS: &ast.IntegerLiteral{Val: 111},
  440. },
  441. },
  442. }.Init(),
  443. FilterPlan{
  444. baseLogicalPlan: baseLogicalPlan{
  445. children: []LogicalPlan{
  446. DataSourcePlan{
  447. name: "src2",
  448. streamFields: []interface{}{
  449. &ast.StreamField{
  450. Name: "hum",
  451. FieldType: &ast.BasicType{Type: ast.BIGINT},
  452. },
  453. &ast.StreamField{
  454. Name: "id2",
  455. FieldType: &ast.BasicType{Type: ast.BIGINT},
  456. },
  457. },
  458. streamStmt: streams["src2"],
  459. metaFields: []string{},
  460. }.Init(),
  461. },
  462. },
  463. condition: &ast.BinaryExpr{
  464. OP: ast.LT,
  465. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  466. RHS: &ast.IntegerLiteral{Val: 60},
  467. },
  468. }.Init(),
  469. },
  470. },
  471. condition: nil,
  472. wtype: ast.TUMBLING_WINDOW,
  473. length: 10000,
  474. interval: 0,
  475. limit: 0,
  476. }.Init(),
  477. },
  478. },
  479. from: &ast.Table{
  480. Name: "src1",
  481. },
  482. joins: []ast.Join{
  483. {
  484. Name: "src2",
  485. Alias: "",
  486. JoinType: ast.INNER_JOIN,
  487. Expr: &ast.BinaryExpr{
  488. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  489. OP: ast.EQ,
  490. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  491. },
  492. },
  493. },
  494. }.Init(),
  495. },
  496. },
  497. fields: []ast.Field{
  498. {
  499. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  500. Name: "id1",
  501. AName: ""},
  502. },
  503. isAggregate: false,
  504. sendMeta: false,
  505. }.Init(),
  506. }, { // 6. optimize outter join on
  507. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  508. p: ProjectPlan{
  509. baseLogicalPlan: baseLogicalPlan{
  510. children: []LogicalPlan{
  511. JoinPlan{
  512. baseLogicalPlan: baseLogicalPlan{
  513. children: []LogicalPlan{
  514. WindowPlan{
  515. baseLogicalPlan: baseLogicalPlan{
  516. children: []LogicalPlan{
  517. FilterPlan{
  518. baseLogicalPlan: baseLogicalPlan{
  519. children: []LogicalPlan{
  520. DataSourcePlan{
  521. name: "src1",
  522. streamFields: []interface{}{
  523. &ast.StreamField{
  524. Name: "id1",
  525. FieldType: &ast.BasicType{Type: ast.BIGINT},
  526. },
  527. &ast.StreamField{
  528. Name: "temp",
  529. FieldType: &ast.BasicType{Type: ast.BIGINT},
  530. },
  531. },
  532. streamStmt: streams["src1"],
  533. metaFields: []string{},
  534. }.Init(),
  535. },
  536. },
  537. condition: &ast.BinaryExpr{
  538. OP: ast.GT,
  539. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  540. RHS: &ast.IntegerLiteral{Val: 111},
  541. },
  542. }.Init(),
  543. DataSourcePlan{
  544. name: "src2",
  545. streamFields: []interface{}{
  546. &ast.StreamField{
  547. Name: "hum",
  548. FieldType: &ast.BasicType{Type: ast.BIGINT},
  549. },
  550. &ast.StreamField{
  551. Name: "id2",
  552. FieldType: &ast.BasicType{Type: ast.BIGINT},
  553. },
  554. },
  555. streamStmt: streams["src2"],
  556. metaFields: []string{},
  557. }.Init(),
  558. },
  559. },
  560. condition: nil,
  561. wtype: ast.TUMBLING_WINDOW,
  562. length: 10000,
  563. interval: 0,
  564. limit: 0,
  565. }.Init(),
  566. },
  567. },
  568. from: &ast.Table{
  569. Name: "src1",
  570. },
  571. joins: []ast.Join{
  572. {
  573. Name: "src2",
  574. Alias: "",
  575. JoinType: ast.FULL_JOIN,
  576. Expr: &ast.BinaryExpr{
  577. OP: ast.AND,
  578. LHS: &ast.BinaryExpr{
  579. OP: ast.AND,
  580. LHS: &ast.BinaryExpr{
  581. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  582. OP: ast.EQ,
  583. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  584. },
  585. RHS: &ast.BinaryExpr{
  586. OP: ast.GT,
  587. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  588. RHS: &ast.IntegerLiteral{Val: 20},
  589. },
  590. },
  591. RHS: &ast.BinaryExpr{
  592. OP: ast.LT,
  593. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  594. RHS: &ast.IntegerLiteral{Val: 60},
  595. },
  596. },
  597. },
  598. },
  599. }.Init(),
  600. },
  601. },
  602. fields: []ast.Field{
  603. {
  604. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  605. Name: "id1",
  606. AName: ""},
  607. },
  608. isAggregate: false,
  609. sendMeta: false,
  610. }.Init(),
  611. }, { // 7 window error for table
  612. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  613. p: nil,
  614. err: "cannot run window for TABLE sources",
  615. }, { // 8 join table without window
  616. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  617. p: ProjectPlan{
  618. baseLogicalPlan: baseLogicalPlan{
  619. children: []LogicalPlan{
  620. JoinPlan{
  621. baseLogicalPlan: baseLogicalPlan{
  622. children: []LogicalPlan{
  623. JoinAlignPlan{
  624. baseLogicalPlan: baseLogicalPlan{
  625. children: []LogicalPlan{
  626. FilterPlan{
  627. baseLogicalPlan: baseLogicalPlan{
  628. children: []LogicalPlan{
  629. DataSourcePlan{
  630. name: "src1",
  631. streamFields: []interface{}{
  632. &ast.StreamField{
  633. Name: "id1",
  634. FieldType: &ast.BasicType{Type: ast.BIGINT},
  635. },
  636. &ast.StreamField{
  637. Name: "temp",
  638. FieldType: &ast.BasicType{Type: ast.BIGINT},
  639. },
  640. },
  641. streamStmt: streams["src1"],
  642. metaFields: []string{},
  643. }.Init(),
  644. },
  645. },
  646. condition: &ast.BinaryExpr{
  647. RHS: &ast.BinaryExpr{
  648. OP: ast.GT,
  649. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  650. RHS: &ast.IntegerLiteral{Val: 20},
  651. },
  652. OP: ast.AND,
  653. LHS: &ast.BinaryExpr{
  654. OP: ast.GT,
  655. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  656. RHS: &ast.IntegerLiteral{Val: 111},
  657. },
  658. },
  659. }.Init(),
  660. FilterPlan{
  661. baseLogicalPlan: baseLogicalPlan{
  662. children: []LogicalPlan{
  663. DataSourcePlan{
  664. name: "tableInPlanner",
  665. streamFields: []interface{}{
  666. &ast.StreamField{
  667. Name: "hum",
  668. FieldType: &ast.BasicType{Type: ast.BIGINT},
  669. },
  670. &ast.StreamField{
  671. Name: "id",
  672. FieldType: &ast.BasicType{Type: ast.BIGINT},
  673. },
  674. },
  675. streamStmt: streams["tableInPlanner"],
  676. metaFields: []string{},
  677. }.Init(),
  678. },
  679. },
  680. condition: &ast.BinaryExpr{
  681. OP: ast.LT,
  682. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  683. RHS: &ast.IntegerLiteral{Val: 60},
  684. },
  685. }.Init(),
  686. },
  687. },
  688. Emitters: []string{"tableInPlanner"},
  689. }.Init(),
  690. },
  691. },
  692. from: &ast.Table{
  693. Name: "src1",
  694. },
  695. joins: []ast.Join{
  696. {
  697. Name: "tableInPlanner",
  698. Alias: "",
  699. JoinType: ast.INNER_JOIN,
  700. Expr: &ast.BinaryExpr{
  701. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  702. OP: ast.EQ,
  703. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  704. },
  705. },
  706. },
  707. }.Init(),
  708. },
  709. },
  710. fields: []ast.Field{
  711. {
  712. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  713. Name: "id1",
  714. AName: ""},
  715. },
  716. isAggregate: false,
  717. sendMeta: false,
  718. }.Init(),
  719. }, { // 9 join table with window
  720. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  721. p: ProjectPlan{
  722. baseLogicalPlan: baseLogicalPlan{
  723. children: []LogicalPlan{
  724. JoinPlan{
  725. baseLogicalPlan: baseLogicalPlan{
  726. children: []LogicalPlan{
  727. JoinAlignPlan{
  728. baseLogicalPlan: baseLogicalPlan{
  729. children: []LogicalPlan{
  730. WindowPlan{
  731. baseLogicalPlan: baseLogicalPlan{
  732. children: []LogicalPlan{
  733. FilterPlan{
  734. baseLogicalPlan: baseLogicalPlan{
  735. children: []LogicalPlan{
  736. DataSourcePlan{
  737. name: "src1",
  738. streamFields: []interface{}{
  739. &ast.StreamField{
  740. Name: "id1",
  741. FieldType: &ast.BasicType{Type: ast.BIGINT},
  742. },
  743. &ast.StreamField{
  744. Name: "temp",
  745. FieldType: &ast.BasicType{Type: ast.BIGINT},
  746. },
  747. },
  748. streamStmt: streams["src1"],
  749. metaFields: []string{},
  750. }.Init(),
  751. },
  752. },
  753. condition: &ast.BinaryExpr{
  754. RHS: &ast.BinaryExpr{
  755. OP: ast.GT,
  756. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  757. RHS: &ast.IntegerLiteral{Val: 20},
  758. },
  759. OP: ast.AND,
  760. LHS: &ast.BinaryExpr{
  761. OP: ast.GT,
  762. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  763. RHS: &ast.IntegerLiteral{Val: 111},
  764. },
  765. },
  766. }.Init(),
  767. },
  768. },
  769. condition: nil,
  770. wtype: ast.TUMBLING_WINDOW,
  771. length: 10000,
  772. interval: 0,
  773. limit: 0,
  774. }.Init(),
  775. FilterPlan{
  776. baseLogicalPlan: baseLogicalPlan{
  777. children: []LogicalPlan{
  778. DataSourcePlan{
  779. name: "tableInPlanner",
  780. streamFields: []interface{}{
  781. &ast.StreamField{
  782. Name: "hum",
  783. FieldType: &ast.BasicType{Type: ast.BIGINT},
  784. },
  785. &ast.StreamField{
  786. Name: "id",
  787. FieldType: &ast.BasicType{Type: ast.BIGINT},
  788. },
  789. },
  790. streamStmt: streams["tableInPlanner"],
  791. metaFields: []string{},
  792. }.Init(),
  793. },
  794. },
  795. condition: &ast.BinaryExpr{
  796. OP: ast.LT,
  797. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  798. RHS: &ast.IntegerLiteral{Val: 60},
  799. },
  800. }.Init(),
  801. },
  802. },
  803. Emitters: []string{"tableInPlanner"},
  804. }.Init(),
  805. },
  806. },
  807. from: &ast.Table{
  808. Name: "src1",
  809. },
  810. joins: []ast.Join{
  811. {
  812. Name: "tableInPlanner",
  813. Alias: "",
  814. JoinType: ast.INNER_JOIN,
  815. Expr: &ast.BinaryExpr{
  816. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  817. OP: ast.EQ,
  818. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  819. },
  820. },
  821. },
  822. }.Init(),
  823. },
  824. },
  825. fields: []ast.Field{
  826. {
  827. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  828. Name: "id1",
  829. AName: ""},
  830. },
  831. isAggregate: false,
  832. sendMeta: false,
  833. }.Init(),
  834. }, { // 10 meta
  835. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  836. p: ProjectPlan{
  837. baseLogicalPlan: baseLogicalPlan{
  838. children: []LogicalPlan{
  839. FilterPlan{
  840. baseLogicalPlan: baseLogicalPlan{
  841. children: []LogicalPlan{
  842. DataSourcePlan{
  843. name: "src1",
  844. streamFields: []interface{}{
  845. &ast.StreamField{
  846. Name: "temp",
  847. FieldType: &ast.BasicType{Type: ast.BIGINT},
  848. },
  849. },
  850. streamStmt: streams["src1"],
  851. metaFields: []string{"Humidity", "device", "id"},
  852. }.Init(),
  853. },
  854. },
  855. condition: &ast.BinaryExpr{
  856. LHS: &ast.Call{
  857. Name: "meta",
  858. Args: []ast.Expr{&ast.MetaRef{
  859. Name: "device",
  860. StreamName: ast.DefaultStream,
  861. }},
  862. },
  863. OP: ast.EQ,
  864. RHS: &ast.StringLiteral{
  865. Val: "demo2",
  866. },
  867. },
  868. }.Init(),
  869. },
  870. },
  871. fields: []ast.Field{
  872. {
  873. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  874. Name: "temp",
  875. AName: "",
  876. }, {
  877. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  878. &ast.Call{Name: "meta", Args: []ast.Expr{&ast.MetaRef{
  879. Name: "id",
  880. StreamName: ast.DefaultStream,
  881. }}},
  882. []ast.StreamName{},
  883. nil,
  884. )},
  885. Name: "meta",
  886. AName: "eid",
  887. }, {
  888. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  889. &ast.Call{Name: "meta", Args: []ast.Expr{
  890. &ast.BinaryExpr{
  891. OP: ast.ARROW,
  892. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  893. RHS: &ast.MetaRef{Name: "Device"},
  894. },
  895. }},
  896. []ast.StreamName{},
  897. nil,
  898. )},
  899. Name: "meta",
  900. AName: "hdevice",
  901. },
  902. },
  903. isAggregate: false,
  904. sendMeta: false,
  905. }.Init(),
  906. }, { // 11 join with same name field and aliased
  907. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  908. p: ProjectPlan{
  909. baseLogicalPlan: baseLogicalPlan{
  910. children: []LogicalPlan{
  911. JoinPlan{
  912. baseLogicalPlan: baseLogicalPlan{
  913. children: []LogicalPlan{
  914. JoinAlignPlan{
  915. baseLogicalPlan: baseLogicalPlan{
  916. children: []LogicalPlan{
  917. DataSourcePlan{
  918. name: "src2",
  919. streamFields: []interface{}{
  920. &ast.StreamField{
  921. Name: "hum",
  922. FieldType: &ast.BasicType{Type: ast.BIGINT},
  923. },
  924. &ast.StreamField{
  925. Name: "id2",
  926. FieldType: &ast.BasicType{Type: ast.BIGINT},
  927. },
  928. },
  929. streamStmt: streams["src2"],
  930. metaFields: []string{},
  931. }.Init(),
  932. DataSourcePlan{
  933. name: "tableInPlanner",
  934. streamFields: []interface{}{
  935. &ast.StreamField{
  936. Name: "hum",
  937. FieldType: &ast.BasicType{Type: ast.BIGINT},
  938. },
  939. &ast.StreamField{
  940. Name: "id",
  941. FieldType: &ast.BasicType{Type: ast.BIGINT},
  942. },
  943. },
  944. streamStmt: streams["tableInPlanner"],
  945. metaFields: []string{},
  946. }.Init(),
  947. },
  948. },
  949. Emitters: []string{"tableInPlanner"},
  950. }.Init(),
  951. },
  952. },
  953. from: &ast.Table{
  954. Name: "src2",
  955. },
  956. joins: []ast.Join{
  957. {
  958. Name: "tableInPlanner",
  959. Alias: "",
  960. JoinType: ast.INNER_JOIN,
  961. Expr: &ast.BinaryExpr{
  962. RHS: &ast.BinaryExpr{
  963. OP: ast.EQ,
  964. LHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  965. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  966. },
  967. OP: ast.AND,
  968. LHS: &ast.BinaryExpr{
  969. OP: ast.GT,
  970. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  971. &ast.FieldRef{
  972. Name: "hum",
  973. StreamName: "src2",
  974. },
  975. []ast.StreamName{"src2"},
  976. &boolFalse,
  977. )},
  978. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  979. &ast.FieldRef{
  980. Name: "hum",
  981. StreamName: "tableInPlanner",
  982. },
  983. []ast.StreamName{"tableInPlanner"},
  984. &boolFalse,
  985. )},
  986. },
  987. },
  988. },
  989. },
  990. }.Init(),
  991. },
  992. },
  993. fields: []ast.Field{
  994. {
  995. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  996. &ast.FieldRef{
  997. Name: "hum",
  998. StreamName: "src2",
  999. },
  1000. []ast.StreamName{"src2"},
  1001. &boolFalse,
  1002. )},
  1003. Name: "hum",
  1004. AName: "hum1",
  1005. }, {
  1006. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1007. &ast.FieldRef{
  1008. Name: "hum",
  1009. StreamName: "tableInPlanner",
  1010. },
  1011. []ast.StreamName{"tableInPlanner"},
  1012. &boolFalse,
  1013. )},
  1014. Name: "hum",
  1015. AName: "hum2",
  1016. },
  1017. },
  1018. isAggregate: false,
  1019. sendMeta: false,
  1020. }.Init(),
  1021. },
  1022. }
  1023. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1024. for i, tt := range tests {
  1025. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1026. if err != nil {
  1027. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1028. continue
  1029. }
  1030. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1031. IsEventTime: false,
  1032. LateTol: 0,
  1033. Concurrency: 0,
  1034. BufferLength: 0,
  1035. SendMetaToSink: false,
  1036. Qos: 0,
  1037. CheckpointInterval: 0,
  1038. SendError: true,
  1039. }, store)
  1040. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1041. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1042. } else if !reflect.DeepEqual(tt.p, p) {
  1043. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1044. }
  1045. }
  1046. }
  1047. func Test_createLogicalPlanSchemaless(t *testing.T) {
  1048. store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
  1049. err := store.Open()
  1050. if err != nil {
  1051. t.Error(err)
  1052. return
  1053. }
  1054. defer store.Close()
  1055. streamSqls := map[string]string{
  1056. "src1": `CREATE STREAM src1 (
  1057. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  1058. "src2": `CREATE STREAM src2 (
  1059. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  1060. "tableInPlanner": `CREATE TABLE tableInPlanner (
  1061. id BIGINT,
  1062. name STRING,
  1063. value STRING,
  1064. hum BIGINT
  1065. ) WITH (TYPE="file");`,
  1066. }
  1067. types := map[string]ast.StreamType{
  1068. "src1": ast.TypeStream,
  1069. "src2": ast.TypeStream,
  1070. "tableInPlanner": ast.TypeTable,
  1071. }
  1072. for name, sql := range streamSqls {
  1073. s, err := json.Marshal(&xsql.StreamInfo{
  1074. StreamType: types[name],
  1075. Statement: sql,
  1076. })
  1077. if err != nil {
  1078. t.Error(err)
  1079. t.Fail()
  1080. }
  1081. store.Set(name, string(s))
  1082. }
  1083. streams := make(map[string]*ast.StreamStmt)
  1084. for n := range streamSqls {
  1085. streamStmt, err := xsql.GetDataSource(store, n)
  1086. if err != nil {
  1087. t.Errorf("fail to get stream %s, please check if stream is created", n)
  1088. return
  1089. }
  1090. streams[n] = streamStmt
  1091. }
  1092. var (
  1093. //boolTrue = true
  1094. boolFalse = false
  1095. )
  1096. var tests = []struct {
  1097. sql string
  1098. p LogicalPlan
  1099. err string
  1100. }{
  1101. { // 0
  1102. sql: `SELECT name FROM src1`,
  1103. p: ProjectPlan{
  1104. baseLogicalPlan: baseLogicalPlan{
  1105. children: []LogicalPlan{
  1106. DataSourcePlan{
  1107. baseLogicalPlan: baseLogicalPlan{},
  1108. name: "src1",
  1109. streamFields: []interface{}{
  1110. "name",
  1111. },
  1112. streamStmt: streams["src1"],
  1113. metaFields: []string{},
  1114. }.Init(),
  1115. },
  1116. },
  1117. fields: []ast.Field{
  1118. {
  1119. Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1120. Name: "name",
  1121. AName: "",
  1122. },
  1123. },
  1124. isAggregate: false,
  1125. sendMeta: false,
  1126. }.Init(),
  1127. }, { // 1 optimize where to data source
  1128. sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1129. p: ProjectPlan{
  1130. baseLogicalPlan: baseLogicalPlan{
  1131. children: []LogicalPlan{
  1132. WindowPlan{
  1133. baseLogicalPlan: baseLogicalPlan{
  1134. children: []LogicalPlan{
  1135. FilterPlan{
  1136. baseLogicalPlan: baseLogicalPlan{
  1137. children: []LogicalPlan{
  1138. DataSourcePlan{
  1139. name: "src1",
  1140. streamFields: []interface{}{
  1141. "name", "temp",
  1142. },
  1143. streamStmt: streams["src1"],
  1144. metaFields: []string{},
  1145. }.Init(),
  1146. },
  1147. },
  1148. condition: &ast.BinaryExpr{
  1149. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1150. OP: ast.EQ,
  1151. RHS: &ast.StringLiteral{Val: "v1"},
  1152. },
  1153. }.Init(),
  1154. },
  1155. },
  1156. condition: nil,
  1157. wtype: ast.TUMBLING_WINDOW,
  1158. length: 10000,
  1159. interval: 0,
  1160. limit: 0,
  1161. }.Init(),
  1162. },
  1163. },
  1164. fields: []ast.Field{
  1165. {
  1166. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1167. Name: "temp",
  1168. AName: ""},
  1169. },
  1170. isAggregate: false,
  1171. sendMeta: false,
  1172. }.Init(),
  1173. }, { // 2 condition that cannot be optimized
  1174. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1175. p: ProjectPlan{
  1176. baseLogicalPlan: baseLogicalPlan{
  1177. children: []LogicalPlan{
  1178. JoinPlan{
  1179. baseLogicalPlan: baseLogicalPlan{
  1180. children: []LogicalPlan{
  1181. WindowPlan{
  1182. baseLogicalPlan: baseLogicalPlan{
  1183. children: []LogicalPlan{
  1184. DataSourcePlan{
  1185. name: "src1",
  1186. streamFields: []interface{}{
  1187. "id1", "temp",
  1188. },
  1189. streamStmt: streams["src1"],
  1190. metaFields: []string{},
  1191. }.Init(),
  1192. DataSourcePlan{
  1193. name: "src2",
  1194. streamFields: []interface{}{ // can't determine where is id1 belonged to
  1195. "hum", "id1", "id2",
  1196. },
  1197. streamStmt: streams["src2"],
  1198. metaFields: []string{},
  1199. }.Init(),
  1200. },
  1201. },
  1202. condition: nil,
  1203. wtype: ast.TUMBLING_WINDOW,
  1204. length: 10000,
  1205. interval: 0,
  1206. limit: 0,
  1207. }.Init(),
  1208. },
  1209. },
  1210. from: &ast.Table{Name: "src1"},
  1211. joins: ast.Joins{ast.Join{
  1212. Name: "src2",
  1213. JoinType: ast.INNER_JOIN,
  1214. Expr: &ast.BinaryExpr{
  1215. OP: ast.AND,
  1216. LHS: &ast.BinaryExpr{
  1217. LHS: &ast.BinaryExpr{
  1218. OP: ast.GT,
  1219. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1220. RHS: &ast.IntegerLiteral{Val: 20},
  1221. },
  1222. OP: ast.OR,
  1223. RHS: &ast.BinaryExpr{
  1224. OP: ast.GT,
  1225. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1226. RHS: &ast.IntegerLiteral{Val: 60},
  1227. },
  1228. },
  1229. RHS: &ast.BinaryExpr{
  1230. OP: ast.EQ,
  1231. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1232. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1233. },
  1234. },
  1235. }},
  1236. }.Init(),
  1237. },
  1238. },
  1239. fields: []ast.Field{
  1240. {
  1241. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1242. Name: "id1",
  1243. AName: ""},
  1244. },
  1245. isAggregate: false,
  1246. sendMeta: false,
  1247. }.Init(),
  1248. }, { // 3 optimize window filter
  1249. sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
  1250. p: ProjectPlan{
  1251. baseLogicalPlan: baseLogicalPlan{
  1252. children: []LogicalPlan{
  1253. WindowPlan{
  1254. baseLogicalPlan: baseLogicalPlan{
  1255. children: []LogicalPlan{
  1256. FilterPlan{
  1257. baseLogicalPlan: baseLogicalPlan{
  1258. children: []LogicalPlan{
  1259. DataSourcePlan{
  1260. name: "src1",
  1261. streamFields: []interface{}{
  1262. "id1", "name", "temp",
  1263. },
  1264. streamStmt: streams["src1"],
  1265. metaFields: []string{},
  1266. }.Init(),
  1267. },
  1268. },
  1269. condition: &ast.BinaryExpr{
  1270. OP: ast.AND,
  1271. LHS: &ast.BinaryExpr{
  1272. LHS: &ast.FieldRef{Name: "name", StreamName: "src1"},
  1273. OP: ast.EQ,
  1274. RHS: &ast.StringLiteral{Val: "v1"},
  1275. },
  1276. RHS: &ast.BinaryExpr{
  1277. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1278. OP: ast.GT,
  1279. RHS: &ast.IntegerLiteral{Val: 2},
  1280. },
  1281. },
  1282. }.Init(),
  1283. },
  1284. },
  1285. condition: nil,
  1286. wtype: ast.TUMBLING_WINDOW,
  1287. length: 10000,
  1288. interval: 0,
  1289. limit: 0,
  1290. }.Init(),
  1291. },
  1292. },
  1293. fields: []ast.Field{
  1294. {
  1295. Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1296. Name: "id1",
  1297. AName: ""},
  1298. },
  1299. isAggregate: false,
  1300. sendMeta: false,
  1301. }.Init(),
  1302. }, { // 4. do not optimize count window
  1303. sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  1304. p: ProjectPlan{
  1305. baseLogicalPlan: baseLogicalPlan{
  1306. children: []LogicalPlan{
  1307. HavingPlan{
  1308. baseLogicalPlan: baseLogicalPlan{
  1309. children: []LogicalPlan{
  1310. FilterPlan{
  1311. baseLogicalPlan: baseLogicalPlan{
  1312. children: []LogicalPlan{
  1313. WindowPlan{
  1314. baseLogicalPlan: baseLogicalPlan{
  1315. children: []LogicalPlan{
  1316. DataSourcePlan{
  1317. name: "src1",
  1318. isWildCard: true,
  1319. streamFields: nil,
  1320. streamStmt: streams["src1"],
  1321. metaFields: []string{},
  1322. }.Init(),
  1323. },
  1324. },
  1325. condition: nil,
  1326. wtype: ast.COUNT_WINDOW,
  1327. length: 5,
  1328. interval: 1,
  1329. limit: 0,
  1330. }.Init(),
  1331. },
  1332. },
  1333. condition: &ast.BinaryExpr{
  1334. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1335. OP: ast.GT,
  1336. RHS: &ast.IntegerLiteral{Val: 20},
  1337. },
  1338. }.Init(),
  1339. },
  1340. },
  1341. condition: &ast.BinaryExpr{
  1342. LHS: &ast.Call{Name: "COUNT", Args: []ast.Expr{&ast.Wildcard{
  1343. Token: ast.ASTERISK,
  1344. }}},
  1345. OP: ast.GT,
  1346. RHS: &ast.IntegerLiteral{Val: 2},
  1347. },
  1348. }.Init(),
  1349. },
  1350. },
  1351. fields: []ast.Field{
  1352. {
  1353. Expr: &ast.Wildcard{Token: ast.ASTERISK},
  1354. Name: "kuiper_field_0",
  1355. AName: ""},
  1356. },
  1357. isAggregate: false,
  1358. sendMeta: false,
  1359. }.Init(),
  1360. }, { // 5. optimize join on
  1361. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1362. p: ProjectPlan{
  1363. baseLogicalPlan: baseLogicalPlan{
  1364. children: []LogicalPlan{
  1365. JoinPlan{
  1366. baseLogicalPlan: baseLogicalPlan{
  1367. children: []LogicalPlan{
  1368. WindowPlan{
  1369. baseLogicalPlan: baseLogicalPlan{
  1370. children: []LogicalPlan{
  1371. FilterPlan{
  1372. baseLogicalPlan: baseLogicalPlan{
  1373. children: []LogicalPlan{
  1374. DataSourcePlan{
  1375. name: "src1",
  1376. streamFields: []interface{}{
  1377. "id1", "temp",
  1378. },
  1379. streamStmt: streams["src1"],
  1380. metaFields: []string{},
  1381. }.Init(),
  1382. },
  1383. },
  1384. condition: &ast.BinaryExpr{
  1385. RHS: &ast.BinaryExpr{
  1386. OP: ast.GT,
  1387. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1388. RHS: &ast.IntegerLiteral{Val: 20},
  1389. },
  1390. OP: ast.AND,
  1391. LHS: &ast.BinaryExpr{
  1392. OP: ast.GT,
  1393. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1394. RHS: &ast.IntegerLiteral{Val: 111},
  1395. },
  1396. },
  1397. }.Init(),
  1398. FilterPlan{
  1399. baseLogicalPlan: baseLogicalPlan{
  1400. children: []LogicalPlan{
  1401. DataSourcePlan{
  1402. name: "src2",
  1403. streamFields: []interface{}{
  1404. "hum", "id1", "id2",
  1405. },
  1406. streamStmt: streams["src2"],
  1407. metaFields: []string{},
  1408. }.Init(),
  1409. },
  1410. },
  1411. condition: &ast.BinaryExpr{
  1412. OP: ast.LT,
  1413. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1414. RHS: &ast.IntegerLiteral{Val: 60},
  1415. },
  1416. }.Init(),
  1417. },
  1418. },
  1419. condition: nil,
  1420. wtype: ast.TUMBLING_WINDOW,
  1421. length: 10000,
  1422. interval: 0,
  1423. limit: 0,
  1424. }.Init(),
  1425. },
  1426. },
  1427. from: &ast.Table{
  1428. Name: "src1",
  1429. },
  1430. joins: []ast.Join{
  1431. {
  1432. Name: "src2",
  1433. Alias: "",
  1434. JoinType: ast.INNER_JOIN,
  1435. Expr: &ast.BinaryExpr{
  1436. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1437. OP: ast.EQ,
  1438. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1439. },
  1440. },
  1441. },
  1442. }.Init(),
  1443. },
  1444. },
  1445. fields: []ast.Field{
  1446. {
  1447. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1448. Name: "id1",
  1449. AName: ""},
  1450. },
  1451. isAggregate: false,
  1452. sendMeta: false,
  1453. }.Init(),
  1454. }, { // 6. optimize outter join on
  1455. sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1456. p: ProjectPlan{
  1457. baseLogicalPlan: baseLogicalPlan{
  1458. children: []LogicalPlan{
  1459. JoinPlan{
  1460. baseLogicalPlan: baseLogicalPlan{
  1461. children: []LogicalPlan{
  1462. WindowPlan{
  1463. baseLogicalPlan: baseLogicalPlan{
  1464. children: []LogicalPlan{
  1465. FilterPlan{
  1466. baseLogicalPlan: baseLogicalPlan{
  1467. children: []LogicalPlan{
  1468. DataSourcePlan{
  1469. name: "src1",
  1470. streamFields: []interface{}{
  1471. "id1", "temp",
  1472. },
  1473. streamStmt: streams["src1"],
  1474. metaFields: []string{},
  1475. }.Init(),
  1476. },
  1477. },
  1478. condition: &ast.BinaryExpr{
  1479. OP: ast.GT,
  1480. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1481. RHS: &ast.IntegerLiteral{Val: 111},
  1482. },
  1483. }.Init(),
  1484. DataSourcePlan{
  1485. name: "src2",
  1486. streamFields: []interface{}{
  1487. "hum", "id1", "id2",
  1488. },
  1489. streamStmt: streams["src2"],
  1490. metaFields: []string{},
  1491. }.Init(),
  1492. },
  1493. },
  1494. condition: nil,
  1495. wtype: ast.TUMBLING_WINDOW,
  1496. length: 10000,
  1497. interval: 0,
  1498. limit: 0,
  1499. }.Init(),
  1500. },
  1501. },
  1502. from: &ast.Table{
  1503. Name: "src1",
  1504. },
  1505. joins: []ast.Join{
  1506. {
  1507. Name: "src2",
  1508. Alias: "",
  1509. JoinType: ast.FULL_JOIN,
  1510. Expr: &ast.BinaryExpr{
  1511. OP: ast.AND,
  1512. LHS: &ast.BinaryExpr{
  1513. OP: ast.AND,
  1514. LHS: &ast.BinaryExpr{
  1515. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1516. OP: ast.EQ,
  1517. RHS: &ast.FieldRef{Name: "id2", StreamName: "src2"},
  1518. },
  1519. RHS: &ast.BinaryExpr{
  1520. OP: ast.GT,
  1521. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1522. RHS: &ast.IntegerLiteral{Val: 20},
  1523. },
  1524. },
  1525. RHS: &ast.BinaryExpr{
  1526. OP: ast.LT,
  1527. LHS: &ast.FieldRef{Name: "hum", StreamName: "src2"},
  1528. RHS: &ast.IntegerLiteral{Val: 60},
  1529. },
  1530. },
  1531. },
  1532. },
  1533. }.Init(),
  1534. },
  1535. },
  1536. fields: []ast.Field{
  1537. {
  1538. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1539. Name: "id1",
  1540. AName: ""},
  1541. },
  1542. isAggregate: false,
  1543. sendMeta: false,
  1544. }.Init(),
  1545. }, { // 7 window error for table
  1546. sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1547. p: nil,
  1548. err: "cannot run window for TABLE sources",
  1549. }, { // 8 join table without window
  1550. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
  1551. p: ProjectPlan{
  1552. baseLogicalPlan: baseLogicalPlan{
  1553. children: []LogicalPlan{
  1554. JoinPlan{
  1555. baseLogicalPlan: baseLogicalPlan{
  1556. children: []LogicalPlan{
  1557. JoinAlignPlan{
  1558. baseLogicalPlan: baseLogicalPlan{
  1559. children: []LogicalPlan{
  1560. FilterPlan{
  1561. baseLogicalPlan: baseLogicalPlan{
  1562. children: []LogicalPlan{
  1563. DataSourcePlan{
  1564. name: "src1",
  1565. streamFields: []interface{}{
  1566. "hum", "id1", "temp",
  1567. },
  1568. streamStmt: streams["src1"],
  1569. metaFields: []string{},
  1570. }.Init(),
  1571. },
  1572. },
  1573. condition: &ast.BinaryExpr{
  1574. RHS: &ast.BinaryExpr{
  1575. OP: ast.GT,
  1576. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1577. RHS: &ast.IntegerLiteral{Val: 20},
  1578. },
  1579. OP: ast.AND,
  1580. LHS: &ast.BinaryExpr{
  1581. OP: ast.GT,
  1582. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1583. RHS: &ast.IntegerLiteral{Val: 111},
  1584. },
  1585. },
  1586. }.Init(),
  1587. DataSourcePlan{
  1588. name: "tableInPlanner",
  1589. streamFields: []interface{}{
  1590. &ast.StreamField{
  1591. Name: "hum",
  1592. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1593. },
  1594. &ast.StreamField{
  1595. Name: "id",
  1596. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1597. },
  1598. },
  1599. streamStmt: streams["tableInPlanner"],
  1600. metaFields: []string{},
  1601. }.Init(),
  1602. },
  1603. },
  1604. Emitters: []string{"tableInPlanner"},
  1605. }.Init(),
  1606. },
  1607. },
  1608. from: &ast.Table{
  1609. Name: "src1",
  1610. },
  1611. joins: []ast.Join{
  1612. {
  1613. Name: "tableInPlanner",
  1614. Alias: "",
  1615. JoinType: ast.INNER_JOIN,
  1616. Expr: &ast.BinaryExpr{
  1617. OP: ast.AND,
  1618. LHS: &ast.BinaryExpr{
  1619. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1620. OP: ast.EQ,
  1621. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1622. },
  1623. RHS: &ast.BinaryExpr{
  1624. OP: ast.LT,
  1625. LHS: &ast.FieldRef{Name: "hum", StreamName: ast.DefaultStream},
  1626. RHS: &ast.IntegerLiteral{Val: 60},
  1627. },
  1628. },
  1629. },
  1630. },
  1631. }.Init(),
  1632. },
  1633. },
  1634. fields: []ast.Field{
  1635. {
  1636. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1637. Name: "id1",
  1638. AName: ""},
  1639. },
  1640. isAggregate: false,
  1641. sendMeta: false,
  1642. }.Init(),
  1643. }, { // 9 join table with window
  1644. sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  1645. p: ProjectPlan{
  1646. baseLogicalPlan: baseLogicalPlan{
  1647. children: []LogicalPlan{
  1648. JoinPlan{
  1649. baseLogicalPlan: baseLogicalPlan{
  1650. children: []LogicalPlan{
  1651. JoinAlignPlan{
  1652. baseLogicalPlan: baseLogicalPlan{
  1653. children: []LogicalPlan{
  1654. WindowPlan{
  1655. baseLogicalPlan: baseLogicalPlan{
  1656. children: []LogicalPlan{
  1657. FilterPlan{
  1658. baseLogicalPlan: baseLogicalPlan{
  1659. children: []LogicalPlan{
  1660. DataSourcePlan{
  1661. name: "src1",
  1662. streamFields: []interface{}{
  1663. "id1", "temp",
  1664. },
  1665. streamStmt: streams["src1"],
  1666. metaFields: []string{},
  1667. }.Init(),
  1668. },
  1669. },
  1670. condition: &ast.BinaryExpr{
  1671. RHS: &ast.BinaryExpr{
  1672. OP: ast.GT,
  1673. LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1674. RHS: &ast.IntegerLiteral{Val: 20},
  1675. },
  1676. OP: ast.AND,
  1677. LHS: &ast.BinaryExpr{
  1678. OP: ast.GT,
  1679. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1680. RHS: &ast.IntegerLiteral{Val: 111},
  1681. },
  1682. },
  1683. }.Init(),
  1684. },
  1685. },
  1686. condition: nil,
  1687. wtype: ast.TUMBLING_WINDOW,
  1688. length: 10000,
  1689. interval: 0,
  1690. limit: 0,
  1691. }.Init(),
  1692. FilterPlan{
  1693. baseLogicalPlan: baseLogicalPlan{
  1694. children: []LogicalPlan{
  1695. DataSourcePlan{
  1696. name: "tableInPlanner",
  1697. streamFields: []interface{}{
  1698. &ast.StreamField{
  1699. Name: "hum",
  1700. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1701. },
  1702. &ast.StreamField{
  1703. Name: "id",
  1704. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1705. },
  1706. },
  1707. streamStmt: streams["tableInPlanner"],
  1708. metaFields: []string{},
  1709. }.Init(),
  1710. },
  1711. },
  1712. condition: &ast.BinaryExpr{
  1713. OP: ast.LT,
  1714. LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
  1715. RHS: &ast.IntegerLiteral{Val: 60},
  1716. },
  1717. }.Init(),
  1718. },
  1719. },
  1720. Emitters: []string{"tableInPlanner"},
  1721. }.Init(),
  1722. },
  1723. },
  1724. from: &ast.Table{
  1725. Name: "src1",
  1726. },
  1727. joins: []ast.Join{
  1728. {
  1729. Name: "tableInPlanner",
  1730. Alias: "",
  1731. JoinType: ast.INNER_JOIN,
  1732. Expr: &ast.BinaryExpr{
  1733. LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
  1734. OP: ast.EQ,
  1735. RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
  1736. },
  1737. },
  1738. },
  1739. }.Init(),
  1740. },
  1741. },
  1742. fields: []ast.Field{
  1743. {
  1744. Expr: &ast.FieldRef{Name: "id1", StreamName: ast.DefaultStream},
  1745. Name: "id1",
  1746. AName: ""},
  1747. },
  1748. isAggregate: false,
  1749. sendMeta: false,
  1750. }.Init(),
  1751. }, { // 10 meta
  1752. sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
  1753. p: ProjectPlan{
  1754. baseLogicalPlan: baseLogicalPlan{
  1755. children: []LogicalPlan{
  1756. FilterPlan{
  1757. baseLogicalPlan: baseLogicalPlan{
  1758. children: []LogicalPlan{
  1759. DataSourcePlan{
  1760. name: "src1",
  1761. streamFields: []interface{}{
  1762. "temp",
  1763. },
  1764. streamStmt: streams["src1"],
  1765. metaFields: []string{"Humidity", "device", "id"},
  1766. }.Init(),
  1767. },
  1768. },
  1769. condition: &ast.BinaryExpr{
  1770. LHS: &ast.Call{
  1771. Name: "meta",
  1772. Args: []ast.Expr{&ast.MetaRef{
  1773. Name: "device",
  1774. StreamName: ast.DefaultStream,
  1775. }},
  1776. },
  1777. OP: ast.EQ,
  1778. RHS: &ast.StringLiteral{
  1779. Val: "demo2",
  1780. },
  1781. },
  1782. }.Init(),
  1783. },
  1784. },
  1785. fields: []ast.Field{
  1786. {
  1787. Expr: &ast.FieldRef{Name: "temp", StreamName: "src1"},
  1788. Name: "temp",
  1789. AName: "",
  1790. }, {
  1791. Expr: &ast.FieldRef{Name: "eid", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1792. &ast.Call{Name: "meta", Args: []ast.Expr{&ast.MetaRef{
  1793. Name: "id",
  1794. StreamName: ast.DefaultStream,
  1795. }}},
  1796. []ast.StreamName{},
  1797. nil,
  1798. )},
  1799. Name: "meta",
  1800. AName: "eid",
  1801. }, {
  1802. Expr: &ast.FieldRef{Name: "hdevice", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1803. &ast.Call{Name: "meta", Args: []ast.Expr{
  1804. &ast.BinaryExpr{
  1805. OP: ast.ARROW,
  1806. LHS: &ast.MetaRef{Name: "Humidity", StreamName: ast.DefaultStream},
  1807. RHS: &ast.MetaRef{Name: "Device"},
  1808. },
  1809. }},
  1810. []ast.StreamName{},
  1811. nil,
  1812. )},
  1813. Name: "meta",
  1814. AName: "hdevice",
  1815. },
  1816. },
  1817. isAggregate: false,
  1818. sendMeta: false,
  1819. }.Init(),
  1820. }, { // 11 join with same name field and aliased
  1821. sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
  1822. p: ProjectPlan{
  1823. baseLogicalPlan: baseLogicalPlan{
  1824. children: []LogicalPlan{
  1825. JoinPlan{
  1826. baseLogicalPlan: baseLogicalPlan{
  1827. children: []LogicalPlan{
  1828. JoinAlignPlan{
  1829. baseLogicalPlan: baseLogicalPlan{
  1830. children: []LogicalPlan{
  1831. DataSourcePlan{
  1832. name: "src2",
  1833. streamFields: []interface{}{
  1834. "hum", "id", "id2",
  1835. },
  1836. streamStmt: streams["src2"],
  1837. metaFields: []string{},
  1838. }.Init(),
  1839. DataSourcePlan{
  1840. name: "tableInPlanner",
  1841. streamFields: []interface{}{
  1842. &ast.StreamField{
  1843. Name: "hum",
  1844. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1845. },
  1846. &ast.StreamField{
  1847. Name: "id",
  1848. FieldType: &ast.BasicType{Type: ast.BIGINT},
  1849. },
  1850. },
  1851. streamStmt: streams["tableInPlanner"],
  1852. metaFields: []string{},
  1853. }.Init(),
  1854. },
  1855. },
  1856. Emitters: []string{"tableInPlanner"},
  1857. }.Init(),
  1858. },
  1859. },
  1860. from: &ast.Table{
  1861. Name: "src2",
  1862. },
  1863. joins: []ast.Join{
  1864. {
  1865. Name: "tableInPlanner",
  1866. Alias: "",
  1867. JoinType: ast.INNER_JOIN,
  1868. Expr: &ast.BinaryExpr{
  1869. RHS: &ast.BinaryExpr{
  1870. OP: ast.EQ,
  1871. LHS: &ast.FieldRef{Name: "id2", StreamName: ast.DefaultStream},
  1872. RHS: &ast.FieldRef{Name: "id", StreamName: ast.DefaultStream},
  1873. },
  1874. OP: ast.AND,
  1875. LHS: &ast.BinaryExpr{
  1876. OP: ast.GT,
  1877. LHS: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1878. &ast.FieldRef{
  1879. Name: "hum",
  1880. StreamName: "src2",
  1881. },
  1882. []ast.StreamName{"src2"},
  1883. &boolFalse,
  1884. )},
  1885. RHS: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1886. &ast.FieldRef{
  1887. Name: "hum",
  1888. StreamName: "tableInPlanner",
  1889. },
  1890. []ast.StreamName{"tableInPlanner"},
  1891. &boolFalse,
  1892. )},
  1893. },
  1894. },
  1895. },
  1896. },
  1897. }.Init(),
  1898. },
  1899. },
  1900. fields: []ast.Field{
  1901. {
  1902. Expr: &ast.FieldRef{Name: "hum1", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1903. &ast.FieldRef{
  1904. Name: "hum",
  1905. StreamName: "src2",
  1906. },
  1907. []ast.StreamName{"src2"},
  1908. &boolFalse,
  1909. )},
  1910. Name: "hum",
  1911. AName: "hum1",
  1912. }, {
  1913. Expr: &ast.FieldRef{Name: "hum2", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
  1914. &ast.FieldRef{
  1915. Name: "hum",
  1916. StreamName: "tableInPlanner",
  1917. },
  1918. []ast.StreamName{"tableInPlanner"},
  1919. &boolFalse,
  1920. )},
  1921. Name: "hum",
  1922. AName: "hum2",
  1923. },
  1924. },
  1925. isAggregate: false,
  1926. sendMeta: false,
  1927. }.Init(),
  1928. },
  1929. }
  1930. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1931. for i, tt := range tests {
  1932. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1933. if err != nil {
  1934. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  1935. continue
  1936. }
  1937. p, err := createLogicalPlan(stmt, &api.RuleOption{
  1938. IsEventTime: false,
  1939. LateTol: 0,
  1940. Concurrency: 0,
  1941. BufferLength: 0,
  1942. SendMetaToSink: false,
  1943. Qos: 0,
  1944. CheckpointInterval: 0,
  1945. SendError: true,
  1946. }, store)
  1947. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  1948. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
  1949. } else if !reflect.DeepEqual(tt.p, p) {
  1950. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  1951. }
  1952. }
  1953. }