ソースを参照

refactor(parser): move stream management related keyword into common word

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年 前
コミット
da9fc781e5

+ 0 - 6
docs/en_US/sqls/lexical_elements.md

@@ -40,12 +40,6 @@ The following is an example for using a stream named `from`, which is a reserved
 SELECT * FROM demo1 where `from`="device1"
 SELECT * FROM demo1 where `from`="device1"
 ```
 ```
 
 
-**Reserved keywords for streams management**: If you'd like to use the following keywords in stream management command, you will have to use backtick to enclose them.
-
-```
-CREATE, RROP, EXPLAIN, DESCRIBE, SHOW, STREAM, STREAMS, WITH, BIGINT, FLOAT, STRING, DATETIME, BOOLEAN, ARRAY, STRUCT, DATASOURCE, KEY, FORMAT,CONF_KEY, TYPE, KIND, SCHEMAID, STRICT_VALIDATION, TIMESTAMP, TIMESTAMP_FORMAT
-```
-
 The following is an example for how to use reserved keywords in stream creation statement.
 The following is an example for how to use reserved keywords in stream creation statement.
 
 
 ```sql
 ```sql

+ 0 - 6
docs/zh_CN/sqls/lexical_elements.md

@@ -40,12 +40,6 @@ SELECT, FROM, JOIN, LEFT, INNER, ON, WHERE, GROUP, ORDER, HAVING, BY, ASC, DESC,
 SELECT * FROM demo1 where `from`="device1"
 SELECT * FROM demo1 where `from`="device1"
 ```
 ```
 
 
-**用于流管理的保留关键字**:如果您想在流管理命令中使用以下关键字,则必须使用反撇号将其括起来。
-
-```
-CREATE, RROP, EXPLAIN, DESCRIBE, SHOW, STREAM, STREAMS, WITH, BIGINT, FLOAT, STRING, DATETIME, BOOLEAN, ARRAY, STRUCT, DATASOURCE, KEY, FORMAT,CONF_KEY, TYPE, KIND, SCHEMAID, STRICT_VALIDATION, TIMESTAMP, TIMESTAMP_FORMAT
-```
-
 以下是如何在流创建语句中使用保留关键字的示例。
 以下是如何在流创建语句中使用保留关键字的示例。
 
 
 ```sql
 ```sql

+ 4 - 4
internal/topo/rule/ruleState_test.go

@@ -72,7 +72,7 @@ func TestCreate(t *testing.T) {
 			r: &api.Rule{
 			r: &api.Rule{
 				Triggered: false,
 				Triggered: false,
 				Id:        "test",
 				Id:        "test",
-				Sql:       "SELECT timestamp FROM demo",
+				Sql:       "SELECT FROM demo",
 				Actions: []map[string]interface{}{
 				Actions: []map[string]interface{}{
 					{
 					{
 						"log": map[string]interface{}{},
 						"log": map[string]interface{}{},
@@ -80,7 +80,7 @@ func TestCreate(t *testing.T) {
 				},
 				},
 				Options: defaultOption,
 				Options: defaultOption,
 			},
 			},
-			e: errors.New("Parse SQL SELECT timestamp FROM demo error: found \"TIMESTAMP\", expected expression.."),
+			e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
 		},
 		},
 		{
 		{
 			r: &api.Rule{
 			r: &api.Rule{
@@ -138,7 +138,7 @@ func TestUpdate(t *testing.T) {
 			r: &api.Rule{
 			r: &api.Rule{
 				Triggered: false,
 				Triggered: false,
 				Id:        "test",
 				Id:        "test",
-				Sql:       "SELECT timestamp FROM demo",
+				Sql:       "SELECT FROM demo",
 				Actions: []map[string]interface{}{
 				Actions: []map[string]interface{}{
 					{
 					{
 						"log": map[string]interface{}{},
 						"log": map[string]interface{}{},
@@ -146,7 +146,7 @@ func TestUpdate(t *testing.T) {
 				},
 				},
 				Options: defaultOption,
 				Options: defaultOption,
 			},
 			},
-			e: errors.New("Parse SQL SELECT timestamp FROM demo error: found \"TIMESTAMP\", expected expression.."),
+			e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
 		},
 		},
 		{
 		{
 			r: &api.Rule{
 			r: &api.Rule{

+ 0 - 61
internal/xsql/lexical.go

@@ -225,70 +225,10 @@ func (s *Scanner) ScanIdent() (tok ast.Token, lit string) {
 		return ast.OVER, lit
 		return ast.OVER, lit
 	case "PARTITION":
 	case "PARTITION":
 		return ast.PARTITION, lit
 		return ast.PARTITION, lit
-	case "CREATE":
-		return ast.CREATE, lit
-	case "DROP":
-		return ast.DROP, lit
-	case "EXPLAIN":
-		return ast.EXPLAIN, lit
-	case "DESCRIBE":
-		return ast.DESCRIBE, lit
-	case "SHOW":
-		return ast.SHOW, lit
-	case "STREAM":
-		return ast.STREAM, lit
-	case "STREAMS":
-		return ast.STREAMS, lit
-	case "TABLE":
-		return ast.TABLE, lit
-	case "TABLES":
-		return ast.TABLES, lit
-	case "WITH":
-		return ast.WITH, lit
-	case "BIGINT":
-		return ast.XBIGINT, lit
-	case "FLOAT":
-		return ast.XFLOAT, lit
-	case "DATETIME":
-		return ast.XDATETIME, lit
-	case "STRING":
-		return ast.XSTRING, lit
-	case "BYTEA":
-		return ast.XBYTEA, lit
-	case "BOOLEAN":
-		return ast.XBOOLEAN, lit
-	case "ARRAY":
-		return ast.XARRAY, lit
-	case "STRUCT":
-		return ast.XSTRUCT, lit
-	case "DATASOURCE":
-		return ast.DATASOURCE, lit
-	case "KEY":
-		return ast.KEY, lit
-	case "FORMAT":
-		return ast.FORMAT, lit
-	case "CONF_KEY":
-		return ast.CONF_KEY, lit
-	case "TYPE":
-		return ast.TYPE, lit
 	case "TRUE":
 	case "TRUE":
 		return ast.TRUE, lit
 		return ast.TRUE, lit
 	case "FALSE":
 	case "FALSE":
 		return ast.FALSE, lit
 		return ast.FALSE, lit
-	case "STRICT_VALIDATION":
-		return ast.STRICT_VALIDATION, lit
-	case "TIMESTAMP":
-		return ast.TIMESTAMP, lit
-	case "TIMESTAMP_FORMAT":
-		return ast.TIMESTAMP_FORMAT, lit
-	case "RETAIN_SIZE":
-		return ast.RETAIN_SIZE, lit
-	case "SHARED":
-		return ast.SHARED, lit
-	case "SCHEMAID":
-		return ast.SCHEMAID, lit
-	case "KIND":
-		return ast.KIND, lit
 	case "DD":
 	case "DD":
 		return ast.DD, lit
 		return ast.DD, lit
 	case "HH":
 	case "HH":
@@ -300,7 +240,6 @@ func (s *Scanner) ScanIdent() (tok ast.Token, lit string) {
 	case "MS":
 	case "MS":
 		return ast.MS, lit
 		return ast.MS, lit
 	}
 	}
-
 	return ast.IDENT, buf.String()
 	return ast.IDENT, buf.String()
 }
 }
 
 

+ 16 - 14
internal/xsql/manager.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@ package xsql
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
+	"strings"
 )
 )
 
 
 var (
 var (
@@ -25,39 +26,40 @@ var (
 )
 )
 
 
 type ParseTree struct {
 type ParseTree struct {
-	Handlers map[ast.Token]func(*Parser) (ast.Statement, error)
-	Tokens   map[ast.Token]*ParseTree
+	Handlers map[string]func(*Parser) (ast.Statement, error)
+	Tokens   map[string]*ParseTree
 	Keys     []string
 	Keys     []string
 }
 }
 
 
-func (t *ParseTree) Handle(tok ast.Token, fn func(*Parser) (ast.Statement, error)) {
+func (t *ParseTree) Handle(lit string, fn func(*Parser) (ast.Statement, error)) {
 	// Verify that there is no conflict for this token in this parse tree.
 	// Verify that there is no conflict for this token in this parse tree.
-	if _, conflict := t.Tokens[tok]; conflict {
-		panic(fmt.Sprintf("conflict for token %s", tok))
+	if _, conflict := t.Tokens[lit]; conflict {
+		panic(fmt.Sprintf("conflict for token %s", lit))
 	}
 	}
 
 
-	if _, conflict := t.Handlers[tok]; conflict {
-		panic(fmt.Sprintf("conflict for token %s", tok))
+	if _, conflict := t.Handlers[lit]; conflict {
+		panic(fmt.Sprintf("conflict for token %s", lit))
 	}
 	}
 
 
 	if t.Handlers == nil {
 	if t.Handlers == nil {
-		t.Handlers = make(map[ast.Token]func(*Parser) (ast.Statement, error))
+		t.Handlers = make(map[string]func(*Parser) (ast.Statement, error))
 	}
 	}
-	t.Handlers[tok] = fn
-	t.Keys = append(t.Keys, tok.String())
+	t.Handlers[lit] = fn
+	t.Keys = append(t.Keys, lit)
 }
 }
 
 
 func (pt *ParseTree) Parse(p *Parser) (ast.Statement, error) {
 func (pt *ParseTree) Parse(p *Parser) (ast.Statement, error) {
-	tok, _ := p.scanIgnoreWhitespace()
+	_, lit := p.scanIgnoreWhitespace()
+	lit = strings.ToUpper(lit)
 	p.unscan()
 	p.unscan()
-	if f, ok := pt.Handlers[tok]; ok {
+	if f, ok := pt.Handlers[lit]; ok {
 		return f(p)
 		return f(p)
 	}
 	}
 	return nil, nil
 	return nil, nil
 }
 }
 
 
 func init() {
 func init() {
-	Language.Handle(ast.SELECT, func(p *Parser) (ast.Statement, error) {
+	Language.Handle(ast.SELECT_LIT, func(p *Parser) (ast.Statement, error) {
 		return p.Parse()
 		return p.Parse()
 	})
 	})
 
 

+ 46 - 26
internal/xsql/parser.go

@@ -1045,10 +1045,13 @@ func (p *Parser) ConvertToWindows(wtype ast.WindowType, args []ast.Expr) (*ast.W
 }
 }
 
 
 func (p *Parser) ParseCreateStmt() (ast.Statement, error) {
 func (p *Parser) ParseCreateStmt() (ast.Statement, error) {
-	if tok, _ := p.scanIgnoreWhitespace(); tok == ast.CREATE {
-		tok1, lit1 := p.scanIgnoreWhitespace()
+	_, lit := p.scanIgnoreWhitespace()
+	lit = strings.ToUpper(lit)
+	if lit == ast.CREATE {
+		_, lit1 := p.scanIgnoreWhitespace()
 		stmt := &ast.StreamStmt{}
 		stmt := &ast.StreamStmt{}
-		switch tok1 {
+		lit1 = strings.ToUpper(lit1)
+		switch lit1 {
 		case ast.STREAM:
 		case ast.STREAM:
 			stmt.StreamType = ast.TypeStream
 			stmt.StreamType = ast.TypeStream
 		case ast.TABLE:
 		case ast.TABLE:
@@ -1124,9 +1127,12 @@ func validateStream(stmt *ast.StreamStmt) error {
 }
 }
 
 
 func (p *Parser) parseShowStmt() (ast.Statement, error) {
 func (p *Parser) parseShowStmt() (ast.Statement, error) {
-	if tok, _ := p.scanIgnoreWhitespace(); tok == ast.SHOW {
-		tok1, lit1 := p.scanIgnoreWhitespace()
-		switch tok1 {
+	_, lit := p.scanIgnoreWhitespace()
+	lit = strings.ToUpper(lit)
+	if lit == ast.SHOW {
+		_, lit1 := p.scanIgnoreWhitespace()
+		lit1 = strings.ToUpper(lit1)
+		switch lit1 {
 		case ast.STREAMS:
 		case ast.STREAMS:
 			ss := &ast.ShowStreamsStatement{}
 			ss := &ast.ShowStreamsStatement{}
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EOF || tok2 == ast.SEMICOLON {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EOF || tok2 == ast.SEMICOLON {
@@ -1151,9 +1157,12 @@ func (p *Parser) parseShowStmt() (ast.Statement, error) {
 }
 }
 
 
 func (p *Parser) parseDescribeStmt() (ast.Statement, error) {
 func (p *Parser) parseDescribeStmt() (ast.Statement, error) {
-	if tok, _ := p.scanIgnoreWhitespace(); tok == ast.DESCRIBE {
-		tok1, lit1 := p.scanIgnoreWhitespace()
-		switch tok1 {
+	_, lit := p.scanIgnoreWhitespace()
+	lit = strings.ToUpper(lit)
+	if lit == ast.DESCRIBE {
+		_, lit1 := p.scanIgnoreWhitespace()
+		lit1 = strings.ToUpper(lit1)
+		switch lit1 {
 		case ast.STREAM:
 		case ast.STREAM:
 			dss := &ast.DescribeStreamStatement{}
 			dss := &ast.DescribeStreamStatement{}
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
@@ -1180,9 +1189,12 @@ func (p *Parser) parseDescribeStmt() (ast.Statement, error) {
 }
 }
 
 
 func (p *Parser) parseExplainStmt() (ast.Statement, error) {
 func (p *Parser) parseExplainStmt() (ast.Statement, error) {
-	if tok, _ := p.scanIgnoreWhitespace(); tok == ast.EXPLAIN {
-		tok1, lit1 := p.scanIgnoreWhitespace()
-		switch tok1 {
+	_, lit := p.scanIgnoreWhitespace()
+	lit = strings.ToUpper(lit)
+	if lit == ast.EXPLAIN {
+		_, lit1 := p.scanIgnoreWhitespace()
+		lit1 = strings.ToUpper(lit1)
+		switch lit1 {
 		case ast.STREAM:
 		case ast.STREAM:
 			ess := &ast.ExplainStreamStatement{}
 			ess := &ast.ExplainStreamStatement{}
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
@@ -1209,9 +1221,12 @@ func (p *Parser) parseExplainStmt() (ast.Statement, error) {
 }
 }
 
 
 func (p *Parser) parseDropStmt() (ast.Statement, error) {
 func (p *Parser) parseDropStmt() (ast.Statement, error) {
-	if tok, _ := p.scanIgnoreWhitespace(); tok == ast.DROP {
-		tok1, lit1 := p.scanIgnoreWhitespace()
-		switch tok1 {
+	_, lit := p.scanIgnoreWhitespace()
+	lit = strings.ToUpper(lit)
+	if lit == ast.DROP {
+		_, lit1 := p.scanIgnoreWhitespace()
+		lit1 = strings.ToUpper(lit1)
+		switch lit1 {
 		case ast.STREAM:
 		case ast.STREAM:
 			ess := &ast.DropStreamStatement{}
 			ess := &ast.DropStreamStatement{}
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.IDENT {
@@ -1247,7 +1262,7 @@ func (p *Parser) parseStreamFields() (ast.StreamFields, error) {
 			//create stream demo () WITH (FORMAT="JSON", DATASOURCE="demo" TYPE="edgex")
 			//create stream demo () WITH (FORMAT="JSON", DATASOURCE="demo" TYPE="edgex")
 			if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.RPAREN {
 			if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.RPAREN {
 				lStack.Pop()
 				lStack.Pop()
-				if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != ast.WITH {
+				if _, lit2 := p.scanIgnoreWhitespace(); strings.ToUpper(lit2) != ast.WITH {
 					return nil, fmt.Errorf("found %q, expected is with.", lit2)
 					return nil, fmt.Errorf("found %q, expected is with.", lit2)
 				}
 				}
 				return fields, nil
 				return fields, nil
@@ -1262,7 +1277,9 @@ func (p *Parser) parseStreamFields() (ast.StreamFields, error) {
 
 
 			if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.RPAREN {
 			if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.RPAREN {
 				lStack.Pop()
 				lStack.Pop()
-				if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.WITH {
+				tok2, lit2 := p.scanIgnoreWhitespace()
+				lit2 = strings.ToUpper(lit2)
+				if lit2 == ast.WITH {
 					//Check the stack for LPAREN; If the stack for LPAREN is not zero, then it's not correct.
 					//Check the stack for LPAREN; If the stack for LPAREN is not zero, then it's not correct.
 					if lStack.Len() > 0 {
 					if lStack.Len() > 0 {
 						return nil, fmt.Errorf("Parenthesis is not matched.")
 						return nil, fmt.Errorf("Parenthesis is not matched.")
@@ -1301,8 +1318,8 @@ func (p *Parser) parseStreamField() (*ast.StreamField, error) {
 	field := &ast.StreamField{}
 	field := &ast.StreamField{}
 	if tok, lit := p.scanIgnoreWhitespace(); tok == ast.IDENT {
 	if tok, lit := p.scanIgnoreWhitespace(); tok == ast.IDENT {
 		field.Name = lit
 		field.Name = lit
-		tok1, lit1 := p.scanIgnoreWhitespace()
-		if t := ast.GetDataType(tok1); t != ast.UNKNOWN && t.IsSimpleType() {
+		_, lit1 := p.scanIgnoreWhitespace()
+		if t := ast.GetDataType(lit1); t != ast.UNKNOWN && t.IsSimpleType() {
 			field.FieldType = &ast.BasicType{Type: t}
 			field.FieldType = &ast.BasicType{Type: t}
 		} else if t == ast.ARRAY {
 		} else if t == ast.ARRAY {
 			if f, e := p.parseStreamArrayType(); e != nil {
 			if f, e := p.parseStreamArrayType(); e != nil {
@@ -1338,7 +1355,8 @@ func (p *Parser) parseStreamArrayType() (ast.FieldType, error) {
 	if tok, _ := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
 	if tok, _ := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
 		lStack.Push(ast.LPAREN)
 		lStack.Push(ast.LPAREN)
 		tok1, lit1 := p.scanIgnoreWhitespace()
 		tok1, lit1 := p.scanIgnoreWhitespace()
-		if t := ast.GetDataType(tok1); t != ast.UNKNOWN && t.IsSimpleType() {
+		t := ast.GetDataType(lit1)
+		if t != ast.UNKNOWN && t.IsSimpleType() {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.RPAREN {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.RPAREN {
 				lStack.Pop()
 				lStack.Pop()
 				if lStack.Len() > 0 {
 				if lStack.Len() > 0 {
@@ -1348,7 +1366,7 @@ func (p *Parser) parseStreamArrayType() (ast.FieldType, error) {
 			} else {
 			} else {
 				return nil, fmt.Errorf("found %q, expect rparen in array type definition.", lit2)
 				return nil, fmt.Errorf("found %q, expect rparen in array type definition.", lit2)
 			}
 			}
-		} else if tok1 == ast.XSTRUCT {
+		} else if t == ast.STRUCT {
 			if f, err := p.parseStreamStructType(); err != nil {
 			if f, err := p.parseStreamStructType(); err != nil {
 				return nil, err
 				return nil, err
 			} else {
 			} else {
@@ -1398,25 +1416,27 @@ func (p *Parser) parseStreamOptions() (*ast.Options, error) {
 	if tok, lit := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
 	if tok, lit := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
 		lStack.Push(ast.LPAREN)
 		lStack.Push(ast.LPAREN)
 		for {
 		for {
-			if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == ast.DATASOURCE || tok1 == ast.FORMAT || tok1 == ast.KEY || tok1 == ast.CONF_KEY || tok1 == ast.STRICT_VALIDATION || tok1 == ast.TYPE || tok1 == ast.TIMESTAMP || tok1 == ast.TIMESTAMP_FORMAT || tok1 == ast.RETAIN_SIZE || tok1 == ast.SHARED || tok1 == ast.SCHEMAID || tok1 == ast.KIND {
+			tok1, lit1 := p.scanIgnoreWhitespace()
+			lit1 = strings.ToUpper(lit1)
+			if ast.IsStreamOptionKeyword(tok1, lit1) {
 				if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EQ {
 				if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == ast.EQ {
 					if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == ast.STRING {
 					if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == ast.STRING {
-						switch tok1 {
+						switch lit1 {
 						case ast.STRICT_VALIDATION:
 						case ast.STRICT_VALIDATION:
 							if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
 							if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
-								return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, tok1)
+								return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, lit1)
 							} else {
 							} else {
 								opts.STRICT_VALIDATION = val == "TRUE"
 								opts.STRICT_VALIDATION = val == "TRUE"
 							}
 							}
 						case ast.RETAIN_SIZE:
 						case ast.RETAIN_SIZE:
 							if val, err := strconv.Atoi(lit3); err != nil {
 							if val, err := strconv.Atoi(lit3); err != nil {
-								return nil, fmt.Errorf("found %q, expect number value in %s option.", lit3, tok1)
+								return nil, fmt.Errorf("found %q, expect number value in %s option.", lit3, lit1)
 							} else {
 							} else {
 								opts.RETAIN_SIZE = val
 								opts.RETAIN_SIZE = val
 							}
 							}
 						case ast.SHARED:
 						case ast.SHARED:
 							if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
 							if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
-								return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, tok1)
+								return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, lit1)
 							} else {
 							} else {
 								opts.SHARED = val == "TRUE"
 								opts.SHARED = val == "TRUE"
 							}
 							}

+ 2 - 2
internal/xsql/parser_stream_test.go

@@ -241,7 +241,7 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				StreamFields: nil,
 				StreamFields: nil,
 				Options:      nil,
 				Options:      nil,
 			},
 			},
-			err: `found "WITHs", expected is with.`,
+			err: `found "WITHS", expected is with.`,
 		},
 		},
 
 
 		{
 		{
@@ -261,7 +261,7 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				StreamFields: nil,
 				StreamFields: nil,
 				Options:      nil,
 				Options:      nil,
 			},
 			},
-			err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE|SCHEMAID).`,
+			err: `found "SOURCES", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE|SCHEMAID).`,
 		},
 		},
 
 
 		{
 		{

+ 1 - 1
internal/xsql/parser_test.go

@@ -1504,7 +1504,7 @@ func TestParser_ParseStatement(t *testing.T) {
 		{
 		{
 			s:    `select timestamp() as tp from demo`,
 			s:    `select timestamp() as tp from demo`,
 			stmt: nil,
 			stmt: nil,
-			err:  "found \"TIMESTAMP\", expected expression.",
+			err:  "function timestamp not found",
 		},
 		},
 
 
 		{
 		{

+ 1 - 1
internal/xsql/parser_tree_test.go

@@ -150,7 +150,7 @@ func TestParser_ParseTree(t *testing.T) {
 		{
 		{
 			s:    `SHOW STREAMSf`,
 			s:    `SHOW STREAMSf`,
 			stmt: nil,
 			stmt: nil,
-			err:  `found "STREAMSf", expected keyword streams or tables.`,
+			err:  `found "STREAMSF", expected keyword streams or tables.`,
 		},
 		},
 
 
 		{
 		{

+ 74 - 82
pkg/ast/token.go

@@ -14,6 +14,8 @@
 
 
 package ast
 package ast
 
 
+import "strings"
+
 type Token int
 type Token int
 
 
 const (
 const (
@@ -107,39 +109,6 @@ const (
 	TRUE
 	TRUE
 	FALSE
 	FALSE
 
 
-	CREATE
-	DROP
-	EXPLAIN
-	DESCRIBE
-	SHOW
-	STREAM
-	TABLE
-	STREAMS
-	TABLES
-	WITH
-
-	XBIGINT
-	XFLOAT
-	XSTRING
-	XBYTEA
-	XDATETIME
-	XBOOLEAN
-	XARRAY
-	XSTRUCT
-
-	DATASOURCE
-	KEY
-	FORMAT
-	CONF_KEY
-	TYPE
-	STRICT_VALIDATION
-	TIMESTAMP
-	TIMESTAMP_FORMAT
-	RETAIN_SIZE
-	SHARED
-	SCHEMAID
-	KIND
-
 	DD
 	DD
 	HH
 	HH
 	MI
 	MI
@@ -212,37 +181,6 @@ var Tokens = []string{
 	OVER:      "OVER",
 	OVER:      "OVER",
 	PARTITION: "PARTITION",
 	PARTITION: "PARTITION",
 
 
-	CREATE:   "CREATE",
-	DROP:     "RROP",
-	EXPLAIN:  "EXPLAIN",
-	DESCRIBE: "DESCRIBE",
-	SHOW:     "SHOW",
-	STREAM:   "STREAM",
-	TABLE:    "TABLE",
-	STREAMS:  "STREAMS",
-	TABLES:   "TABLES",
-	WITH:     "WITH",
-
-	XBIGINT:   "BIGINT",
-	XFLOAT:    "FLOAT",
-	XSTRING:   "STRING",
-	XBYTEA:    "BYTEA",
-	XDATETIME: "DATETIME",
-	XBOOLEAN:  "BOOLEAN",
-	XARRAY:    "ARRAY",
-	XSTRUCT:   "STRUCT",
-
-	DATASOURCE:        "DATASOURCE",
-	KEY:               "KEY",
-	FORMAT:            "FORMAT",
-	CONF_KEY:          "CONF_KEY",
-	TYPE:              "TYPE",
-	STRICT_VALIDATION: "STRICT_VALIDATION",
-	TIMESTAMP:         "TIMESTAMP",
-	TIMESTAMP_FORMAT:  "TIMESTAMP_FORMAT",
-	RETAIN_SIZE:       "RETAIN_SIZE",
-	SHARED:            "SHARED",
-
 	AND:        "AND",
 	AND:        "AND",
 	OR:         "OR",
 	OR:         "OR",
 	TRUE:       "TRUE",
 	TRUE:       "TRUE",
@@ -260,6 +198,74 @@ var Tokens = []string{
 	MS: "MS",
 	MS: "MS",
 }
 }
 
 
+const (
+	SELECT_LIT = "SELECT"
+	CREATE     = "CREATE"
+	DROP       = "DROP"
+	EXPLAIN    = "EXPLAIN"
+	DESCRIBE   = "DESCRIBE"
+	SHOW       = "SHOW"
+	STREAM     = "STREAM"
+	TABLE      = "TABLE"
+	STREAMS    = "STREAMS"
+	TABLES     = "TABLES"
+	WITH       = "WITH"
+
+	DATASOURCE        = "DATASOURCE"
+	KEY               = "KEY"
+	FORMAT            = "FORMAT"
+	CONF_KEY          = "CONF_KEY"
+	TYPE              = "TYPE"
+	STRICT_VALIDATION = "STRICT_VALIDATION"
+	TIMESTAMP         = "TIMESTAMP"
+	TIMESTAMP_FORMAT  = "TIMESTAMP_FORMAT"
+	RETAIN_SIZE       = "RETAIN_SIZE"
+	SHARED            = "SHARED"
+	SCHEMAID          = "SCHEMAID"
+	KIND              = "KIND"
+
+	XBIGINT   = "BIGINT"
+	XFLOAT    = "FLOAT"
+	XSTRING   = "STRING"
+	XBYTEA    = "BYTEA"
+	XDATETIME = "DATETIME"
+	XBOOLEAN  = "BOOLEAN"
+	XARRAY    = "ARRAY"
+	XSTRUCT   = "STRUCT"
+)
+
+var StreamTokens = map[string]struct{}{
+	DATASOURCE:        {},
+	KEY:               {},
+	FORMAT:            {},
+	CONF_KEY:          {},
+	TYPE:              {},
+	STRICT_VALIDATION: {},
+	TIMESTAMP:         {},
+	TIMESTAMP_FORMAT:  {},
+	RETAIN_SIZE:       {},
+	SHARED:            {},
+	SCHEMAID:          {},
+	KIND:              {},
+}
+
+var StreamDataTypes = map[string]DataType{
+	XBIGINT:   BIGINT,
+	XFLOAT:    FLOAT,
+	XSTRING:   STRINGS,
+	XBYTEA:    BYTEA,
+	XDATETIME: DATETIME,
+	XBOOLEAN:  BOOLEAN,
+	XARRAY:    ARRAY,
+	XSTRUCT:   STRUCT,
+}
+
+func IsStreamOptionKeyword(_ Token, lit string) bool {
+	// token is always IDENT
+	_, ok := StreamTokens[lit]
+	return ok
+}
+
 var COLUMN_SEPARATOR = Tokens[COLSEP]
 var COLUMN_SEPARATOR = Tokens[COLSEP]
 
 
 func (tok Token) String() string {
 func (tok Token) String() string {
@@ -334,24 +340,10 @@ func (d DataType) String() string {
 	return ""
 	return ""
 }
 }
 
 
-func GetDataType(tok Token) DataType {
-	switch tok {
-	case XBIGINT:
-		return BIGINT
-	case XFLOAT:
-		return FLOAT
-	case XSTRING:
-		return STRINGS
-	case XBYTEA:
-		return BYTEA
-	case XDATETIME:
-		return DATETIME
-	case XBOOLEAN:
-		return BOOLEAN
-	case XARRAY:
-		return ARRAY
-	case XSTRUCT:
-		return STRUCT
+func GetDataType(lit string) DataType {
+	lit = strings.ToUpper(lit)
+	if dt, ok := StreamDataTypes[lit]; ok {
+		return dt
 	}
 	}
 	return UNKNOWN
 	return UNKNOWN
 }
 }