parser.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286
  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/pkg/ast"
  5. "github.com/emqx/kuiper/pkg/message"
  6. "github.com/golang-collections/collections/stack"
  7. "io"
  8. "math"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. )
  13. type Parser struct {
  14. s *Scanner
  15. i int // buffer index
  16. n int // buffer char count
  17. buf [3]struct {
  18. tok ast.Token
  19. lit string
  20. }
  21. inmeta bool
  22. }
  23. func (p *Parser) parseCondition() (ast.Expr, error) {
  24. if tok, _ := p.scanIgnoreWhitespace(); tok != ast.WHERE {
  25. p.unscan()
  26. return nil, nil
  27. }
  28. expr, err := p.ParseExpr()
  29. if err != nil {
  30. return nil, err
  31. }
  32. return expr, nil
  33. }
  34. func (p *Parser) scan() (tok ast.Token, lit string) {
  35. if p.n > 0 {
  36. p.n--
  37. return p.curr()
  38. }
  39. tok, lit = p.s.Scan()
  40. if tok != ast.WS && tok != ast.COMMENT {
  41. p.i = (p.i + 1) % len(p.buf)
  42. buf := &p.buf[p.i]
  43. buf.tok, buf.lit = tok, lit
  44. }
  45. return
  46. }
  47. func (p *Parser) curr() (ast.Token, string) {
  48. i := (p.i - p.n + len(p.buf)) % len(p.buf)
  49. buf := &p.buf[i]
  50. return buf.tok, buf.lit
  51. }
  52. func (p *Parser) scanIgnoreWhitespace() (tok ast.Token, lit string) {
  53. tok, lit = p.scan()
  54. for {
  55. if tok == ast.WS || tok == ast.COMMENT {
  56. tok, lit = p.scan()
  57. } else {
  58. break
  59. }
  60. }
  61. return tok, lit
  62. }
  63. func (p *Parser) unscan() { p.n++ }
  64. func NewParser(r io.Reader) *Parser {
  65. return &Parser{s: NewScanner(r)}
  66. }
  67. func (p *Parser) ParseQueries() ([]ast.SelectStatement, error) {
  68. var stmts []ast.SelectStatement
  69. if stmt, err := p.Parse(); err != nil {
  70. return nil, err
  71. } else {
  72. stmts = append(stmts, *stmt)
  73. }
  74. for {
  75. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.SEMICOLON {
  76. if stmt, err := p.Parse(); err != nil {
  77. return nil, err
  78. } else {
  79. if stmt != nil {
  80. stmts = append(stmts, *stmt)
  81. }
  82. }
  83. } else if tok == ast.EOF {
  84. break
  85. }
  86. }
  87. return stmts, nil
  88. }
  89. func (p *Parser) Parse() (*ast.SelectStatement, error) {
  90. selects := &ast.SelectStatement{}
  91. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.EOF {
  92. return nil, nil
  93. } else if tok != ast.SELECT {
  94. return nil, fmt.Errorf("Found %q, Expected SELECT.\n", lit)
  95. }
  96. if fields, err := p.parseFields(); err != nil {
  97. return nil, err
  98. } else {
  99. selects.Fields = fields
  100. }
  101. if src, err := p.parseSource(); err != nil {
  102. return nil, err
  103. } else {
  104. selects.Sources = src
  105. }
  106. if joins, err := p.parseJoins(); err != nil {
  107. return nil, err
  108. } else {
  109. selects.Joins = joins
  110. }
  111. if exp, err := p.parseCondition(); err != nil {
  112. return nil, err
  113. } else {
  114. if exp != nil {
  115. selects.Condition = exp
  116. }
  117. }
  118. if dims, err := p.parseDimensions(); err != nil {
  119. return nil, err
  120. } else {
  121. selects.Dimensions = dims
  122. }
  123. if having, err := p.parseHaving(); err != nil {
  124. return nil, err
  125. } else {
  126. selects.Having = having
  127. }
  128. if sorts, err := p.parseSorts(); err != nil {
  129. return nil, err
  130. } else {
  131. selects.SortFields = sorts
  132. }
  133. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.SEMICOLON {
  134. p.unscan()
  135. return selects, nil
  136. } else if tok != ast.EOF {
  137. return nil, fmt.Errorf("found %q, expected EOF.", lit)
  138. }
  139. if err := Validate(selects); err != nil {
  140. return nil, err
  141. }
  142. return selects, nil
  143. }
  144. func (p *Parser) parseSource() (ast.Sources, error) {
  145. var sources ast.Sources
  146. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.FROM {
  147. return nil, fmt.Errorf("found %q, expected FROM.", lit)
  148. }
  149. if src, alias, err := p.parseSourceLiteral(); err != nil {
  150. return nil, err
  151. } else {
  152. sources = append(sources, &ast.Table{Name: src, Alias: alias})
  153. }
  154. return sources, nil
  155. }
  156. //TODO Current func has problems when the source includes white space.
  157. func (p *Parser) parseSourceLiteral() (string, string, error) {
  158. var sourceSeg []string
  159. var alias string
  160. for {
  161. //HASH, DIV & ADD token is specially support for MQTT topic name patterns.
  162. if tok, lit := p.scanIgnoreWhitespace(); tok.AllowedSourceToken() {
  163. sourceSeg = append(sourceSeg, lit)
  164. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == ast.AS {
  165. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  166. alias = lit2
  167. } else {
  168. return "", "", fmt.Errorf("found %q, expected JOIN key word.", lit)
  169. }
  170. } else if tok1.AllowedSourceToken() {
  171. sourceSeg = append(sourceSeg, lit1)
  172. } else {
  173. p.unscan()
  174. break
  175. }
  176. } else {
  177. p.unscan()
  178. break
  179. }
  180. }
  181. return strings.Join(sourceSeg, ""), alias, nil
  182. }
  183. func (p *Parser) parseFieldNameSections() ([]string, error) {
  184. var fieldNameSects []string
  185. for {
  186. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.IDENT || tok == ast.ASTERISK {
  187. fieldNameSects = append(fieldNameSects, lit)
  188. if tok1, _ := p.scanIgnoreWhitespace(); !tok1.AllowedSFNToken() {
  189. p.unscan()
  190. break
  191. }
  192. } else {
  193. p.unscan()
  194. break
  195. }
  196. }
  197. if len(fieldNameSects) == 0 {
  198. return nil, fmt.Errorf("Cannot find any field name.\n")
  199. } else if len(fieldNameSects) > 2 {
  200. return nil, fmt.Errorf("Too many field names. Please use -> to reference keys in struct.\n")
  201. }
  202. return fieldNameSects, nil
  203. }
  204. func (p *Parser) parseJoins() (ast.Joins, error) {
  205. var joins ast.Joins
  206. for {
  207. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.INNER || tok == ast.LEFT || tok == ast.RIGHT || tok == ast.FULL || tok == ast.CROSS {
  208. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.JOIN {
  209. var jt = ast.INNER_JOIN
  210. switch tok {
  211. case ast.INNER:
  212. jt = ast.INNER_JOIN
  213. case ast.LEFT:
  214. jt = ast.LEFT_JOIN
  215. case ast.RIGHT:
  216. jt = ast.RIGHT_JOIN
  217. case ast.FULL:
  218. jt = ast.FULL_JOIN
  219. case ast.CROSS:
  220. jt = ast.CROSS_JOIN
  221. }
  222. if j, err := p.ParseJoin(jt); err != nil {
  223. return nil, err
  224. } else {
  225. joins = append(joins, *j)
  226. }
  227. } else {
  228. return nil, fmt.Errorf("found %q, expected JOIN key word.", lit)
  229. }
  230. } else {
  231. p.unscan()
  232. if len(joins) > 0 {
  233. return joins, nil
  234. }
  235. return nil, nil
  236. }
  237. }
  238. return joins, nil
  239. }
  240. func (p *Parser) ParseJoin(joinType ast.JoinType) (*ast.Join, error) {
  241. var j = &ast.Join{JoinType: joinType}
  242. if src, alias, err := p.parseSourceLiteral(); err != nil {
  243. return nil, err
  244. } else {
  245. j.Name = src
  246. j.Alias = alias
  247. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.ON {
  248. if ast.CROSS_JOIN == joinType {
  249. return nil, fmt.Errorf("On expression is not required for cross join type.\n")
  250. }
  251. if exp, err := p.ParseExpr(); err != nil {
  252. return nil, err
  253. } else {
  254. j.Expr = exp
  255. }
  256. } else {
  257. p.unscan()
  258. }
  259. }
  260. return j, nil
  261. }
  262. func (p *Parser) parseDimensions() (ast.Dimensions, error) {
  263. var ds ast.Dimensions
  264. if t, _ := p.scanIgnoreWhitespace(); t == ast.GROUP {
  265. if t1, l1 := p.scanIgnoreWhitespace(); t1 == ast.BY {
  266. for {
  267. if exp, err := p.ParseExpr(); err != nil {
  268. return nil, err
  269. } else {
  270. d := ast.Dimension{Expr: exp}
  271. ds = append(ds, d)
  272. }
  273. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.COMMA {
  274. continue
  275. } else {
  276. p.unscan()
  277. break
  278. }
  279. }
  280. } else {
  281. return nil, fmt.Errorf("found %q, expected BY statement.", l1)
  282. }
  283. } else {
  284. p.unscan()
  285. }
  286. return ds, nil
  287. }
  288. func (p *Parser) parseHaving() (ast.Expr, error) {
  289. if tok, _ := p.scanIgnoreWhitespace(); tok != ast.HAVING {
  290. p.unscan()
  291. return nil, nil
  292. }
  293. expr, err := p.ParseExpr()
  294. if err != nil {
  295. return nil, err
  296. }
  297. return expr, nil
  298. }
  299. func (p *Parser) parseSorts() (ast.SortFields, error) {
  300. var ss ast.SortFields
  301. if t, _ := p.scanIgnoreWhitespace(); t == ast.ORDER {
  302. if t1, l1 := p.scanIgnoreWhitespace(); t1 == ast.BY {
  303. for {
  304. if t1, l1 = p.scanIgnoreWhitespace(); t1 == ast.IDENT {
  305. s := ast.SortField{Ascending: true}
  306. p.unscan()
  307. if name, err := p.parseFieldNameSections(); err == nil {
  308. s.Name = strings.Join(name, ast.COLUMN_SEPARATOR)
  309. } else {
  310. return nil, err
  311. }
  312. if t2, _ := p.scanIgnoreWhitespace(); t2 == ast.DESC {
  313. s.Ascending = false
  314. ss = append(ss, s)
  315. } else if t2 == ast.ASC {
  316. ss = append(ss, s)
  317. } else {
  318. ss = append(ss, s)
  319. p.unscan()
  320. continue
  321. }
  322. } else if t1 == ast.COMMA {
  323. continue
  324. } else {
  325. p.unscan()
  326. break
  327. }
  328. }
  329. } else {
  330. return nil, fmt.Errorf("found %q, expected BY keyword.", l1)
  331. }
  332. } else {
  333. p.unscan()
  334. }
  335. return ss, nil
  336. }
  337. func (p *Parser) parseFields() (ast.Fields, error) {
  338. var fields ast.Fields
  339. tok, _ := p.scanIgnoreWhitespace()
  340. if tok == ast.ASTERISK {
  341. fields = append(fields, ast.Field{AName: "", Expr: &ast.Wildcard{Token: tok}})
  342. return fields, nil
  343. }
  344. p.unscan()
  345. for {
  346. field, err := p.parseField()
  347. if err != nil {
  348. return nil, err
  349. } else {
  350. fields = append(fields, *field)
  351. }
  352. tok, _ = p.scanIgnoreWhitespace()
  353. if tok != ast.COMMA {
  354. p.unscan()
  355. break
  356. }
  357. }
  358. return fields, nil
  359. }
  360. func (p *Parser) parseField() (*ast.Field, error) {
  361. field := &ast.Field{}
  362. if exp, err := p.ParseExpr(); err != nil {
  363. return nil, err
  364. } else {
  365. if e, ok := exp.(*ast.FieldRef); ok {
  366. field.Name = e.Name
  367. } else if e, ok := exp.(*ast.Call); ok {
  368. field.Name = e.Name
  369. }
  370. field.Expr = exp
  371. }
  372. if alias, err := p.parseAlias(); err != nil {
  373. return nil, err
  374. } else {
  375. if alias != "" {
  376. field.AName = alias
  377. }
  378. }
  379. return field, nil
  380. }
  381. func (p *Parser) parseAlias() (string, error) {
  382. tok, lit := p.scanIgnoreWhitespace()
  383. if tok == ast.AS {
  384. if tok, lit = p.scanIgnoreWhitespace(); tok != ast.IDENT {
  385. return "", fmt.Errorf("found %q, expected as alias.", lit)
  386. } else {
  387. return lit, nil
  388. }
  389. }
  390. p.unscan()
  391. return "", nil
  392. }
  393. func (p *Parser) ParseExpr() (ast.Expr, error) {
  394. var err error
  395. root := &ast.BinaryExpr{}
  396. root.RHS, err = p.parseUnaryExpr(false)
  397. if err != nil {
  398. return nil, err
  399. }
  400. for {
  401. op, _ := p.scanIgnoreWhitespace()
  402. if !op.IsOperator() {
  403. p.unscan()
  404. return root.RHS, nil
  405. } else if op == ast.ASTERISK { //Change the asterisk to Mul token.
  406. op = ast.MUL
  407. } else if op == ast.LBRACKET { //LBRACKET is a special token, need to unscan
  408. op = ast.SUBSET
  409. p.unscan()
  410. }
  411. var rhs ast.Expr
  412. if rhs, err = p.parseUnaryExpr(op == ast.ARROW); err != nil {
  413. return nil, err
  414. }
  415. for node := root; ; {
  416. r, ok := node.RHS.(*ast.BinaryExpr)
  417. if !ok || r.OP.Precedence() >= op.Precedence() {
  418. node.RHS = &ast.BinaryExpr{LHS: node.RHS, RHS: rhs, OP: op}
  419. break
  420. }
  421. node = r
  422. }
  423. }
  424. return nil, nil
  425. }
  426. func (p *Parser) parseUnaryExpr(isSubField bool) (ast.Expr, error) {
  427. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.LPAREN {
  428. expr, err := p.ParseExpr()
  429. if err != nil {
  430. return nil, err
  431. }
  432. // Expect an RPAREN at the end.
  433. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != ast.RPAREN {
  434. return nil, fmt.Errorf("found %q, expected right paren.", lit2)
  435. }
  436. return &ast.ParenExpr{Expr: expr}, nil
  437. } else if tok1 == ast.LBRACKET {
  438. return p.parseBracketExpr()
  439. }
  440. p.unscan()
  441. tok, lit := p.scanIgnoreWhiteSpaceWithNegativeNum()
  442. if tok == ast.CASE {
  443. return p.parseCaseExpr()
  444. } else if tok == ast.IDENT {
  445. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.LPAREN {
  446. return p.parseCall(lit)
  447. }
  448. p.unscan() //Back the Lparen token
  449. p.unscan() //Back the ident token
  450. if n, err := p.parseFieldNameSections(); err != nil {
  451. return nil, err
  452. } else {
  453. if p.inmeta {
  454. if len(n) == 2 {
  455. return &ast.MetaRef{StreamName: ast.StreamName(n[0]), Name: n[1]}, nil
  456. }
  457. if isSubField {
  458. return &ast.MetaRef{StreamName: "", Name: n[0]}, nil
  459. }
  460. return &ast.MetaRef{StreamName: ast.DefaultStream, Name: n[0]}, nil
  461. } else {
  462. if len(n) == 2 {
  463. return &ast.FieldRef{StreamName: ast.StreamName(n[0]), Name: n[1]}, nil
  464. }
  465. if isSubField {
  466. return &ast.FieldRef{StreamName: "", Name: n[0]}, nil
  467. }
  468. return &ast.FieldRef{StreamName: ast.DefaultStream, Name: n[0]}, nil
  469. }
  470. }
  471. } else if tok == ast.STRING {
  472. return &ast.StringLiteral{Val: lit}, nil
  473. } else if tok == ast.INTEGER {
  474. val, _ := strconv.Atoi(lit)
  475. return &ast.IntegerLiteral{Val: val}, nil
  476. } else if tok == ast.NUMBER {
  477. if v, err := strconv.ParseFloat(lit, 64); err != nil {
  478. return nil, fmt.Errorf("found %q, invalid number value.", lit)
  479. } else {
  480. return &ast.NumberLiteral{Val: v}, nil
  481. }
  482. } else if tok == ast.TRUE || tok == ast.FALSE {
  483. if v, err := strconv.ParseBool(lit); err != nil {
  484. return nil, fmt.Errorf("found %q, invalid boolean value.", lit)
  485. } else {
  486. return &ast.BooleanLiteral{Val: v}, nil
  487. }
  488. } else if tok.IsTimeLiteral() {
  489. return &ast.TimeLiteral{Val: tok}, nil
  490. }
  491. return nil, fmt.Errorf("found %q, expected expression.", lit)
  492. }
  493. func (p *Parser) parseBracketExpr() (ast.Expr, error) {
  494. tok2, lit2 := p.scanIgnoreWhiteSpaceWithNegativeNum()
  495. if tok2 == ast.RBRACKET {
  496. //field[]
  497. return &ast.ColonExpr{Start: 0, End: math.MinInt32}, nil
  498. } else if tok2 == ast.INTEGER {
  499. start, err := strconv.Atoi(lit2)
  500. if err != nil {
  501. return nil, fmt.Errorf("The start index %s is not an int value in bracket expression.", lit2)
  502. }
  503. if tok3, _ := p.scanIgnoreWhitespace(); tok3 == ast.RBRACKET {
  504. //Such as field[2]
  505. return &ast.IndexExpr{Index: start}, nil
  506. } else if tok3 == ast.COLON {
  507. //Such as field[2:] or field[2:4]
  508. return p.parseColonExpr(start)
  509. }
  510. } else if tok2 == ast.COLON {
  511. //Such as field[:3] or [:]
  512. return p.parseColonExpr(0)
  513. }
  514. return nil, fmt.Errorf("Unexpected token %q. when parsing bracket expressions.", lit2)
  515. }
  516. func (p *Parser) parseColonExpr(start int) (ast.Expr, error) {
  517. tok, lit := p.scanIgnoreWhiteSpaceWithNegativeNum()
  518. if tok == ast.INTEGER {
  519. end, err := strconv.Atoi(lit)
  520. if err != nil {
  521. return nil, fmt.Errorf("The end index %s is not an int value in bracket expression.", lit)
  522. }
  523. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == ast.RBRACKET {
  524. return &ast.ColonExpr{Start: start, End: end}, nil
  525. } else {
  526. return nil, fmt.Errorf("Found %q, expected right bracket.", lit1)
  527. }
  528. } else if tok == ast.RBRACKET {
  529. return &ast.ColonExpr{Start: start, End: math.MinInt32}, nil
  530. }
  531. return nil, fmt.Errorf("Found %q, expected right bracket.", lit)
  532. }
  533. func (p *Parser) scanIgnoreWhiteSpaceWithNegativeNum() (ast.Token, string) {
  534. tok, lit := p.scanIgnoreWhitespace()
  535. if tok == ast.SUB {
  536. _, _ = p.s.ScanWhiteSpace()
  537. r := p.s.read()
  538. if isDigit(r) {
  539. p.s.unread()
  540. tok, lit = p.s.ScanNumber(false, true)
  541. }
  542. }
  543. return tok, lit
  544. }
  545. func (p *Parser) parseAs(f *ast.Field) (*ast.Field, error) {
  546. tok, lit := p.scanIgnoreWhitespace()
  547. if tok != ast.IDENT {
  548. return nil, fmt.Errorf("found %q, expected as alias.", lit)
  549. }
  550. f.AName = lit
  551. return f, nil
  552. }
  553. func (p *Parser) parseCall(name string) (ast.Expr, error) {
  554. if strings.ToLower(name) == "meta" || strings.ToLower(name) == "mqtt" {
  555. p.inmeta = true
  556. defer func() {
  557. p.inmeta = false
  558. }()
  559. }
  560. var args []ast.Expr
  561. for {
  562. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.RPAREN {
  563. if valErr := validateFuncs(name, nil); valErr != nil {
  564. return nil, valErr
  565. }
  566. return &ast.Call{Name: name, Args: args}, nil
  567. } else if tok == ast.ASTERISK {
  568. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != ast.RPAREN {
  569. return nil, fmt.Errorf("found %q, expected right paren.", lit2)
  570. } else {
  571. if p.inmeta {
  572. args = append(args, &ast.MetaRef{StreamName: "", Name: "*"})
  573. } else {
  574. args = append(args, &ast.Wildcard{Token: ast.ASTERISK})
  575. }
  576. return &ast.Call{Name: name, Args: args}, nil
  577. }
  578. } else {
  579. p.unscan()
  580. }
  581. if exp, err := p.ParseExpr(); err != nil {
  582. return nil, err
  583. } else {
  584. args = append(args, exp)
  585. }
  586. if tok, _ := p.scanIgnoreWhitespace(); tok != ast.COMMA {
  587. p.unscan()
  588. break
  589. }
  590. }
  591. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.RPAREN {
  592. return nil, fmt.Errorf("found function call %q, expected ), but with %q.", name, lit)
  593. }
  594. if wt, error := validateWindows(name, args); wt == ast.NOT_WINDOW {
  595. if valErr := validateFuncs(name, args); valErr != nil {
  596. return nil, valErr
  597. }
  598. // Add context for some aggregate func
  599. if name == "deduplicate" {
  600. args = append([]ast.Expr{&ast.Wildcard{Token: ast.ASTERISK}}, args...)
  601. }
  602. return &ast.Call{Name: name, Args: args}, nil
  603. } else {
  604. if error != nil {
  605. return nil, error
  606. }
  607. win, err := p.ConvertToWindows(wt, args)
  608. if err != nil {
  609. return nil, error
  610. }
  611. // parse filter clause
  612. f, err := p.parseFilter()
  613. if err != nil {
  614. return nil, err
  615. } else if f != nil {
  616. win.Filter = f
  617. }
  618. return win, nil
  619. }
  620. }
  621. func (p *Parser) parseCaseExpr() (*ast.CaseExpr, error) {
  622. c := &ast.CaseExpr{}
  623. tok, _ := p.scanIgnoreWhitespace()
  624. p.unscan()
  625. if tok != ast.WHEN { // no condition value for case, additional validation needed
  626. if exp, err := p.ParseExpr(); err != nil {
  627. return nil, err
  628. } else {
  629. c.Value = exp
  630. }
  631. }
  632. loop:
  633. for {
  634. tok, _ := p.scanIgnoreWhitespace()
  635. switch tok {
  636. case ast.WHEN:
  637. if exp, err := p.ParseExpr(); err != nil {
  638. return nil, err
  639. } else {
  640. if c.WhenClauses == nil {
  641. c.WhenClauses = make([]*ast.WhenClause, 0)
  642. }
  643. if c.Value == nil && !ast.IsBooleanArg(exp) {
  644. return nil, fmt.Errorf("invalid CASE expression, WHEN expression must be a bool condition")
  645. }
  646. w := &ast.WhenClause{
  647. Expr: exp,
  648. }
  649. tokThen, _ := p.scanIgnoreWhitespace()
  650. if tokThen != ast.THEN {
  651. return nil, fmt.Errorf("invalid CASE expression, THEN expected after WHEN")
  652. } else {
  653. if expThen, err := p.ParseExpr(); err != nil {
  654. return nil, err
  655. } else {
  656. w.Result = expThen
  657. c.WhenClauses = append(c.WhenClauses, w)
  658. }
  659. }
  660. }
  661. case ast.ELSE:
  662. if c.WhenClauses != nil {
  663. if exp, err := p.ParseExpr(); err != nil {
  664. return nil, err
  665. } else {
  666. c.ElseClause = exp
  667. }
  668. } else {
  669. return nil, fmt.Errorf("invalid CASE expression, WHEN expected before ELSE")
  670. }
  671. case ast.END:
  672. if c.WhenClauses != nil {
  673. break loop
  674. } else {
  675. return nil, fmt.Errorf("invalid CASE expression, WHEN expected before END")
  676. }
  677. default:
  678. return nil, fmt.Errorf("invalid CASE expression, END expected")
  679. }
  680. }
  681. return c, nil
  682. }
  683. func validateWindows(name string, args []ast.Expr) (ast.WindowType, error) {
  684. fname := strings.ToLower(name)
  685. switch fname {
  686. case "tumblingwindow":
  687. if err := validateWindow(fname, 2, args); err != nil {
  688. return ast.TUMBLING_WINDOW, err
  689. }
  690. return ast.TUMBLING_WINDOW, nil
  691. case "hoppingwindow":
  692. if err := validateWindow(fname, 3, args); err != nil {
  693. return ast.HOPPING_WINDOW, err
  694. }
  695. return ast.HOPPING_WINDOW, nil
  696. case "sessionwindow":
  697. if err := validateWindow(fname, 3, args); err != nil {
  698. return ast.SESSION_WINDOW, err
  699. }
  700. return ast.SESSION_WINDOW, nil
  701. case "slidingwindow":
  702. if err := validateWindow(fname, 2, args); err != nil {
  703. return ast.SLIDING_WINDOW, err
  704. }
  705. return ast.SLIDING_WINDOW, nil
  706. case "countwindow":
  707. if len(args) == 1 {
  708. if para1, ok := args[0].(*ast.IntegerLiteral); ok && para1.Val > 0 {
  709. return ast.COUNT_WINDOW, nil
  710. } else {
  711. return ast.COUNT_WINDOW, fmt.Errorf("Invalid parameter value %s.", args[0])
  712. }
  713. } else if len(args) == 2 {
  714. if para1, ok1 := args[0].(*ast.IntegerLiteral); ok1 {
  715. if para2, ok2 := args[1].(*ast.IntegerLiteral); ok2 {
  716. if para1.Val < para2.Val {
  717. return ast.COUNT_WINDOW, fmt.Errorf("The second parameter value %d should be less than the first parameter %d.", para2.Val, para1.Val)
  718. } else {
  719. return ast.COUNT_WINDOW, nil
  720. }
  721. }
  722. }
  723. return ast.COUNT_WINDOW, fmt.Errorf("Invalid parameter value %s, %s.", args[0], args[1])
  724. } else {
  725. return ast.COUNT_WINDOW, fmt.Errorf("Invalid parameter count.")
  726. }
  727. }
  728. return ast.NOT_WINDOW, nil
  729. }
  730. func validateWindow(funcName string, expectLen int, args []ast.Expr) error {
  731. if len(args) != expectLen {
  732. return fmt.Errorf("The arguments for %s should be %d.\n", funcName, expectLen)
  733. }
  734. if _, ok := args[0].(*ast.TimeLiteral); !ok {
  735. return fmt.Errorf("The 1st argument for %s is expecting timer literal expression. One value of [dd|hh|mi|ss|ms].\n", funcName)
  736. }
  737. for i := 1; i < len(args); i++ {
  738. if _, ok := args[i].(*ast.IntegerLiteral); !ok {
  739. return fmt.Errorf("The %d argument for %s is expecting interger literal expression. \n", i, funcName)
  740. }
  741. }
  742. return nil
  743. }
  744. func (p *Parser) ConvertToWindows(wtype ast.WindowType, args []ast.Expr) (*ast.Window, error) {
  745. win := &ast.Window{WindowType: wtype}
  746. if wtype == ast.COUNT_WINDOW {
  747. win.Length = &ast.IntegerLiteral{Val: args[0].(*ast.IntegerLiteral).Val}
  748. if len(args) == 2 {
  749. win.Interval = &ast.IntegerLiteral{Val: args[1].(*ast.IntegerLiteral).Val}
  750. }
  751. return win, nil
  752. }
  753. var unit = 1
  754. v := args[0].(*ast.TimeLiteral).Val
  755. switch v {
  756. case ast.DD:
  757. unit = 24 * 3600 * 1000
  758. case ast.HH:
  759. unit = 3600 * 1000
  760. case ast.MI:
  761. unit = 60 * 1000
  762. case ast.SS:
  763. unit = 1000
  764. case ast.MS:
  765. unit = 1
  766. default:
  767. return nil, fmt.Errorf("Invalid timeliteral %s", v)
  768. }
  769. win.Length = &ast.IntegerLiteral{Val: args[1].(*ast.IntegerLiteral).Val * unit}
  770. if len(args) > 2 {
  771. win.Interval = &ast.IntegerLiteral{Val: args[2].(*ast.IntegerLiteral).Val * unit}
  772. } else {
  773. win.Interval = &ast.IntegerLiteral{Val: 0}
  774. }
  775. return win, nil
  776. }
  777. func (p *Parser) ParseCreateStmt() (ast.Statement, error) {
  778. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.CREATE {
  779. tok1, lit1 := p.scanIgnoreWhitespace()
  780. stmt := &ast.StreamStmt{}
  781. switch tok1 {
  782. case ast.STREAM:
  783. stmt.StreamType = ast.TypeStream
  784. case ast.TABLE:
  785. stmt.StreamType = ast.TypeTable
  786. default:
  787. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  788. }
  789. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  790. stmt.Name = ast.StreamName(lit2)
  791. if fields, err := p.parseStreamFields(); err != nil {
  792. return nil, err
  793. } else {
  794. stmt.StreamFields = fields
  795. }
  796. if opts, err := p.parseStreamOptions(); err != nil {
  797. return nil, err
  798. } else {
  799. stmt.Options = opts
  800. }
  801. if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == ast.SEMICOLON {
  802. p.unscan()
  803. } else if tok3 == ast.EOF {
  804. //Finish parsing create stream statement. Jump to validate
  805. } else {
  806. return nil, fmt.Errorf("found %q, expected semicolon or EOF.", lit3)
  807. }
  808. } else {
  809. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  810. }
  811. if valErr := validateStream(stmt); valErr != nil {
  812. return nil, valErr
  813. }
  814. return stmt, nil
  815. } else {
  816. p.unscan()
  817. return nil, nil
  818. }
  819. }
  820. // TODO more accurate validation for table
  821. func validateStream(stmt *ast.StreamStmt) error {
  822. f := stmt.Options.FORMAT
  823. if f == "" {
  824. f = message.FormatJson
  825. }
  826. switch strings.ToLower(f) {
  827. case message.FormatJson:
  828. //do nothing
  829. case message.FormatBinary:
  830. if stmt.StreamType == ast.TypeTable {
  831. return fmt.Errorf("'binary' format is not supported for table")
  832. }
  833. switch len(stmt.StreamFields) {
  834. case 0:
  835. // do nothing for schemaless
  836. case 1:
  837. f := stmt.StreamFields[0]
  838. if bt, ok := f.FieldType.(*ast.BasicType); ok {
  839. if bt.Type == ast.BYTEA {
  840. break
  841. }
  842. }
  843. return fmt.Errorf("'binary' format stream can have only 'bytea' type field")
  844. default:
  845. return fmt.Errorf("'binary' format stream can have only one field")
  846. }
  847. default:
  848. return fmt.Errorf("option 'format=%s' is invalid", f)
  849. }
  850. return nil
  851. }
  852. func (p *Parser) parseShowStmt() (ast.Statement, error) {
  853. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.SHOW {
  854. tok1, lit1 := p.scanIgnoreWhitespace()
  855. switch tok1 {
  856. case ast.STREAMS:
  857. ss := &ast.ShowStreamsStatement{}
  858. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EOF || tok2 == ast.SEMICOLON {
  859. return ss, nil
  860. } else {
  861. return nil, fmt.Errorf("found %q, expected semecolon or EOF.", lit2)
  862. }
  863. case ast.TABLES:
  864. ss := &ast.ShowTablesStatement{}
  865. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EOF || tok2 == ast.SEMICOLON {
  866. return ss, nil
  867. } else {
  868. return nil, fmt.Errorf("found %q, expected semecolon or EOF.", lit2)
  869. }
  870. default:
  871. return nil, fmt.Errorf("found %q, expected keyword streams or tables.", lit1)
  872. }
  873. } else {
  874. p.unscan()
  875. return nil, nil
  876. }
  877. }
  878. func (p *Parser) parseDescribeStmt() (ast.Statement, error) {
  879. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.DESCRIBE {
  880. tok1, lit1 := p.scanIgnoreWhitespace()
  881. switch tok1 {
  882. case ast.STREAM:
  883. dss := &ast.DescribeStreamStatement{}
  884. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  885. dss.Name = lit2
  886. return dss, nil
  887. } else {
  888. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  889. }
  890. case ast.TABLE:
  891. dss := &ast.DescribeTableStatement{}
  892. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  893. dss.Name = lit2
  894. return dss, nil
  895. } else {
  896. return nil, fmt.Errorf("found %q, expected table name.", lit2)
  897. }
  898. default:
  899. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  900. }
  901. } else {
  902. p.unscan()
  903. return nil, nil
  904. }
  905. }
  906. func (p *Parser) parseExplainStmt() (ast.Statement, error) {
  907. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.EXPLAIN {
  908. tok1, lit1 := p.scanIgnoreWhitespace()
  909. switch tok1 {
  910. case ast.STREAM:
  911. ess := &ast.ExplainStreamStatement{}
  912. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  913. ess.Name = lit2
  914. return ess, nil
  915. } else {
  916. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  917. }
  918. case ast.TABLE:
  919. ess := &ast.ExplainTableStatement{}
  920. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  921. ess.Name = lit2
  922. return ess, nil
  923. } else {
  924. return nil, fmt.Errorf("found %q, expected table name.", lit2)
  925. }
  926. default:
  927. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  928. }
  929. } else {
  930. p.unscan()
  931. return nil, nil
  932. }
  933. }
  934. func (p *Parser) parseDropStmt() (ast.Statement, error) {
  935. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.DROP {
  936. tok1, lit1 := p.scanIgnoreWhitespace()
  937. switch tok1 {
  938. case ast.STREAM:
  939. ess := &ast.DropStreamStatement{}
  940. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  941. ess.Name = lit2
  942. return ess, nil
  943. } else {
  944. return nil, fmt.Errorf("found %q, expected stream name.", lit2)
  945. }
  946. case ast.TABLE:
  947. ess := &ast.DropTableStatement{}
  948. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
  949. ess.Name = lit2
  950. return ess, nil
  951. } else {
  952. return nil, fmt.Errorf("found %q, expected table name.", lit2)
  953. }
  954. default:
  955. return nil, fmt.Errorf("found %q, expected keyword stream or table.", lit1)
  956. }
  957. } else {
  958. p.unscan()
  959. return nil, nil
  960. }
  961. }
  962. func (p *Parser) parseStreamFields() (ast.StreamFields, error) {
  963. lStack := &stack.Stack{}
  964. var fields ast.StreamFields
  965. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
  966. lStack.Push(lit)
  967. for {
  968. //For the schemaless streams
  969. //create stream demo () WITH (FORMAT="JSON", DATASOURCE="demo" TYPE="edgex")
  970. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.RPAREN {
  971. lStack.Pop()
  972. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != ast.WITH {
  973. return nil, fmt.Errorf("found %q, expected is with.", lit2)
  974. }
  975. return fields, nil
  976. } else {
  977. p.unscan()
  978. }
  979. if f, err := p.parseStreamField(); err != nil {
  980. return nil, err
  981. } else {
  982. fields = append(fields, *f)
  983. }
  984. if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.RPAREN {
  985. lStack.Pop()
  986. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.WITH {
  987. //Check the stack for LPAREN; If the stack for LPAREN is not zero, then it's not correct.
  988. if lStack.Len() > 0 {
  989. return nil, fmt.Errorf("Parenthesis is not matched.")
  990. }
  991. break
  992. } else if tok2 == ast.COMMA {
  993. if lStack.Len() > 0 {
  994. return nil, fmt.Errorf("Parenthesis is in create record type not matched.")
  995. }
  996. p.unscan()
  997. break
  998. } else if tok2 == ast.RPAREN { //The nested type definition of ARRAY and Struct, such as "field ARRAY(STRUCT(f BIGINT))"
  999. if lStack.Len() > 0 {
  1000. return nil, fmt.Errorf("Parenthesis is not matched.")
  1001. }
  1002. p.unscan()
  1003. break
  1004. } else {
  1005. if lStack.Len() == 0 {
  1006. return nil, fmt.Errorf("found %q, expected is with.", lit2)
  1007. }
  1008. p.unscan()
  1009. }
  1010. } else {
  1011. p.unscan()
  1012. }
  1013. }
  1014. } else {
  1015. return nil, fmt.Errorf("found %q, expected lparen after stream name.", lit)
  1016. }
  1017. return fields, nil
  1018. }
  1019. func (p *Parser) parseStreamField() (*ast.StreamField, error) {
  1020. field := &ast.StreamField{}
  1021. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.IDENT {
  1022. field.Name = lit
  1023. tok1, lit1 := p.scanIgnoreWhitespace()
  1024. if t := ast.GetDataType(tok1); t != ast.UNKNOWN && t.IsSimpleType() {
  1025. field.FieldType = &ast.BasicType{Type: t}
  1026. } else if t == ast.ARRAY {
  1027. if f, e := p.parseStreamArrayType(); e != nil {
  1028. return nil, e
  1029. } else {
  1030. field.FieldType = f
  1031. }
  1032. } else if t == ast.STRUCT {
  1033. if f, e := p.parseStreamStructType(); e != nil {
  1034. return nil, e
  1035. } else {
  1036. field.FieldType = f
  1037. }
  1038. } else if t == ast.UNKNOWN {
  1039. return nil, fmt.Errorf("found %q, expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | BYTEA | ARRAY | STRUCT).", lit1)
  1040. }
  1041. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.COMMA {
  1042. //Just consume the comma.
  1043. } else if tok2 == ast.RPAREN {
  1044. p.unscan()
  1045. } else {
  1046. return nil, fmt.Errorf("found %q, expect comma or rparen.", lit2)
  1047. }
  1048. } else {
  1049. return nil, fmt.Errorf("found %q, expect stream field name.", lit)
  1050. }
  1051. return field, nil
  1052. }
  1053. func (p *Parser) parseStreamArrayType() (ast.FieldType, error) {
  1054. lStack := &stack.Stack{}
  1055. if tok, _ := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
  1056. lStack.Push(ast.LPAREN)
  1057. tok1, lit1 := p.scanIgnoreWhitespace()
  1058. if t := ast.GetDataType(tok1); t != ast.UNKNOWN && t.IsSimpleType() {
  1059. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.RPAREN {
  1060. lStack.Pop()
  1061. if lStack.Len() > 0 {
  1062. return nil, fmt.Errorf("Parenthesis is in array type not matched.")
  1063. }
  1064. return &ast.ArrayType{Type: t}, nil
  1065. } else {
  1066. return nil, fmt.Errorf("found %q, expect rparen in array type definition.", lit2)
  1067. }
  1068. } else if tok1 == ast.XSTRUCT {
  1069. if f, err := p.parseStreamStructType(); err != nil {
  1070. return nil, err
  1071. } else {
  1072. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.RPAREN {
  1073. lStack.Pop()
  1074. if lStack.Len() > 0 {
  1075. return nil, fmt.Errorf("Parenthesis is in struct of array type %q not matched.", tok1)
  1076. }
  1077. return &ast.ArrayType{Type: ast.STRUCT, FieldType: f}, nil
  1078. } else {
  1079. return nil, fmt.Errorf("found %q, expect rparen in struct of array type definition.", lit2)
  1080. }
  1081. }
  1082. } else if tok1 == ast.COMMA {
  1083. p.unscan()
  1084. } else {
  1085. return nil, fmt.Errorf("found %q, expect stream data types.", lit1)
  1086. }
  1087. } else {
  1088. }
  1089. return nil, nil
  1090. }
  1091. func (p *Parser) parseStreamStructType() (ast.FieldType, error) {
  1092. rf := &ast.RecType{}
  1093. if sfs, err := p.parseStreamFields(); err != nil {
  1094. return nil, err
  1095. } else {
  1096. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.COMMA {
  1097. rf.StreamFields = sfs
  1098. p.unscan()
  1099. } else if tok2 == ast.RPAREN {
  1100. rf.StreamFields = sfs
  1101. p.unscan()
  1102. } else {
  1103. return nil, fmt.Errorf("found %q, expect comma in create stream record statement.", lit2)
  1104. }
  1105. }
  1106. return rf, nil
  1107. }
  1108. func (p *Parser) parseStreamOptions() (*ast.Options, error) {
  1109. opts := &ast.Options{}
  1110. v := reflect.ValueOf(opts)
  1111. lStack := &stack.Stack{}
  1112. if tok, lit := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
  1113. lStack.Push(ast.LPAREN)
  1114. for {
  1115. if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == ast.DATASOURCE || tok1 == ast.FORMAT || tok1 == ast.KEY || tok1 == ast.CONF_KEY || tok1 == ast.STRICT_VALIDATION || tok1 == ast.TYPE || tok1 == ast.TIMESTAMP || tok1 == ast.TIMESTAMP_FORMAT || tok1 == ast.RETAIN_SIZE || tok1 == ast.SHARED {
  1116. if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EQ {
  1117. if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == ast.STRING {
  1118. switch tok1 {
  1119. case ast.STRICT_VALIDATION:
  1120. if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
  1121. return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, tok1)
  1122. } else {
  1123. opts.STRICT_VALIDATION = (val == "TRUE")
  1124. }
  1125. case ast.RETAIN_SIZE:
  1126. if val, err := strconv.Atoi(lit3); err != nil {
  1127. return nil, fmt.Errorf("found %q, expect number value in %s option.", lit3, tok1)
  1128. } else {
  1129. opts.RETAIN_SIZE = val
  1130. }
  1131. case ast.SHARED:
  1132. if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
  1133. return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, tok1)
  1134. } else {
  1135. opts.SHARED = (val == "TRUE")
  1136. }
  1137. default:
  1138. f := v.Elem().FieldByName(lit1)
  1139. if f.IsValid() {
  1140. f.SetString(lit3)
  1141. } else { // should not happen
  1142. return nil, fmt.Errorf("invalid field %s.", lit1)
  1143. }
  1144. }
  1145. } else {
  1146. return nil, fmt.Errorf("found %q, expect string value in option.", lit3)
  1147. }
  1148. } else {
  1149. return nil, fmt.Errorf("found %q, expect equals(=) in options.", lit2)
  1150. }
  1151. } else if tok1 == ast.COMMA {
  1152. continue
  1153. } else if tok1 == ast.RPAREN {
  1154. if lStack.Pop(); lStack.Len() == 0 {
  1155. break
  1156. } else {
  1157. return nil, fmt.Errorf("Parenthesis is not matched in options definition.")
  1158. }
  1159. } else {
  1160. return nil, fmt.Errorf("found %q, unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE).", lit1)
  1161. }
  1162. }
  1163. } else {
  1164. return nil, fmt.Errorf("found %q, expect stream options.", lit)
  1165. }
  1166. return opts, nil
  1167. }
  1168. // Only support filter on window now
  1169. func (p *Parser) parseFilter() (ast.Expr, error) {
  1170. if tok, _ := p.scanIgnoreWhitespace(); tok != ast.FILTER {
  1171. p.unscan()
  1172. return nil, nil
  1173. }
  1174. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.LPAREN {
  1175. return nil, fmt.Errorf("Found %q after FILTER, expect parentheses.", lit)
  1176. }
  1177. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.WHERE {
  1178. return nil, fmt.Errorf("Found %q after FILTER(, expect WHERE.", lit)
  1179. }
  1180. expr, err := p.ParseExpr()
  1181. if err != nil {
  1182. return nil, err
  1183. }
  1184. if tok, lit := p.scanIgnoreWhitespace(); tok != ast.RPAREN {
  1185. return nil, fmt.Errorf("Found %q after FILTER, expect right parentheses.", lit)
  1186. }
  1187. return expr, nil
  1188. }