parser.go 36 KB


  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "fmt"
  17. "github.com/golang-collections/collections/stack"
  18. "github.com/lf-edge/ekuiper/internal/binder/function"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. "github.com/lf-edge/ekuiper/pkg/message"
  21. "io"
  22. "math"
  23. "reflect"
  24. "strconv"
  25. "strings"
  26. )
  27. type Parser struct {
  28. s *Scanner
  29. i int // buffer index
  30. n int // buffer char count
  31. buf [3]struct {
  32. tok ast.Token
  33. lit string
  34. }
  35. inmeta bool
  36. f int // anonymous field index number
  37. clause string
  38. }
  39. func (p *Parser) parseCondition() (ast.Expr, error) {
  40. if tok, _ := p.scanIgnoreWhitespace(); tok != ast.WHERE {
  41. p.unscan()
  42. return nil, nil
  43. }
  44. expr, err := p.ParseExpr()
  45. if err != nil {
  46. return nil, err
  47. }
  48. return expr, nil
  49. }
  50. func (p *Parser) scan() (tok ast.Token, lit string) {
  51. if p.n > 0 {
  52. p.n--
  53. return p.curr()
  54. }
  55. tok, lit = p.s.Scan()
  56. if tok != ast.WS && tok != ast.COMMENT {
  57. p.i = (p.i + 1) % len(p.buf)
  58. buf := &p.buf[p.i]
  59. buf.tok, buf.lit = tok, lit
  60. }
  61. return
  62. }
  63. func (p *Parser) curr() (ast.Token, string) {
  64. i := (p.i - p.n + len(p.buf)) % len(p.buf)
  65. buf := &p.buf[i]
  66. return buf.tok, buf.lit
  67. }
  68. func (p *Parser) scanIgnoreWhitespace() (tok ast.Token, lit string) {
  69. tok, lit = p.scan()
  70. for {
  71. if tok == ast.WS || tok == ast.COMMENT {
  72. tok, lit = p.scan()
  73. } else {
  74. break
  75. }
  76. }
  77. return tok, lit
  78. }
  79. func (p *Parser) unscan() { p.n++ }
  80. func NewParser(r io.Reader) *Parser {
  81. return &Parser{s: NewScanner(r)}
  82. }
  83. func (p *Parser) ParseQueries() ([]ast.SelectStatement, error) {
  84. var stmts []ast.SelectStatement
  85. if stmt, err := p.Parse(); err != nil {
  86. return nil, err
  87. } else {
  88. stmts = append(stmts, *stmt)
  89. }
  90. for {
  91. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.SEMICOLON {
  92. if stmt, err := p.Parse(); err != nil {
  93. return nil, err
  94. } else {
  95. if stmt != nil {
  96. stmts = append(stmts, *stmt)
  97. }
  98. }
  99. } else if tok == ast.EOF {
  100. break
  101. }
  102. }
  103. return stmts, nil
  104. }
  105. func (p *Parser) Parse() (*ast.SelectStatement, error) {
  106. selects := &ast.SelectStatement{}
  107. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.EOF {
  108. return nil, nil
  109. } else if tok != ast.SELECT {
  110. return nil, fmt.Errorf("Found %q, Expected SELECT.\n", lit)
  111. }
  112. p.clause = "select"
  113. if fields, err := p.parseFields(); err != nil {
  114. return nil, err
  115. } else {
  116. selects.Fields = fields
  117. }
  118. p.clause = "from"
  119. if src, err := p.parseSource(); err != nil {
  120. return nil, err
  121. } else {
  122. selects.Sources = src
  123. }
  124. p.clause = "join"
  125. if joins, err := p.parseJoins(); err != nil {
  126. return nil, err
  127. } else {
  128. selects.Joins = joins
  129. }
  130. p.clause = "where"
  131. if exp, err := p.parseCondition(); err != nil {
  132. return nil, err
  133. } else {
  134. if exp != nil {
  135. selects.Condition = exp
  136. }
  137. }
  138. p.clause = "groupby"
  139. if dims, err := p.parseDimensions(); err != nil {
  140. return nil, err
  141. } else {
  142. selects.Dimensions = dims
  143. }
  144. p.clause = "having"
  145. if having, err := p.parseHaving(); err != nil {
  146. return nil, err
  147. } else {
  148. selects.Having = having
  149. }
  150. p.clause = "orderby"
  151. if sorts, err := p.parseSorts(); err != nil {
  152. return nil, err
  153. } else {
  154. selects.SortFields = sorts
  155. }
  156. p.clause = ""
  157. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.SEMICOLON {
  158. p.unscan()
  159. return selects, nil
  160. } else if tok != ast.EOF {
  161. return nil, fmt.Errorf("found %q, expected EOF.", lit)
  162. }
  163. if err := Validate(selects); err != nil {
  164. return nil, err
  165. }
  166. return selects, nil
  167. }
  168. func (p *Parser) parseSource() (ast.Sources, error) {
  169. var sources ast.Sources
  170. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.FROM {
  171. return nil, fmt.Errorf("found %q, expected FROM.", lit)
  172. }
  173. if src, alias, err := p.parseSourceLiteral(); err != nil {
  174. return nil, err
  175. } else {
  176. sources = append(sources, &ast.Table{Name: src, Alias: alias})
  177. }
  178. return sources, nil
  179. }
  180. //TODO Current func has problems when the source includes white space.
  181. func (p *Parser) parseSourceLiteral() (string, string, error) {
  182. var sourceSeg []string
  183. var alias string
  184. for {
  185. //HASH, DIV & ADD token is specially support for MQTT topic name patterns.
  186. if tok, lit := p.scanIgnoreWhitespace(); tok.AllowedSourceToken() {
  187. sourceSeg = append(sourceSeg, lit)
  188. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == ast.AS {
  189. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  190. alias = lit2
  191. } else {
  192. return "", "", fmt.Errorf("found %q, expected JOIN key word.", lit)
  193. }
  194. } else if tok1.AllowedSourceToken() {
  195. sourceSeg = append(sourceSeg, lit1)
  196. } else {
  197. p.unscan()
  198. break
  199. }
  200. } else {
  201. p.unscan()
  202. break
  203. }
  204. }
  205. return strings.Join(sourceSeg, ""), alias, nil
  206. }
  207. func (p *Parser) parseFieldNameSections() ([]string, error) {
  208. var fieldNameSects []string
  209. for {
  210. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.IDENT || tok == ast.ASTERISK {
  211. fieldNameSects = append(fieldNameSects, lit)
  212. if tok1, _ := p.scanIgnoreWhitespace(); !tok1.AllowedSFNToken() {
  213. p.unscan()
  214. break
  215. }
  216. } else {
  217. p.unscan()
  218. break
  219. }
  220. }
  221. if len(fieldNameSects) == 0 {
  222. return nil, fmt.Errorf("Cannot find any field name.\n")
  223. } else if len(fieldNameSects) > 2 {
  224. return nil, fmt.Errorf("Too many field names. Please use -> to reference keys in struct.\n")
  225. }
  226. return fieldNameSects, nil
  227. }
  228. func (p *Parser) parseJoins() (ast.Joins, error) {
  229. var joins ast.Joins
  230. for {
  231. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.INNER || tok == ast.LEFT || tok == ast.RIGHT || tok == ast.FULL || tok == ast.CROSS {
  232. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.JOIN {
  233. var jt = ast.INNER_JOIN
  234. switch tok {
  235. case ast.INNER:
  236. jt = ast.INNER_JOIN
  237. case ast.LEFT:
  238. jt = ast.LEFT_JOIN
  239. case ast.RIGHT:
  240. jt = ast.RIGHT_JOIN
  241. case ast.FULL:
  242. jt = ast.FULL_JOIN
  243. case ast.CROSS:
  244. jt = ast.CROSS_JOIN
  245. }
  246. if j, err := p.ParseJoin(jt); err != nil {
  247. return nil, err
  248. } else {
  249. joins = append(joins, *j)
  250. }
  251. } else {
  252. return nil, fmt.Errorf("found %q, expected JOIN key word.", lit)
  253. }
  254. } else {
  255. p.unscan()
  256. if len(joins) > 0 {
  257. return joins, nil
  258. }
  259. return nil, nil
  260. }
  261. }
  262. }
  263. func (p *Parser) ParseJoin(joinType ast.JoinType) (*ast.Join, error) {
  264. var j = &ast.Join{JoinType: joinType}
  265. if src, alias, err := p.parseSourceLiteral(); err != nil {
  266. return nil, err
  267. } else {
  268. j.Name = src
  269. j.Alias = alias
  270. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.ON {
  271. if ast.CROSS_JOIN == joinType {
  272. return nil, fmt.Errorf("On expression is not required for cross join type.\n")
  273. }
  274. if exp, err := p.ParseExpr(); err != nil {
  275. return nil, err
  276. } else {
  277. j.Expr = exp
  278. }
  279. } else {
  280. p.unscan()
  281. }
  282. }
  283. return j, nil
  284. }
  285. func (p *Parser) parseDimensions() (ast.Dimensions, error) {
  286. var ds ast.Dimensions
  287. if t, _ := p.scanIgnoreWhitespace(); t == ast.GROUP {
  288. if t1, l1 := p.scanIgnoreWhitespace(); t1 == ast.BY {
  289. for {
  290. if exp, err := p.ParseExpr(); err != nil {
  291. return nil, err
  292. } else {
  293. d := ast.Dimension{Expr: exp}
  294. ds = append(ds, d)
  295. }
  296. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.COMMA {
  297. continue
  298. } else {
  299. p.unscan()
  300. break
  301. }
  302. }
  303. } else {
  304. return nil, fmt.Errorf("found %q, expected BY statement.", l1)
  305. }
  306. } else {
  307. p.unscan()
  308. }
  309. return ds, nil
  310. }
  311. func (p *Parser) parseHaving() (ast.Expr, error) {
  312. if tok, _ := p.scanIgnoreWhitespace(); tok != ast.HAVING {
  313. p.unscan()
  314. return nil, nil
  315. }
  316. expr, err := p.ParseExpr()
  317. if err != nil {
  318. return nil, err
  319. }
  320. return expr, nil
  321. }
  322. func (p *Parser) parseSorts() (ast.SortFields, error) {
  323. var ss ast.SortFields
  324. if t, _ := p.scanIgnoreWhitespace(); t == ast.ORDER {
  325. if t1, l1 := p.scanIgnoreWhitespace(); t1 == ast.BY {
  326. for {
  327. if t1, l1 = p.scanIgnoreWhitespace(); t1 == ast.IDENT {
  328. s := ast.SortField{Ascending: true}
  329. p.unscan()
  330. if name, err := p.parseFieldNameSections(); err == nil {
  331. if len(name) == 2 {
  332. s.StreamName = ast.StreamName(name[0])
  333. s.Name = name[1]
  334. } else {
  335. s.Name = name[0]
  336. }
  337. s.Uname = strings.Join(name, ast.COLUMN_SEPARATOR)
  338. } else {
  339. return nil, err
  340. }
  341. if t2, _ := p.scanIgnoreWhitespace(); t2 == ast.DESC {
  342. s.Ascending = false
  343. ss = append(ss, s)
  344. } else if t2 == ast.ASC {
  345. ss = append(ss, s)
  346. } else {
  347. ss = append(ss, s)
  348. p.unscan()
  349. continue
  350. }
  351. } else if t1 == ast.COMMA {
  352. continue
  353. } else {
  354. p.unscan()
  355. break
  356. }
  357. }
  358. } else {
  359. return nil, fmt.Errorf("found %q, expected BY keyword.", l1)
  360. }
  361. } else {
  362. p.unscan()
  363. }
  364. return ss, nil
  365. }
  366. func (p *Parser) parseFields() (ast.Fields, error) {
  367. var fields ast.Fields
  368. tok, _ := p.scanIgnoreWhitespace()
  369. if tok == ast.ASTERISK {
  370. fields = append(fields, ast.Field{AName: "", Expr: &ast.Wildcard{Token: tok}})
  371. return fields, nil
  372. }
  373. p.unscan()
  374. for {
  375. field, err := p.parseField()
  376. if err != nil {
  377. return nil, err
  378. } else {
  379. fields = append(fields, *field)
  380. }
  381. tok, _ = p.scanIgnoreWhitespace()
  382. if tok != ast.COMMA {
  383. p.unscan()
  384. break
  385. }
  386. }
  387. return fields, nil
  388. }
  389. func (p *Parser) parseField() (*ast.Field, error) {
  390. field := &ast.Field{}
  391. if exp, err := p.ParseExpr(); err != nil {
  392. return nil, err
  393. } else {
  394. field.Name = nameExpr(exp)
  395. field.Expr = exp
  396. }
  397. if alias, err := p.parseAlias(); err != nil {
  398. return nil, err
  399. } else {
  400. if alias != "" {
  401. field.AName = alias
  402. }
  403. }
  404. if field.Name == "" && field.AName == "" {
  405. field.Name = DEFAULT_FIELD_NAME_PREFIX + strconv.Itoa(p.f)
  406. p.f += 1
  407. }
  408. return field, nil
  409. }
  410. func nameExpr(exp ast.Expr) string {
  411. switch e := exp.(type) {
  412. case *ast.FieldRef:
  413. return e.Name
  414. case *ast.Call:
  415. return e.Name
  416. default:
  417. return ""
  418. }
  419. }
  420. func (p *Parser) parseAlias() (string, error) {
  421. tok, lit := p.scanIgnoreWhitespace()
  422. if tok == ast.AS {
  423. if tok, lit = p.scanIgnoreWhitespace(); tok != ast.IDENT {
  424. return "", fmt.Errorf("found %q, expected as alias.", lit)
  425. } else {
  426. return lit, nil
  427. }
  428. }
  429. p.unscan()
  430. return "", nil
  431. }
  432. func (p *Parser) ParseExpr() (ast.Expr, error) {
  433. var err error
  434. root := &ast.BinaryExpr{}
  435. root.RHS, err = p.parseUnaryExpr(false)
  436. if err != nil {
  437. return nil, err
  438. }
  439. for {
  440. op, _ := p.scanIgnoreWhitespace()
  441. if !op.IsOperator() {
  442. p.unscan()
  443. return root.RHS, nil
  444. } else if op == ast.ASTERISK { //Change the asterisk to Mul token.
  445. op = ast.MUL
  446. } else if op == ast.LBRACKET { //LBRACKET is a special token, need to unscan
  447. op = ast.SUBSET
  448. p.unscan()
  449. }
  450. var rhs ast.Expr
  451. if rhs, err = p.parseUnaryExpr(op == ast.ARROW); err != nil {
  452. return nil, err
  453. }
  454. for node := root; ; {
  455. r, ok := node.RHS.(*ast.BinaryExpr)
  456. if !ok || r.OP.Precedence() >= op.Precedence() {
  457. node.RHS = &ast.BinaryExpr{LHS: node.RHS, RHS: rhs, OP: op}
  458. break
  459. }
  460. node = r
  461. }
  462. }
  463. }
  464. func (p *Parser) parseUnaryExpr(isSubField bool) (ast.Expr, error) {
  465. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.LPAREN {
  466. expr, err := p.ParseExpr()
  467. if err != nil {
  468. return nil, err
  469. }
  470. // Expect an RPAREN at the end.
  471. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != ast.RPAREN {
  472. return nil, fmt.Errorf("found %q, expected right paren.", lit2)
  473. }
  474. return &ast.ParenExpr{Expr: expr}, nil
  475. } else if tok1 == ast.LBRACKET {
  476. return p.parseBracketExpr()
  477. }
  478. p.unscan()
  479. tok, lit := p.scanIgnoreWhiteSpaceWithNegativeNum()
  480. if tok == ast.CASE {
  481. return p.parseCaseExpr()
  482. } else if tok == ast.IDENT {
  483. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.LPAREN {
  484. return p.parseCall(lit)
  485. }
  486. p.unscan() //Back the Lparen token
  487. p.unscan() //Back the ident token
  488. if n, err := p.parseFieldNameSections(); err != nil {
  489. return nil, err
  490. } else {
  491. if p.inmeta {
  492. if len(n) == 2 {
  493. return &ast.MetaRef{StreamName: ast.StreamName(n[0]), Name: n[1]}, nil
  494. }
  495. if isSubField {
  496. return &ast.JsonFieldRef{Name: n[0]}, nil
  497. }
  498. return &ast.MetaRef{StreamName: ast.DefaultStream, Name: n[0]}, nil
  499. } else {
  500. if len(n) == 2 {
  501. return &ast.FieldRef{StreamName: ast.StreamName(n[0]), Name: n[1]}, nil
  502. }
  503. if isSubField {
  504. return &ast.JsonFieldRef{Name: n[0]}, nil
  505. }
  506. return &ast.FieldRef{StreamName: ast.DefaultStream, Name: n[0]}, nil
  507. }
  508. }
  509. } else if tok == ast.STRING {
  510. return &ast.StringLiteral{Val: lit}, nil
  511. } else if tok == ast.INTEGER {
  512. val, _ := strconv.Atoi(lit)
  513. return &ast.IntegerLiteral{Val: val}, nil
  514. } else if tok == ast.NUMBER {
  515. if v, err := strconv.ParseFloat(lit, 64); err != nil {
  516. return nil, fmt.Errorf("found %q, invalid number value.", lit)
  517. } else {
  518. return &ast.NumberLiteral{Val: v}, nil
  519. }
  520. } else if tok == ast.TRUE || tok == ast.FALSE {
  521. if v, err := strconv.ParseBool(lit); err != nil {
  522. return nil, fmt.Errorf("found %q, invalid boolean value.", lit)
  523. } else {
  524. return &ast.BooleanLiteral{Val: v}, nil
  525. }
  526. } else if tok.IsTimeLiteral() {
  527. return &ast.TimeLiteral{Val: tok}, nil
  528. }
  529. return nil, fmt.Errorf("found %q, expected expression.", lit)
  530. }
  531. func (p *Parser) parseBracketExpr() (ast.Expr, error) {
  532. tok2, lit2 := p.scanIgnoreWhiteSpaceWithNegativeNum()
  533. if tok2 == ast.RBRACKET {
  534. //field[]
  535. return &ast.ColonExpr{Start: &ast.IntegerLiteral{Val: 0}, End: &ast.IntegerLiteral{Val: math.MinInt32}}, nil
  536. } else if tok2 == ast.INTEGER {
  537. start, err := strconv.Atoi(lit2)
  538. if err != nil {
  539. return nil, fmt.Errorf("The start index %s is not an int value in bracket expression.", lit2)
  540. }
  541. if tok3, _ := p.scanIgnoreWhitespace(); tok3 == ast.RBRACKET {
  542. //Such as field[2]
  543. return &ast.IndexExpr{Index: &ast.IntegerLiteral{Val: start}}, nil
  544. } else if tok3 == ast.COLON {
  545. //Such as field[2:] or field[2:4]
  546. return p.parseColonExpr(&ast.IntegerLiteral{Val: start})
  547. }
  548. } else if tok2 == ast.COLON {
  549. //Such as field[:3] or [:]
  550. return p.parseColonExpr(&ast.IntegerLiteral{Val: 0})
  551. } else {
  552. p.unscan()
  553. start, err := p.ParseExpr()
  554. if err != nil {
  555. return nil, fmt.Errorf("The start index %s is invalid in bracket expression.", lit2)
  556. }
  557. if tok3, _ := p.scanIgnoreWhitespace(); tok3 == ast.RBRACKET {
  558. //Such as field[2]
  559. return &ast.IndexExpr{Index: start}, nil
  560. } else if tok3 == ast.COLON {
  561. //Such as field[2:] or field[2:4]
  562. return p.parseColonExpr(start)
  563. }
  564. }
  565. return nil, fmt.Errorf("Unexpected token %q. when parsing bracket expressions.", lit2)
  566. }
  567. func (p *Parser) parseColonExpr(start ast.Expr) (ast.Expr, error) {
  568. tok, lit := p.scanIgnoreWhiteSpaceWithNegativeNum()
  569. if tok == ast.INTEGER {
  570. end, err := strconv.Atoi(lit)
  571. if err != nil {
  572. return nil, fmt.Errorf("The end index %s is not an int value in bracket expression.", lit)
  573. }
  574. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == ast.RBRACKET {
  575. return &ast.ColonExpr{Start: start, End: &ast.IntegerLiteral{Val: end}}, nil
  576. } else {
  577. return nil, fmt.Errorf("Found %q, expected right bracket.", lit1)
  578. }
  579. } else if tok == ast.RBRACKET {
  580. return &ast.ColonExpr{Start: start, End: &ast.IntegerLiteral{Val: math.MinInt32}}, nil
  581. }
  582. return nil, fmt.Errorf("Found %q, expected right bracket.", lit)
  583. }
  584. func (p *Parser) scanIgnoreWhiteSpaceWithNegativeNum() (ast.Token, string) {
  585. tok, lit := p.scanIgnoreWhitespace()
  586. if tok == ast.SUB {
  587. _, _ = p.s.ScanWhiteSpace()
  588. r := p.s.read()
  589. if isDigit(r) {
  590. p.s.unread()
  591. tok, lit = p.s.ScanNumber(false, true)
  592. }
  593. }
  594. return tok, lit
  595. }
  596. func (p *Parser) parseAs(f *ast.Field) (*ast.Field, error) {
  597. tok, lit := p.scanIgnoreWhitespace()
  598. if tok != ast.IDENT {
  599. return nil, fmt.Errorf("found %q, expected as alias.", lit)
  600. }
  601. f.AName = lit
  602. return f, nil
  603. }
  604. var WindowFuncs = map[string]struct{}{
  605. "tumblingwindow": {},
  606. "hoppingwindow": {},
  607. "sessionwindow": {},
  608. "slidingwindow": {},
  609. "countwindow": {},
  610. }
  611. func convFuncName(n string) (string, bool) {
  612. lname := strings.ToLower(n)
  613. if _, ok := WindowFuncs[lname]; ok {
  614. return lname, ok
  615. } else {
  616. return function.ConvName(n)
  617. }
  618. }
  619. func (p *Parser) parseCall(n string) (ast.Expr, error) {
  620. // Check if n function exists and convert it to lowercase for built-in func
  621. name, ok := convFuncName(n)
  622. if !ok {
  623. return nil, fmt.Errorf("function %s not found", n)
  624. }
  625. if name == "meta" || name == "mqtt" {
  626. p.inmeta = true
  627. defer func() {
  628. p.inmeta = false
  629. }()
  630. }
  631. ft := function.GetFuncType(name)
  632. if ft == function.FuncTypeCols && p.clause != "select" {
  633. return nil, fmt.Errorf("function %s can only be used inside the select clause", n)
  634. }
  635. var args []ast.Expr
  636. for {
  637. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.RPAREN {
  638. if valErr := validateFuncs(name, nil); valErr != nil {
  639. return nil, valErr
  640. }
  641. return &ast.Call{Name: name, Args: args}, nil
  642. } else if tok == ast.ASTERISK {
  643. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != ast.RPAREN {
  644. return nil, fmt.Errorf("found %q, expected right paren.", lit2)
  645. } else {
  646. if p.inmeta {
  647. args = append(args, &ast.MetaRef{StreamName: ast.DefaultStream, Name: "*"})
  648. } else {
  649. args = append(args, &ast.Wildcard{Token: ast.ASTERISK})
  650. }
  651. return &ast.Call{Name: name, Args: args}, nil
  652. }
  653. } else {
  654. p.unscan()
  655. }
  656. if exp, err := p.ParseExpr(); err != nil {
  657. return nil, err
  658. } else {
  659. if ft == function.FuncTypeCols {
  660. field := &ast.ColFuncField{Expr: exp, Name: nameExpr(exp)}
  661. args = append(args, field)
  662. } else {
  663. args = append(args, exp)
  664. }
  665. }
  666. if tok, _ := p.scanIgnoreWhitespace(); tok != ast.COMMA {
  667. p.unscan()
  668. break
  669. }
  670. }
  671. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.RPAREN {
  672. return nil, fmt.Errorf("found function call %q, expected ), but with %q.", name, lit)
  673. }
  674. if wt, err := validateWindows(name, args); wt == ast.NOT_WINDOW {
  675. if valErr := validateFuncs(name, args); valErr != nil {
  676. return nil, valErr
  677. }
  678. // Add context for some aggregate func
  679. if name == "deduplicate" {
  680. args = append([]ast.Expr{&ast.Wildcard{Token: ast.ASTERISK}}, args...)
  681. }
  682. return &ast.Call{Name: name, Args: args}, nil
  683. } else {
  684. if err != nil {
  685. return nil, err
  686. }
  687. win, err := p.ConvertToWindows(wt, args)
  688. if err != nil {
  689. return nil, err
  690. }
  691. // parse filter clause
  692. f, err := p.parseFilter()
  693. if err != nil {
  694. return nil, err
  695. } else if f != nil {
  696. win.Filter = f
  697. }
  698. return win, nil
  699. }
  700. }
  701. func (p *Parser) parseCaseExpr() (*ast.CaseExpr, error) {
  702. c := &ast.CaseExpr{}
  703. tok, _ := p.scanIgnoreWhitespace()
  704. p.unscan()
  705. if tok != ast.WHEN { // no condition value for case, additional validation needed
  706. if exp, err := p.ParseExpr(); err != nil {
  707. return nil, err
  708. } else {
  709. c.Value = exp
  710. }
  711. }
  712. loop:
  713. for {
  714. tok, _ := p.scanIgnoreWhitespace()
  715. switch tok {
  716. case ast.WHEN:
  717. if exp, err := p.ParseExpr(); err != nil {
  718. return nil, err
  719. } else {
  720. if c.WhenClauses == nil {
  721. c.WhenClauses = make([]*ast.WhenClause, 0)
  722. }
  723. if c.Value == nil && !ast.IsBooleanArg(exp) {
  724. return nil, fmt.Errorf("invalid CASE expression, WHEN expression must be a bool condition")
  725. }
  726. w := &ast.WhenClause{
  727. Expr: exp,
  728. }
  729. tokThen, _ := p.scanIgnoreWhitespace()
  730. if tokThen != ast.THEN {
  731. return nil, fmt.Errorf("invalid CASE expression, THEN expected after WHEN")
  732. } else {
  733. if expThen, err := p.ParseExpr(); err != nil {
  734. return nil, err
  735. } else {
  736. w.Result = expThen
  737. c.WhenClauses = append(c.WhenClauses, w)
  738. }
  739. }
  740. }
  741. case ast.ELSE:
  742. if c.WhenClauses != nil {
  743. if exp, err := p.ParseExpr(); err != nil {
  744. return nil, err
  745. } else {
  746. c.ElseClause = exp
  747. }
  748. } else {
  749. return nil, fmt.Errorf("invalid CASE expression, WHEN expected before ELSE")
  750. }
  751. case ast.END:
  752. if c.WhenClauses != nil {
  753. break loop
  754. } else {
  755. return nil, fmt.Errorf("invalid CASE expression, WHEN expected before END")
  756. }
  757. default:
  758. return nil, fmt.Errorf("invalid CASE expression, END expected")
  759. }
  760. }
  761. return c, nil
  762. }
  763. func validateWindows(fname string, args []ast.Expr) (ast.WindowType, error) {
  764. switch fname {
  765. case "tumblingwindow":
  766. if err := validateWindow(fname, 2, args); err != nil {
  767. return ast.TUMBLING_WINDOW, err
  768. }
  769. return ast.TUMBLING_WINDOW, nil
  770. case "hoppingwindow":
  771. if err := validateWindow(fname, 3, args); err != nil {
  772. return ast.HOPPING_WINDOW, err
  773. }
  774. return ast.HOPPING_WINDOW, nil
  775. case "sessionwindow":
  776. if err := validateWindow(fname, 3, args); err != nil {
  777. return ast.SESSION_WINDOW, err
  778. }
  779. return ast.SESSION_WINDOW, nil
  780. case "slidingwindow":
  781. if err := validateWindow(fname, 2, args); err != nil {
  782. return ast.SLIDING_WINDOW, err
  783. }
  784. return ast.SLIDING_WINDOW, nil
  785. case "countwindow":
  786. if len(args) == 1 {
  787. if para1, ok := args[0].(*ast.IntegerLiteral); ok && para1.Val > 0 {
  788. return ast.COUNT_WINDOW, nil
  789. } else {
  790. return ast.COUNT_WINDOW, fmt.Errorf("Invalid parameter value %s.", args[0])
  791. }
  792. } else if len(args) == 2 {
  793. if para1, ok1 := args[0].(*ast.IntegerLiteral); ok1 {
  794. if para2, ok2 := args[1].(*ast.IntegerLiteral); ok2 {
  795. if para1.Val < para2.Val {
  796. return ast.COUNT_WINDOW, fmt.Errorf("The second parameter value %d should be less than the first parameter %d.", para2.Val, para1.Val)
  797. } else {
  798. return ast.COUNT_WINDOW, nil
  799. }
  800. }
  801. }
  802. return ast.COUNT_WINDOW, fmt.Errorf("Invalid parameter value %s, %s.", args[0], args[1])
  803. } else {
  804. return ast.COUNT_WINDOW, fmt.Errorf("Invalid parameter count.")
  805. }
  806. }
  807. return ast.NOT_WINDOW, nil
  808. }
  809. func validateWindow(funcName string, expectLen int, args []ast.Expr) error {
  810. if len(args) != expectLen {
  811. return fmt.Errorf("The arguments for %s should be %d.\n", funcName, expectLen)
  812. }
  813. if _, ok := args[0].(*ast.TimeLiteral); !ok {
  814. return fmt.Errorf("The 1st argument for %s is expecting timer literal expression. One value of [dd|hh|mi|ss|ms].\n", funcName)
  815. }
  816. for i := 1; i < len(args); i++ {
  817. if _, ok := args[i].(*ast.IntegerLiteral); !ok {
  818. return fmt.Errorf("The %d argument for %s is expecting interger literal expression. \n", i, funcName)
  819. }
  820. }
  821. return nil
  822. }
  823. func (p *Parser) ConvertToWindows(wtype ast.WindowType, args []ast.Expr) (*ast.Window, error) {
  824. win := &ast.Window{WindowType: wtype}
  825. if wtype == ast.COUNT_WINDOW {
  826. win.Length = &ast.IntegerLiteral{Val: args[0].(*ast.IntegerLiteral).Val}
  827. if len(args) == 2 {
  828. win.Interval = &ast.IntegerLiteral{Val: args[1].(*ast.IntegerLiteral).Val}
  829. }
  830. return win, nil
  831. }
  832. var unit = 1
  833. v := args[0].(*ast.TimeLiteral).Val
  834. switch v {
  835. case ast.DD:
  836. unit = 24 * 3600 * 1000
  837. case ast.HH:
  838. unit = 3600 * 1000
  839. case ast.MI:
  840. unit = 60 * 1000
  841. case ast.SS:
  842. unit = 1000
  843. case ast.MS:
  844. unit = 1
  845. default:
  846. return nil, fmt.Errorf("Invalid timeliteral %s", v)
  847. }
  848. win.Length = &ast.IntegerLiteral{Val: args[1].(*ast.IntegerLiteral).Val * unit}
  849. if len(args) > 2 {
  850. win.Interval = &ast.IntegerLiteral{Val: args[2].(*ast.IntegerLiteral).Val * unit}
  851. } else {
  852. win.Interval = &ast.IntegerLiteral{Val: 0}
  853. }
  854. return win, nil
  855. }
  856. func (p *Parser) ParseCreateStmt() (ast.Statement, error) {
  857. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.CREATE {
  858. tok1, lit1 := p.scanIgnoreWhitespace()
  859. stmt := &ast.StreamStmt{}
  860. switch tok1 {
  861. case ast.STREAM:
  862. stmt.StreamType = ast.TypeStream
  863. case ast.TABLE:
  864. stmt.StreamType = ast.TypeTable
  865. default:
  866. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  867. }
  868. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  869. stmt.Name = ast.StreamName(lit2)
  870. if fields, err := p.parseStreamFields(); err != nil {
  871. return nil, err
  872. } else {
  873. stmt.StreamFields = fields
  874. }
  875. if opts, err := p.parseStreamOptions(); err != nil {
  876. return nil, err
  877. } else {
  878. stmt.Options = opts
  879. }
  880. if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == ast.SEMICOLON {
  881. p.unscan()
  882. } else if tok3 == ast.EOF {
  883. //Finish parsing create stream statement. Jump to validate
  884. } else {
  885. return nil, fmt.Errorf("found %q, expected semicolon or EOF.", lit3)
  886. }
  887. } else {
  888. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  889. }
  890. if valErr := validateStream(stmt); valErr != nil {
  891. return nil, valErr
  892. }
  893. return stmt, nil
  894. } else {
  895. p.unscan()
  896. return nil, nil
  897. }
  898. }
  899. // TODO more accurate validation for table
  900. func validateStream(stmt *ast.StreamStmt) error {
  901. f := stmt.Options.FORMAT
  902. if f == "" {
  903. f = message.FormatJson
  904. }
  905. switch strings.ToLower(f) {
  906. case message.FormatJson:
  907. //do nothing
  908. case message.FormatBinary:
  909. if stmt.StreamType == ast.TypeTable {
  910. return fmt.Errorf("'binary' format is not supported for table")
  911. }
  912. switch len(stmt.StreamFields) {
  913. case 0:
  914. // do nothing for schemaless
  915. case 1:
  916. f := stmt.StreamFields[0]
  917. if bt, ok := f.FieldType.(*ast.BasicType); ok {
  918. if bt.Type == ast.BYTEA {
  919. break
  920. }
  921. }
  922. return fmt.Errorf("'binary' format stream can have only 'bytea' type field")
  923. default:
  924. return fmt.Errorf("'binary' format stream can have only one field")
  925. }
  926. default:
  927. return fmt.Errorf("option 'format=%s' is invalid", f)
  928. }
  929. return nil
  930. }
  931. func (p *Parser) parseShowStmt() (ast.Statement, error) {
  932. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.SHOW {
  933. tok1, lit1 := p.scanIgnoreWhitespace()
  934. switch tok1 {
  935. case ast.STREAMS:
  936. ss := &ast.ShowStreamsStatement{}
  937. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EOF || tok2 == ast.SEMICOLON {
  938. return ss, nil
  939. } else {
  940. return nil, fmt.Errorf("found %q, expected semecolon or EOF.", lit2)
  941. }
  942. case ast.TABLES:
  943. ss := &ast.ShowTablesStatement{}
  944. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EOF || tok2 == ast.SEMICOLON {
  945. return ss, nil
  946. } else {
  947. return nil, fmt.Errorf("found %q, expected semecolon or EOF.", lit2)
  948. }
  949. default:
  950. return nil, fmt.Errorf("found %q, expected keyword streams or tables.", lit1)
  951. }
  952. } else {
  953. p.unscan()
  954. return nil, nil
  955. }
  956. }
  957. func (p *Parser) parseDescribeStmt() (ast.Statement, error) {
  958. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.DESCRIBE {
  959. tok1, lit1 := p.scanIgnoreWhitespace()
  960. switch tok1 {
  961. case ast.STREAM:
  962. dss := &ast.DescribeStreamStatement{}
  963. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  964. dss.Name = lit2
  965. return dss, nil
  966. } else {
  967. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  968. }
  969. case ast.TABLE:
  970. dss := &ast.DescribeTableStatement{}
  971. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  972. dss.Name = lit2
  973. return dss, nil
  974. } else {
  975. return nil, fmt.Errorf("found %q, expected table name.", lit2)
  976. }
  977. default:
  978. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  979. }
  980. } else {
  981. p.unscan()
  982. return nil, nil
  983. }
  984. }
  985. func (p *Parser) parseExplainStmt() (ast.Statement, error) {
  986. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.EXPLAIN {
  987. tok1, lit1 := p.scanIgnoreWhitespace()
  988. switch tok1 {
  989. case ast.STREAM:
  990. ess := &ast.ExplainStreamStatement{}
  991. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  992. ess.Name = lit2
  993. return ess, nil
  994. } else {
  995. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  996. }
  997. case ast.TABLE:
  998. ess := &ast.ExplainTableStatement{}
  999. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  1000. ess.Name = lit2
  1001. return ess, nil
  1002. } else {
  1003. return nil, fmt.Errorf("found %q, expected table name.", lit2)
  1004. }
  1005. default:
  1006. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  1007. }
  1008. } else {
  1009. p.unscan()
  1010. return nil, nil
  1011. }
  1012. }
  1013. func (p *Parser) parseDropStmt() (ast.Statement, error) {
  1014. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.DROP {
  1015. tok1, lit1 := p.scanIgnoreWhitespace()
  1016. switch tok1 {
  1017. case ast.STREAM:
  1018. ess := &ast.DropStreamStatement{}
  1019. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  1020. ess.Name = lit2
  1021. return ess, nil
  1022. } else {
  1023. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  1024. }
  1025. case ast.TABLE:
  1026. ess := &ast.DropTableStatement{}
  1027. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  1028. ess.Name = lit2
  1029. return ess, nil
  1030. } else {
  1031. return nil, fmt.Errorf("found %q, expected table name.", lit2)
  1032. }
  1033. default:
  1034. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  1035. }
  1036. } else {
  1037. p.unscan()
  1038. return nil, nil
  1039. }
  1040. }
  1041. func (p *Parser) parseStreamFields() (ast.StreamFields, error) {
  1042. lStack := &stack.Stack{}
  1043. var fields ast.StreamFields
  1044. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
  1045. lStack.Push(lit)
  1046. for {
  1047. //For the schemaless streams
  1048. //create stream demo () WITH (FORMAT="JSON", DATASOURCE="demo" TYPE="edgex")
  1049. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.RPAREN {
  1050. lStack.Pop()
  1051. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != ast.WITH {
  1052. return nil, fmt.Errorf("found %q, expected is with.", lit2)
  1053. }
  1054. return fields, nil
  1055. } else {
  1056. p.unscan()
  1057. }
  1058. if f, err := p.parseStreamField(); err != nil {
  1059. return nil, err
  1060. } else {
  1061. fields = append(fields, *f)
  1062. }
  1063. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.RPAREN {
  1064. lStack.Pop()
  1065. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.WITH {
  1066. //Check the stack for LPAREN; If the stack for LPAREN is not zero, then it's not correct.
  1067. if lStack.Len() > 0 {
  1068. return nil, fmt.Errorf("Parenthesis is not matched.")
  1069. }
  1070. break
  1071. } else if tok2 == ast.COMMA {
  1072. if lStack.Len() > 0 {
  1073. return nil, fmt.Errorf("Parenthesis is in create record type not matched.")
  1074. }
  1075. p.unscan()
  1076. break
  1077. } else if tok2 == ast.RPAREN { //The nested type definition of ARRAY and Struct, such as "field ARRAY(STRUCT(f BIGINT))"
  1078. if lStack.Len() > 0 {
  1079. return nil, fmt.Errorf("Parenthesis is not matched.")
  1080. }
  1081. p.unscan()
  1082. break
  1083. } else {
  1084. if lStack.Len() == 0 {
  1085. return nil, fmt.Errorf("found %q, expected is with.", lit2)
  1086. }
  1087. p.unscan()
  1088. }
  1089. } else {
  1090. p.unscan()
  1091. }
  1092. }
  1093. } else {
  1094. return nil, fmt.Errorf("found %q, expected lparen after stream name.", lit)
  1095. }
  1096. return fields, nil
  1097. }
  1098. func (p *Parser) parseStreamField() (*ast.StreamField, error) {
  1099. field := &ast.StreamField{}
  1100. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.IDENT {
  1101. field.Name = lit
  1102. tok1, lit1 := p.scanIgnoreWhitespace()
  1103. if t := ast.GetDataType(tok1); t != ast.UNKNOWN && t.IsSimpleType() {
  1104. field.FieldType = &ast.BasicType{Type: t}
  1105. } else if t == ast.ARRAY {
  1106. if f, e := p.parseStreamArrayType(); e != nil {
  1107. return nil, e
  1108. } else {
  1109. field.FieldType = f
  1110. }
  1111. } else if t == ast.STRUCT {
  1112. if f, e := p.parseStreamStructType(); e != nil {
  1113. return nil, e
  1114. } else {
  1115. field.FieldType = f
  1116. }
  1117. } else if t == ast.UNKNOWN {
  1118. return nil, fmt.Errorf("found %q, expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | BYTEA | ARRAY | STRUCT).", lit1)
  1119. }
  1120. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.COMMA {
  1121. //Just consume the comma.
  1122. } else if tok2 == ast.RPAREN {
  1123. p.unscan()
  1124. } else {
  1125. return nil, fmt.Errorf("found %q, expect comma or rparen.", lit2)
  1126. }
  1127. } else {
  1128. return nil, fmt.Errorf("found %q, expect stream field name.", lit)
  1129. }
  1130. return field, nil
  1131. }
  1132. func (p *Parser) parseStreamArrayType() (ast.FieldType, error) {
  1133. lStack := &stack.Stack{}
  1134. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
  1135. lStack.Push(ast.LPAREN)
  1136. tok1, lit1 := p.scanIgnoreWhitespace()
  1137. if t := ast.GetDataType(tok1); t != ast.UNKNOWN && t.IsSimpleType() {
  1138. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.RPAREN {
  1139. lStack.Pop()
  1140. if lStack.Len() > 0 {
  1141. return nil, fmt.Errorf("Parenthesis is in array type not matched.")
  1142. }
  1143. return &ast.ArrayType{Type: t}, nil
  1144. } else {
  1145. return nil, fmt.Errorf("found %q, expect rparen in array type definition.", lit2)
  1146. }
  1147. } else if tok1 == ast.XSTRUCT {
  1148. if f, err := p.parseStreamStructType(); err != nil {
  1149. return nil, err
  1150. } else {
  1151. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.RPAREN {
  1152. lStack.Pop()
  1153. if lStack.Len() > 0 {
  1154. return nil, fmt.Errorf("Parenthesis is in struct of array type %q not matched.", tok1)
  1155. }
  1156. return &ast.ArrayType{Type: ast.STRUCT, FieldType: f}, nil
  1157. } else {
  1158. return nil, fmt.Errorf("found %q, expect rparen in struct of array type definition.", lit2)
  1159. }
  1160. }
  1161. } else if tok1 == ast.COMMA {
  1162. p.unscan()
  1163. } else {
  1164. return nil, fmt.Errorf("found %q, expect stream data types.", lit1)
  1165. }
  1166. } else {
  1167. }
  1168. return nil, nil
  1169. }
  1170. func (p *Parser) parseStreamStructType() (ast.FieldType, error) {
  1171. rf := &ast.RecType{}
  1172. if sfs, err := p.parseStreamFields(); err != nil {
  1173. return nil, err
  1174. } else {
  1175. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.COMMA {
  1176. rf.StreamFields = sfs
  1177. p.unscan()
  1178. } else if tok2 == ast.RPAREN {
  1179. rf.StreamFields = sfs
  1180. p.unscan()
  1181. } else {
  1182. return nil, fmt.Errorf("found %q, expect comma in create stream record statement.", lit2)
  1183. }
  1184. }
  1185. return rf, nil
  1186. }
  1187. func (p *Parser) parseStreamOptions() (*ast.Options, error) {
  1188. opts := &ast.Options{STRICT_VALIDATION: true}
  1189. v := reflect.ValueOf(opts)
  1190. lStack := &stack.Stack{}
  1191. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
  1192. lStack.Push(ast.LPAREN)
  1193. for {
  1194. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == ast.DATASOURCE || tok1 == ast.FORMAT || tok1 == ast.KEY || tok1 == ast.CONF_KEY || tok1 == ast.STRICT_VALIDATION || tok1 == ast.TYPE || tok1 == ast.TIMESTAMP || tok1 == ast.TIMESTAMP_FORMAT || tok1 == ast.RETAIN_SIZE || tok1 == ast.SHARED {
  1195. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EQ {
  1196. if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == ast.STRING {
  1197. switch tok1 {
  1198. case ast.STRICT_VALIDATION:
  1199. if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
  1200. return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, tok1)
  1201. } else {
  1202. opts.STRICT_VALIDATION = val == "TRUE"
  1203. }
  1204. case ast.RETAIN_SIZE:
  1205. if val, err := strconv.Atoi(lit3); err != nil {
  1206. return nil, fmt.Errorf("found %q, expect number value in %s option.", lit3, tok1)
  1207. } else {
  1208. opts.RETAIN_SIZE = val
  1209. }
  1210. case ast.SHARED:
  1211. if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
  1212. return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, tok1)
  1213. } else {
  1214. opts.SHARED = val == "TRUE"
  1215. }
  1216. default:
  1217. f := v.Elem().FieldByName(lit1)
  1218. if f.IsValid() {
  1219. f.SetString(lit3)
  1220. } else { // should not happen
  1221. return nil, fmt.Errorf("invalid field %s.", lit1)
  1222. }
  1223. }
  1224. } else {
  1225. return nil, fmt.Errorf("found %q, expect string value in option.", lit3)
  1226. }
  1227. } else {
  1228. return nil, fmt.Errorf("found %q, expect equals(=) in options.", lit2)
  1229. }
  1230. } else if tok1 == ast.COMMA {
  1231. continue
  1232. } else if tok1 == ast.RPAREN {
  1233. if lStack.Pop(); lStack.Len() == 0 {
  1234. break
  1235. } else {
  1236. return nil, fmt.Errorf("Parenthesis is not matched in options definition.")
  1237. }
  1238. } else {
  1239. return nil, fmt.Errorf("found %q, unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE).", lit1)
  1240. }
  1241. }
  1242. } else {
  1243. return nil, fmt.Errorf("found %q, expect stream options.", lit)
  1244. }
  1245. return opts, nil
  1246. }
  1247. // Only support filter on window now
  1248. func (p *Parser) parseFilter() (ast.Expr, error) {
  1249. if tok, _ := p.scanIgnoreWhitespace(); tok != ast.FILTER {
  1250. p.unscan()
  1251. return nil, nil
  1252. }
  1253. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.LPAREN {
  1254. return nil, fmt.Errorf("Found %q after FILTER, expect parentheses.", lit)
  1255. }
  1256. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.WHERE {
  1257. return nil, fmt.Errorf("Found %q after FILTER(, expect WHERE.", lit)
  1258. }
  1259. expr, err := p.ParseExpr()
  1260. if err != nil {
  1261. return nil, err
  1262. }
  1263. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.RPAREN {
  1264. return nil, fmt.Errorf("Found %q after FILTER, expect right parentheses.", lit)
  1265. }
  1266. return expr, nil
  1267. }