Browse Source

fix(join): problem when joining without stream name in fieldRef

Refactor parser to add default stream name for stream field in order to distinguish it from non stream field
ngjaying 4 years atrás
parent
commit
a94715f055

+ 1 - 1
fvt_scripts/change_stream_rule.jmx

@@ -116,7 +116,7 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-&quot;sql&quot; : &quot;create stream demo (temperature float, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot; )&quot;&#xd;
+&quot;sql&quot; : &quot;create stream demo (temperature float, light string) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot; )&quot;&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>

+ 2 - 2
fvt_scripts/change_stream_rule.txt

@@ -1,3 +1,3 @@
-[{}]
-[{}]
+error
+error
 light

+ 10 - 3
xsql/ast.go

@@ -463,7 +463,7 @@ type Visitor interface {
 }
 
 func Walk(v Visitor, node Node) {
-	if node == nil {
+	if node == nil || reflect.ValueOf(node).IsNil() {
 		return
 	}
 
@@ -482,9 +482,16 @@ func Walk(v Visitor, node Node) {
 			Walk(v, expr)
 		}
 
+	case Dimensions:
+		Walk(v, n.GetWindow())
+		for _, dimension := range n.GetGroups() {
+			Walk(v, dimension.Expr)
+		}
+
 	case *Window:
 		Walk(v, n.Length)
 		Walk(v, n.Interval)
+		Walk(v, n.Filter)
 
 	case *Field:
 		Walk(v, n.Expr)
@@ -1214,7 +1221,7 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
 		}
 		return nil
 	case *FieldRef:
-		if expr.StreamName == "" {
+		if expr.StreamName == "" || expr.StreamName == DEFAULT_STREAM {
 			val, _ := v.Valuer.Value(expr.Name)
 			return val
 		} else {
@@ -1223,7 +1230,7 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
 			return val
 		}
 	case *MetaRef:
-		if expr.StreamName == "" {
+		if expr.StreamName == "" || expr.StreamName == DEFAULT_STREAM {
 			val, _ := v.Valuer.Meta(expr.Name)
 			return val
 		} else {

+ 14 - 14
xsql/funcs_ast_validator_test.go

@@ -24,7 +24,7 @@ func TestFuncValidator(t *testing.T) {
 
 		{
 			s: `SELECT abs(field1) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "abs", Expr: &Call{Name: "abs", Args: []Expr{&FieldRef{Name: "field1"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "abs", Expr: &Call{Name: "abs", Args: []Expr{&FieldRef{Name: "field1", StreamName: DEFAULT_STREAM}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -206,7 +206,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT concat(field, "hello") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "concat", Expr: &Call{Name: "concat", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "hello"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "concat", Expr: &Call{Name: "concat", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}, &StringLiteral{Val: "hello"}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -232,7 +232,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT regexp_matches(field, "hello") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "regexp_matches", Expr: &Call{Name: "regexp_matches", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "hello"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "regexp_matches", Expr: &Call{Name: "regexp_matches", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}, &StringLiteral{Val: "hello"}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -252,7 +252,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT regexp_replace(field, "hello", "h") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "regexp_replace", Expr: &Call{Name: "regexp_replace", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "hello"}, &StringLiteral{Val: "h"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "regexp_replace", Expr: &Call{Name: "regexp_replace", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}, &StringLiteral{Val: "hello"}, &StringLiteral{Val: "h"}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -266,7 +266,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT trim(field) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "trim", Expr: &Call{Name: "trim", Args: []Expr{&FieldRef{Name: "field"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "trim", Expr: &Call{Name: "trim", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -280,7 +280,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT rpad(field, 3) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "rpad", Expr: &Call{Name: "rpad", Args: []Expr{&FieldRef{Name: "field"}, &IntegerLiteral{Val: 3}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "rpad", Expr: &Call{Name: "rpad", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}, &IntegerLiteral{Val: 3}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -294,7 +294,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT substring(field, 3, 4) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "substring", Expr: &Call{Name: "substring", Args: []Expr{&FieldRef{Name: "field"}, &IntegerLiteral{Val: 3}, &IntegerLiteral{Val: 4}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "substring", Expr: &Call{Name: "substring", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}, &IntegerLiteral{Val: 3}, &IntegerLiteral{Val: 4}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -320,7 +320,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT cast(field, "bigint") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "cast", Expr: &Call{Name: "cast", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "bigint"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "cast", Expr: &Call{Name: "cast", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}, &StringLiteral{Val: "bigint"}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -334,7 +334,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT chr(field) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "chr", Expr: &Call{Name: "chr", Args: []Expr{&FieldRef{Name: "field"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "chr", Expr: &Call{Name: "chr", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -348,7 +348,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT encode(field, "base64") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "encode", Expr: &Call{Name: "encode", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "base64"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "encode", Expr: &Call{Name: "encode", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}, &StringLiteral{Val: "base64"}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -362,7 +362,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT trunc(field, 3) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "trunc", Expr: &Call{Name: "trunc", Args: []Expr{&FieldRef{Name: "field"}, &IntegerLiteral{Val: 3}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "trunc", Expr: &Call{Name: "trunc", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}, &IntegerLiteral{Val: 3}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -376,7 +376,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT sha512(field) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "sha512", Expr: &Call{Name: "sha512", Args: []Expr{&FieldRef{Name: "field"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "sha512", Expr: &Call{Name: "sha512", Args: []Expr{&FieldRef{Name: "field", StreamName: DEFAULT_STREAM}}}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -427,7 +427,7 @@ func TestFuncValidator(t *testing.T) {
 		},
 		{
 			s:    `SELECT meta(device) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "meta", Expr: &Call{Name: "meta", Args: []Expr{&MetaRef{Name: "device"}}}}}, Sources: []Source{&Table{Name: "tbl"}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "meta", Expr: &Call{Name: "meta", Args: []Expr{&MetaRef{Name: "device", StreamName: DEFAULT_STREAM}}}}}, Sources: []Source{&Table{Name: "tbl"}}},
 		},
 		{
 			s:    `SELECT meta(tbl.device) FROM tbl`,
@@ -439,7 +439,7 @@ func TestFuncValidator(t *testing.T) {
 				OP: ARROW,
 				LHS: &BinaryExpr{
 					OP:  ARROW,
-					LHS: &MetaRef{Name: "device"},
+					LHS: &MetaRef{Name: "device", StreamName: DEFAULT_STREAM},
 					RHS: &MetaRef{Name: "reading"},
 				},
 				RHS: &MetaRef{Name: "topic"},

+ 13 - 5
xsql/parser.go

@@ -11,6 +11,8 @@ import (
 	"strings"
 )
 
+const DEFAULT_STREAM = "$default"
+
 type Parser struct {
 	s *Scanner
 
@@ -446,7 +448,7 @@ func (p *Parser) ParseExpr() (Expr, error) {
 	var err error
 	root := &BinaryExpr{}
 
-	root.RHS, err = p.parseUnaryExpr()
+	root.RHS, err = p.parseUnaryExpr(false)
 	if err != nil {
 		return nil, err
 	}
@@ -464,7 +466,7 @@ func (p *Parser) ParseExpr() (Expr, error) {
 		}
 
 		var rhs Expr
-		if rhs, err = p.parseUnaryExpr(); err != nil {
+		if rhs, err = p.parseUnaryExpr(op == ARROW); err != nil {
 			return nil, err
 		}
 
@@ -481,7 +483,7 @@ func (p *Parser) ParseExpr() (Expr, error) {
 	return nil, nil
 }
 
-func (p *Parser) parseUnaryExpr() (Expr, error) {
+func (p *Parser) parseUnaryExpr(isSubField bool) (Expr, error) {
 	if tok1, _ := p.scanIgnoreWhitespace(); tok1 == LPAREN {
 		expr, err := p.ParseExpr()
 		if err != nil {
@@ -515,12 +517,18 @@ func (p *Parser) parseUnaryExpr() (Expr, error) {
 				if len(n) == 2 {
 					return &MetaRef{StreamName: StreamName(n[0]), Name: n[1]}, nil
 				}
-				return &MetaRef{StreamName: "", Name: n[0]}, nil
+				if isSubField {
+					return &MetaRef{StreamName: "", Name: n[0]}, nil
+				}
+				return &MetaRef{StreamName: DEFAULT_STREAM, Name: n[0]}, nil
 			} else {
 				if len(n) == 2 {
 					return &FieldRef{StreamName: StreamName(n[0]), Name: n[1]}, nil
 				}
-				return &FieldRef{StreamName: "", Name: n[0]}, nil
+				if isSubField {
+					return &FieldRef{StreamName: "", Name: n[0]}, nil
+				}
+				return &FieldRef{StreamName: DEFAULT_STREAM, Name: n[0]}, nil
 			}
 		}
 	} else if tok == STRING {

+ 132 - 132
xsql/parser_test.go

@@ -21,7 +21,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -33,7 +33,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "select"},
+						Expr:  &FieldRef{Name: "select", StreamName: DEFAULT_STREAM},
 						Name:  "select",
 						AName: ""},
 				},
@@ -45,7 +45,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -83,7 +83,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -96,7 +96,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -109,7 +109,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -122,7 +122,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -135,7 +135,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -148,7 +148,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -161,7 +161,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -185,8 +185,8 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT a,b FROM tbl`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "a"}, Name: "a", AName: ""},
-					{Expr: &FieldRef{Name: "b"}, Name: "b", AName: ""},
+					{Expr: &FieldRef{Name: "a", StreamName: DEFAULT_STREAM}, Name: "a", AName: ""},
+					{Expr: &FieldRef{Name: "b", StreamName: DEFAULT_STREAM}, Name: "b", AName: ""},
 				},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
@@ -195,9 +195,9 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT a, b,c FROM tbl`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "a"}, Name: "a", AName: ""},
-					{Expr: &FieldRef{Name: "b"}, Name: "b", AName: ""},
-					{Expr: &FieldRef{Name: "c"}, Name: "c", AName: ""},
+					{Expr: &FieldRef{Name: "a", StreamName: DEFAULT_STREAM}, Name: "a", AName: ""},
+					{Expr: &FieldRef{Name: "b", StreamName: DEFAULT_STREAM}, Name: "b", AName: ""},
+					{Expr: &FieldRef{Name: "c", StreamName: DEFAULT_STREAM}, Name: "c", AName: ""},
 				},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
@@ -206,7 +206,7 @@ func TestParser_ParseStatement(t *testing.T) {
 		{
 			s: `SELECT a AS alias FROM tbl`,
 			stmt: &SelectStatement{
-				Fields:  []Field{{Expr: &FieldRef{Name: "a"}, Name: "a", AName: "alias"}},
+				Fields:  []Field{{Expr: &FieldRef{Name: "a", StreamName: DEFAULT_STREAM}, Name: "a", AName: "alias"}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -215,8 +215,8 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT a AS alias1, b as Alias2 FROM tbl`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "a"}, Name: "a", AName: "alias1"},
-					{Expr: &FieldRef{Name: "b"}, Name: "b", AName: "Alias2"},
+					{Expr: &FieldRef{Name: "a", StreamName: DEFAULT_STREAM}, Name: "a", AName: "alias1"},
+					{Expr: &FieldRef{Name: "b", StreamName: DEFAULT_STREAM}, Name: "b", AName: "Alias2"},
 				},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
@@ -248,7 +248,7 @@ func TestParser_ParseStatement(t *testing.T) {
 						Name:  "length",
 						Expr: &Call{
 							Name: "length",
-							Args: []Expr{&FieldRef{Name: "test"}},
+							Args: []Expr{&FieldRef{Name: "test", StreamName: DEFAULT_STREAM}},
 						},
 					},
 				},
@@ -318,7 +318,7 @@ func TestParser_ParseStatement(t *testing.T) {
 							Name: "indexof",
 							Args: []Expr{
 								&StringLiteral{Val: "abc"},
-								&FieldRef{Name: "field1"},
+								&FieldRef{Name: "field1", StreamName: DEFAULT_STREAM},
 							},
 						},
 					},
@@ -340,7 +340,7 @@ func TestParser_ParseStatement(t *testing.T) {
 								&Call{
 									Name: "lower",
 									Args: []Expr{
-										&FieldRef{Name: "test"},
+										&FieldRef{Name: "test", StreamName: DEFAULT_STREAM},
 									},
 								},
 								&IntegerLiteral{Val: 1},
@@ -365,7 +365,7 @@ func TestParser_ParseStatement(t *testing.T) {
 								&Call{
 									Name: "lower",
 									Args: []Expr{
-										&FieldRef{Name: "test"},
+										&FieldRef{Name: "test", StreamName: DEFAULT_STREAM},
 									},
 								},
 								&IntegerLiteral{Val: 1},
@@ -433,7 +433,7 @@ func TestParser_ParseStatement(t *testing.T) {
 						Name:  "deduplicate",
 						Expr: &Call{
 							Name: "deduplicate",
-							Args: []Expr{&Wildcard{Token: ASTERISK}, &FieldRef{Name: "temperature"}, &BooleanLiteral{Val: false}},
+							Args: []Expr{&Wildcard{Token: ASTERISK}, &FieldRef{Name: "temperature", StreamName: DEFAULT_STREAM}, &BooleanLiteral{Val: false}},
 						},
 					},
 				},
@@ -461,9 +461,9 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT field0,   "abc" AS field1, field2 FROM tbl`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{AName: "", Name: "field0", Expr: &FieldRef{Name: "field0"}},
+					{AName: "", Name: "field0", Expr: &FieldRef{Name: "field0", StreamName: DEFAULT_STREAM}},
 					{AName: "field1", Name: "", Expr: &StringLiteral{Val: "abc"}},
-					{AName: "", Name: "field2", Expr: &FieldRef{Name: "field2"}}},
+					{AName: "", Name: "field2", Expr: &FieldRef{Name: "field2", StreamName: DEFAULT_STREAM}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 			},
 		},
@@ -489,10 +489,10 @@ func TestParser_ParseStatement(t *testing.T) {
 		{
 			s: `SELECT abc FROM tbl WHERE abc > 12 `,
 			stmt: &SelectStatement{
-				Fields:  []Field{{AName: "", Name: "abc", Expr: &FieldRef{Name: "abc"}}},
+				Fields:  []Field{{AName: "", Name: "abc", Expr: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 				Condition: &BinaryExpr{
-					LHS: &FieldRef{Name: "abc"},
+					LHS: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM},
 					OP:  GT,
 					RHS: &IntegerLiteral{Val: 12},
 				},
@@ -502,10 +502,10 @@ func TestParser_ParseStatement(t *testing.T) {
 		{
 			s: `SELECT abc FROM tbl WHERE abc = "hello" `,
 			stmt: &SelectStatement{
-				Fields:  []Field{{AName: "", Name: "abc", Expr: &FieldRef{Name: "abc"}}},
+				Fields:  []Field{{AName: "", Name: "abc", Expr: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 				Condition: &BinaryExpr{
-					LHS: &FieldRef{Name: "abc"},
+					LHS: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM},
 					OP:  EQ,
 					RHS: &StringLiteral{Val: "hello"},
 				},
@@ -528,10 +528,10 @@ func TestParser_ParseStatement(t *testing.T) {
 		{
 			s: `SELECT abc, "fff" AS fa FROM tbl WHERE fa >= 5 `,
 			stmt: &SelectStatement{
-				Fields:  []Field{{AName: "", Name: "abc", Expr: &FieldRef{Name: "abc"}}, {AName: "fa", Name: "", Expr: &StringLiteral{Val: "fff"}}},
+				Fields:  []Field{{AName: "", Name: "abc", Expr: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM}}, {AName: "fa", Name: "", Expr: &StringLiteral{Val: "fff"}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 				Condition: &BinaryExpr{
-					LHS: &FieldRef{Name: "fa"},
+					LHS: &FieldRef{Name: "fa", StreamName: DEFAULT_STREAM},
 					OP:  GTE,
 					RHS: &IntegerLiteral{Val: 5},
 				},
@@ -541,10 +541,10 @@ func TestParser_ParseStatement(t *testing.T) {
 		{
 			s: `SELECT field2 FROM tbl WHERE field2 != 5 `,
 			stmt: &SelectStatement{
-				Fields:  []Field{{AName: "", Name: "field2", Expr: &FieldRef{Name: "field2"}}},
+				Fields:  []Field{{AName: "", Name: "field2", Expr: &FieldRef{Name: "field2", StreamName: DEFAULT_STREAM}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 				Condition: &BinaryExpr{
-					LHS: &FieldRef{Name: "field2"},
+					LHS: &FieldRef{Name: "field2", StreamName: DEFAULT_STREAM},
 					OP:  NEQ,
 					RHS: &IntegerLiteral{Val: 5},
 				},
@@ -554,10 +554,10 @@ func TestParser_ParseStatement(t *testing.T) {
 		{
 			s: `SELECT field2 FROM tbl WHERE field2 !   = 5 `, //Add space char in expression
 			stmt: &SelectStatement{
-				Fields:  []Field{{AName: "", Name: "field2", Expr: &FieldRef{Name: "field2"}}},
+				Fields:  []Field{{AName: "", Name: "field2", Expr: &FieldRef{Name: "field2", StreamName: DEFAULT_STREAM}}},
 				Sources: []Source{&Table{Name: "tbl"}},
 				Condition: &BinaryExpr{
-					LHS: &FieldRef{Name: "field2"},
+					LHS: &FieldRef{Name: "field2", StreamName: DEFAULT_STREAM},
 					OP:  NEQ,
 					RHS: &IntegerLiteral{Val: 5},
 				},
@@ -585,7 +585,7 @@ func TestParser_ParseStatement(t *testing.T) {
 						AName: "",
 						Name:  "",
 						Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "abc"},
+							LHS: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM},
 							OP:  ADD,
 							RHS: &IntegerLiteral{Val: 2},
 						},
@@ -621,7 +621,7 @@ func TestParser_ParseStatement(t *testing.T) {
 						AName: "",
 						Name:  "",
 						Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "abc"},
+							LHS: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM},
 							OP:  ADD,
 							RHS: &StringLiteral{Val: "hello"},
 						},
@@ -640,7 +640,7 @@ func TestParser_ParseStatement(t *testing.T) {
 						Name:  "",
 						Expr: &BinaryExpr{
 							LHS: &BinaryExpr{
-								LHS: &FieldRef{Name: "abc"},
+								LHS: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM},
 								OP:  MUL,
 								RHS: &IntegerLiteral{Val: 2},
 							},
@@ -665,7 +665,7 @@ func TestParser_ParseStatement(t *testing.T) {
 							Args: []Expr{
 								&BinaryExpr{
 									LHS: &BinaryExpr{
-										LHS: &FieldRef{Name: "abc"},
+										LHS: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM},
 										OP:  MUL,
 										RHS: &IntegerLiteral{Val: 2},
 									},
@@ -720,7 +720,7 @@ func TestParser_ParseStatement(t *testing.T) {
 								&StringLiteral{Val: "param2"},
 								&BinaryExpr{
 									LHS: &BinaryExpr{
-										LHS: &FieldRef{Name: "abc"},
+										LHS: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM},
 										OP:  MUL,
 										RHS: &IntegerLiteral{Val: 2},
 									},
@@ -812,7 +812,7 @@ func TestParser_ParseStatement(t *testing.T) {
 				},
 				Sources: []Source{&Table{Name: "tbl"}},
 				Condition: &BinaryExpr{
-					LHS: &FieldRef{Name: "f1"},
+					LHS: &FieldRef{Name: "f1", StreamName: DEFAULT_STREAM},
 					OP:  GT,
 					RHS: &NumberLiteral{Val: 2.2},
 				},
@@ -823,14 +823,14 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT deviceId, name FROM topic/sensor1 WHERE deviceId=1 AND name = "dname"`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "deviceId"}, Name: "deviceId", AName: ""},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "deviceId", StreamName: DEFAULT_STREAM}, Name: "deviceId", AName: ""},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources: []Source{&Table{Name: "topic/sensor1"}},
 				Condition: &BinaryExpr{
-					LHS: &BinaryExpr{LHS: &FieldRef{Name: "deviceId"}, OP: EQ, RHS: &IntegerLiteral{Val: 1}},
+					LHS: &BinaryExpr{LHS: &FieldRef{Name: "deviceId", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &IntegerLiteral{Val: 1}},
 					OP:  AND,
-					RHS: &BinaryExpr{LHS: &FieldRef{Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+					RHS: &BinaryExpr{LHS: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
 				},
 			},
 		},
@@ -839,8 +839,8 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT deviceId, name FROM topic/sensor1 AS t1 WHERE t1.deviceId=1 AND t1.name = "dname"`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "deviceId"}, Name: "deviceId", AName: ""},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "deviceId", StreamName: DEFAULT_STREAM}, Name: "deviceId", AName: ""},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources: []Source{&Table{Name: "topic/sensor1", Alias: "t1"}},
 				Condition: &BinaryExpr{
@@ -855,14 +855,14 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE t> = 20.5 OR name = "dname"`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "temp", StreamName: DEFAULT_STREAM}, Name: "temp", AName: "t"},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources: []Source{&Table{Name: "topic/sensor1"}},
 				Condition: &BinaryExpr{
-					LHS: &BinaryExpr{LHS: &FieldRef{Name: "t"}, OP: GTE, RHS: &NumberLiteral{Val: 20.5}},
+					LHS: &BinaryExpr{LHS: &FieldRef{Name: "t", StreamName: DEFAULT_STREAM}, OP: GTE, RHS: &NumberLiteral{Val: 20.5}},
 					OP:  OR,
-					RHS: &BinaryExpr{LHS: &FieldRef{Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+					RHS: &BinaryExpr{LHS: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
 				},
 			},
 		},
@@ -871,12 +871,12 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE name = "dname" GROUP BY name`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "temp", StreamName: DEFAULT_STREAM}, Name: "temp", AName: "t"},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources:    []Source{&Table{Name: "topic/sensor1"}},
-				Condition:  &BinaryExpr{LHS: &FieldRef{Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
-				Dimensions: Dimensions{Dimension{Expr: &FieldRef{Name: "name"}}},
+				Condition:  &BinaryExpr{LHS: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+				Dimensions: Dimensions{Dimension{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}}},
 			},
 		},
 
@@ -884,13 +884,13 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE name = "dname" GROUP BY name HAVING count(name) > 3`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "temp", StreamName: DEFAULT_STREAM}, Name: "temp", AName: "t"},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources:    []Source{&Table{Name: "topic/sensor1"}},
-				Condition:  &BinaryExpr{LHS: &FieldRef{Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
-				Dimensions: Dimensions{Dimension{Expr: &FieldRef{Name: "name"}}},
-				Having:     &BinaryExpr{LHS: &Call{Name: "count", Args: []Expr{&FieldRef{StreamName: "", Name: "name"}}}, OP: GT, RHS: &IntegerLiteral{Val: 3}},
+				Condition:  &BinaryExpr{LHS: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+				Dimensions: Dimensions{Dimension{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}}},
+				Having:     &BinaryExpr{LHS: &Call{Name: "count", Args: []Expr{&FieldRef{StreamName: DEFAULT_STREAM, Name: "name"}}}, OP: GT, RHS: &IntegerLiteral{Val: 3}},
 			},
 		},
 
@@ -898,12 +898,12 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE name = "dname" HAVING count(name) > 3`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "temp", StreamName: DEFAULT_STREAM}, Name: "temp", AName: "t"},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources:   []Source{&Table{Name: "topic/sensor1"}},
-				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
-				Having:    &BinaryExpr{LHS: &Call{Name: "count", Args: []Expr{&FieldRef{StreamName: "", Name: "name"}}}, OP: GT, RHS: &IntegerLiteral{Val: 3}},
+				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+				Having:    &BinaryExpr{LHS: &Call{Name: "count", Args: []Expr{&FieldRef{StreamName: DEFAULT_STREAM, Name: "name"}}}, OP: GT, RHS: &IntegerLiteral{Val: 3}},
 			},
 		},
 
@@ -930,10 +930,10 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{Expr: &FieldRef{StreamName: "s1", Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources:    []Source{&Table{Name: "topic/sensor1", Alias: "s1"}},
-				Condition:  &BinaryExpr{LHS: &FieldRef{Name: "t"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+				Condition:  &BinaryExpr{LHS: &FieldRef{Name: "t", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
 				Dimensions: Dimensions{Dimension{Expr: &FieldRef{StreamName: "s1", Name: "temp"}}},
 			},
 		},
@@ -942,13 +942,13 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE name = "dname" GROUP BY lpad(name,1)`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "temp", StreamName: DEFAULT_STREAM}, Name: "temp", AName: "t"},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources:   []Source{&Table{Name: "topic/sensor1"}},
-				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
 				Dimensions: Dimensions{Dimension{
-					Expr: &Call{Name: "lpad", Args: []Expr{&FieldRef{Name: "name"}, &IntegerLiteral{Val: 1}}},
+					Expr: &Call{Name: "lpad", Args: []Expr{&FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, &IntegerLiteral{Val: 1}}},
 				},
 				},
 			},
@@ -958,11 +958,11 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT temp AS t, name FROM topic/sensor1 AS s1 WHERE name = "dname" GROUP BY lpad(s1.name,1)`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "temp", StreamName: DEFAULT_STREAM}, Name: "temp", AName: "t"},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources:   []Source{&Table{Name: "topic/sensor1", Alias: "s1"}},
-				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
 				Dimensions: Dimensions{Dimension{
 					Expr: &Call{Name: "lpad", Args: []Expr{&FieldRef{StreamName: StreamName("s1"), Name: "name"}, &IntegerLiteral{Val: 1}}},
 				},
@@ -974,15 +974,15 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE name = "dname" GROUP BY lpad(name,1) ORDER BY name`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "temp", StreamName: DEFAULT_STREAM}, Name: "temp", AName: "t"},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources:   []Source{&Table{Name: "topic/sensor1"}},
-				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
 				Dimensions: Dimensions{
 					Dimension{
 						Expr: &Call{Name: "lpad", Args: []Expr{
-							&FieldRef{Name: "name"},
+							&FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 							&IntegerLiteral{Val: 1}},
 						},
 					},
@@ -995,8 +995,8 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT temp AS t, name FROM topic/sensor1 AS s1 WHERE s1.name = "dname" GROUP BY lpad(s1.name,1) ORDER BY s1.name`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "temp", StreamName: DEFAULT_STREAM}, Name: "temp", AName: "t"},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources:   []Source{&Table{Name: "topic/sensor1", Alias: "s1"}},
 				Condition: &BinaryExpr{LHS: &FieldRef{StreamName: StreamName("s1"), Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
@@ -1016,15 +1016,15 @@ func TestParser_ParseStatement(t *testing.T) {
 			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE name = "dname" GROUP BY lpad(name,1) ORDER BY name DESC`,
 			stmt: &SelectStatement{
 				Fields: []Field{
-					{Expr: &FieldRef{Name: "temp"}, Name: "temp", AName: "t"},
-					{Expr: &FieldRef{Name: "name"}, Name: "name", AName: ""},
+					{Expr: &FieldRef{Name: "temp", StreamName: DEFAULT_STREAM}, Name: "temp", AName: "t"},
+					{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, Name: "name", AName: ""},
 				},
 				Sources:   []Source{&Table{Name: "topic/sensor1"}},
-				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name"}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
+				Condition: &BinaryExpr{LHS: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &StringLiteral{Val: "dname"}},
 				Dimensions: Dimensions{
 					Dimension{
 						Expr: &Call{Name: "lpad", Args: []Expr{
-							&FieldRef{Name: "name"},
+							&FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 							&IntegerLiteral{Val: 1}},
 						},
 					},
@@ -1072,11 +1072,11 @@ func TestParser_ParseStatement(t *testing.T) {
 				},
 				Sources: []Source{&Table{Name: "topic/sensor1"}},
 				Dimensions: Dimensions{
-					Dimension{Expr: &FieldRef{Name: "name"}},
-					Dimension{Expr: &FieldRef{Name: "name2"}},
+					Dimension{Expr: &FieldRef{Name: "name", StreamName: DEFAULT_STREAM}},
+					Dimension{Expr: &FieldRef{Name: "name2", StreamName: DEFAULT_STREAM}},
 					Dimension{
 						Expr: &Call{Name: "power", Args: []Expr{
-							&FieldRef{Name: "name3"},
+							&FieldRef{Name: "name3", StreamName: DEFAULT_STREAM},
 							&NumberLiteral{Val: 1.8}},
 						},
 					},
@@ -1103,7 +1103,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -1116,7 +1116,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -1129,7 +1129,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "name"},
+						Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 						Name:  "name",
 						AName: ""},
 				},
@@ -1155,7 +1155,7 @@ func TestParser_ParseStatement(t *testing.T) {
 					{AName: "f1", Name: "", Expr: &BooleanLiteral{Val: true}},
 				},
 				Sources:   []Source{&Table{Name: "tbl"}},
-				Condition: &BinaryExpr{LHS: &FieldRef{Name: "f2"}, OP: EQ, RHS: &BooleanLiteral{Val: true}},
+				Condition: &BinaryExpr{LHS: &FieldRef{Name: "f2", StreamName: DEFAULT_STREAM}, OP: EQ, RHS: &BooleanLiteral{Val: true}},
 			},
 		},
 
@@ -1168,7 +1168,7 @@ func TestParser_ParseStatement(t *testing.T) {
 						Name:  "indexof",
 						Expr: &Call{
 							Name: "indexof",
-							Args: []Expr{&FieldRef{Name: "field1"}, &StringLiteral{Val: "abc"}},
+							Args: []Expr{&FieldRef{Name: "field1", StreamName: DEFAULT_STREAM}, &StringLiteral{Val: "abc"}},
 						},
 					},
 				},
@@ -1268,7 +1268,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "space var"},
+						Expr:  &FieldRef{Name: "space var", StreamName: DEFAULT_STREAM},
 						Name:  "space var",
 						AName: ""},
 				},
@@ -1280,7 +1280,7 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "中文 Chinese"},
+						Expr:  &FieldRef{Name: "中文 Chinese", StreamName: DEFAULT_STREAM},
 						Name:  "中文 Chinese",
 						AName: ""},
 				},
@@ -1292,7 +1292,7 @@ func TestParser_ParseStatement(t *testing.T) {
 				Fields: []Field{
 					{
 						Expr: &CaseExpr{
-							Value: &FieldRef{Name: "temperature"},
+							Value: &FieldRef{Name: "temperature", StreamName: DEFAULT_STREAM},
 							WhenClauses: []*WhenClause{
 								{
 									Expr:   &IntegerLiteral{Val: 25},
@@ -1307,7 +1307,7 @@ func TestParser_ParseStatement(t *testing.T) {
 						Name:  "",
 						AName: "label",
 					}, {
-						Expr:  &FieldRef{Name: "humidity"},
+						Expr:  &FieldRef{Name: "humidity", StreamName: DEFAULT_STREAM},
 						Name:  "humidity",
 						AName: "",
 					},
@@ -1320,7 +1320,7 @@ func TestParser_ParseStatement(t *testing.T) {
 				Fields: []Field{
 					{
 						Expr: &CaseExpr{
-							Value: &FieldRef{Name: "temperature"},
+							Value: &FieldRef{Name: "temperature", StreamName: DEFAULT_STREAM},
 							WhenClauses: []*WhenClause{
 								{
 									Expr:   &IntegerLiteral{Val: 25},
@@ -1335,7 +1335,7 @@ func TestParser_ParseStatement(t *testing.T) {
 						Name:  "",
 						AName: "label",
 					}, {
-						Expr:  &FieldRef{Name: "humidity"},
+						Expr:  &FieldRef{Name: "humidity", StreamName: DEFAULT_STREAM},
 						Name:  "humidity",
 						AName: "",
 					},
@@ -1357,7 +1357,7 @@ func TestParser_ParseStatement(t *testing.T) {
 								{
 									Expr: &BinaryExpr{
 										OP:  GT,
-										LHS: &FieldRef{Name: "temperature"},
+										LHS: &FieldRef{Name: "temperature", StreamName: DEFAULT_STREAM},
 										RHS: &IntegerLiteral{Val: 30},
 									},
 									Result: &StringLiteral{Val: "high"},
@@ -1368,7 +1368,7 @@ func TestParser_ParseStatement(t *testing.T) {
 						Name:  "",
 						AName: "label",
 					}, {
-						Expr:  &FieldRef{Name: "humidity"},
+						Expr:  &FieldRef{Name: "humidity", StreamName: DEFAULT_STREAM},
 						Name:  "humidity",
 						AName: "",
 					},
@@ -1420,7 +1420,7 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "f1"},
+						Expr:  &FieldRef{Name: "f1", StreamName: DEFAULT_STREAM},
 						Name:  "f1",
 						AName: ""},
 				},
@@ -1442,7 +1442,7 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "f1"},
+						Expr:  &FieldRef{Name: "f1", StreamName: DEFAULT_STREAM},
 						Name:  "f1",
 						AName: ""},
 				},
@@ -1464,7 +1464,7 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "f1"},
+						Expr:  &FieldRef{Name: "f1", StreamName: DEFAULT_STREAM},
 						Name:  "f1",
 						AName: ""},
 				},
@@ -1486,7 +1486,7 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "f1"},
+						Expr:  &FieldRef{Name: "f1", StreamName: DEFAULT_STREAM},
 						Name:  "f1",
 						AName: ""},
 				},
@@ -1520,7 +1520,7 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "f1"},
+						Expr:  &FieldRef{Name: "f1", StreamName: DEFAULT_STREAM},
 						Name:  "f1",
 						AName: ""},
 				},
@@ -1541,7 +1541,7 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
-						Expr:  &FieldRef{Name: "f1"},
+						Expr:  &FieldRef{Name: "f1", StreamName: DEFAULT_STREAM},
 						Name:  "f1",
 						AName: ""},
 				},
@@ -1580,7 +1580,7 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 							Length:     &IntegerLiteral{Val: 3},
 							Interval:   &IntegerLiteral{Val: 1},
 							Filter: &BinaryExpr{
-								LHS: &FieldRef{Name: "revenue"},
+								LHS: &FieldRef{Name: "revenue", StreamName: DEFAULT_STREAM},
 								OP:  GT,
 								RHS: &IntegerLiteral{Val: 100},
 							},
@@ -1600,20 +1600,20 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 				},
 				Sources: []Source{&Table{Name: "demo"}},
 				Dimensions: Dimensions{
-					Dimension{Expr: &FieldRef{Name: "department"}},
+					Dimension{Expr: &FieldRef{Name: "department", StreamName: DEFAULT_STREAM}},
 					Dimension{
 						Expr: &Window{
 							WindowType: COUNT_WINDOW,
 							Length:     &IntegerLiteral{Val: 3},
 							Interval:   &IntegerLiteral{Val: 1},
 							Filter: &BinaryExpr{
-								LHS: &FieldRef{Name: "revenue"},
+								LHS: &FieldRef{Name: "revenue", StreamName: DEFAULT_STREAM},
 								OP:  GT,
 								RHS: &IntegerLiteral{Val: 100},
 							},
 						},
 					},
-					Dimension{Expr: &FieldRef{Name: "year"}},
+					Dimension{Expr: &FieldRef{Name: "year", StreamName: DEFAULT_STREAM}},
 				},
 			},
 		},
@@ -1659,7 +1659,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 				Fields: []Field{
 					{
 						Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "children"},
+							LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM},
 							OP:  SUBSET,
 							RHS: &IndexExpr{Index: 0},
 						},
@@ -1677,7 +1677,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 					{
 						Expr: &BinaryExpr{
 							LHS: &BinaryExpr{
-								LHS: &FieldRef{Name: "children"},
+								LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM},
 								OP:  SUBSET,
 								RHS: &IndexExpr{Index: 0},
 							},
@@ -1699,7 +1699,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 					{
 						Expr: &BinaryExpr{
 							LHS: &BinaryExpr{
-								LHS: &FieldRef{Name: "children"},
+								LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM},
 								OP:  ARROW,
 								RHS: &FieldRef{Name: "first"},
 							},
@@ -1722,7 +1722,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 						Expr: &BinaryExpr{
 							LHS: &BinaryExpr{
 								LHS: &BinaryExpr{
-									LHS: &FieldRef{Name: "children"},
+									LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM},
 									OP:  ARROW,
 									RHS: &FieldRef{Name: "first"},
 								},
@@ -1746,7 +1746,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 				Fields: []Field{
 					{
 						Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "children"},
+							LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM},
 							OP:  SUBSET,
 							RHS: &ColonExpr{Start: 0, End: 1},
 						},
@@ -1763,7 +1763,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 				Fields: []Field{
 					{
 						Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "children"},
+							LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM},
 							OP:  SUBSET,
 							RHS: &ColonExpr{Start: 0, End: 1},
 						},
@@ -1780,7 +1780,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 				Fields: []Field{
 					{
 						Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "children"},
+							LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM},
 							OP:  SUBSET,
 							RHS: &ColonExpr{Start: 0, End: math.MinInt32},
 						},
@@ -1797,7 +1797,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 				Fields: []Field{
 					{
 						Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "children"},
+							LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM},
 							OP:  SUBSET,
 							RHS: &ColonExpr{Start: 2, End: math.MinInt32},
 						},
@@ -1814,7 +1814,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 				Fields: []Field{
 					{
 						Expr: &BinaryExpr{
-							LHS: &BinaryExpr{LHS: &FieldRef{Name: "children"}, OP: SUBSET, RHS: &ColonExpr{Start: 2, End: math.MinInt32}},
+							LHS: &BinaryExpr{LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM}, OP: SUBSET, RHS: &ColonExpr{Start: 2, End: math.MinInt32}},
 							OP:  ARROW,
 							RHS: &FieldRef{Name: "first"},
 						},
@@ -1883,7 +1883,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 				Fields: []Field{
 					{
 						Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "children"},
+							LHS: &FieldRef{Name: "children", StreamName: DEFAULT_STREAM},
 							OP:  SUBSET,
 							RHS: &ColonExpr{Start: 0, End: 1},
 						},
@@ -1893,7 +1893,7 @@ func TestParser_ParseJsonExpr(t *testing.T) {
 				Sources: []Source{&Table{Name: "demo"}},
 				Condition: &BinaryExpr{
 					LHS: &BinaryExpr{
-						LHS: &FieldRef{Name: "abc"},
+						LHS: &FieldRef{Name: "abc", StreamName: DEFAULT_STREAM},
 						OP:  SUBSET,
 						RHS: &IndexExpr{Index: 0},
 					},
@@ -1940,9 +1940,9 @@ func TestParser_ParseJoins(t *testing.T) {
 				Joins: []Join{
 					{
 						Name: "topic1", Alias: "", JoinType: LEFT_JOIN, Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "f"},
+							LHS: &FieldRef{Name: "f", StreamName: DEFAULT_STREAM},
 							OP:  EQ,
-							RHS: &FieldRef{Name: "k"},
+							RHS: &FieldRef{Name: "k", StreamName: DEFAULT_STREAM},
 						},
 					},
 				},
@@ -1962,9 +1962,9 @@ func TestParser_ParseJoins(t *testing.T) {
 				Joins: []Join{
 					{
 						Name: "topic1", Alias: "t2", JoinType: INNER_JOIN, Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "f"},
+							LHS: &FieldRef{Name: "f", StreamName: DEFAULT_STREAM},
 							OP:  EQ,
-							RHS: &FieldRef{Name: "k"},
+							RHS: &FieldRef{Name: "k", StreamName: DEFAULT_STREAM},
 						},
 					},
 				},
@@ -1984,9 +1984,9 @@ func TestParser_ParseJoins(t *testing.T) {
 				Joins: []Join{
 					{
 						Name: "topic1/sensor2", Alias: "t2", JoinType: LEFT_JOIN, Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "f"},
+							LHS: &FieldRef{Name: "f", StreamName: DEFAULT_STREAM},
 							OP:  EQ,
-							RHS: &FieldRef{Name: "k"},
+							RHS: &FieldRef{Name: "k", StreamName: DEFAULT_STREAM},
 						},
 					},
 				},
@@ -2006,9 +2006,9 @@ func TestParser_ParseJoins(t *testing.T) {
 				Joins: []Join{
 					{
 						Name: "topic1/sensor2", Alias: "t2", JoinType: LEFT_JOIN, Expr: &BinaryExpr{
-							LHS: &FieldRef{Name: "f"},
+							LHS: &FieldRef{Name: "f", StreamName: DEFAULT_STREAM},
 							OP:  EQ,
-							RHS: &FieldRef{Name: "k"},
+							RHS: &FieldRef{Name: "k", StreamName: DEFAULT_STREAM},
 						},
 					},
 				},
@@ -2154,7 +2154,7 @@ func TestParser_ParseStatements(t *testing.T) {
 				{
 					Fields: []Field{
 						{
-							Expr:  &FieldRef{Name: "name"},
+							Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 							Name:  "name",
 							AName: ""},
 					},
@@ -2163,7 +2163,7 @@ func TestParser_ParseStatements(t *testing.T) {
 				{
 					Fields: []Field{
 						{
-							Expr:  &FieldRef{Name: "name"},
+							Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 							Name:  "name",
 							AName: ""},
 					},
@@ -2176,7 +2176,7 @@ func TestParser_ParseStatements(t *testing.T) {
 				{
 					Fields: []Field{
 						{
-							Expr:  &FieldRef{Name: "name"},
+							Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 							Name:  "name",
 							AName: ""},
 					},
@@ -2185,7 +2185,7 @@ func TestParser_ParseStatements(t *testing.T) {
 				{
 					Fields: []Field{
 						{
-							Expr:  &FieldRef{Name: "name"},
+							Expr:  &FieldRef{Name: "name", StreamName: DEFAULT_STREAM},
 							Name:  "name",
 							AName: ""},
 					},

+ 1 - 1
xstream/operators/project_operator.go

@@ -102,7 +102,7 @@ func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) (map[string]inter
 		expr := f.Expr
 		//Avoid to re-evaluate for non-agg field has alias name, which was already evaluated in pre-processor operator.
 		if f.AName != "" && !isTest {
-			expr = &xsql.FieldRef{StreamName: "", Name: f.AName}
+			expr = &xsql.FieldRef{StreamName: xsql.DEFAULT_STREAM, Name: f.AName}
 		}
 		v := ve.Eval(expr)
 		if e, ok := v.(error); ok {

+ 2 - 2
xstream/planner/dataSourcePlan.go

@@ -86,7 +86,7 @@ func (p *DataSourcePlan) PruneColumns(fields []xsql.Expr) error {
 		case *xsql.Wildcard:
 			p.isWildCard = true
 		case *xsql.FieldRef:
-			if !p.isWildCard && (f.StreamName == "" || string(f.StreamName) == p.name) {
+			if !p.isWildCard && (f.StreamName == xsql.DEFAULT_STREAM || string(f.StreamName) == p.name) {
 				if _, ok := p.fields[f.Name]; !ok {
 					sf := p.getField(f.Name)
 					if sf != nil {
@@ -98,7 +98,7 @@ func (p *DataSourcePlan) PruneColumns(fields []xsql.Expr) error {
 			if p.allMeta {
 				break
 			}
-			if f.StreamName == "" || string(f.StreamName) == p.name {
+			if f.StreamName == xsql.DEFAULT_STREAM || string(f.StreamName) == p.name {
 				if f.Name == "*" {
 					p.allMeta = true
 					p.metaMap = nil

+ 66 - 1
xstream/planner/planner.go

@@ -11,6 +11,7 @@ import (
 	"github.com/emqx/kuiper/xstream/nodes"
 	"github.com/emqx/kuiper/xstream/operators"
 	"path"
+	"strings"
 )
 
 func Plan(rule *api.Rule, storePath string) (*xstream.TopologyNew, error) {
@@ -52,6 +53,64 @@ func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*nodes.
 	return tp, nil
 }
 
+func decorateStmt(s *xsql.SelectStatement, ss []*xsql.StreamStmt, alias xsql.Fields, aggregateAlias xsql.Fields) (err error) {
+	isSchemaless := false
+	for _, streamStmt := range ss {
+		if streamStmt.StreamFields == nil {
+			isSchemaless = true
+			break
+		}
+	}
+	xsql.WalkFunc(s, func(n xsql.Node) {
+		if f, ok := n.(*xsql.FieldRef); ok && f.StreamName != "" {
+			fname := f.Name
+			isAlias := false
+			if f.StreamName == xsql.DEFAULT_STREAM {
+				for _, alias := range alias {
+					if strings.EqualFold(fname, alias.AName) {
+						fname = alias.Name
+						isAlias = true
+						break
+					}
+				}
+				if !isAlias {
+					for _, alias := range aggregateAlias {
+						if strings.EqualFold(fname, alias.AName) {
+							fname = alias.Name
+							isAlias = true
+							break
+						}
+					}
+				}
+			}
+			count := 0
+			for _, streamStmt := range ss {
+				for _, field := range streamStmt.StreamFields {
+					if strings.EqualFold(fname, field.Name) {
+						if f.StreamName == xsql.DEFAULT_STREAM {
+							f.StreamName = streamStmt.Name
+							count++
+						} else if f.StreamName == streamStmt.Name {
+							count++
+						}
+						break
+					}
+				}
+			}
+			if count > 1 {
+				err = fmt.Errorf("ambiguous field %s", fname)
+			} else if count == 0 && !isAlias && f.StreamName == xsql.DEFAULT_STREAM { // alias may refer to non stream field
+				if !isSchemaless {
+					err = fmt.Errorf("unknown field %s.%s", f.StreamName, f.Name)
+				} else if len(ss) == 1 { // If only one schemaless stream, all the fields must be a field of that stream
+					f.StreamName = ss[0].Name
+				}
+			}
+		}
+	})
+	return
+}
+
 func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sinks []*nodes.SinkNode, streamsFromStmt []string) (*xstream.TopologyNew, error) {
 	// Create topology
 	tp, err := xstream.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
@@ -209,11 +268,13 @@ func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption, store kv
 		}
 	}
 
-	for _, s := range streamsFromStmt {
+	streamStmts := make([]*xsql.StreamStmt, len(streamsFromStmt))
+	for i, s := range streamsFromStmt {
 		streamStmt, err := xsql.GetDataSource(store, s)
 		if err != nil {
 			return nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
 		}
+		streamStmts[i] = streamStmt
 		p = DataSourcePlan{
 			name:       s,
 			streamStmt: streamStmt,
@@ -227,7 +288,11 @@ func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption, store kv
 			tableChildren = append(tableChildren, p)
 			tableEmitters = append(tableEmitters, string(streamStmt.Name))
 		}
+	}
 
+	err := decorateStmt(stmt, streamStmts, alias, aggregateAlias)
+	if err != nil {
+		return nil, err
 	}
 	if dimensions != nil {
 		w = dimensions.GetWindow()

+ 27 - 24
xstream/planner/planner_test.go

@@ -93,7 +93,7 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []xsql.Field{
 					{
-						Expr:  &xsql.FieldRef{Name: "name"},
+						Expr:  &xsql.FieldRef{Name: "name", StreamName: "src1"},
 						Name:  "name",
 						AName: ""},
 				},
@@ -129,7 +129,7 @@ func Test_createLogicalPlan(t *testing.T) {
 											},
 										},
 										condition: &xsql.BinaryExpr{
-											LHS: &xsql.FieldRef{Name: "name"},
+											LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
 											OP:  xsql.EQ,
 											RHS: &xsql.StringLiteral{Val: "v1"},
 										},
@@ -146,7 +146,7 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []xsql.Field{
 					{
-						Expr:  &xsql.FieldRef{Name: "temp"},
+						Expr:  &xsql.FieldRef{Name: "temp", StreamName: "src1"},
 						Name:  "temp",
 						AName: ""},
 				},
@@ -235,7 +235,7 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []xsql.Field{
 					{
-						Expr:  &xsql.FieldRef{Name: "id1"},
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: "src1"},
 						Name:  "id1",
 						AName: ""},
 				},
@@ -277,12 +277,12 @@ func Test_createLogicalPlan(t *testing.T) {
 										condition: &xsql.BinaryExpr{
 											OP: xsql.AND,
 											LHS: &xsql.BinaryExpr{
-												LHS: &xsql.FieldRef{Name: "name"},
+												LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
 												OP:  xsql.EQ,
 												RHS: &xsql.StringLiteral{Val: "v1"},
 											},
 											RHS: &xsql.BinaryExpr{
-												LHS: &xsql.FieldRef{Name: "temp"},
+												LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
 												OP:  xsql.GT,
 												RHS: &xsql.IntegerLiteral{Val: 2},
 											},
@@ -300,7 +300,7 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []xsql.Field{
 					{
-						Expr:  &xsql.FieldRef{Name: "id1"},
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: "src1"},
 						Name:  "id1",
 						AName: ""},
 				},
@@ -352,7 +352,7 @@ func Test_createLogicalPlan(t *testing.T) {
 											},
 										},
 										condition: &xsql.BinaryExpr{
-											LHS: &xsql.FieldRef{Name: "temp"},
+											LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
 											OP:  xsql.GT,
 											RHS: &xsql.IntegerLiteral{Val: 20},
 										},
@@ -479,7 +479,7 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []xsql.Field{
 					{
-						Expr:  &xsql.FieldRef{Name: "id1"},
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: "src1"},
 						Name:  "id1",
 						AName: ""},
 				},
@@ -487,7 +487,7 @@ func Test_createLogicalPlan(t *testing.T) {
 				sendMeta:    false,
 			}.Init(),
 		}, { // 6. optimize outter join on
-			sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
 			p: ProjectPlan{
 				baseLogicalPlan: baseLogicalPlan{
 					children: []LogicalPlan{
@@ -519,7 +519,7 @@ func Test_createLogicalPlan(t *testing.T) {
 													},
 													condition: &xsql.BinaryExpr{
 														OP:  xsql.GT,
-														LHS: &xsql.FieldRef{Name: "id", StreamName: "src1"},
+														LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
 														RHS: &xsql.IntegerLiteral{Val: 111},
 													},
 												}.Init(),
@@ -584,7 +584,7 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []xsql.Field{
 					{
-						Expr:  &xsql.FieldRef{Name: "id1"},
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: "src1"},
 						Name:  "id1",
 						AName: ""},
 				},
@@ -592,11 +592,11 @@ func Test_createLogicalPlan(t *testing.T) {
 				sendMeta:    false,
 			}.Init(),
 		}, { // 7 window error for table
-			sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
+			sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
 			p:   nil,
 			err: "cannot run window for TABLE sources",
 		}, { // 8 join table without window
-			sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111`,
+			sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
 			p: ProjectPlan{
 				baseLogicalPlan: baseLogicalPlan{
 					children: []LogicalPlan{
@@ -692,14 +692,14 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []xsql.Field{
 					{
-						Expr:  &xsql.FieldRef{Name: "id1"},
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: "src1"},
 						Name:  "id1",
 						AName: ""},
 				},
 				isAggregate: false,
 				sendMeta:    false,
 			}.Init(),
-		}, { // 8 join table with window
+		}, { // 9 join table with window
 			sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
 			p: ProjectPlan{
 				baseLogicalPlan: baseLogicalPlan{
@@ -807,14 +807,14 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []xsql.Field{
 					{
-						Expr:  &xsql.FieldRef{Name: "id1"},
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: "src1"},
 						Name:  "id1",
 						AName: ""},
 				},
 				isAggregate: false,
 				sendMeta:    false,
 			}.Init(),
-		}, { // 9 meta
+		}, { // 10 meta
 			sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
 			p: ProjectPlan{
 				baseLogicalPlan: baseLogicalPlan{
@@ -835,7 +835,8 @@ func Test_createLogicalPlan(t *testing.T) {
 										alias: xsql.Fields{
 											xsql.Field{
 												Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
-													Name: "id",
+													Name:       "id",
+													StreamName: xsql.DEFAULT_STREAM,
 												}}},
 												Name:  "meta",
 												AName: "eid",
@@ -844,7 +845,7 @@ func Test_createLogicalPlan(t *testing.T) {
 												Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{
 													&xsql.BinaryExpr{
 														OP:  xsql.ARROW,
-														LHS: &xsql.MetaRef{Name: "Humidity"},
+														LHS: &xsql.MetaRef{Name: "Humidity", StreamName: xsql.DEFAULT_STREAM},
 														RHS: &xsql.MetaRef{Name: "Device"},
 													},
 												}},
@@ -859,7 +860,8 @@ func Test_createLogicalPlan(t *testing.T) {
 								LHS: &xsql.Call{
 									Name: "meta",
 									Args: []xsql.Expr{&xsql.MetaRef{
-										Name: "device",
+										Name:       "device",
+										StreamName: xsql.DEFAULT_STREAM,
 									}},
 								},
 								OP: xsql.EQ,
@@ -872,12 +874,13 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []xsql.Field{
 					{
-						Expr:  &xsql.FieldRef{Name: "temp"},
+						Expr:  &xsql.FieldRef{Name: "temp", StreamName: "src1"},
 						Name:  "temp",
 						AName: "",
 					}, {
 						Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
-							Name: "id",
+							Name:       "id",
+							StreamName: xsql.DEFAULT_STREAM,
 						}}},
 						Name:  "meta",
 						AName: "eid",
@@ -885,7 +888,7 @@ func Test_createLogicalPlan(t *testing.T) {
 						Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{
 							&xsql.BinaryExpr{
 								OP:  xsql.ARROW,
-								LHS: &xsql.MetaRef{Name: "Humidity"},
+								LHS: &xsql.MetaRef{Name: "Humidity", StreamName: xsql.DEFAULT_STREAM},
 								RHS: &xsql.MetaRef{Name: "Device"},
 							},
 						}},

+ 7 - 3
xstream/planner/util.go

@@ -9,7 +9,7 @@ func getRefSources(node xsql.Node) []string {
 		return keys
 	}
 	xsql.WalkFunc(node, func(n xsql.Node) {
-		if f, ok := n.(*xsql.FieldRef); ok {
+		if f, ok := n.(*xsql.FieldRef); ok && f.StreamName != "" {
 			result[string(f.StreamName)] = true
 		}
 	})
@@ -38,11 +38,15 @@ func getFields(node xsql.Node) []xsql.Expr {
 	xsql.WalkFunc(node, func(n xsql.Node) {
 		switch t := n.(type) {
 		case *xsql.FieldRef:
-			result = append(result, t)
+			if t.StreamName != "" {
+				result = append(result, t)
+			}
 		case *xsql.Wildcard:
 			result = append(result, t)
 		case *xsql.MetaRef:
-			result = append(result, t)
+			if t.StreamName != "" {
+				result = append(result, t)
+			}
 		case *xsql.SortField:
 			result = append(result, t)
 		}