parser.go 24 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001
  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/golang-collections/collections/stack"
  5. "io"
  6. "strconv"
  7. "strings"
  8. )
  9. type Parser struct {
  10. s *Scanner
  11. i int // buffer index
  12. n int // buffer char count
  13. buf [3]struct {
  14. tok Token
  15. lit string
  16. }
  17. }
  18. func (p *Parser) parseCondition() (Expr, error) {
  19. if tok, _ := p.scanIgnoreWhitespace(); tok != WHERE {
  20. p.unscan()
  21. return nil, nil
  22. }
  23. expr, err := p.ParseExpr()
  24. if err != nil {
  25. return nil, err
  26. }
  27. return expr, nil
  28. }
  29. func (p *Parser) scan() (tok Token, lit string) {
  30. if p.n > 0 {
  31. p.n--
  32. return p.curr()
  33. }
  34. tok, lit = p.s.Scan()
  35. if tok != WS && tok != COMMENT {
  36. p.i = (p.i + 1) % len(p.buf)
  37. buf := &p.buf[p.i]
  38. buf.tok, buf.lit = tok, lit
  39. }
  40. return
  41. }
  42. func (p *Parser) curr() (Token, string) {
  43. i := (p.i - p.n + len(p.buf)) % len(p.buf)
  44. buf := &p.buf[i]
  45. return buf.tok, buf.lit
  46. }
  47. func (p *Parser) scanIgnoreWhitespace() (tok Token, lit string) {
  48. tok, lit = p.scan()
  49. for {
  50. if tok == WS || tok == COMMENT {
  51. tok, lit = p.scan()
  52. } else {
  53. break
  54. }
  55. }
  56. return tok, lit
  57. }
  58. func (p *Parser) unscan() { p.n++ }
  59. func NewParser(r io.Reader) *Parser {
  60. return &Parser{s: NewScanner(r)}
  61. }
  62. func (p *Parser) ParseQueries() (SelectStatements, error) {
  63. var stmts SelectStatements
  64. if stmt, err := p.Parse(); err != nil {
  65. return nil, err
  66. } else {
  67. stmts = append(stmts, *stmt)
  68. }
  69. for {
  70. if tok, _ := p.scanIgnoreWhitespace(); tok == SEMICOLON {
  71. if stmt, err := p.Parse(); err != nil {
  72. return nil, err
  73. } else {
  74. if stmt != nil {
  75. stmts = append(stmts, *stmt)
  76. }
  77. }
  78. } else if tok == EOF {
  79. break
  80. }
  81. }
  82. return stmts, nil
  83. }
  84. func (p *Parser) Parse() (*SelectStatement, error) {
  85. selects := &SelectStatement{}
  86. if tok, lit := p.scanIgnoreWhitespace(); tok == EOF {
  87. return nil, nil
  88. } else if tok != SELECT {
  89. return nil, fmt.Errorf("Found %q, Expected SELECT.\n", lit)
  90. }
  91. if fields, err := p.parseFields(); err != nil {
  92. return nil, err
  93. } else {
  94. selects.Fields = fields
  95. }
  96. if src, err := p.parseSource(); err != nil {
  97. return nil, err
  98. } else {
  99. selects.Sources = src
  100. }
  101. if joins, err := p.parseJoins(); err != nil {
  102. return nil, err
  103. } else {
  104. selects.Joins = joins
  105. }
  106. if exp, err := p.parseCondition(); err != nil {
  107. return nil, err
  108. } else {
  109. if exp != nil {
  110. selects.Condition = exp
  111. }
  112. }
  113. if dims, err := p.parseDimensions(); err != nil {
  114. return nil, err
  115. } else {
  116. selects.Dimensions = dims
  117. }
  118. if having, err := p.parseHaving(); err != nil {
  119. return nil, err
  120. } else {
  121. selects.Having = having
  122. }
  123. if sorts, err := p.parseSorts(); err != nil {
  124. return nil, err
  125. } else {
  126. selects.SortFields = sorts
  127. }
  128. if tok, lit := p.scanIgnoreWhitespace(); tok == SEMICOLON {
  129. p.unscan()
  130. return selects, nil
  131. } else if tok != EOF {
  132. return nil, fmt.Errorf("found %q, expected EOF.", lit)
  133. }
  134. if err := Validate(selects); err != nil {
  135. return nil, err
  136. }
  137. return selects, nil
  138. }
  139. func (p *Parser) parseSource() (Sources, error) {
  140. var sources Sources
  141. if tok, lit := p.scanIgnoreWhitespace(); tok != FROM {
  142. return nil, fmt.Errorf("found %q, expected FROM.", lit)
  143. }
  144. if src, alias, err := p.parseSourceLiteral(); err != nil {
  145. return nil, err
  146. } else {
  147. sources = append(sources, &Table{Name: src, Alias: alias})
  148. }
  149. return sources, nil
  150. }
  151. //TODO Current func has problems when the source includes white space.
  152. func (p *Parser) parseSourceLiteral() (string, string, error) {
  153. var sourceSeg []string
  154. var alias string
  155. for {
  156. //HASH, DIV & ADD token is specially support for MQTT topic name patterns.
  157. if tok, lit := p.scanIgnoreWhitespace(); tok.allowedSourceToken() {
  158. sourceSeg = append(sourceSeg, lit)
  159. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == AS {
  160. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  161. alias = lit2
  162. } else {
  163. return "", "", fmt.Errorf("found %q, expected JOIN key word.", lit)
  164. }
  165. } else if tok1.allowedSourceToken() {
  166. sourceSeg = append(sourceSeg, lit1)
  167. } else {
  168. p.unscan()
  169. break
  170. }
  171. } else {
  172. p.unscan()
  173. break
  174. }
  175. }
  176. return strings.Join(sourceSeg, ""), alias, nil
  177. }
  178. func (p *Parser) parseFieldNameSections() ([]string, error) {
  179. var fieldNameSects []string
  180. for {
  181. if tok, lit := p.scanIgnoreWhitespace(); tok == IDENT || tok == ASTERISK {
  182. fieldNameSects = append(fieldNameSects, lit)
  183. if tok1, _ := p.scanIgnoreWhitespace(); !tok1.allowedSFNToken() {
  184. p.unscan()
  185. break
  186. }
  187. } else {
  188. p.unscan()
  189. break
  190. }
  191. }
  192. if len(fieldNameSects) == 0 {
  193. return nil, fmt.Errorf("Cannot find any field name.\n")
  194. } else if len(fieldNameSects) > 2 {
  195. return nil, fmt.Errorf("Too many field names. Please use -> to reference keys in struct.\n")
  196. }
  197. return fieldNameSects, nil
  198. }
  199. func (p *Parser) parseJoins() (Joins, error) {
  200. var joins Joins
  201. for {
  202. if tok, lit := p.scanIgnoreWhitespace(); tok == INNER || tok == LEFT || tok == RIGHT || tok == FULL || tok == CROSS {
  203. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == JOIN {
  204. var jt JoinType = INNER_JOIN
  205. switch tok {
  206. case INNER:
  207. jt = INNER_JOIN
  208. case LEFT:
  209. jt = LEFT_JOIN
  210. case RIGHT:
  211. jt = RIGHT_JOIN
  212. case FULL:
  213. jt = FULL_JOIN
  214. case CROSS:
  215. jt = CROSS_JOIN
  216. }
  217. if j, err := p.ParseJoin(jt); err != nil {
  218. return nil, err
  219. } else {
  220. joins = append(joins, *j)
  221. }
  222. } else {
  223. return nil, fmt.Errorf("found %q, expected JOIN key word.", lit)
  224. }
  225. } else {
  226. p.unscan()
  227. if len(joins) > 0 {
  228. return joins, nil
  229. }
  230. return nil, nil
  231. }
  232. }
  233. return joins, nil
  234. }
  235. func (p *Parser) ParseJoin(joinType JoinType) (*Join, error) {
  236. var j = &Join{ JoinType : joinType }
  237. if src, alias, err := p.parseSourceLiteral(); err != nil {
  238. return nil, err
  239. } else {
  240. j.Name = src
  241. j.Alias = alias
  242. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ON {
  243. if CROSS_JOIN == joinType {
  244. return nil, fmt.Errorf("On expression is not required for cross join type.\n")
  245. }
  246. if exp, err := p.ParseExpr(); err != nil {
  247. return nil, err
  248. } else {
  249. j.Expr = exp
  250. }
  251. } else {
  252. p.unscan()
  253. }
  254. }
  255. return j, nil
  256. }
  257. func (p *Parser) parseDimensions() (Dimensions, error) {
  258. var ds Dimensions
  259. if t, _ := p.scanIgnoreWhitespace(); t == GROUP {
  260. if t1, l1 := p.scanIgnoreWhitespace(); t1 == BY {
  261. for {
  262. if exp, err := p.ParseExpr(); err != nil {
  263. return nil, err
  264. } else {
  265. d := Dimension{Expr: exp}
  266. ds = append(ds, d)
  267. }
  268. if tok, _ := p.scanIgnoreWhitespace(); tok == COMMA {
  269. continue
  270. } else {
  271. p.unscan()
  272. break
  273. }
  274. }
  275. } else {
  276. return nil, fmt.Errorf("found %q, expected BY statement.", l1)
  277. }
  278. } else {
  279. p.unscan()
  280. }
  281. return ds, nil
  282. }
  283. func (p *Parser) parseHaving() (Expr, error) {
  284. if tok, _ := p.scanIgnoreWhitespace(); tok != HAVING {
  285. p.unscan()
  286. return nil, nil
  287. }
  288. expr, err := p.ParseExpr()
  289. if err != nil {
  290. return nil, err
  291. }
  292. return expr, nil
  293. }
  294. func (p *Parser) parseSorts() (SortFields, error) {
  295. var ss SortFields
  296. if t, _ := p.scanIgnoreWhitespace(); t == ORDER {
  297. if t1, l1 := p.scanIgnoreWhitespace(); t1 == BY {
  298. for {
  299. if t1, l1 = p.scanIgnoreWhitespace(); t1 == IDENT {
  300. s := SortField{Ascending: true}
  301. p.unscan()
  302. if name, err := p.parseFieldNameSections(); err == nil {
  303. s.Name = strings.Join(name, tokens[DOT])
  304. } else {
  305. return nil, err
  306. }
  307. if t2, _ := p.scanIgnoreWhitespace(); t2 == DESC {
  308. s.Ascending = false
  309. ss = append(ss, s)
  310. } else if t2 == ASC {
  311. ss = append(ss, s)
  312. } else {
  313. ss = append(ss, s)
  314. p.unscan()
  315. continue
  316. }
  317. } else if t1 == COMMA {
  318. continue
  319. } else {
  320. p.unscan()
  321. break
  322. }
  323. }
  324. } else {
  325. return nil, fmt.Errorf("found %q, expected BY keyword.", l1)
  326. }
  327. } else {
  328. p.unscan()
  329. }
  330. return ss, nil
  331. }
  332. func (p *Parser) parseFields() (Fields, error) {
  333. var fields Fields
  334. tok, _ := p.scanIgnoreWhitespace()
  335. if tok == ASTERISK {
  336. fields = append(fields, Field{AName: "", Expr: &Wildcard{Token: tok}})
  337. return fields, nil
  338. }
  339. p.unscan()
  340. for {
  341. field, err := p.parseField()
  342. if err != nil {
  343. return nil, err
  344. } else {
  345. fields = append(fields, *field)
  346. }
  347. tok, _ = p.scanIgnoreWhitespace()
  348. if tok != COMMA {
  349. p.unscan()
  350. break
  351. }
  352. }
  353. return fields, nil
  354. }
  355. func (p *Parser) parseField() (*Field, error) {
  356. field := &Field{}
  357. if exp, err := p.ParseExpr(); err != nil {
  358. return nil, err
  359. } else {
  360. if e, ok := exp.(*FieldRef); ok {
  361. field.Name = e.Name
  362. } else if e, ok := exp.(*Call); ok {
  363. field.Name = e.Name
  364. }
  365. field.Expr = exp
  366. }
  367. if alias, err := p.parseAlias(); err != nil {
  368. return nil, err
  369. } else {
  370. if alias != "" {
  371. field.AName = alias
  372. }
  373. }
  374. return field, nil
  375. }
  376. func (p *Parser) parseAlias() (string, error) {
  377. tok, lit := p.scanIgnoreWhitespace()
  378. if tok == AS {
  379. if tok, lit = p.scanIgnoreWhitespace(); tok != IDENT {
  380. return "", fmt.Errorf("found %q, expected as alias.", lit)
  381. } else {
  382. return lit, nil
  383. }
  384. }
  385. p.unscan()
  386. return "", nil
  387. }
  388. func (p *Parser) ParseExpr() (Expr, error) {
  389. var err error
  390. root := &BinaryExpr{}
  391. root.RHS, err = p.parseUnaryExpr()
  392. if err != nil {
  393. return nil, err
  394. }
  395. for {
  396. op, _ := p.scanIgnoreWhitespace()
  397. if !op.isOperator() {
  398. p.unscan()
  399. return root.RHS, nil
  400. } else if op == ASTERISK { //Change the asterisk to Mul token.
  401. op = MUL
  402. } else if op == LBRACKET { //LBRACKET is a special token, need to unscan
  403. op = SUBSET
  404. p.unscan()
  405. }
  406. var rhs Expr
  407. if rhs, err = p.parseUnaryExpr(); err != nil {
  408. return nil, err
  409. }
  410. for node := root; ; {
  411. r, ok := node.RHS.(*BinaryExpr)
  412. if !ok || r.OP.Precedence() >= op.Precedence() {
  413. node.RHS = &BinaryExpr{LHS: node.RHS, RHS: rhs, OP: op}
  414. break
  415. }
  416. node = r
  417. }
  418. }
  419. return nil, nil
  420. }
  421. func (p *Parser) parseUnaryExpr() (Expr, error) {
  422. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == LPAREN {
  423. expr, err := p.ParseExpr()
  424. if err != nil {
  425. return nil, err
  426. }
  427. // Expect an RPAREN at the end.
  428. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != RPAREN {
  429. return nil, fmt.Errorf("found %q, expected right paren.", lit2)
  430. }
  431. return &ParenExpr{Expr: expr}, nil
  432. } else if tok1 == LBRACKET {
  433. return p.parseBracketExpr()
  434. }
  435. p.unscan()
  436. tok, lit := p.scanIgnoreWhitespace()
  437. if tok == IDENT {
  438. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == LPAREN {
  439. return p.parseCall(lit)
  440. }
  441. p.unscan() //Back the Lparen token
  442. p.unscan() //Back the ident token
  443. if n, err := p.parseFieldNameSections(); err != nil {
  444. return nil, err
  445. } else {
  446. if len(n) == 2 {
  447. return &FieldRef{StreamName: StreamName(n[0]), Name: n[1]}, nil
  448. }
  449. return &FieldRef{StreamName: StreamName(""), Name: n[0]}, nil
  450. }
  451. } else if tok == STRING {
  452. return &StringLiteral{Val: lit}, nil
  453. } else if tok == INTEGER {
  454. val, _ := strconv.Atoi(lit)
  455. return &IntegerLiteral{Val: val}, nil
  456. } else if tok == NUMBER {
  457. if v, err := strconv.ParseFloat(lit, 64); err != nil {
  458. return nil, fmt.Errorf("found %q, invalid number value.", lit)
  459. } else {
  460. return &NumberLiteral{Val: v}, nil
  461. }
  462. } else if tok == TRUE || tok == FALSE {
  463. if v, err := strconv.ParseBool(lit); err != nil {
  464. return nil, fmt.Errorf("found %q, invalid boolean value.", lit)
  465. } else {
  466. return &BooleanLiteral{Val:v}, nil
  467. }
  468. } else if tok.isTimeLiteral() {
  469. return &TimeLiteral{Val:tok}, nil
  470. }
  471. return nil, fmt.Errorf("found %q, expected expression.", lit)
  472. }
  473. func (p *Parser) parseBracketExpr() (Expr, error){
  474. tok2, lit2 := p.scanIgnoreWhitespace()
  475. if tok2 == RBRACKET {
  476. //field[]
  477. return &ColonExpr{Start:0, End:-1}, nil
  478. } else if tok2 == INTEGER {
  479. start, err := strconv.Atoi(lit2)
  480. if err != nil {
  481. return nil, fmt.Errorf("The start index %s is not an int value in bracket expression.", lit2)
  482. }
  483. if tok3, _ := p.scanIgnoreWhitespace(); tok3 == RBRACKET {
  484. //Such as field[2]
  485. return &IndexExpr{Index:start}, nil
  486. } else if tok3 == COLON {
  487. //Such as field[2:] or field[2:4]
  488. return p.parseColonExpr(start)
  489. }
  490. } else if tok2 == COLON {
  491. //Such as field[:3] or [:]
  492. return p.parseColonExpr(0)
  493. }
  494. return nil, fmt.Errorf("Unexpected token %q. when parsing bracket expressions.", lit2)
  495. }
  496. func (p *Parser) parseColonExpr(start int) (Expr, error) {
  497. tok, lit := p.scanIgnoreWhitespace()
  498. if tok == INTEGER {
  499. end, err := strconv.Atoi(lit)
  500. if err != nil {
  501. return nil, fmt.Errorf("The end index %s is not an int value in bracket expression.", lit)
  502. }
  503. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == RBRACKET {
  504. return &ColonExpr{Start:start, End: end}, nil
  505. } else {
  506. return nil, fmt.Errorf("Found %q, expected right bracket.", lit1)
  507. }
  508. } else if tok == RBRACKET {
  509. return &ColonExpr{Start:start, End: -1}, nil
  510. }
  511. return nil, fmt.Errorf("Found %q, expected right bracket.", lit)
  512. }
  513. func (p *Parser) parseAs(f *Field) (*Field, error) {
  514. tok, lit := p.scanIgnoreWhitespace()
  515. if tok != IDENT {
  516. return nil, fmt.Errorf("found %q, expected as alias.", lit)
  517. }
  518. f.AName = lit
  519. return f, nil
  520. }
  521. func (p *Parser) parseCall(name string) (Expr, error) {
  522. var args []Expr
  523. for {
  524. if tok, _ := p.scanIgnoreWhitespace(); tok == RPAREN {
  525. return &Call{Name: name, Args: args}, nil
  526. } else if tok == ASTERISK {
  527. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != RPAREN {
  528. return nil, fmt.Errorf("found %q, expected right paren.", lit2)
  529. } else {
  530. args = append(args, &StringLiteral{Val:"*"})
  531. return &Call{Name: name, Args: args}, nil
  532. }
  533. } else {
  534. p.unscan()
  535. }
  536. if exp, err := p.ParseExpr(); err != nil {
  537. return nil, err
  538. } else {
  539. args = append(args, exp)
  540. }
  541. if tok, _ := p.scanIgnoreWhitespace(); tok != COMMA {
  542. p.unscan()
  543. break
  544. }
  545. }
  546. if tok, lit := p.scanIgnoreWhitespace(); tok != RPAREN {
  547. return nil, fmt.Errorf("found function call %q, expected ), but with %q.", name, lit)
  548. }
  549. if wt, error := validateWindows(name, args); wt == NOT_WINDOW {
  550. if valErr := validateFuncs(name, args); valErr != nil {
  551. return nil, valErr
  552. }
  553. return &Call{Name: name, Args: args}, nil
  554. } else {
  555. if error != nil {
  556. return nil, error
  557. }
  558. return p.ConvertToWindows(wt, name, args)
  559. }
  560. }
  561. func validateWindows(name string, args []Expr) (WindowType, error) {
  562. fname := strings.ToLower(name)
  563. switch fname {
  564. case "tumblingwindow":
  565. if err := validateWindow(fname, 2, args); err != nil {
  566. return TUMBLING_WINDOW, err
  567. }
  568. return TUMBLING_WINDOW, nil
  569. case "hoppingwindow":
  570. if err := validateWindow(fname, 3, args); err != nil {
  571. return HOPPING_WINDOW, err
  572. }
  573. return HOPPING_WINDOW, nil
  574. case "sessionwindow":
  575. if err := validateWindow(fname, 3, args); err != nil {
  576. return SESSION_WINDOW, err
  577. }
  578. return SESSION_WINDOW, nil
  579. case "slidingwindow":
  580. if err := validateWindow(fname, 2, args); err != nil {
  581. return SLIDING_WINDOW, err
  582. }
  583. return SLIDING_WINDOW, nil
  584. }
  585. return NOT_WINDOW, nil
  586. }
  587. func validateWindow(funcName string, expectLen int, args []Expr) (error) {
  588. if len(args) != expectLen {
  589. return fmt.Errorf("The arguments for %s should be %d.\n", funcName, expectLen)
  590. }
  591. if _, ok := args[0].(*TimeLiteral); !ok {
  592. return fmt.Errorf("The 1st argument for %s is expecting timer literal expression. One value of [dd|hh|mi|ss|ms].\n", funcName)
  593. }
  594. for i := 1; i< len(args); i++ {
  595. if _, ok := args[i].(*IntegerLiteral); !ok {
  596. return fmt.Errorf("The %d argument for %s is expecting interger literal expression. \n", i, funcName)
  597. }
  598. }
  599. return nil
  600. }
  601. func (p *Parser) ConvertToWindows(wtype WindowType, name string, args []Expr) (*Window, error) {
  602. win := &Window{WindowType: wtype}
  603. var unit = 1
  604. v := args[0].(*TimeLiteral).Val
  605. switch v{
  606. case DD:
  607. unit = 24 * 3600 * 1000
  608. case HH:
  609. unit = 3600 * 1000
  610. case MI:
  611. unit = 60 * 1000
  612. case SS:
  613. unit = 1000
  614. case MS:
  615. unit = 1
  616. default:
  617. return nil, fmt.Errorf("Invalid timeliteral %s", v)
  618. }
  619. win.Length = &IntegerLiteral{Val : args[1].(*IntegerLiteral).Val * unit}
  620. if len(args) > 2{
  621. win.Interval = &IntegerLiteral{Val : args[2].(*IntegerLiteral).Val * unit}
  622. }else{
  623. win.Interval = &IntegerLiteral{Val: 0}
  624. }
  625. return win, nil
  626. }
  627. func (p *Parser) ParseCreateStreamStmt() (*StreamStmt, error) {
  628. stmt := &StreamStmt{}
  629. if tok, _ := p.scanIgnoreWhitespace(); tok == CREATE {
  630. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == STREAM {
  631. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  632. stmt.Name = StreamName(lit2)
  633. if fields, err := p.parseStreamFields(); err != nil {
  634. return nil, err
  635. } else {
  636. stmt.StreamFields = fields
  637. }
  638. if opts, err := p.parseStreamOptions(); err != nil {
  639. return nil, err
  640. } else {
  641. stmt.Options = opts
  642. }
  643. if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == SEMICOLON {
  644. p.unscan()
  645. return stmt, nil
  646. } else if tok3 == EOF {
  647. //Finish parsing create stream statement.
  648. return stmt, nil
  649. } else {
  650. return nil, fmt.Errorf("found %q, expected semicolon or EOF.", lit3)
  651. }
  652. } else {
  653. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  654. }
  655. } else {
  656. return nil, fmt.Errorf("found %q, expected keyword stream.", lit1)
  657. }
  658. } else {
  659. p.unscan()
  660. return nil, nil
  661. }
  662. return stmt, nil
  663. }
  664. func (p *Parser) parseShowStreamsStmt() (*ShowStreamsStatement, error) {
  665. ss := &ShowStreamsStatement{}
  666. if tok, _ := p.scanIgnoreWhitespace(); tok == SHOW {
  667. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == STREAMS {
  668. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == EOF || tok2 == SEMICOLON {
  669. return ss, nil
  670. } else {
  671. return nil, fmt.Errorf("found %q, expected semecolon or EOF.", lit2)
  672. }
  673. } else {
  674. return nil, fmt.Errorf("found %q, expected keyword streams.", lit1)
  675. }
  676. } else {
  677. p.unscan()
  678. return nil, nil
  679. }
  680. return ss, nil
  681. }
  682. func (p *Parser) parseDescribeStreamStmt() (*DescribeStreamStatement, error) {
  683. dss := &DescribeStreamStatement{}
  684. if tok, _ := p.scanIgnoreWhitespace(); tok == DESCRIBE {
  685. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == STREAM {
  686. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  687. dss.Name = lit2
  688. return dss, nil
  689. } else {
  690. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  691. }
  692. } else {
  693. return nil, fmt.Errorf("found %q, expected keyword stream.", lit1)
  694. }
  695. } else {
  696. p.unscan()
  697. return nil, nil
  698. }
  699. }
  700. func (p *Parser) parseExplainStreamsStmt() (*ExplainStreamStatement, error) {
  701. ess := &ExplainStreamStatement{}
  702. if tok, _ := p.scanIgnoreWhitespace(); tok == EXPLAIN {
  703. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == STREAM {
  704. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  705. ess.Name = lit2
  706. return ess, nil
  707. } else {
  708. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  709. }
  710. } else {
  711. return nil, fmt.Errorf("found %q, expected keyword stream.", lit1)
  712. }
  713. } else {
  714. p.unscan()
  715. return nil, nil
  716. }
  717. }
  718. func (p *Parser) parseDropStreamsStmt() (*DropStreamStatement, error) {
  719. ess := &DropStreamStatement{}
  720. if tok, _ := p.scanIgnoreWhitespace(); tok == DROP {
  721. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == STREAM {
  722. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  723. ess.Name = lit2
  724. return ess, nil
  725. } else {
  726. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  727. }
  728. } else {
  729. return nil, fmt.Errorf("found %q, expected keyword stream.", lit1)
  730. }
  731. } else {
  732. p.unscan()
  733. return nil, nil
  734. }
  735. }
  736. func (p *Parser) parseStreamFields() (StreamFields, error) {
  737. lStack := &stack.Stack{}
  738. var fields StreamFields
  739. if tok, lit := p.scanIgnoreWhitespace(); tok == LPAREN {
  740. lStack.Push(lit)
  741. for {
  742. if f, err := p.parseStreamField(); err != nil {
  743. return nil, err
  744. } else {
  745. fields = append(fields, *f)
  746. }
  747. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == RPAREN {
  748. lStack.Pop()
  749. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == WITH {
  750. //Check the stack for LPAREN; If the stack for LPAREN is not zero, then it's not correct.
  751. if lStack.Len() > 0 {
  752. return nil, fmt.Errorf("Parenthesis is not matched.")
  753. }
  754. break
  755. } else if tok2 == COMMA {
  756. if lStack.Len() > 0 {
  757. return nil, fmt.Errorf("Parenthesis is in create record type not matched.")
  758. }
  759. p.unscan()
  760. break
  761. } else if tok2 == RPAREN { //The nested type definition of ARRAY and Struct, such as "field ARRAY(STRUCT(f BIGINT))"
  762. if lStack.Len() > 0 {
  763. return nil, fmt.Errorf("Parenthesis is not matched.")
  764. }
  765. p.unscan()
  766. break
  767. } else {
  768. if lStack.Len() == 0 {
  769. return nil, fmt.Errorf("found %q, expected is with.", lit2)
  770. }
  771. p.unscan()
  772. }
  773. } else {
  774. p.unscan()
  775. }
  776. }
  777. } else {
  778. return nil, fmt.Errorf("found %q, expected lparen after stream name.", lit)
  779. }
  780. return fields, nil
  781. }
  782. func (p *Parser) parseStreamField() (*StreamField, error) {
  783. field := &StreamField{}
  784. if tok, lit := p.scanIgnoreWhitespace(); tok == IDENT {
  785. field.Name = lit
  786. tok1, lit1 := p.scanIgnoreWhitespace()
  787. if t := getDataType(tok1); t != UNKNOWN && t.isSimpleType() {
  788. field.FieldType = &BasicType{Type: t}
  789. } else if t == ARRAY {
  790. if f, e := p.parseStreamArrayType(); e != nil {
  791. return nil, e
  792. } else {
  793. field.FieldType = f
  794. }
  795. } else if t == STRUCT {
  796. if f, e := p.parseStreamStructType(); e != nil {
  797. return nil, e
  798. } else {
  799. field.FieldType = f
  800. }
  801. } else if t == UNKNOWN {
  802. return nil, fmt.Errorf("found %q, expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | ARRAY | STRUCT).", lit1)
  803. }
  804. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == COMMA {
  805. //Just consume the comma.
  806. } else if tok2 == RPAREN {
  807. p.unscan()
  808. } else {
  809. return nil, fmt.Errorf("found %q, expect comma or rparen.", lit2)
  810. }
  811. } else {
  812. return nil, fmt.Errorf("found %q, expect stream field name.", lit)
  813. }
  814. return field, nil
  815. }
  816. func (p *Parser) parseStreamArrayType() (FieldType, error) {
  817. lStack := &stack.Stack{}
  818. if tok, _ := p.scanIgnoreWhitespace(); tok == LPAREN {
  819. lStack.Push(LPAREN)
  820. tok1, lit1 := p.scanIgnoreWhitespace()
  821. if t := getDataType(tok1); t != UNKNOWN && t.isSimpleType() {
  822. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == RPAREN {
  823. lStack.Pop()
  824. if lStack.Len() > 0 {
  825. return nil, fmt.Errorf("Parenthesis is in array type not matched.")
  826. }
  827. return &ArrayType{Type: t}, nil
  828. } else {
  829. return nil, fmt.Errorf("found %q, expect rparen in array type definition.", lit2)
  830. }
  831. } else if tok1 == XSTRUCT {
  832. if f, err := p.parseStreamStructType(); err != nil {
  833. return nil, err
  834. } else {
  835. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == RPAREN {
  836. lStack.Pop()
  837. if lStack.Len() > 0 {
  838. return nil, fmt.Errorf("Parenthesis is in struct of array type %q not matched.", tok1)
  839. }
  840. return &ArrayType{Type: STRUCT, FieldType: f}, nil
  841. } else {
  842. return nil, fmt.Errorf("found %q, expect rparen in struct of array type definition.", lit2)
  843. }
  844. }
  845. } else if tok1 == COMMA {
  846. p.unscan()
  847. } else {
  848. return nil, fmt.Errorf("found %q, expect stream data types.", lit1)
  849. }
  850. } else {
  851. }
  852. return nil, nil
  853. }
  854. func (p *Parser) parseStreamStructType() (FieldType, error) {
  855. rf := &RecType{}
  856. if sfs, err := p.parseStreamFields(); err != nil {
  857. return nil, err
  858. } else {
  859. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == COMMA {
  860. rf.StreamFields = sfs
  861. } else if tok2 == RPAREN {
  862. rf.StreamFields = sfs
  863. p.unscan()
  864. } else {
  865. return nil, fmt.Errorf("found %q, expect comma in create stream record statement.", lit2)
  866. }
  867. }
  868. return rf, nil
  869. }
  870. func (p *Parser) parseStreamOptions() (map[string]string, error) {
  871. var opts map[string]string = make(map[string]string)
  872. lStack := &stack.Stack{}
  873. if tok, lit := p.scanIgnoreWhitespace(); tok == LPAREN {
  874. lStack.Push(LPAREN)
  875. for {
  876. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == DATASOURCE || tok1 == FORMAT || tok1 == KEY || tok1 == CONF_KEY || tok1 == STRICT_VALIDATION || tok1 == TYPE || tok1 == TIMESTAMP || tok1 == TIMESTAMP_FORMAT {
  877. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == EQ {
  878. if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == STRING {
  879. if tok1 == STRICT_VALIDATION {
  880. if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
  881. return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, tok1)
  882. }
  883. }
  884. opts[lit1] = lit3
  885. } else {
  886. return nil, fmt.Errorf("found %q, expect string value in option.", lit3)
  887. }
  888. } else {
  889. return nil, fmt.Errorf("found %q, expect equals(=) in options.", lit2)
  890. }
  891. } else if tok1 == COMMA {
  892. continue
  893. } else if tok1 == RPAREN {
  894. if lStack.Pop(); lStack.Len() == 0 {
  895. break
  896. } else {
  897. return nil, fmt.Errorf("Parenthesis is not matched in options definition.")
  898. }
  899. } else {
  900. return nil, fmt.Errorf("found %q, unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE).", lit1)
  901. }
  902. }
  903. } else {
  904. return nil, fmt.Errorf("found %q, expect stream options.", lit)
  905. }
  906. return opts, nil
  907. }