Bladeren bron

fix(planner): schemaless stream name default to the stream if only one stream

Also add unit test case for schemaless streams

Signed-off-by: ngjaying <ngjaying@gmail.com>
ngjaying 3 jaren geleden
bovenliggende
commit
2f3a7fe61e
3 gewijzigde bestanden met toevoegingen van 1086 en 71 verwijderingen
  1. 25 13
      xstream/planner/analyzer.go
  2. 150 58
      xstream/planner/analyzer_test.go
  3. 911 0
      xstream/planner/planner_test.go

+ 25 - 13
xstream/planner/analyzer.go

@@ -26,8 +26,12 @@ func decorateStmt(s *xsql.SelectStatement, store kv.KeyValue) ([]*xsql.StreamStm
 		}
 		}
 	}
 	}
 
 
+	dsn := xsql.DefaultStream
+	if len(streamsFromStmt) == 1 {
+		dsn = streamStmts[0].Name
+	}
 	// [fieldName][streamsName][*aliasRef] if alias, with special key alias/default. Each key has exactly one value
 	// [fieldName][streamsName][*aliasRef] if alias, with special key alias/default. Each key has exactly one value
-	fieldsMap := newFieldsMap(isSchemaless)
+	fieldsMap := newFieldsMap(isSchemaless, dsn)
 	if !isSchemaless {
 	if !isSchemaless {
 		for _, streamStmt := range streamStmts {
 		for _, streamStmt := range streamStmts {
 			for _, field := range streamStmt.StreamFields {
 			for _, field := range streamStmt.StreamFields {
@@ -149,19 +153,20 @@ func allAggregate(expr xsql.Expr) (r bool) {
 }
 }
 
 
 type fieldsMap struct {
 type fieldsMap struct {
-	content      map[string]streamFieldStore
-	isSchemaless bool
+	content       map[string]streamFieldStore
+	isSchemaless  bool
+	defaultStream xsql.StreamName
 }
 }
 
 
-func newFieldsMap(isSchemaless bool) *fieldsMap {
-	return &fieldsMap{content: make(map[string]streamFieldStore), isSchemaless: isSchemaless}
+func newFieldsMap(isSchemaless bool, defaultStream xsql.StreamName) *fieldsMap {
+	return &fieldsMap{content: make(map[string]streamFieldStore), isSchemaless: isSchemaless, defaultStream: defaultStream}
 }
 }
 
 
 func (f *fieldsMap) reserve(fieldName string, streamName xsql.StreamName) {
 func (f *fieldsMap) reserve(fieldName string, streamName xsql.StreamName) {
 	if fm, ok := f.content[strings.ToLower(fieldName)]; ok {
 	if fm, ok := f.content[strings.ToLower(fieldName)]; ok {
 		fm.add(streamName)
 		fm.add(streamName)
 	} else {
 	} else {
-		fm := newStreamFieldStore(f.isSchemaless)
+		fm := newStreamFieldStore(f.isSchemaless, f.defaultStream)
 		fm.add(streamName)
 		fm.add(streamName)
 		f.content[strings.ToLower(fieldName)] = fm
 		f.content[strings.ToLower(fieldName)] = fm
 	}
 	}
@@ -171,7 +176,7 @@ func (f *fieldsMap) save(fieldName string, streamName xsql.StreamName, field *xs
 	fm, ok := f.content[strings.ToLower(fieldName)]
 	fm, ok := f.content[strings.ToLower(fieldName)]
 	if !ok {
 	if !ok {
 		if streamName == xsql.AliasStream || f.isSchemaless {
 		if streamName == xsql.AliasStream || f.isSchemaless {
-			fm = newStreamFieldStore(f.isSchemaless)
+			fm = newStreamFieldStore(f.isSchemaless, f.defaultStream)
 			f.content[strings.ToLower(fieldName)] = fm
 			f.content[strings.ToLower(fieldName)] = fm
 		} else {
 		} else {
 			return fmt.Errorf("unknown field %s", fieldName)
 			return fmt.Errorf("unknown field %s", fieldName)
@@ -188,7 +193,7 @@ func (f *fieldsMap) bind(fr *xsql.FieldRef) error {
 	fm, ok := f.content[strings.ToLower(fr.Name)]
 	fm, ok := f.content[strings.ToLower(fr.Name)]
 	if !ok {
 	if !ok {
 		if f.isSchemaless && fr.Name != "" {
 		if f.isSchemaless && fr.Name != "" {
-			fm = newStreamFieldStore(f.isSchemaless)
+			fm = newStreamFieldStore(f.isSchemaless, f.defaultStream)
 			f.content[strings.ToLower(fr.Name)] = fm
 			f.content[strings.ToLower(fr.Name)] = fm
 		} else {
 		} else {
 			return fmt.Errorf("unknown field %s", fr.Name)
 			return fmt.Errorf("unknown field %s", fr.Name)
@@ -217,11 +222,11 @@ type streamFieldStore interface {
 	bindRef(f *xsql.FieldRef) error
 	bindRef(f *xsql.FieldRef) error
 }
 }
 
 
-func newStreamFieldStore(isSchemaless bool) streamFieldStore {
+func newStreamFieldStore(isSchemaless bool, defaultStream xsql.StreamName) streamFieldStore {
 	if !isSchemaless {
 	if !isSchemaless {
 		return &streamFieldMap{content: make(map[xsql.StreamName]*xsql.AliasRef)}
 		return &streamFieldMap{content: make(map[xsql.StreamName]*xsql.AliasRef)}
 	} else {
 	} else {
-		return &streamFieldMapSchemaless{content: make(map[xsql.StreamName]*xsql.AliasRef)}
+		return &streamFieldMapSchemaless{content: make(map[xsql.StreamName]*xsql.AliasRef), defaultStream: defaultStream}
 	}
 	}
 }
 }
 
 
@@ -302,7 +307,8 @@ func (s *streamFieldMap) bindRef(fr *xsql.FieldRef) error {
 }
 }
 
 
 type streamFieldMapSchemaless struct {
 type streamFieldMapSchemaless struct {
-	content map[xsql.StreamName]*xsql.AliasRef
+	content       map[xsql.StreamName]*xsql.AliasRef
+	defaultStream xsql.StreamName
 }
 }
 
 
 // add this should not be called for schemaless
 // add this should not be called for schemaless
@@ -343,8 +349,14 @@ func (s *streamFieldMapSchemaless) ref(k xsql.StreamName, v *xsql.AliasRef) erro
 
 
 func (s *streamFieldMapSchemaless) bindRef(fr *xsql.FieldRef) error {
 func (s *streamFieldMapSchemaless) bindRef(fr *xsql.FieldRef) error {
 	l := len(s.content)
 	l := len(s.content)
-	if fr.StreamName == "" {
-		fr.StreamName = xsql.DefaultStream
+	if fr.StreamName == "" || fr.StreamName == xsql.DefaultStream {
+		if l == 1 {
+			for sk := range s.content {
+				fr.StreamName = sk
+			}
+		} else {
+			fr.StreamName = s.defaultStream
+		}
 	}
 	}
 	k := fr.StreamName
 	k := fr.StreamName
 	if k == xsql.DefaultStream {
 	if k == xsql.DefaultStream {

+ 150 - 58
xstream/planner/analyzer_test.go

@@ -13,6 +13,93 @@ import (
 	"testing"
 	"testing"
 )
 )
 
 
+type errorStruct struct {
+	err  string
+	serr *string
+}
+
+func newErrorStruct(err string) *errorStruct {
+	return &errorStruct{
+		err: err,
+	}
+}
+
+func newErrorStructWithS(err string, serr string) *errorStruct {
+	return &errorStruct{
+		err:  err,
+		serr: &serr,
+	}
+}
+
+func (e *errorStruct) Serr() string {
+	if e.serr != nil {
+		return *e.serr
+	}
+	return e.err
+}
+
+var tests = []struct {
+	sql string
+	r   *errorStruct
+}{
+	{ // 0
+		sql: `SELECT count(*) FROM src1 HAVING sin(temp) > 0.3`,
+		r:   newErrorStruct("Not allowed to call non-aggregate functions in HAVING clause."),
+	},
+	{ // 1
+		sql: `SELECT count(*) FROM src1 WHERE name = "dname" HAVING sin(count(*)) > 0.3`,
+		r:   newErrorStruct(""),
+	},
+	{ // 2
+		sql: `SELECT count(*) as c FROM src1 WHERE name = "dname" HAVING sin(c) > 0.3`,
+		r:   newErrorStruct(""),
+	},
+	{ // 3
+		sql: `SELECT count(*) as c FROM src1 WHERE name = "dname" HAVING sum(c) > 0.3`,
+		r:   newErrorStruct("invalid argument for func sum: aggregate argument is not allowed"),
+	},
+	{ // 4
+		sql: `SELECT count(*) as c FROM src1 WHERE name = "dname" GROUP BY sin(c)`,
+		r:   newErrorStruct("Not allowed to call aggregate functions in GROUP BY clause."),
+	},
+	{ // 5
+		sql: `SELECT count(*) as c FROM src1 WHERE name = "dname" HAVING sum(c) > 0.3 OR sin(temp) > 3`,
+		r:   newErrorStruct("Not allowed to call non-aggregate functions in HAVING clause."),
+	},
+	{ // 6
+		sql: `SELECT collect(*) as c FROM src1 WHERE name = "dname" HAVING c[2]->temp > 20 AND sin(c[0]->temp) > 0`,
+		r:   newErrorStruct(""),
+	},
+	{ // 7
+		sql: `SELECT collect(*) as c FROM src1 WHERE name = "dname" HAVING c[2]->temp + temp > 0`,
+		r:   newErrorStruct("Not allowed to call non-aggregate functions in HAVING clause."),
+	},
+	{ // 8
+		sql: `SELECT deduplicate(temp, true) as de FROM src1 HAVING cardinality(de) > 20`,
+		r:   newErrorStruct(""),
+	},
+	{ // 9
+		sql: `SELECT sin(temp) as temp FROM src1`,
+		r:   newErrorStruct(""),
+	},
+	{ // 10
+		sql: `SELECT sum(temp) as temp, count(temp) as temp FROM src1`,
+		r:   newErrorStruct("duplicate alias temp"),
+	},
+	{ // 11
+		sql: `SELECT sum(temp) as temp, count(temp) as ct FROM src1`,
+		r:   newErrorStruct(""),
+	},
+	{ // 12
+		sql: `SELECT collect(*)->abc FROM src1`,
+		r:   newErrorStruct(""),
+	},
+	{ // 13
+		sql: `SELECT sin(temp) as temp1, cos(temp1) FROM src1`,
+		r:   newErrorStructWithS("unknown field temp1", ""),
+	},
+}
+
 func Test_validation(t *testing.T) {
 func Test_validation(t *testing.T) {
 	store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
 	store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
 	err := store.Open()
 	err := store.Open()
@@ -51,63 +138,67 @@ func Test_validation(t *testing.T) {
 		}
 		}
 		streams[n] = streamStmt
 		streams[n] = streamStmt
 	}
 	}
-	var tests = []struct {
-		sql string
-		err string
-	}{
-		{ // 0
-			sql: `SELECT count(*) FROM src1 HAVING sin(temp) > 0.3`,
-			err: "Not allowed to call non-aggregate functions in HAVING clause.",
-		},
-		{ // 1
-			sql: `SELECT count(*) FROM src1 WHERE name = "dname" HAVING sin(count(*)) > 0.3`,
-			err: "",
-		},
-		{ // 2
-			sql: `SELECT count(*) as c FROM src1 WHERE name = "dname" HAVING sin(c) > 0.3`,
-			err: "",
-		},
-		{ // 3
-			sql: `SELECT count(*) as c FROM src1 WHERE name = "dname" HAVING sum(c) > 0.3`,
-			err: "invalid argument for func sum: aggregate argument is not allowed",
-		},
-		{ // 4
-			sql: `SELECT count(*) as c FROM src1 WHERE name = "dname" GROUP BY sin(c)`,
-			err: "Not allowed to call aggregate functions in GROUP BY clause.",
-		},
-		{ // 5
-			sql: `SELECT count(*) as c FROM src1 WHERE name = "dname" HAVING sum(c) > 0.3 OR sin(temp) > 3`,
-			err: "Not allowed to call non-aggregate functions in HAVING clause.",
-		},
-		{ // 6
-			sql: `SELECT collect(*) as c FROM src1 WHERE name = "dname" HAVING c[2]->temp > 20 AND sin(c[0]->temp) > 0`,
-			err: "",
-		},
-		{ // 7
-			sql: `SELECT collect(*) as c FROM src1 WHERE name = "dname" HAVING c[2]->temp + temp > 0`,
-			err: "Not allowed to call non-aggregate functions in HAVING clause.",
-		},
-		{ // 8
-			sql: `SELECT deduplicate(temp, true) as de FROM src1 HAVING cardinality(de) > 20`,
-			err: "",
-		},
-		{ // 9
-			sql: `SELECT sin(temp) as temp FROM src1`,
-			err: "",
-		},
-		{ // 10
-			sql: `SELECT sum(temp) as temp, count(temp) as temp FROM src1`,
-			err: "duplicate alias temp",
-		},
-		{ // 11
-			sql: `SELECT sum(temp) as temp, count(temp) as ct FROM src1`,
-			err: "",
-		},
-		{ // 12
-			sql: `SELECT collect(*)->abc FROM src1`,
-			err: "",
-		},
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
+			continue
+		}
+		_, err = createLogicalPlan(stmt, &api.RuleOption{
+			IsEventTime:        false,
+			LateTol:            0,
+			Concurrency:        0,
+			BufferLength:       0,
+			SendMetaToSink:     false,
+			Qos:                0,
+			CheckpointInterval: 0,
+			SendError:          true,
+		}, store)
+		if !reflect.DeepEqual(tt.r.err, common.Errstring(err)) {
+			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.sql, tt.r.err, err)
+		}
+	}
+}
+
+func Test_validationSchemaless(t *testing.T) {
+	store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
+	err := store.Open()
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	defer store.Close()
+	streamSqls := map[string]string{
+		"src1": `CREATE STREAM src1 (
+				) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
+	}
+	types := map[string]xsql.StreamType{
+		"src1": xsql.TypeStream,
 	}
 	}
+	for name, sql := range streamSqls {
+		s, err := json.Marshal(&xsql.StreamInfo{
+			StreamType: types[name],
+			Statement:  sql,
+		})
+		if err != nil {
+			t.Error(err)
+			t.Fail()
+		}
+		store.Set(name, string(s))
+	}
+	streams := make(map[string]*xsql.StreamStmt)
+	for n := range streamSqls {
+		streamStmt, err := xsql.GetDataSource(store, n)
+		if err != nil {
+			t.Errorf("fail to get stream %s, please check if stream is created", n)
+			return
+		}
+		streams[n] = streamStmt
+	}
+
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
@@ -126,8 +217,9 @@ func Test_validation(t *testing.T) {
 			CheckpointInterval: 0,
 			CheckpointInterval: 0,
 			SendError:          true,
 			SendError:          true,
 		}, store)
 		}, store)
-		if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
-			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.sql, tt.err, err)
+		serr := tt.r.Serr()
+		if !reflect.DeepEqual(serr, common.Errstring(err)) {
+			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.sql, serr, err)
 		}
 		}
 	}
 	}
 }
 }

+ 911 - 0
xstream/planner/planner_test.go

@@ -1036,3 +1036,914 @@ func Test_createLogicalPlan(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func Test_createLogicalPlanSchemaless(t *testing.T) {
+	store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
+	err := store.Open()
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	defer store.Close()
+	streamSqls := map[string]string{
+		"src1": `CREATE STREAM src1 (
+				) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
+		"src2": `CREATE STREAM src2 (
+				) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
+		"tableInPlanner": `CREATE TABLE tableInPlanner (
+					id BIGINT,
+					name STRING,
+					value STRING,
+					hum BIGINT
+				) WITH (TYPE="file");`,
+	}
+	types := map[string]xsql.StreamType{
+		"src1":           xsql.TypeStream,
+		"src2":           xsql.TypeStream,
+		"tableInPlanner": xsql.TypeTable,
+	}
+	for name, sql := range streamSqls {
+		s, err := json.Marshal(&xsql.StreamInfo{
+			StreamType: types[name],
+			Statement:  sql,
+		})
+		if err != nil {
+			t.Error(err)
+			t.Fail()
+		}
+		store.Set(name, string(s))
+	}
+	streams := make(map[string]*xsql.StreamStmt)
+	for n := range streamSqls {
+		streamStmt, err := xsql.GetDataSource(store, n)
+		if err != nil {
+			t.Errorf("fail to get stream %s, please check if stream is created", n)
+			return
+		}
+		streams[n] = streamStmt
+	}
+
+	var (
+		//boolTrue = true
+		boolFalse = false
+	)
+
+	var tests = []struct {
+		sql string
+		p   LogicalPlan
+		err string
+	}{
+		{ // 0
+			sql: `SELECT name FROM src1`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						DataSourcePlan{
+							baseLogicalPlan: baseLogicalPlan{},
+							name:            "src1",
+							streamFields: []interface{}{
+								"name",
+							},
+							streamStmt: streams["src1"],
+							metaFields: []string{},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "name", StreamName: "src1"},
+						Name:  "name",
+						AName: "",
+					},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 1 optimize where to data source
+			sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						WindowPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									FilterPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												DataSourcePlan{
+													name: "src1",
+													streamFields: []interface{}{
+														"name", "temp",
+													},
+													streamStmt: streams["src1"],
+													metaFields: []string{},
+												}.Init(),
+											},
+										},
+										condition: &xsql.BinaryExpr{
+											LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
+											OP:  xsql.EQ,
+											RHS: &xsql.StringLiteral{Val: "v1"},
+										},
+									}.Init(),
+								},
+							},
+							condition: nil,
+							wtype:     xsql.TUMBLING_WINDOW,
+							length:    10000,
+							interval:  0,
+							limit:     0,
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+						Name:  "temp",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 2 condition that cannot be optimized
+			sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						JoinPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									WindowPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												DataSourcePlan{
+													name: "src1",
+													streamFields: []interface{}{
+														"id1", "temp",
+													},
+													streamStmt: streams["src1"],
+													metaFields: []string{},
+												}.Init(),
+												DataSourcePlan{
+													name: "src2",
+													streamFields: []interface{}{ // can't determine where is id1 belonged to
+														"hum", "id1", "id2",
+													},
+													streamStmt: streams["src2"],
+													metaFields: []string{},
+												}.Init(),
+											},
+										},
+										condition: nil,
+										wtype:     xsql.TUMBLING_WINDOW,
+										length:    10000,
+										interval:  0,
+										limit:     0,
+									}.Init(),
+								},
+							},
+							from: &xsql.Table{Name: "src1"},
+							joins: xsql.Joins{xsql.Join{
+								Name:     "src2",
+								JoinType: xsql.INNER_JOIN,
+								Expr: &xsql.BinaryExpr{
+									OP: xsql.AND,
+									LHS: &xsql.BinaryExpr{
+										LHS: &xsql.BinaryExpr{
+											OP:  xsql.GT,
+											LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+											RHS: &xsql.IntegerLiteral{Val: 20},
+										},
+										OP: xsql.OR,
+										RHS: &xsql.BinaryExpr{
+											OP:  xsql.GT,
+											LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
+											RHS: &xsql.IntegerLiteral{Val: 60},
+										},
+									},
+									RHS: &xsql.BinaryExpr{
+										OP:  xsql.EQ,
+										LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+										RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
+									},
+								},
+							}},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
+						Name:  "id1",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 3 optimize window filter
+			sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						WindowPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									FilterPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												DataSourcePlan{
+													name: "src1",
+													streamFields: []interface{}{
+														"id1", "name", "temp",
+													},
+													streamStmt: streams["src1"],
+													metaFields: []string{},
+												}.Init(),
+											},
+										},
+										condition: &xsql.BinaryExpr{
+											OP: xsql.AND,
+											LHS: &xsql.BinaryExpr{
+												LHS: &xsql.FieldRef{Name: "name", StreamName: "src1"},
+												OP:  xsql.EQ,
+												RHS: &xsql.StringLiteral{Val: "v1"},
+											},
+											RHS: &xsql.BinaryExpr{
+												LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+												OP:  xsql.GT,
+												RHS: &xsql.IntegerLiteral{Val: 2},
+											},
+										},
+									}.Init(),
+								},
+							},
+							condition: nil,
+							wtype:     xsql.TUMBLING_WINDOW,
+							length:    10000,
+							interval:  0,
+							limit:     0,
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+						Name:  "id1",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 4. do not optimize count window
+			sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						HavingPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									FilterPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												WindowPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															DataSourcePlan{
+																name:         "src1",
+																isWildCard:   true,
+																streamFields: nil,
+																streamStmt:   streams["src1"],
+																metaFields:   []string{},
+															}.Init(),
+														},
+													},
+													condition: nil,
+													wtype:     xsql.COUNT_WINDOW,
+													length:    5,
+													interval:  1,
+													limit:     0,
+												}.Init(),
+											},
+										},
+										condition: &xsql.BinaryExpr{
+											LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+											OP:  xsql.GT,
+											RHS: &xsql.IntegerLiteral{Val: 20},
+										},
+									}.Init(),
+								},
+							},
+							condition: &xsql.BinaryExpr{
+								LHS: &xsql.Call{Name: "COUNT", Args: []xsql.Expr{&xsql.Wildcard{
+									Token: xsql.ASTERISK,
+								}}},
+								OP:  xsql.GT,
+								RHS: &xsql.IntegerLiteral{Val: 2},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.Wildcard{Token: xsql.ASTERISK},
+						Name:  "kuiper_field_0",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 5. optimize join on
+			sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						JoinPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									WindowPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												FilterPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															DataSourcePlan{
+																name: "src1",
+																streamFields: []interface{}{
+																	"id1", "temp",
+																},
+																streamStmt: streams["src1"],
+																metaFields: []string{},
+															}.Init(),
+														},
+													},
+													condition: &xsql.BinaryExpr{
+														RHS: &xsql.BinaryExpr{
+															OP:  xsql.GT,
+															LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+															RHS: &xsql.IntegerLiteral{Val: 20},
+														},
+														OP: xsql.AND,
+														LHS: &xsql.BinaryExpr{
+															OP:  xsql.GT,
+															LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+															RHS: &xsql.IntegerLiteral{Val: 111},
+														},
+													},
+												}.Init(),
+												FilterPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															DataSourcePlan{
+																name: "src2",
+																streamFields: []interface{}{
+																	"hum", "id1", "id2",
+																},
+																streamStmt: streams["src2"],
+																metaFields: []string{},
+															}.Init(),
+														},
+													},
+													condition: &xsql.BinaryExpr{
+														OP:  xsql.LT,
+														LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
+														RHS: &xsql.IntegerLiteral{Val: 60},
+													},
+												}.Init(),
+											},
+										},
+										condition: nil,
+										wtype:     xsql.TUMBLING_WINDOW,
+										length:    10000,
+										interval:  0,
+										limit:     0,
+									}.Init(),
+								},
+							},
+							from: &xsql.Table{
+								Name: "src1",
+							},
+							joins: []xsql.Join{
+								{
+									Name:     "src2",
+									Alias:    "",
+									JoinType: xsql.INNER_JOIN,
+									Expr: &xsql.BinaryExpr{
+										LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+										OP:  xsql.EQ,
+										RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
+									},
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
+						Name:  "id1",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 6. optimize outter join on
+			sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						JoinPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									WindowPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												FilterPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															DataSourcePlan{
+																name: "src1",
+																streamFields: []interface{}{
+																	"id1", "temp",
+																},
+																streamStmt: streams["src1"],
+																metaFields: []string{},
+															}.Init(),
+														},
+													},
+													condition: &xsql.BinaryExpr{
+														OP:  xsql.GT,
+														LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+														RHS: &xsql.IntegerLiteral{Val: 111},
+													},
+												}.Init(),
+												DataSourcePlan{
+													name: "src2",
+													streamFields: []interface{}{
+														"hum", "id1", "id2",
+													},
+													streamStmt: streams["src2"],
+													metaFields: []string{},
+												}.Init(),
+											},
+										},
+										condition: nil,
+										wtype:     xsql.TUMBLING_WINDOW,
+										length:    10000,
+										interval:  0,
+										limit:     0,
+									}.Init(),
+								},
+							},
+							from: &xsql.Table{
+								Name: "src1",
+							},
+							joins: []xsql.Join{
+								{
+									Name:     "src2",
+									Alias:    "",
+									JoinType: xsql.FULL_JOIN,
+									Expr: &xsql.BinaryExpr{
+										OP: xsql.AND,
+										LHS: &xsql.BinaryExpr{
+											OP: xsql.AND,
+											LHS: &xsql.BinaryExpr{
+												LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+												OP:  xsql.EQ,
+												RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
+											},
+											RHS: &xsql.BinaryExpr{
+												OP:  xsql.GT,
+												LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+												RHS: &xsql.IntegerLiteral{Val: 20},
+											},
+										},
+										RHS: &xsql.BinaryExpr{
+											OP:  xsql.LT,
+											LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
+											RHS: &xsql.IntegerLiteral{Val: 60},
+										},
+									},
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
+						Name:  "id1",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 7 window error for table
+			sql: `SELECT value FROM tableInPlanner WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			p:   nil,
+			err: "cannot run window for TABLE sources",
+		}, { // 8 join table without window
+			sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and hum < 60 WHERE src1.id1 > 111`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						JoinPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									JoinAlignPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												FilterPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															DataSourcePlan{
+																name: "src1",
+																streamFields: []interface{}{
+																	"hum", "id1", "temp",
+																},
+																streamStmt: streams["src1"],
+																metaFields: []string{},
+															}.Init(),
+														},
+													},
+													condition: &xsql.BinaryExpr{
+														RHS: &xsql.BinaryExpr{
+															OP:  xsql.GT,
+															LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+															RHS: &xsql.IntegerLiteral{Val: 20},
+														},
+														OP: xsql.AND,
+														LHS: &xsql.BinaryExpr{
+															OP:  xsql.GT,
+															LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+															RHS: &xsql.IntegerLiteral{Val: 111},
+														},
+													},
+												}.Init(),
+												DataSourcePlan{
+													name: "tableInPlanner",
+													streamFields: []interface{}{
+														&xsql.StreamField{
+															Name:      "hum",
+															FieldType: &xsql.BasicType{Type: xsql.BIGINT},
+														},
+														&xsql.StreamField{
+															Name:      "id",
+															FieldType: &xsql.BasicType{Type: xsql.BIGINT},
+														},
+													},
+													streamStmt: streams["tableInPlanner"],
+													metaFields: []string{},
+												}.Init(),
+											},
+										},
+										Emitters: []string{"tableInPlanner"},
+									}.Init(),
+								},
+							},
+							from: &xsql.Table{
+								Name: "src1",
+							},
+							joins: []xsql.Join{
+								{
+									Name:     "tableInPlanner",
+									Alias:    "",
+									JoinType: xsql.INNER_JOIN,
+									Expr: &xsql.BinaryExpr{
+										OP: xsql.AND,
+										LHS: &xsql.BinaryExpr{
+											LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+											OP:  xsql.EQ,
+											RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
+										},
+										RHS: &xsql.BinaryExpr{
+											OP:  xsql.LT,
+											LHS: &xsql.FieldRef{Name: "hum", StreamName: xsql.DefaultStream},
+											RHS: &xsql.IntegerLiteral{Val: 60},
+										},
+									},
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
+						Name:  "id1",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 9 join table with window
+			sql: `SELECT id1 FROM src1 INNER JOIN tableInPlanner on src1.id1 = tableInPlanner.id and src1.temp > 20 and tableInPlanner.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						JoinPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									JoinAlignPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												WindowPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															FilterPlan{
+																baseLogicalPlan: baseLogicalPlan{
+																	children: []LogicalPlan{
+																		DataSourcePlan{
+																			name: "src1",
+																			streamFields: []interface{}{
+																				"id1", "temp",
+																			},
+																			streamStmt: streams["src1"],
+																			metaFields: []string{},
+																		}.Init(),
+																	},
+																},
+																condition: &xsql.BinaryExpr{
+																	RHS: &xsql.BinaryExpr{
+																		OP:  xsql.GT,
+																		LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+																		RHS: &xsql.IntegerLiteral{Val: 20},
+																	},
+																	OP: xsql.AND,
+																	LHS: &xsql.BinaryExpr{
+																		OP:  xsql.GT,
+																		LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+																		RHS: &xsql.IntegerLiteral{Val: 111},
+																	},
+																},
+															}.Init(),
+														},
+													},
+													condition: nil,
+													wtype:     xsql.TUMBLING_WINDOW,
+													length:    10000,
+													interval:  0,
+													limit:     0,
+												}.Init(),
+												FilterPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															DataSourcePlan{
+																name: "tableInPlanner",
+																streamFields: []interface{}{
+																	&xsql.StreamField{
+																		Name:      "hum",
+																		FieldType: &xsql.BasicType{Type: xsql.BIGINT},
+																	},
+																	&xsql.StreamField{
+																		Name:      "id",
+																		FieldType: &xsql.BasicType{Type: xsql.BIGINT},
+																	},
+																},
+																streamStmt: streams["tableInPlanner"],
+																metaFields: []string{},
+															}.Init(),
+														},
+													},
+													condition: &xsql.BinaryExpr{
+														OP:  xsql.LT,
+														LHS: &xsql.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
+														RHS: &xsql.IntegerLiteral{Val: 60},
+													},
+												}.Init(),
+											},
+										},
+										Emitters: []string{"tableInPlanner"},
+									}.Init(),
+								},
+							},
+							from: &xsql.Table{
+								Name: "src1",
+							},
+							joins: []xsql.Join{
+								{
+									Name:     "tableInPlanner",
+									Alias:    "",
+									JoinType: xsql.INNER_JOIN,
+									Expr: &xsql.BinaryExpr{
+										LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+										OP:  xsql.EQ,
+										RHS: &xsql.FieldRef{Name: "id", StreamName: "tableInPlanner"},
+									},
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "id1", StreamName: xsql.DefaultStream},
+						Name:  "id1",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 10 meta
+			sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						FilterPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									DataSourcePlan{
+										name: "src1",
+										streamFields: []interface{}{
+											"temp",
+										},
+										streamStmt: streams["src1"],
+										metaFields: []string{"Humidity", "device", "id"},
+									}.Init(),
+								},
+							},
+							condition: &xsql.BinaryExpr{
+								LHS: &xsql.Call{
+									Name: "meta",
+									Args: []xsql.Expr{&xsql.MetaRef{
+										Name:       "device",
+										StreamName: xsql.DefaultStream,
+									}},
+								},
+								OP: xsql.EQ,
+								RHS: &xsql.StringLiteral{
+									Val: "demo2",
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+						Name:  "temp",
+						AName: "",
+					}, {
+						Expr: &xsql.FieldRef{Name: "eid", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
+							&xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
+								Name:       "id",
+								StreamName: xsql.DefaultStream,
+							}}},
+							[]xsql.StreamName{},
+							nil,
+						)},
+						Name:  "meta",
+						AName: "eid",
+					}, {
+						Expr: &xsql.FieldRef{Name: "hdevice", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
+							&xsql.Call{Name: "meta", Args: []xsql.Expr{
+								&xsql.BinaryExpr{
+									OP:  xsql.ARROW,
+									LHS: &xsql.MetaRef{Name: "Humidity", StreamName: xsql.DefaultStream},
+									RHS: &xsql.MetaRef{Name: "Device"},
+								},
+							}},
+							[]xsql.StreamName{},
+							nil,
+						)},
+						Name:  "meta",
+						AName: "hdevice",
+					},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 11 join with same name field and aliased
+			sql: `SELECT src2.hum AS hum1, tableInPlanner.hum AS hum2 FROM src2 INNER JOIN tableInPlanner on id2 = id WHERE hum1 > hum2`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						JoinPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									JoinAlignPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												DataSourcePlan{
+													name: "src2",
+													streamFields: []interface{}{
+														"hum", "id", "id2",
+													},
+													streamStmt: streams["src2"],
+													metaFields: []string{},
+												}.Init(),
+												DataSourcePlan{
+													name: "tableInPlanner",
+													streamFields: []interface{}{
+														&xsql.StreamField{
+															Name:      "hum",
+															FieldType: &xsql.BasicType{Type: xsql.BIGINT},
+														},
+														&xsql.StreamField{
+															Name:      "id",
+															FieldType: &xsql.BasicType{Type: xsql.BIGINT},
+														},
+													},
+													streamStmt: streams["tableInPlanner"],
+													metaFields: []string{},
+												}.Init(),
+											},
+										},
+										Emitters: []string{"tableInPlanner"},
+									}.Init(),
+								},
+							},
+							from: &xsql.Table{
+								Name: "src2",
+							},
+							joins: []xsql.Join{
+								{
+									Name:     "tableInPlanner",
+									Alias:    "",
+									JoinType: xsql.INNER_JOIN,
+									Expr: &xsql.BinaryExpr{
+										RHS: &xsql.BinaryExpr{
+											OP:  xsql.EQ,
+											LHS: &xsql.FieldRef{Name: "id2", StreamName: xsql.DefaultStream},
+											RHS: &xsql.FieldRef{Name: "id", StreamName: xsql.DefaultStream},
+										},
+										OP: xsql.AND,
+										LHS: &xsql.BinaryExpr{
+											OP: xsql.GT,
+											LHS: &xsql.FieldRef{Name: "hum1", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
+												&xsql.FieldRef{
+													Name:       "hum",
+													StreamName: "src2",
+												},
+												[]xsql.StreamName{"src2"},
+												&boolFalse,
+											)},
+											RHS: &xsql.FieldRef{Name: "hum2", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
+												&xsql.FieldRef{
+													Name:       "hum",
+													StreamName: "tableInPlanner",
+												},
+												[]xsql.StreamName{"tableInPlanner"},
+												&boolFalse,
+											)},
+										},
+									},
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr: &xsql.FieldRef{Name: "hum1", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
+							&xsql.FieldRef{
+								Name:       "hum",
+								StreamName: "src2",
+							},
+							[]xsql.StreamName{"src2"},
+							&boolFalse,
+						)},
+						Name:  "hum",
+						AName: "hum1",
+					}, {
+						Expr: &xsql.FieldRef{Name: "hum2", StreamName: xsql.AliasStream, AliasRef: xsql.MockAliasRef(
+							&xsql.FieldRef{
+								Name:       "hum",
+								StreamName: "tableInPlanner",
+							},
+							[]xsql.StreamName{"tableInPlanner"},
+							&boolFalse,
+						)},
+						Name:  "hum",
+						AName: "hum2",
+					},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
+			continue
+		}
+		p, err := createLogicalPlan(stmt, &api.RuleOption{
+			IsEventTime:        false,
+			LateTol:            0,
+			Concurrency:        0,
+			BufferLength:       0,
+			SendMetaToSink:     false,
+			Qos:                0,
+			CheckpointInterval: 0,
+			SendError:          true,
+		}, store)
+		if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
+			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.sql, tt.err, err)
+		} else if !reflect.DeepEqual(tt.p, p) {
+			t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
+		}
+	}
+}