parser.go 31 KB


  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/golang-collections/collections/stack"
  6. "io"
  7. "math"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. )
  12. const DEFAULT_STREAM = "$default"
  13. const MULTI_STREAM = "$multi"
  14. type Parser struct {
  15. s *Scanner
  16. i int // buffer index
  17. n int // buffer char count
  18. buf [3]struct {
  19. tok Token
  20. lit string
  21. }
  22. inmeta bool
  23. }
  24. func (p *Parser) parseCondition() (Expr, error) {
  25. if tok, _ := p.scanIgnoreWhitespace(); tok != WHERE {
  26. p.unscan()
  27. return nil, nil
  28. }
  29. expr, err := p.ParseExpr()
  30. if err != nil {
  31. return nil, err
  32. }
  33. return expr, nil
  34. }
  35. func (p *Parser) scan() (tok Token, lit string) {
  36. if p.n > 0 {
  37. p.n--
  38. return p.curr()
  39. }
  40. tok, lit = p.s.Scan()
  41. if tok != WS && tok != COMMENT {
  42. p.i = (p.i + 1) % len(p.buf)
  43. buf := &p.buf[p.i]
  44. buf.tok, buf.lit = tok, lit
  45. }
  46. return
  47. }
  48. func (p *Parser) curr() (Token, string) {
  49. i := (p.i - p.n + len(p.buf)) % len(p.buf)
  50. buf := &p.buf[i]
  51. return buf.tok, buf.lit
  52. }
  53. func (p *Parser) scanIgnoreWhitespace() (tok Token, lit string) {
  54. tok, lit = p.scan()
  55. for {
  56. if tok == WS || tok == COMMENT {
  57. tok, lit = p.scan()
  58. } else {
  59. break
  60. }
  61. }
  62. return tok, lit
  63. }
  64. func (p *Parser) unscan() { p.n++ }
  65. func NewParser(r io.Reader) *Parser {
  66. return &Parser{s: NewScanner(r)}
  67. }
  68. func (p *Parser) ParseQueries() (SelectStatements, error) {
  69. var stmts SelectStatements
  70. if stmt, err := p.Parse(); err != nil {
  71. return nil, err
  72. } else {
  73. stmts = append(stmts, *stmt)
  74. }
  75. for {
  76. if tok, _ := p.scanIgnoreWhitespace(); tok == SEMICOLON {
  77. if stmt, err := p.Parse(); err != nil {
  78. return nil, err
  79. } else {
  80. if stmt != nil {
  81. stmts = append(stmts, *stmt)
  82. }
  83. }
  84. } else if tok == EOF {
  85. break
  86. }
  87. }
  88. return stmts, nil
  89. }
  90. func (p *Parser) Parse() (*SelectStatement, error) {
  91. selects := &SelectStatement{}
  92. if tok, lit := p.scanIgnoreWhitespace(); tok == EOF {
  93. return nil, nil
  94. } else if tok != SELECT {
  95. return nil, fmt.Errorf("Found %q, Expected SELECT.\n", lit)
  96. }
  97. if fields, err := p.parseFields(); err != nil {
  98. return nil, err
  99. } else {
  100. selects.Fields = fields
  101. }
  102. if src, err := p.parseSource(); err != nil {
  103. return nil, err
  104. } else {
  105. selects.Sources = src
  106. }
  107. if joins, err := p.parseJoins(); err != nil {
  108. return nil, err
  109. } else {
  110. selects.Joins = joins
  111. }
  112. if exp, err := p.parseCondition(); err != nil {
  113. return nil, err
  114. } else {
  115. if exp != nil {
  116. selects.Condition = exp
  117. }
  118. }
  119. if dims, err := p.parseDimensions(); err != nil {
  120. return nil, err
  121. } else {
  122. selects.Dimensions = dims
  123. }
  124. if having, err := p.parseHaving(); err != nil {
  125. return nil, err
  126. } else {
  127. selects.Having = having
  128. }
  129. if sorts, err := p.parseSorts(); err != nil {
  130. return nil, err
  131. } else {
  132. selects.SortFields = sorts
  133. }
  134. if tok, lit := p.scanIgnoreWhitespace(); tok == SEMICOLON {
  135. p.unscan()
  136. return selects, nil
  137. } else if tok != EOF {
  138. return nil, fmt.Errorf("found %q, expected EOF.", lit)
  139. }
  140. if err := Validate(selects); err != nil {
  141. return nil, err
  142. }
  143. return selects, nil
  144. }
  145. func (p *Parser) parseSource() (Sources, error) {
  146. var sources Sources
  147. if tok, lit := p.scanIgnoreWhitespace(); tok != FROM {
  148. return nil, fmt.Errorf("found %q, expected FROM.", lit)
  149. }
  150. if src, alias, err := p.parseSourceLiteral(); err != nil {
  151. return nil, err
  152. } else {
  153. sources = append(sources, &Table{Name: src, Alias: alias})
  154. }
  155. return sources, nil
  156. }
  157. //TODO Current func has problems when the source includes white space.
  158. func (p *Parser) parseSourceLiteral() (string, string, error) {
  159. var sourceSeg []string
  160. var alias string
  161. for {
  162. //HASH, DIV & ADD token is specially support for MQTT topic name patterns.
  163. if tok, lit := p.scanIgnoreWhitespace(); tok.allowedSourceToken() {
  164. sourceSeg = append(sourceSeg, lit)
  165. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == AS {
  166. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  167. alias = lit2
  168. } else {
  169. return "", "", fmt.Errorf("found %q, expected JOIN key word.", lit)
  170. }
  171. } else if tok1.allowedSourceToken() {
  172. sourceSeg = append(sourceSeg, lit1)
  173. } else {
  174. p.unscan()
  175. break
  176. }
  177. } else {
  178. p.unscan()
  179. break
  180. }
  181. }
  182. return strings.Join(sourceSeg, ""), alias, nil
  183. }
  184. func (p *Parser) parseFieldNameSections() ([]string, error) {
  185. var fieldNameSects []string
  186. for {
  187. if tok, lit := p.scanIgnoreWhitespace(); tok == IDENT || tok == ASTERISK {
  188. fieldNameSects = append(fieldNameSects, lit)
  189. if tok1, _ := p.scanIgnoreWhitespace(); !tok1.allowedSFNToken() {
  190. p.unscan()
  191. break
  192. }
  193. } else {
  194. p.unscan()
  195. break
  196. }
  197. }
  198. if len(fieldNameSects) == 0 {
  199. return nil, fmt.Errorf("Cannot find any field name.\n")
  200. } else if len(fieldNameSects) > 2 {
  201. return nil, fmt.Errorf("Too many field names. Please use -> to reference keys in struct.\n")
  202. }
  203. return fieldNameSects, nil
  204. }
  205. func (p *Parser) parseJoins() (Joins, error) {
  206. var joins Joins
  207. for {
  208. if tok, lit := p.scanIgnoreWhitespace(); tok == INNER || tok == LEFT || tok == RIGHT || tok == FULL || tok == CROSS {
  209. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == JOIN {
  210. var jt = INNER_JOIN
  211. switch tok {
  212. case INNER:
  213. jt = INNER_JOIN
  214. case LEFT:
  215. jt = LEFT_JOIN
  216. case RIGHT:
  217. jt = RIGHT_JOIN
  218. case FULL:
  219. jt = FULL_JOIN
  220. case CROSS:
  221. jt = CROSS_JOIN
  222. }
  223. if j, err := p.ParseJoin(jt); err != nil {
  224. return nil, err
  225. } else {
  226. joins = append(joins, *j)
  227. }
  228. } else {
  229. return nil, fmt.Errorf("found %q, expected JOIN key word.", lit)
  230. }
  231. } else {
  232. p.unscan()
  233. if len(joins) > 0 {
  234. return joins, nil
  235. }
  236. return nil, nil
  237. }
  238. }
  239. return joins, nil
  240. }
  241. func (p *Parser) ParseJoin(joinType JoinType) (*Join, error) {
  242. var j = &Join{JoinType: joinType}
  243. if src, alias, err := p.parseSourceLiteral(); err != nil {
  244. return nil, err
  245. } else {
  246. j.Name = src
  247. j.Alias = alias
  248. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ON {
  249. if CROSS_JOIN == joinType {
  250. return nil, fmt.Errorf("On expression is not required for cross join type.\n")
  251. }
  252. if exp, err := p.ParseExpr(); err != nil {
  253. return nil, err
  254. } else {
  255. j.Expr = exp
  256. }
  257. } else {
  258. p.unscan()
  259. }
  260. }
  261. return j, nil
  262. }
  263. func (p *Parser) parseDimensions() (Dimensions, error) {
  264. var ds Dimensions
  265. if t, _ := p.scanIgnoreWhitespace(); t == GROUP {
  266. if t1, l1 := p.scanIgnoreWhitespace(); t1 == BY {
  267. for {
  268. if exp, err := p.ParseExpr(); err != nil {
  269. return nil, err
  270. } else {
  271. d := Dimension{Expr: exp}
  272. ds = append(ds, d)
  273. }
  274. if tok, _ := p.scanIgnoreWhitespace(); tok == COMMA {
  275. continue
  276. } else {
  277. p.unscan()
  278. break
  279. }
  280. }
  281. } else {
  282. return nil, fmt.Errorf("found %q, expected BY statement.", l1)
  283. }
  284. } else {
  285. p.unscan()
  286. }
  287. return ds, nil
  288. }
  289. func (p *Parser) parseHaving() (Expr, error) {
  290. if tok, _ := p.scanIgnoreWhitespace(); tok != HAVING {
  291. p.unscan()
  292. return nil, nil
  293. }
  294. expr, err := p.ParseExpr()
  295. if err != nil {
  296. return nil, err
  297. }
  298. return expr, nil
  299. }
  300. func (p *Parser) parseSorts() (SortFields, error) {
  301. var ss SortFields
  302. if t, _ := p.scanIgnoreWhitespace(); t == ORDER {
  303. if t1, l1 := p.scanIgnoreWhitespace(); t1 == BY {
  304. for {
  305. if t1, l1 = p.scanIgnoreWhitespace(); t1 == IDENT {
  306. s := SortField{Ascending: true}
  307. p.unscan()
  308. if name, err := p.parseFieldNameSections(); err == nil {
  309. s.Name = strings.Join(name, tokens[COLSEP])
  310. } else {
  311. return nil, err
  312. }
  313. if t2, _ := p.scanIgnoreWhitespace(); t2 == DESC {
  314. s.Ascending = false
  315. ss = append(ss, s)
  316. } else if t2 == ASC {
  317. ss = append(ss, s)
  318. } else {
  319. ss = append(ss, s)
  320. p.unscan()
  321. continue
  322. }
  323. } else if t1 == COMMA {
  324. continue
  325. } else {
  326. p.unscan()
  327. break
  328. }
  329. }
  330. } else {
  331. return nil, fmt.Errorf("found %q, expected BY keyword.", l1)
  332. }
  333. } else {
  334. p.unscan()
  335. }
  336. return ss, nil
  337. }
  338. func (p *Parser) parseFields() (Fields, error) {
  339. var fields Fields
  340. tok, _ := p.scanIgnoreWhitespace()
  341. if tok == ASTERISK {
  342. fields = append(fields, Field{AName: "", Expr: &Wildcard{Token: tok}})
  343. return fields, nil
  344. }
  345. p.unscan()
  346. for {
  347. field, err := p.parseField()
  348. if err != nil {
  349. return nil, err
  350. } else {
  351. fields = append(fields, *field)
  352. }
  353. tok, _ = p.scanIgnoreWhitespace()
  354. if tok != COMMA {
  355. p.unscan()
  356. break
  357. }
  358. }
  359. return fields, nil
  360. }
  361. func (p *Parser) parseField() (*Field, error) {
  362. field := &Field{}
  363. if exp, err := p.ParseExpr(); err != nil {
  364. return nil, err
  365. } else {
  366. if e, ok := exp.(*FieldRef); ok {
  367. field.Name = e.Name
  368. } else if e, ok := exp.(*Call); ok {
  369. field.Name = e.Name
  370. }
  371. field.Expr = exp
  372. }
  373. if alias, err := p.parseAlias(); err != nil {
  374. return nil, err
  375. } else {
  376. if alias != "" {
  377. field.AName = alias
  378. }
  379. }
  380. return field, nil
  381. }
  382. func (p *Parser) parseAlias() (string, error) {
  383. tok, lit := p.scanIgnoreWhitespace()
  384. if tok == AS {
  385. if tok, lit = p.scanIgnoreWhitespace(); tok != IDENT {
  386. return "", fmt.Errorf("found %q, expected as alias.", lit)
  387. } else {
  388. return lit, nil
  389. }
  390. }
  391. p.unscan()
  392. return "", nil
  393. }
  394. func (p *Parser) ParseExpr() (Expr, error) {
  395. var err error
  396. root := &BinaryExpr{}
  397. root.RHS, err = p.parseUnaryExpr(false)
  398. if err != nil {
  399. return nil, err
  400. }
  401. for {
  402. op, _ := p.scanIgnoreWhitespace()
  403. if !op.isOperator() {
  404. p.unscan()
  405. return root.RHS, nil
  406. } else if op == ASTERISK { //Change the asterisk to Mul token.
  407. op = MUL
  408. } else if op == LBRACKET { //LBRACKET is a special token, need to unscan
  409. op = SUBSET
  410. p.unscan()
  411. }
  412. var rhs Expr
  413. if rhs, err = p.parseUnaryExpr(op == ARROW); err != nil {
  414. return nil, err
  415. }
  416. for node := root; ; {
  417. r, ok := node.RHS.(*BinaryExpr)
  418. if !ok || r.OP.Precedence() >= op.Precedence() {
  419. node.RHS = &BinaryExpr{LHS: node.RHS, RHS: rhs, OP: op}
  420. break
  421. }
  422. node = r
  423. }
  424. }
  425. return nil, nil
  426. }
  427. func (p *Parser) parseUnaryExpr(isSubField bool) (Expr, error) {
  428. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == LPAREN {
  429. expr, err := p.ParseExpr()
  430. if err != nil {
  431. return nil, err
  432. }
  433. // Expect an RPAREN at the end.
  434. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != RPAREN {
  435. return nil, fmt.Errorf("found %q, expected right paren.", lit2)
  436. }
  437. return &ParenExpr{Expr: expr}, nil
  438. } else if tok1 == LBRACKET {
  439. return p.parseBracketExpr()
  440. }
  441. p.unscan()
  442. tok, lit := p.scanIgnoreWhitespace()
  443. if tok == CASE {
  444. return p.parseCaseExpr()
  445. } else if tok == IDENT {
  446. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == LPAREN {
  447. return p.parseCall(lit)
  448. }
  449. p.unscan() //Back the Lparen token
  450. p.unscan() //Back the ident token
  451. if n, err := p.parseFieldNameSections(); err != nil {
  452. return nil, err
  453. } else {
  454. if p.inmeta {
  455. if len(n) == 2 {
  456. return &MetaRef{StreamName: StreamName(n[0]), Name: n[1]}, nil
  457. }
  458. if isSubField {
  459. return &MetaRef{StreamName: "", Name: n[0]}, nil
  460. }
  461. return &MetaRef{StreamName: DEFAULT_STREAM, Name: n[0]}, nil
  462. } else {
  463. if len(n) == 2 {
  464. return &FieldRef{StreamName: StreamName(n[0]), Name: n[1]}, nil
  465. }
  466. if isSubField {
  467. return &FieldRef{StreamName: "", Name: n[0]}, nil
  468. }
  469. return &FieldRef{StreamName: DEFAULT_STREAM, Name: n[0]}, nil
  470. }
  471. }
  472. } else if tok == STRING {
  473. return &StringLiteral{Val: lit}, nil
  474. } else if tok == INTEGER {
  475. val, _ := strconv.Atoi(lit)
  476. return &IntegerLiteral{Val: val}, nil
  477. } else if tok == NUMBER {
  478. if v, err := strconv.ParseFloat(lit, 64); err != nil {
  479. return nil, fmt.Errorf("found %q, invalid number value.", lit)
  480. } else {
  481. return &NumberLiteral{Val: v}, nil
  482. }
  483. } else if tok == TRUE || tok == FALSE {
  484. if v, err := strconv.ParseBool(lit); err != nil {
  485. return nil, fmt.Errorf("found %q, invalid boolean value.", lit)
  486. } else {
  487. return &BooleanLiteral{Val: v}, nil
  488. }
  489. } else if tok.isTimeLiteral() {
  490. return &TimeLiteral{Val: tok}, nil
  491. }
  492. return nil, fmt.Errorf("found %q, expected expression.", lit)
  493. }
  494. func (p *Parser) parseBracketExpr() (Expr, error) {
  495. tok2, lit2 := p.scanIgnoreWhitespace()
  496. if tok2 == RBRACKET {
  497. //field[]
  498. return &ColonExpr{Start: 0, End: math.MinInt32}, nil
  499. } else if tok2 == INTEGER {
  500. start, err := strconv.Atoi(lit2)
  501. if err != nil {
  502. return nil, fmt.Errorf("The start index %s is not an int value in bracket expression.", lit2)
  503. }
  504. if tok3, _ := p.scanIgnoreWhitespace(); tok3 == RBRACKET {
  505. //Such as field[2]
  506. return &IndexExpr{Index: start}, nil
  507. } else if tok3 == COLON {
  508. //Such as field[2:] or field[2:4]
  509. return p.parseColonExpr(start)
  510. }
  511. } else if tok2 == COLON {
  512. //Such as field[:3] or [:]
  513. return p.parseColonExpr(0)
  514. }
  515. return nil, fmt.Errorf("Unexpected token %q. when parsing bracket expressions.", lit2)
  516. }
  517. func (p *Parser) parseColonExpr(start int) (Expr, error) {
  518. tok, lit := p.scanIgnoreWhitespace()
  519. if tok == INTEGER {
  520. end, err := strconv.Atoi(lit)
  521. if err != nil {
  522. return nil, fmt.Errorf("The end index %s is not an int value in bracket expression.", lit)
  523. }
  524. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == RBRACKET {
  525. return &ColonExpr{Start: start, End: end}, nil
  526. } else {
  527. return nil, fmt.Errorf("Found %q, expected right bracket.", lit1)
  528. }
  529. } else if tok == RBRACKET {
  530. return &ColonExpr{Start: start, End: math.MinInt32}, nil
  531. }
  532. return nil, fmt.Errorf("Found %q, expected right bracket.", lit)
  533. }
  534. func (p *Parser) parseAs(f *Field) (*Field, error) {
  535. tok, lit := p.scanIgnoreWhitespace()
  536. if tok != IDENT {
  537. return nil, fmt.Errorf("found %q, expected as alias.", lit)
  538. }
  539. f.AName = lit
  540. return f, nil
  541. }
  542. func (p *Parser) parseCall(name string) (Expr, error) {
  543. if strings.ToLower(name) == "meta" || strings.ToLower(name) == "mqtt" {
  544. p.inmeta = true
  545. defer func() {
  546. p.inmeta = false
  547. }()
  548. }
  549. var args []Expr
  550. for {
  551. if tok, _ := p.scanIgnoreWhitespace(); tok == RPAREN {
  552. if valErr := validateFuncs(name, nil); valErr != nil {
  553. return nil, valErr
  554. }
  555. return &Call{Name: name, Args: args}, nil
  556. } else if tok == ASTERISK {
  557. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != RPAREN {
  558. return nil, fmt.Errorf("found %q, expected right paren.", lit2)
  559. } else {
  560. if p.inmeta {
  561. args = append(args, &MetaRef{StreamName: "", Name: "*"})
  562. } else {
  563. args = append(args, &Wildcard{Token: ASTERISK})
  564. }
  565. return &Call{Name: name, Args: args}, nil
  566. }
  567. } else {
  568. p.unscan()
  569. }
  570. if exp, err := p.ParseExpr(); err != nil {
  571. return nil, err
  572. } else {
  573. args = append(args, exp)
  574. }
  575. if tok, _ := p.scanIgnoreWhitespace(); tok != COMMA {
  576. p.unscan()
  577. break
  578. }
  579. }
  580. if tok, lit := p.scanIgnoreWhitespace(); tok != RPAREN {
  581. return nil, fmt.Errorf("found function call %q, expected ), but with %q.", name, lit)
  582. }
  583. if wt, error := validateWindows(name, args); wt == NOT_WINDOW {
  584. if valErr := validateFuncs(name, args); valErr != nil {
  585. return nil, valErr
  586. }
  587. // Add context for some aggregate func
  588. if name == "deduplicate" {
  589. args = append([]Expr{&Wildcard{Token: ASTERISK}}, args...)
  590. }
  591. return &Call{Name: name, Args: args}, nil
  592. } else {
  593. if error != nil {
  594. return nil, error
  595. }
  596. win, err := p.ConvertToWindows(wt, args)
  597. if err != nil {
  598. return nil, error
  599. }
  600. // parse filter clause
  601. f, err := p.parseFilter()
  602. if err != nil {
  603. return nil, err
  604. } else if f != nil {
  605. win.Filter = f
  606. }
  607. return win, nil
  608. }
  609. }
  610. func (p *Parser) parseCaseExpr() (*CaseExpr, error) {
  611. c := &CaseExpr{}
  612. tok, _ := p.scanIgnoreWhitespace()
  613. p.unscan()
  614. if tok != WHEN { // no condition value for case, additional validation needed
  615. if exp, err := p.ParseExpr(); err != nil {
  616. return nil, err
  617. } else {
  618. c.Value = exp
  619. }
  620. }
  621. loop:
  622. for {
  623. tok, _ := p.scanIgnoreWhitespace()
  624. switch tok {
  625. case WHEN:
  626. if exp, err := p.ParseExpr(); err != nil {
  627. return nil, err
  628. } else {
  629. if c.WhenClauses == nil {
  630. c.WhenClauses = make([]*WhenClause, 0)
  631. }
  632. if c.Value == nil && !isBooleanArg(exp) {
  633. return nil, fmt.Errorf("invalid CASE expression, WHEN expression must be a bool condition")
  634. }
  635. w := &WhenClause{
  636. Expr: exp,
  637. }
  638. tokThen, _ := p.scanIgnoreWhitespace()
  639. if tokThen != THEN {
  640. return nil, fmt.Errorf("invalid CASE expression, THEN expected after WHEN")
  641. } else {
  642. if expThen, err := p.ParseExpr(); err != nil {
  643. return nil, err
  644. } else {
  645. w.Result = expThen
  646. c.WhenClauses = append(c.WhenClauses, w)
  647. }
  648. }
  649. }
  650. case ELSE:
  651. if c.WhenClauses != nil {
  652. if exp, err := p.ParseExpr(); err != nil {
  653. return nil, err
  654. } else {
  655. c.ElseClause = exp
  656. }
  657. } else {
  658. return nil, fmt.Errorf("invalid CASE expression, WHEN expected before ELSE")
  659. }
  660. case END:
  661. if c.WhenClauses != nil {
  662. break loop
  663. } else {
  664. return nil, fmt.Errorf("invalid CASE expression, WHEN expected before END")
  665. }
  666. default:
  667. return nil, fmt.Errorf("invalid CASE expression, END expected")
  668. }
  669. }
  670. return c, nil
  671. }
  672. func validateWindows(name string, args []Expr) (WindowType, error) {
  673. fname := strings.ToLower(name)
  674. switch fname {
  675. case "tumblingwindow":
  676. if err := validateWindow(fname, 2, args); err != nil {
  677. return TUMBLING_WINDOW, err
  678. }
  679. return TUMBLING_WINDOW, nil
  680. case "hoppingwindow":
  681. if err := validateWindow(fname, 3, args); err != nil {
  682. return HOPPING_WINDOW, err
  683. }
  684. return HOPPING_WINDOW, nil
  685. case "sessionwindow":
  686. if err := validateWindow(fname, 3, args); err != nil {
  687. return SESSION_WINDOW, err
  688. }
  689. return SESSION_WINDOW, nil
  690. case "slidingwindow":
  691. if err := validateWindow(fname, 2, args); err != nil {
  692. return SLIDING_WINDOW, err
  693. }
  694. return SLIDING_WINDOW, nil
  695. case "countwindow":
  696. if len(args) == 1 {
  697. if para1, ok := args[0].(*IntegerLiteral); ok && para1.Val > 0 {
  698. return COUNT_WINDOW, nil
  699. } else {
  700. return COUNT_WINDOW, fmt.Errorf("Invalid parameter value %s.", args[0])
  701. }
  702. } else if len(args) == 2 {
  703. if para1, ok1 := args[0].(*IntegerLiteral); ok1 {
  704. if para2, ok2 := args[1].(*IntegerLiteral); ok2 {
  705. if para1.Val < para2.Val {
  706. return COUNT_WINDOW, fmt.Errorf("The second parameter value %d should be less than the first parameter %d.", para2.Val, para1.Val)
  707. } else {
  708. return COUNT_WINDOW, nil
  709. }
  710. }
  711. }
  712. return COUNT_WINDOW, fmt.Errorf("Invalid parameter value %s, %s.", args[0], args[1])
  713. } else {
  714. return COUNT_WINDOW, fmt.Errorf("Invalid parameter count.")
  715. }
  716. }
  717. return NOT_WINDOW, nil
  718. }
  719. func validateWindow(funcName string, expectLen int, args []Expr) error {
  720. if len(args) != expectLen {
  721. return fmt.Errorf("The arguments for %s should be %d.\n", funcName, expectLen)
  722. }
  723. if _, ok := args[0].(*TimeLiteral); !ok {
  724. return fmt.Errorf("The 1st argument for %s is expecting timer literal expression. One value of [dd|hh|mi|ss|ms].\n", funcName)
  725. }
  726. for i := 1; i < len(args); i++ {
  727. if _, ok := args[i].(*IntegerLiteral); !ok {
  728. return fmt.Errorf("The %d argument for %s is expecting interger literal expression. \n", i, funcName)
  729. }
  730. }
  731. return nil
  732. }
  733. func (p *Parser) ConvertToWindows(wtype WindowType, args []Expr) (*Window, error) {
  734. win := &Window{WindowType: wtype}
  735. if wtype == COUNT_WINDOW {
  736. win.Length = &IntegerLiteral{Val: args[0].(*IntegerLiteral).Val}
  737. if len(args) == 2 {
  738. win.Interval = &IntegerLiteral{Val: args[1].(*IntegerLiteral).Val}
  739. }
  740. return win, nil
  741. }
  742. var unit = 1
  743. v := args[0].(*TimeLiteral).Val
  744. switch v {
  745. case DD:
  746. unit = 24 * 3600 * 1000
  747. case HH:
  748. unit = 3600 * 1000
  749. case MI:
  750. unit = 60 * 1000
  751. case SS:
  752. unit = 1000
  753. case MS:
  754. unit = 1
  755. default:
  756. return nil, fmt.Errorf("Invalid timeliteral %s", v)
  757. }
  758. win.Length = &IntegerLiteral{Val: args[1].(*IntegerLiteral).Val * unit}
  759. if len(args) > 2 {
  760. win.Interval = &IntegerLiteral{Val: args[2].(*IntegerLiteral).Val * unit}
  761. } else {
  762. win.Interval = &IntegerLiteral{Val: 0}
  763. }
  764. return win, nil
  765. }
  766. func (p *Parser) ParseCreateStmt() (Statement, error) {
  767. if tok, _ := p.scanIgnoreWhitespace(); tok == CREATE {
  768. tok1, lit1 := p.scanIgnoreWhitespace()
  769. stmt := &StreamStmt{}
  770. switch tok1 {
  771. case STREAM:
  772. stmt.StreamType = TypeStream
  773. case TABLE:
  774. stmt.StreamType = TypeTable
  775. default:
  776. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  777. }
  778. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  779. stmt.Name = StreamName(lit2)
  780. if fields, err := p.parseStreamFields(); err != nil {
  781. return nil, err
  782. } else {
  783. stmt.StreamFields = fields
  784. }
  785. if opts, err := p.parseStreamOptions(); err != nil {
  786. return nil, err
  787. } else {
  788. stmt.Options = opts
  789. }
  790. if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == SEMICOLON {
  791. p.unscan()
  792. } else if tok3 == EOF {
  793. //Finish parsing create stream statement. Jump to validate
  794. } else {
  795. return nil, fmt.Errorf("found %q, expected semicolon or EOF.", lit3)
  796. }
  797. } else {
  798. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  799. }
  800. if valErr := validateStream(stmt); valErr != nil {
  801. return nil, valErr
  802. }
  803. return stmt, nil
  804. } else {
  805. p.unscan()
  806. return nil, nil
  807. }
  808. }
  809. // TODO more accurate validation for table
  810. func validateStream(stmt *StreamStmt) error {
  811. f := stmt.Options.FORMAT
  812. if f == "" {
  813. f = common.FORMAT_JSON
  814. }
  815. switch strings.ToLower(f) {
  816. case common.FORMAT_JSON:
  817. //do nothing
  818. case common.FORMAT_BINARY:
  819. if stmt.StreamType == TypeTable {
  820. return fmt.Errorf("'binary' format is not supported for table")
  821. }
  822. switch len(stmt.StreamFields) {
  823. case 0:
  824. // do nothing for schemaless
  825. case 1:
  826. f := stmt.StreamFields[0]
  827. if bt, ok := f.FieldType.(*BasicType); ok {
  828. if bt.Type == BYTEA {
  829. break
  830. }
  831. }
  832. return fmt.Errorf("'binary' format stream can have only 'bytea' type field")
  833. default:
  834. return fmt.Errorf("'binary' format stream can have only one field")
  835. }
  836. default:
  837. return fmt.Errorf("option 'format=%s' is invalid", f)
  838. }
  839. return nil
  840. }
  841. func (p *Parser) parseShowStmt() (Statement, error) {
  842. if tok, _ := p.scanIgnoreWhitespace(); tok == SHOW {
  843. tok1, lit1 := p.scanIgnoreWhitespace()
  844. switch tok1 {
  845. case STREAMS:
  846. ss := &ShowStreamsStatement{}
  847. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == EOF || tok2 == SEMICOLON {
  848. return ss, nil
  849. } else {
  850. return nil, fmt.Errorf("found %q, expected semecolon or EOF.", lit2)
  851. }
  852. case TABLES:
  853. ss := &ShowTablesStatement{}
  854. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == EOF || tok2 == SEMICOLON {
  855. return ss, nil
  856. } else {
  857. return nil, fmt.Errorf("found %q, expected semecolon or EOF.", lit2)
  858. }
  859. default:
  860. return nil, fmt.Errorf("found %q, expected keyword streams or tables.", lit1)
  861. }
  862. } else {
  863. p.unscan()
  864. return nil, nil
  865. }
  866. }
  867. func (p *Parser) parseDescribeStmt() (Statement, error) {
  868. if tok, _ := p.scanIgnoreWhitespace(); tok == DESCRIBE {
  869. tok1, lit1 := p.scanIgnoreWhitespace()
  870. switch tok1 {
  871. case STREAM:
  872. dss := &DescribeStreamStatement{}
  873. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  874. dss.Name = lit2
  875. return dss, nil
  876. } else {
  877. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  878. }
  879. case TABLE:
  880. dss := &DescribeTableStatement{}
  881. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  882. dss.Name = lit2
  883. return dss, nil
  884. } else {
  885. return nil, fmt.Errorf("found %q, expected table name.", lit2)
  886. }
  887. default:
  888. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  889. }
  890. } else {
  891. p.unscan()
  892. return nil, nil
  893. }
  894. }
  895. func (p *Parser) parseExplainStmt() (Statement, error) {
  896. if tok, _ := p.scanIgnoreWhitespace(); tok == EXPLAIN {
  897. tok1, lit1 := p.scanIgnoreWhitespace()
  898. switch tok1 {
  899. case STREAM:
  900. ess := &ExplainStreamStatement{}
  901. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  902. ess.Name = lit2
  903. return ess, nil
  904. } else {
  905. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  906. }
  907. case TABLE:
  908. ess := &ExplainTableStatement{}
  909. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  910. ess.Name = lit2
  911. return ess, nil
  912. } else {
  913. return nil, fmt.Errorf("found %q, expected table name.", lit2)
  914. }
  915. default:
  916. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  917. }
  918. } else {
  919. p.unscan()
  920. return nil, nil
  921. }
  922. }
  923. func (p *Parser) parseDropStmt() (Statement, error) {
  924. if tok, _ := p.scanIgnoreWhitespace(); tok == DROP {
  925. tok1, lit1 := p.scanIgnoreWhitespace()
  926. switch tok1 {
  927. case STREAM:
  928. ess := &DropStreamStatement{}
  929. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  930. ess.Name = lit2
  931. return ess, nil
  932. } else {
  933. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  934. }
  935. case TABLE:
  936. ess := &DropTableStatement{}
  937. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == IDENT {
  938. ess.Name = lit2
  939. return ess, nil
  940. } else {
  941. return nil, fmt.Errorf("found %q, expected table name.", lit2)
  942. }
  943. default:
  944. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  945. }
  946. } else {
  947. p.unscan()
  948. return nil, nil
  949. }
  950. }
  951. func (p *Parser) parseStreamFields() (StreamFields, error) {
  952. lStack := &stack.Stack{}
  953. var fields StreamFields
  954. if tok, lit := p.scanIgnoreWhitespace(); tok == LPAREN {
  955. lStack.Push(lit)
  956. for {
  957. //For the schemaless streams
  958. //create stream demo () WITH (FORMAT="JSON", DATASOURCE="demo" TYPE="edgex")
  959. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == RPAREN {
  960. lStack.Pop()
  961. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != WITH {
  962. return nil, fmt.Errorf("found %q, expected is with.", lit2)
  963. }
  964. return fields, nil
  965. } else {
  966. p.unscan()
  967. }
  968. if f, err := p.parseStreamField(); err != nil {
  969. return nil, err
  970. } else {
  971. fields = append(fields, *f)
  972. }
  973. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == RPAREN {
  974. lStack.Pop()
  975. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == WITH {
  976. //Check the stack for LPAREN; If the stack for LPAREN is not zero, then it's not correct.
  977. if lStack.Len() > 0 {
  978. return nil, fmt.Errorf("Parenthesis is not matched.")
  979. }
  980. break
  981. } else if tok2 == COMMA {
  982. if lStack.Len() > 0 {
  983. return nil, fmt.Errorf("Parenthesis is in create record type not matched.")
  984. }
  985. p.unscan()
  986. break
  987. } else if tok2 == RPAREN { //The nested type definition of ARRAY and Struct, such as "field ARRAY(STRUCT(f BIGINT))"
  988. if lStack.Len() > 0 {
  989. return nil, fmt.Errorf("Parenthesis is not matched.")
  990. }
  991. p.unscan()
  992. break
  993. } else {
  994. if lStack.Len() == 0 {
  995. return nil, fmt.Errorf("found %q, expected is with.", lit2)
  996. }
  997. p.unscan()
  998. }
  999. } else {
  1000. p.unscan()
  1001. }
  1002. }
  1003. } else {
  1004. return nil, fmt.Errorf("found %q, expected lparen after stream name.", lit)
  1005. }
  1006. return fields, nil
  1007. }
  1008. func (p *Parser) parseStreamField() (*StreamField, error) {
  1009. field := &StreamField{}
  1010. if tok, lit := p.scanIgnoreWhitespace(); tok == IDENT {
  1011. field.Name = lit
  1012. tok1, lit1 := p.scanIgnoreWhitespace()
  1013. if t := getDataType(tok1); t != UNKNOWN && t.isSimpleType() {
  1014. field.FieldType = &BasicType{Type: t}
  1015. } else if t == ARRAY {
  1016. if f, e := p.parseStreamArrayType(); e != nil {
  1017. return nil, e
  1018. } else {
  1019. field.FieldType = f
  1020. }
  1021. } else if t == STRUCT {
  1022. if f, e := p.parseStreamStructType(); e != nil {
  1023. return nil, e
  1024. } else {
  1025. field.FieldType = f
  1026. }
  1027. } else if t == UNKNOWN {
  1028. return nil, fmt.Errorf("found %q, expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | BYTEA | ARRAY | STRUCT).", lit1)
  1029. }
  1030. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == COMMA {
  1031. //Just consume the comma.
  1032. } else if tok2 == RPAREN {
  1033. p.unscan()
  1034. } else {
  1035. return nil, fmt.Errorf("found %q, expect comma or rparen.", lit2)
  1036. }
  1037. } else {
  1038. return nil, fmt.Errorf("found %q, expect stream field name.", lit)
  1039. }
  1040. return field, nil
  1041. }
  1042. func (p *Parser) parseStreamArrayType() (FieldType, error) {
  1043. lStack := &stack.Stack{}
  1044. if tok, _ := p.scanIgnoreWhitespace(); tok == LPAREN {
  1045. lStack.Push(LPAREN)
  1046. tok1, lit1 := p.scanIgnoreWhitespace()
  1047. if t := getDataType(tok1); t != UNKNOWN && t.isSimpleType() {
  1048. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == RPAREN {
  1049. lStack.Pop()
  1050. if lStack.Len() > 0 {
  1051. return nil, fmt.Errorf("Parenthesis is in array type not matched.")
  1052. }
  1053. return &ArrayType{Type: t}, nil
  1054. } else {
  1055. return nil, fmt.Errorf("found %q, expect rparen in array type definition.", lit2)
  1056. }
  1057. } else if tok1 == XSTRUCT {
  1058. if f, err := p.parseStreamStructType(); err != nil {
  1059. return nil, err
  1060. } else {
  1061. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == RPAREN {
  1062. lStack.Pop()
  1063. if lStack.Len() > 0 {
  1064. return nil, fmt.Errorf("Parenthesis is in struct of array type %q not matched.", tok1)
  1065. }
  1066. return &ArrayType{Type: STRUCT, FieldType: f}, nil
  1067. } else {
  1068. return nil, fmt.Errorf("found %q, expect rparen in struct of array type definition.", lit2)
  1069. }
  1070. }
  1071. } else if tok1 == COMMA {
  1072. p.unscan()
  1073. } else {
  1074. return nil, fmt.Errorf("found %q, expect stream data types.", lit1)
  1075. }
  1076. } else {
  1077. }
  1078. return nil, nil
  1079. }
  1080. func (p *Parser) parseStreamStructType() (FieldType, error) {
  1081. rf := &RecType{}
  1082. if sfs, err := p.parseStreamFields(); err != nil {
  1083. return nil, err
  1084. } else {
  1085. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == COMMA {
  1086. rf.StreamFields = sfs
  1087. p.unscan()
  1088. } else if tok2 == RPAREN {
  1089. rf.StreamFields = sfs
  1090. p.unscan()
  1091. } else {
  1092. return nil, fmt.Errorf("found %q, expect comma in create stream record statement.", lit2)
  1093. }
  1094. }
  1095. return rf, nil
  1096. }
  1097. func (p *Parser) parseStreamOptions() (*Options, error) {
  1098. opts := &Options{}
  1099. v := reflect.ValueOf(opts)
  1100. lStack := &stack.Stack{}
  1101. if tok, lit := p.scanIgnoreWhitespace(); tok == LPAREN {
  1102. lStack.Push(LPAREN)
  1103. for {
  1104. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == DATASOURCE || tok1 == FORMAT || tok1 == KEY || tok1 == CONF_KEY || tok1 == STRICT_VALIDATION || tok1 == TYPE || tok1 == TIMESTAMP || tok1 == TIMESTAMP_FORMAT || tok1 == RETAIN_SIZE {
  1105. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == EQ {
  1106. if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == STRING {
  1107. switch tok1 {
  1108. case STRICT_VALIDATION:
  1109. if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
  1110. return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, tok1)
  1111. } else {
  1112. opts.STRICT_VALIDATION = (val == "TRUE")
  1113. }
  1114. case RETAIN_SIZE:
  1115. if val, err := strconv.Atoi(lit3); err != nil {
  1116. return nil, fmt.Errorf("found %q, expect number value in %s option.", lit3, tok1)
  1117. } else {
  1118. opts.RETAIN_SIZE = val
  1119. }
  1120. default:
  1121. f := v.Elem().FieldByName(lit1)
  1122. if f.IsValid() {
  1123. f.SetString(lit3)
  1124. } else { // should not happen
  1125. return nil, fmt.Errorf("invalid field %s.", lit1)
  1126. }
  1127. }
  1128. } else {
  1129. return nil, fmt.Errorf("found %q, expect string value in option.", lit3)
  1130. }
  1131. } else {
  1132. return nil, fmt.Errorf("found %q, expect equals(=) in options.", lit2)
  1133. }
  1134. } else if tok1 == COMMA {
  1135. continue
  1136. } else if tok1 == RPAREN {
  1137. if lStack.Pop(); lStack.Len() == 0 {
  1138. break
  1139. } else {
  1140. return nil, fmt.Errorf("Parenthesis is not matched in options definition.")
  1141. }
  1142. } else {
  1143. return nil, fmt.Errorf("found %q, unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE).", lit1)
  1144. }
  1145. }
  1146. } else {
  1147. return nil, fmt.Errorf("found %q, expect stream options.", lit)
  1148. }
  1149. return opts, nil
  1150. }
  1151. // Only support filter on window now
  1152. func (p *Parser) parseFilter() (Expr, error) {
  1153. if tok, _ := p.scanIgnoreWhitespace(); tok != FILTER {
  1154. p.unscan()
  1155. return nil, nil
  1156. }
  1157. if tok, lit := p.scanIgnoreWhitespace(); tok != LPAREN {
  1158. return nil, fmt.Errorf("Found %q after FILTER, expect parentheses.", lit)
  1159. }
  1160. if tok, lit := p.scanIgnoreWhitespace(); tok != WHERE {
  1161. return nil, fmt.Errorf("Found %q after FILTER(, expect WHERE.", lit)
  1162. }
  1163. expr, err := p.ParseExpr()
  1164. if err != nil {
  1165. return nil, err
  1166. }
  1167. if tok, lit := p.scanIgnoreWhitespace(); tok != RPAREN {
  1168. return nil, fmt.Errorf("Found %q after FILTER, expect right parentheses.", lit)
  1169. }
  1170. return expr, nil
  1171. }