ソースを参照

feat(topo): optimize topo based on SQL (#643)

Infrastructure for logical plan
Logical plan optimization framework: opt rules
PushDownPredicate rule implementation
New stream creator
ngjaying 4 年 前
コミット
de7a92a105
42 ファイル変更1561 行追加838 行削除
  1. 2 1
      examples/testExtension.go
  2. 12 12
      xsql/processors/checkpoint_test.go
  3. 5 5
      xsql/processors/common_test.go
  4. 26 25
      xsql/processors/extension_test.go
  5. 144 144
      xsql/processors/rule_test.go
  6. 382 379
      xsql/processors/window_rule_test.go
  7. 3 191
      xsql/processors/xsql_processor.go
  8. 13 0
      xsql/util.go
  9. 5 16
      xstream/nodes/window_op.go
  10. 4 4
      xsql/plans/aggregate_operator.go
  11. 5 5
      xsql/plans/aggregate_test.go
  12. 3 3
      xsql/plans/filter_operator.go
  13. 3 3
      xsql/plans/filter_test.go
  14. 3 3
      xsql/plans/having_operator.go
  15. 4 4
      xsql/plans/having_test.go
  16. 2 2
      xsql/plans/join_multi_test.go
  17. 7 7
      xsql/plans/join_operator.go
  18. 7 7
      xsql/plans/join_test.go
  19. 2 2
      xsql/plans/math_func_test.go
  20. 5 5
      xsql/plans/misc_func_test.go
  21. 3 3
      xsql/plans/order_operator.go
  22. 2 2
      xsql/plans/order_test.go
  23. 1 1
      xsql/plans/preprocessor.go
  24. 1 1
      xsql/plans/preprocessor_test.go
  25. 4 4
      xsql/plans/project_operator.go
  26. 6 6
      xsql/plans/project_test.go
  27. 2 2
      xsql/plans/str_func_test.go
  28. 14 0
      xstream/planner/aggregatePlan.go
  29. 32 0
      xstream/planner/dataSourcePlan.go
  30. 70 0
      xstream/planner/filterPlan.go
  31. 13 0
      xstream/planner/havingPlan.go
  32. 14 0
      xstream/planner/joinPlan.go
  33. 41 0
      xstream/planner/logicalPlan.go
  34. 16 0
      xstream/planner/optimizer.go
  35. 13 0
      xstream/planner/orderPlan.go
  36. 300 0
      xstream/planner/planner.go
  37. 300 0
      xstream/planner/planner_test.go
  38. 15 0
      xstream/planner/projectPlan.go
  39. 17 0
      xstream/planner/rules.go
  40. 20 0
      xstream/planner/util.go
  41. 37 0
      xstream/planner/windowPlan.go
  42. 3 1
      xstream/server/server/ruleManager.go

+ 2 - 1
examples/testExtension.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql/processors"
+	"github.com/emqx/kuiper/xstream/planner"
 	"path"
 	"time"
 )
@@ -33,7 +34,7 @@ func main() {
 		log.Printf(msg)
 	}
 
-	tp, err := rp.ExecInitRule(rs)
+	tp, err := planner.Plan(rs, dbDir)
 	if err != nil {
 		log.Panicf("fail to init rule: %v", err)
 	}

+ 12 - 12
xsql/processors/checkpoint_test.go

@@ -67,11 +67,11 @@ func TestCheckpoint(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo_0_records_in_total":  int64(3),
-				"op_preprocessor_demo_0_records_out_total": int64(3),
+				"op_1_preprocessor_demo_0_records_in_total":  int64(3),
+				"op_1_preprocessor_demo_0_records_out_total": int64(3),
 
-				"op_project_0_records_in_total":  int64(3),
-				"op_project_0_records_out_total": int64(3),
+				"op_3_project_0_records_in_total":  int64(3),
+				"op_3_project_0_records_out_total": int64(3),
 
 				"sink_mockSink_0_records_in_total":  int64(3),
 				"sink_mockSink_0_records_out_total": int64(3),
@@ -79,18 +79,18 @@ func TestCheckpoint(t *testing.T) {
 				"source_demo_0_records_in_total":  int64(3),
 				"source_demo_0_records_out_total": int64(3),
 
-				"op_window_0_records_in_total":  int64(3),
-				"op_window_0_records_out_total": int64(3),
+				"op_2_window_0_records_in_total":  int64(3),
+				"op_2_window_0_records_out_total": int64(3),
 			},
 		},
 		pauseSize: 3,
 		cc:        2,
 		pauseMetric: map[string]interface{}{
-			"op_preprocessor_demo_0_records_in_total":  int64(3),
-			"op_preprocessor_demo_0_records_out_total": int64(3),
+			"op_1_preprocessor_demo_0_records_in_total":  int64(3),
+			"op_1_preprocessor_demo_0_records_out_total": int64(3),
 
-			"op_project_0_records_in_total":  int64(1),
-			"op_project_0_records_out_total": int64(1),
+			"op_3_project_0_records_in_total":  int64(1),
+			"op_3_project_0_records_out_total": int64(1),
 
 			"sink_mockSink_0_records_in_total":  int64(1),
 			"sink_mockSink_0_records_out_total": int64(1),
@@ -98,8 +98,8 @@ func TestCheckpoint(t *testing.T) {
 			"source_demo_0_records_in_total":  int64(3),
 			"source_demo_0_records_out_total": int64(3),
 
-			"op_window_0_records_in_total":  int64(3),
-			"op_window_0_records_out_total": int64(1),
+			"op_2_window_0_records_in_total":  int64(3),
+			"op_2_window_0_records_out_total": int64(1),
 		}},
 	}
 	handleStream(true, streamList, t)

+ 5 - 5
xsql/processors/common_test.go

@@ -9,6 +9,7 @@ import (
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/nodes"
+	"github.com/emqx/kuiper/xstream/planner"
 	"github.com/emqx/kuiper/xstream/test"
 	"io/ioutil"
 	"os"
@@ -1027,7 +1028,6 @@ func createStream(t *testing.T, tt ruleTest, j int, opt *api.RuleOption, sinkPro
 		dataLength int
 	)
 
-	p := NewRuleProcessor(DbDir)
 	parser := xsql.NewParser(strings.NewReader(tt.sql))
 	if stmt, err := xsql.Language.Parse(parser); err != nil {
 		t.Errorf("parse sql %s error: %s", tt.sql, err)
@@ -1047,13 +1047,13 @@ func createStream(t *testing.T, tt ruleTest, j int, opt *api.RuleOption, sinkPro
 			}
 		}
 	}
-	tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.name, j), Sql: tt.sql, Options: opt}, sources)
+	mockSink := test.NewMockSink()
+	sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
+	tp, err := planner.PlanWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.name, j), Sql: tt.sql, Options: opt}, DbDir, sources, []*nodes.SinkNode{sink})
 	if err != nil {
 		t.Error(err)
+		return nil, 0, nil, nil, nil
 	}
-	mockSink := test.NewMockSink()
-	sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
-	tp.AddSink(inputs, sink)
 	errCh := tp.Open()
 	return datas, dataLength, tp, mockSink, errCh
 }

+ 26 - 25
xsql/processors/extension_test.go

@@ -8,6 +8,7 @@ import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xstream/api"
+	"github.com/emqx/kuiper/xstream/planner"
 	"github.com/emqx/kuiper/xstream/test"
 	"os"
 	"testing"
@@ -57,7 +58,7 @@ func TestExtensions(t *testing.T) {
 			t.Errorf("failed to create rule: %s.", err)
 			continue
 		}
-		tp, err := p.ExecInitRule(rs)
+		tp, err := planner.Plan(rs, DbDir)
 		if err != nil {
 			t.Errorf("fail to init rule: %v", err)
 			continue
@@ -179,15 +180,15 @@ func TestFuncState(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_text_0_exceptions_total":   int64(0),
-				"op_preprocessor_text_0_process_latency_us": int64(0),
-				"op_preprocessor_text_0_records_in_total":   int64(8),
-				"op_preprocessor_text_0_records_out_total":  int64(8),
+				"op_1_preprocessor_text_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_text_0_process_latency_us": int64(0),
+				"op_1_preprocessor_text_0_records_in_total":   int64(8),
+				"op_1_preprocessor_text_0_records_out_total":  int64(8),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(8),
-				"op_project_0_records_out_total":  int64(8),
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(8),
+				"op_2_project_0_records_out_total":  int64(8),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(8),
@@ -244,15 +245,15 @@ func TestFuncStateCheckpoint(t *testing.T) {
 					}},
 				},
 				m: map[string]interface{}{
-					"op_preprocessor_text_0_exceptions_total":   int64(0),
-					"op_preprocessor_text_0_process_latency_us": int64(0),
-					"op_preprocessor_text_0_records_in_total":   int64(6),
-					"op_preprocessor_text_0_records_out_total":  int64(6),
+					"op_1_preprocessor_text_0_exceptions_total":   int64(0),
+					"op_1_preprocessor_text_0_process_latency_us": int64(0),
+					"op_1_preprocessor_text_0_records_in_total":   int64(6),
+					"op_1_preprocessor_text_0_records_out_total":  int64(6),
 
-					"op_project_0_exceptions_total":   int64(0),
-					"op_project_0_process_latency_us": int64(0),
-					"op_project_0_records_in_total":   int64(6),
-					"op_project_0_records_out_total":  int64(6),
+					"op_2_project_0_exceptions_total":   int64(0),
+					"op_2_project_0_process_latency_us": int64(0),
+					"op_2_project_0_records_in_total":   int64(6),
+					"op_2_project_0_records_out_total":  int64(6),
 
 					"sink_mockSink_0_exceptions_total":  int64(0),
 					"sink_mockSink_0_records_in_total":  int64(6),
@@ -266,15 +267,15 @@ func TestFuncStateCheckpoint(t *testing.T) {
 			pauseSize: 3,
 			cc:        1,
 			pauseMetric: map[string]interface{}{
-				"op_preprocessor_text_0_exceptions_total":   int64(0),
-				"op_preprocessor_text_0_process_latency_us": int64(0),
-				"op_preprocessor_text_0_records_in_total":   int64(3),
-				"op_preprocessor_text_0_records_out_total":  int64(3),
+				"op_1_preprocessor_text_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_text_0_process_latency_us": int64(0),
+				"op_1_preprocessor_text_0_records_in_total":   int64(3),
+				"op_1_preprocessor_text_0_records_out_total":  int64(3),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(3),
-				"op_project_0_records_out_total":  int64(3),
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(3),

+ 144 - 144
xsql/processors/rule_test.go

@@ -44,15 +44,15 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_preprocessor_demo_0_records_out_total":  int64(5),
+				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(5),
-				"op_project_0_records_out_total":  int64(5),
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(5),
+				"op_2_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -65,9 +65,9 @@ func TestSingleSQL(t *testing.T) {
 			t: &xstream.PrintableTopo{
 				Sources: []string{"source_demo"},
 				Edges: map[string][]string{
-					"source_demo":          {"op_preprocessor_demo"},
-					"op_preprocessor_demo": {"op_project"},
-					"op_project":           {"sink_mockSink"},
+					"source_demo":            {"op_1_preprocessor_demo"},
+					"op_1_preprocessor_demo": {"op_2_project"},
+					"op_2_project":           {"sink_mockSink"},
 				},
 			},
 		}, {
@@ -84,15 +84,15 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_preprocessor_demo_0_records_out_total":  int64(5),
+				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(2),
-				"op_project_0_records_out_total":  int64(2),
+				"op_3_project_0_exceptions_total":   int64(0),
+				"op_3_project_0_process_latency_us": int64(0),
+				"op_3_project_0_records_in_total":   int64(2),
+				"op_3_project_0_records_out_total":  int64(2),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(2),
@@ -102,10 +102,10 @@ func TestSingleSQL(t *testing.T) {
 				"source_demo_0_records_in_total":  int64(5),
 				"source_demo_0_records_out_total": int64(5),
 
-				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_us": int64(0),
-				"op_filter_0_records_in_total":   int64(5),
-				"op_filter_0_records_out_total":  int64(2),
+				"op_2_filter_0_exceptions_total":   int64(0),
+				"op_2_filter_0_process_latency_us": int64(0),
+				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_out_total":  int64(2),
 			},
 		}, {
 			name: `TestSingleSQLRule3`,
@@ -121,15 +121,15 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_preprocessor_demo_0_records_out_total":  int64(5),
+				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(2),
-				"op_project_0_records_out_total":  int64(2),
+				"op_3_project_0_exceptions_total":   int64(0),
+				"op_3_project_0_process_latency_us": int64(0),
+				"op_3_project_0_records_in_total":   int64(2),
+				"op_3_project_0_records_out_total":  int64(2),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(2),
@@ -139,10 +139,10 @@ func TestSingleSQL(t *testing.T) {
 				"source_demo_0_records_in_total":  int64(5),
 				"source_demo_0_records_out_total": int64(5),
 
-				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_us": int64(0),
-				"op_filter_0_records_in_total":   int64(5),
-				"op_filter_0_records_out_total":  int64(2),
+				"op_2_filter_0_exceptions_total":   int64(0),
+				"op_2_filter_0_process_latency_us": int64(0),
+				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_out_total":  int64(2),
 			},
 		}, {
 			name: `TestSingleSQLRule4`,
@@ -163,15 +163,15 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demoError_0_exceptions_total":   int64(3),
-				"op_preprocessor_demoError_0_process_latency_us": int64(0),
-				"op_preprocessor_demoError_0_records_in_total":   int64(5),
-				"op_preprocessor_demoError_0_records_out_total":  int64(2),
+				"op_1_preprocessor_demoError_0_exceptions_total":   int64(3),
+				"op_1_preprocessor_demoError_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demoError_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demoError_0_records_out_total":  int64(2),
 
-				"op_project_0_exceptions_total":   int64(3),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(4),
-				"op_project_0_records_out_total":  int64(1),
+				"op_3_project_0_exceptions_total":   int64(3),
+				"op_3_project_0_process_latency_us": int64(0),
+				"op_3_project_0_records_in_total":   int64(4),
+				"op_3_project_0_records_out_total":  int64(1),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(4),
@@ -181,10 +181,10 @@ func TestSingleSQL(t *testing.T) {
 				"source_demoError_0_records_in_total":  int64(5),
 				"source_demoError_0_records_out_total": int64(5),
 
-				"op_filter_0_exceptions_total":   int64(3),
-				"op_filter_0_process_latency_us": int64(0),
-				"op_filter_0_records_in_total":   int64(5),
-				"op_filter_0_records_out_total":  int64(1),
+				"op_2_filter_0_exceptions_total":   int64(3),
+				"op_2_filter_0_process_latency_us": int64(0),
+				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_out_total":  int64(1),
 			},
 		}, {
 			name: `TestSingleSQLRule4`,
@@ -205,15 +205,15 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demoError_0_exceptions_total":   int64(3),
-				"op_preprocessor_demoError_0_process_latency_us": int64(0),
-				"op_preprocessor_demoError_0_records_in_total":   int64(5),
-				"op_preprocessor_demoError_0_records_out_total":  int64(2),
+				"op_1_preprocessor_demoError_0_exceptions_total":   int64(3),
+				"op_1_preprocessor_demoError_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demoError_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demoError_0_records_out_total":  int64(2),
 
-				"op_project_0_exceptions_total":   int64(3),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(4),
-				"op_project_0_records_out_total":  int64(1),
+				"op_3_project_0_exceptions_total":   int64(3),
+				"op_3_project_0_process_latency_us": int64(0),
+				"op_3_project_0_records_in_total":   int64(4),
+				"op_3_project_0_records_out_total":  int64(1),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(4),
@@ -223,10 +223,10 @@ func TestSingleSQL(t *testing.T) {
 				"source_demoError_0_records_in_total":  int64(5),
 				"source_demoError_0_records_out_total": int64(5),
 
-				"op_filter_0_exceptions_total":   int64(3),
-				"op_filter_0_process_latency_us": int64(0),
-				"op_filter_0_records_in_total":   int64(5),
-				"op_filter_0_records_out_total":  int64(1),
+				"op_2_filter_0_exceptions_total":   int64(3),
+				"op_2_filter_0_process_latency_us": int64(0),
+				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_out_total":  int64(1),
 			},
 		}, {
 			name: `TestSingleSQLRule5`,
@@ -254,15 +254,15 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_preprocessor_demo_0_records_out_total":  int64(5),
+				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(5),
-				"op_project_0_records_out_total":  int64(5),
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(5),
+				"op_2_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -286,15 +286,15 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_preprocessor_demo_0_records_out_total":  int64(5),
+				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(2),
-				"op_project_0_records_out_total":  int64(2),
+				"op_3_project_0_exceptions_total":   int64(0),
+				"op_3_project_0_process_latency_us": int64(0),
+				"op_3_project_0_records_in_total":   int64(2),
+				"op_3_project_0_records_out_total":  int64(2),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(2),
@@ -304,10 +304,10 @@ func TestSingleSQL(t *testing.T) {
 				"source_demo_0_records_in_total":  int64(5),
 				"source_demo_0_records_out_total": int64(5),
 
-				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_us": int64(0),
-				"op_filter_0_records_in_total":   int64(5),
-				"op_filter_0_records_out_total":  int64(2),
+				"op_2_filter_0_exceptions_total":   int64(0),
+				"op_2_filter_0_process_latency_us": int64(0),
+				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_out_total":  int64(2),
 			},
 		}, {
 			name: `TestSingleSQLRule7`,
@@ -330,15 +330,15 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo1_0_process_latency_us": int64(0),
-				"op_preprocessor_demo1_0_records_in_total":   int64(5),
-				"op_preprocessor_demo1_0_records_out_total":  int64(5),
+				"op_1_preprocessor_demo1_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_demo1_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demo1_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demo1_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(5),
-				"op_project_0_records_out_total":  int64(5),
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(5),
+				"op_2_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -366,20 +366,20 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo1_0_process_latency_us": int64(0),
-				"op_preprocessor_demo1_0_records_in_total":   int64(5),
-				"op_preprocessor_demo1_0_records_out_total":  int64(5),
+				"op_1_preprocessor_demo1_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_demo1_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demo1_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demo1_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(2),
-				"op_project_0_records_out_total":  int64(2),
+				"op_3_project_0_exceptions_total":   int64(0),
+				"op_3_project_0_process_latency_us": int64(0),
+				"op_3_project_0_records_in_total":   int64(2),
+				"op_3_project_0_records_out_total":  int64(2),
 
-				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_us": int64(0),
-				"op_filter_0_records_in_total":   int64(5),
-				"op_filter_0_records_out_total":  int64(2),
+				"op_2_filter_0_exceptions_total":   int64(0),
+				"op_2_filter_0_process_latency_us": int64(0),
+				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_out_total":  int64(2),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(2),
@@ -432,15 +432,15 @@ func TestSingleSQLError(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
+				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
+				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(1),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(3),
-				"op_project_0_records_out_total":  int64(2),
+				"op_3_project_0_exceptions_total":   int64(1),
+				"op_3_project_0_process_latency_us": int64(0),
+				"op_3_project_0_records_in_total":   int64(3),
+				"op_3_project_0_records_out_total":  int64(2),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(3),
@@ -450,10 +450,10 @@ func TestSingleSQLError(t *testing.T) {
 				"source_ldemo_0_records_in_total":  int64(5),
 				"source_ldemo_0_records_out_total": int64(5),
 
-				"op_filter_0_exceptions_total":   int64(1),
-				"op_filter_0_process_latency_us": int64(0),
-				"op_filter_0_records_in_total":   int64(5),
-				"op_filter_0_records_out_total":  int64(2),
+				"op_2_filter_0_exceptions_total":   int64(1),
+				"op_2_filter_0_process_latency_us": int64(0),
+				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_out_total":  int64(2),
 			},
 		}, {
 			name: `TestSingleSQLErrorRule2`,
@@ -474,15 +474,15 @@ func TestSingleSQLError(t *testing.T) {
 				{{}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
+				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
+				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(1),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(5),
-				"op_project_0_records_out_total":  int64(4),
+				"op_2_project_0_exceptions_total":   int64(1),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(5),
+				"op_2_project_0_records_out_total":  int64(4),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -532,15 +532,15 @@ func TestSingleSQLTemplate(t *testing.T) {
 				},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_preprocessor_demo_0_records_out_total":  int64(5),
+				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(5),
-				"op_project_0_records_out_total":  int64(5),
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(5),
+				"op_2_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -590,15 +590,15 @@ func TestNoneSingleSQLTemplate(t *testing.T) {
 				[]byte("<div>results</div><ul><li>red - 1</li></ul>"),
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_preprocessor_demo_0_records_out_total":  int64(5),
+				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
 
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(5),
-				"op_project_0_records_out_total":  int64(5),
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(5),
+				"op_2_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -635,15 +635,15 @@ func TestSingleSQLForBinary(t *testing.T) {
 				}},
 			},
 			m: map[string]interface{}{
-				"op_preprocessor_binDemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_binDemo_0_process_latency_us": int64(0),
-				"op_preprocessor_binDemo_0_records_in_total":   int64(1),
-				"op_preprocessor_binDemo_0_records_out_total":  int64(1),
-
-				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_us": int64(0),
-				"op_project_0_records_in_total":   int64(1),
-				"op_project_0_records_out_total":  int64(1),
+				"op_1_preprocessor_binDemo_0_exceptions_total":   int64(0),
+				"op_1_preprocessor_binDemo_0_process_latency_us": int64(0),
+				"op_1_preprocessor_binDemo_0_records_in_total":   int64(1),
+				"op_1_preprocessor_binDemo_0_records_out_total":  int64(1),
+
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(1),
+				"op_2_project_0_records_out_total":  int64(1),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(1),

ファイルの差分が大きいため隠しています
+ 382 - 379
xsql/processors/window_rule_test.go


+ 3 - 191
xsql/processors/xsql_processor.go

@@ -6,10 +6,10 @@ import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"github.com/emqx/kuiper/xsql/plans"
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/nodes"
+	"github.com/emqx/kuiper/xstream/planner"
 	"os"
 	"path"
 	"strings"
@@ -207,21 +207,6 @@ func (p *StreamProcessor) DropStream(name string) (string, error) {
 	}
 }
 
-func GetStream(m *common.SqliteKVStore, name string) (stmt *xsql.StreamStmt, err error) {
-	var s1 string
-	f, _ := m.Get(name, &s1)
-	if !f {
-		return nil, fmt.Errorf("Cannot find key %s. ", name)
-	}
-	parser := xsql.NewParser(strings.NewReader(s1))
-	stream, err := xsql.Language.Parse(parser)
-	stmt, ok := stream.(*xsql.StreamStmt)
-	if !ok {
-		err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", name)
-	}
-	return
-}
-
 type RuleProcessor struct {
 	db        common.KeyValue
 	rootDbDir string
@@ -335,19 +320,6 @@ func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
 	}
 }
 
-func getStatementFromSql(sql string) (*xsql.SelectStatement, error) {
-	parser := xsql.NewParser(strings.NewReader(sql))
-	if stmt, err := xsql.Language.Parse(parser); err != nil {
-		return nil, fmt.Errorf("Parse SQL %s error: %s.", sql, err)
-	} else {
-		if r, ok := stmt.(*xsql.SelectStatement); !ok {
-			return nil, fmt.Errorf("SQL %s is not a select statement.", sql)
-		} else {
-			return r, nil
-		}
-	}
-}
-
 func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
 	opt := common.Config.Rule
 	//set default rule options
@@ -371,7 +343,7 @@ func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error)
 	if rule.Sql == "" {
 		return nil, fmt.Errorf("Missing rule SQL.")
 	}
-	if _, err := getStatementFromSql(rule.Sql); err != nil {
+	if _, err := xsql.GetStatementFromSql(rule.Sql); err != nil {
 		return nil, err
 	}
 	if rule.Actions == nil || len(rule.Actions) == 0 {
@@ -396,28 +368,10 @@ func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error)
 	return rule, nil
 }
 
-func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, error) {
-	if tp, inputs, err := p.createTopo(rule); err != nil {
-		return nil, err
-	} else {
-		for i, m := range rule.Actions {
-			for name, action := range m {
-				props, ok := action.(map[string]interface{})
-				if !ok {
-					return nil, fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action)
-				}
-				tp.AddSink(inputs, nodes.NewSinkNode(fmt.Sprintf("%s_%d", name, i), name, props))
-			}
-		}
-		return tp, nil
-	}
-}
-
 func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*xstream.TopologyNew, error) {
-	if tp, inputs, err := p.createTopo(p.getDefaultRule(ruleid, sql)); err != nil {
+	if tp, err := planner.PlanWithSourcesAndSinks(p.getDefaultRule(ruleid, sql), p.rootDbDir, nil, []*nodes.SinkNode{nodes.NewSinkNode("sink_memory_log", "logToMemory", nil)}); err != nil {
 		return nil, err
 	} else {
-		tp.AddSink(inputs, nodes.NewSinkNode("sink_memory_log", "logToMemory", nil))
 		go func() {
 			select {
 			case err := <-tp.Open():
@@ -521,145 +475,3 @@ func cleanSinkCache(rule *api.Rule) error {
 	}
 	return nil
 }
-
-func (p *RuleProcessor) createTopo(rule *api.Rule) (*xstream.TopologyNew, []api.Emitter, error) {
-	return p.createTopoWithSources(rule, nil)
-}
-
-//For test to mock source
-func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.SourceNode) (*xstream.TopologyNew, []api.Emitter, error) {
-	name := rule.Id
-	sql := rule.Sql
-
-	log.Infof("Init rule with options %+v", rule.Options)
-	shouldCreateSource := sources == nil
-
-	if selectStmt, err := getStatementFromSql(sql); err != nil {
-		return nil, nil, err
-	} else {
-		tp, err := xstream.NewWithNameAndQos(name, rule.Options.Qos, rule.Options.CheckpointInterval)
-		if err != nil {
-			return nil, nil, err
-		}
-		var inputs []api.Emitter
-		streamsFromStmt := xsql.GetStreams(selectStmt)
-		dimensions := selectStmt.Dimensions
-		if !shouldCreateSource && len(streamsFromStmt) != len(sources) {
-			return nil, nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.", len(streamsFromStmt))
-		}
-		if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || dimensions != nil) {
-			return nil, nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
-		}
-		store := common.GetSqliteKVStore(path.Join(p.rootDbDir, "stream"))
-		err = store.Open()
-		if err != nil {
-			return nil, nil, err
-		}
-		defer store.Close()
-
-		var alias, aggregateAlias xsql.Fields
-		for _, f := range selectStmt.Fields {
-			if f.AName != "" {
-				if !xsql.HasAggFuncs(f.Expr) {
-					alias = append(alias, f)
-				} else {
-					aggregateAlias = append(aggregateAlias, f)
-				}
-			}
-		}
-		for i, s := range streamsFromStmt {
-			streamStmt, err := GetStream(store, s)
-			if err != nil {
-				return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
-			}
-			isBinary := false
-			if f, ok := streamStmt.Options["FORMAT"]; ok {
-				if strings.ToLower(f) == common.FORMAT_BINARY {
-					isBinary = true
-				}
-			}
-			pp, err := plans.NewPreprocessor(streamStmt, alias, rule.Options.IsEventTime, isBinary)
-			if err != nil {
-				return nil, nil, err
-			}
-			var srcNode *nodes.SourceNode
-			if shouldCreateSource {
-				node := nodes.NewSourceNode(s, streamStmt.Options)
-				srcNode = node
-			} else {
-				srcNode = sources[i]
-			}
-			tp.AddSrc(srcNode)
-			preprocessorOp := xstream.Transform(pp, "preprocessor_"+s, rule.Options.BufferLength)
-			preprocessorOp.SetConcurrency(rule.Options.Concurrency)
-			tp.AddOperator([]api.Emitter{srcNode}, preprocessorOp)
-			inputs = append(inputs, preprocessorOp)
-		}
-
-		var w *xsql.Window
-		if dimensions != nil {
-			w = dimensions.GetWindow()
-			if w != nil {
-				if w.Filter != nil {
-					wfilterOp := xstream.Transform(&plans.FilterPlan{Condition: w.Filter}, "windowFilter", rule.Options.BufferLength)
-					wfilterOp.SetConcurrency(rule.Options.Concurrency)
-					tp.AddOperator(inputs, wfilterOp)
-					inputs = []api.Emitter{wfilterOp}
-				}
-				wop, err := nodes.NewWindowOp("window", w, rule.Options.IsEventTime, rule.Options.LateTol, streamsFromStmt, rule.Options.BufferLength)
-				if err != nil {
-					return nil, nil, err
-				}
-				tp.AddOperator(inputs, wop)
-				inputs = []api.Emitter{wop}
-			}
-		}
-
-		if w != nil && selectStmt.Joins != nil {
-			joinOp := xstream.Transform(&plans.JoinPlan{Joins: selectStmt.Joins, From: selectStmt.Sources[0].(*xsql.Table)}, "join", rule.Options.BufferLength)
-			joinOp.SetConcurrency(rule.Options.Concurrency)
-			tp.AddOperator(inputs, joinOp)
-			inputs = []api.Emitter{joinOp}
-		}
-
-		if selectStmt.Condition != nil {
-			filterOp := xstream.Transform(&plans.FilterPlan{Condition: selectStmt.Condition}, "filter", rule.Options.BufferLength)
-			filterOp.SetConcurrency(rule.Options.Concurrency)
-			tp.AddOperator(inputs, filterOp)
-			inputs = []api.Emitter{filterOp}
-		}
-
-		var ds xsql.Dimensions
-		if dimensions != nil || len(aggregateAlias) > 0 {
-			ds = dimensions.GetGroups()
-			if (ds != nil && len(ds) > 0) || len(aggregateAlias) > 0 {
-				aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds, Alias: aggregateAlias}, "aggregate", rule.Options.BufferLength)
-				aggregateOp.SetConcurrency(rule.Options.Concurrency)
-				tp.AddOperator(inputs, aggregateOp)
-				inputs = []api.Emitter{aggregateOp}
-			}
-		}
-
-		if selectStmt.Having != nil {
-			havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having", rule.Options.BufferLength)
-			havingOp.SetConcurrency(rule.Options.Concurrency)
-			tp.AddOperator(inputs, havingOp)
-			inputs = []api.Emitter{havingOp}
-		}
-
-		if selectStmt.SortFields != nil {
-			orderOp := xstream.Transform(&plans.OrderPlan{SortFields: selectStmt.SortFields}, "order", rule.Options.BufferLength)
-			orderOp.SetConcurrency(rule.Options.Concurrency)
-			tp.AddOperator(inputs, orderOp)
-			inputs = []api.Emitter{orderOp}
-		}
-
-		if selectStmt.Fields != nil {
-			projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields, IsAggregate: xsql.IsAggStatement(selectStmt), SendMeta: rule.Options.SendMetaToSink}, "project", rule.Options.BufferLength)
-			projectOp.SetConcurrency(rule.Options.Concurrency)
-			tp.AddOperator(inputs, projectOp)
-			inputs = []api.Emitter{projectOp}
-		}
-		return tp, inputs, nil
-	}
-}

+ 13 - 0
xsql/util.go

@@ -111,3 +111,16 @@ func LowercaseKeyMap(m map[string]interface{}) map[string]interface{} {
 	}
 	return m1
 }
+
+func GetStatementFromSql(sql string) (*SelectStatement, error) {
+	parser := NewParser(strings.NewReader(sql))
+	if stmt, err := Language.Parse(parser); err != nil {
+		return nil, fmt.Errorf("Parse SQL %s error: %s.", sql, err)
+	} else {
+		if r, ok := stmt.(*SelectStatement); !ok {
+			return nil, fmt.Errorf("SQL %s is not a select statement.", sql)
+		} else {
+			return r, nil
+		}
+	}
+}

+ 5 - 16
xstream/nodes/window_op.go

@@ -39,7 +39,7 @@ func init() {
 	gob.Register([]*xsql.Tuple{})
 }
 
-func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance int64, streams []string, bufferLength int) (*WindowOperator, error) {
+func NewWindowOp(name string, w WindowConfig, isEventTime bool, lateTolerance int64, streams []string, bufferLength int) (*WindowOperator, error) {
 	o := new(WindowOperator)
 
 	o.defaultSinkNode = &defaultSinkNode{
@@ -50,21 +50,10 @@ func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance in
 		},
 	}
 	o.isEventTime = isEventTime
-	if w != nil {
-		o.window = &WindowConfig{
-			Type:   w.WindowType,
-			Length: w.Length.Val,
-		}
-		if w.Interval != nil {
-			o.window.Interval = w.Interval.Val
-		} else if o.window.Type == xsql.COUNT_WINDOW {
-			//if no interval value is set and it's count window, then set interval to length value.
-			o.window.Interval = o.window.Length
-		}
-	} else {
-		o.window = &WindowConfig{
-			Type: xsql.NOT_WINDOW,
-		}
+	o.window = &w
+	if o.window.Interval == 0 && o.window.Type == xsql.COUNT_WINDOW {
+		//if no interval value is set and it's count window, then set interval to length value.
+		o.window.Interval = o.window.Length
 	}
 	if isEventTime {
 		//Create watermark generator

+ 4 - 4
xsql/plans/aggregate_operator.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"fmt"
@@ -6,7 +6,7 @@ import (
 	"github.com/emqx/kuiper/xstream/api"
 )
 
-type AggregatePlan struct {
+type AggregateOp struct {
 	Dimensions xsql.Dimensions
 	Alias      xsql.Fields
 }
@@ -15,7 +15,7 @@ type AggregatePlan struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: xsql.GroupedTuplesSet
  */
-func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
+func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("aggregate plan receive %s", data)
 	grouped := data
@@ -105,7 +105,7 @@ func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.
 	return grouped
 }
 
-func (p *AggregatePlan) calculateAlias(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) error {
+func (p *AggregateOp) calculateAlias(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) error {
 	afv.SetData(agg)
 	ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, fv, tuple, fv, afv, &xsql.WildcardValuer{Data: tuple})}
 	for _, f := range p.Alias {

+ 5 - 5
xsql/plans/aggregate_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"errors"
@@ -310,7 +310,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			break
 		}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
+		pp := &AggregateOp{Dimensions: stmt.Dimensions.GetGroups()}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		gr, ok := result.(xsql.GroupedTuplesSet)
 		if !ok {
@@ -552,7 +552,7 @@ func TestAggregatePlanGroupAlias_Apply(t *testing.T) {
 			}
 		}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
+		pp := &AggregateOp{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		gr, ok := result.(xsql.GroupedTuplesSet)
 		if !ok {
@@ -699,7 +699,7 @@ func TestAggregatePlanAlias_Apply(t *testing.T) {
 			}
 		}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
+		pp := &AggregateOp{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -752,7 +752,7 @@ func TestAggregatePlanError(t *testing.T) {
 			break
 		}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
+		pp := &AggregateOp{Dimensions: stmt.Dimensions.GetGroups()}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)

+ 3 - 3
xsql/plans/filter_operator.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"fmt"
@@ -6,7 +6,7 @@ import (
 	"github.com/emqx/kuiper/xstream/api"
 )
 
-type FilterPlan struct {
+type FilterOp struct {
 	Condition xsql.Expr
 }
 
@@ -14,7 +14,7 @@ type FilterPlan struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  */
-func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
+func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("filter plan receive %s", data)
 	switch input := data.(type) {

+ 3 - 3
xsql/plans/filter_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"errors"
@@ -334,7 +334,7 @@ func TestFilterPlan_Apply(t *testing.T) {
 			break
 		}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		pp := &FilterPlan{Condition: stmt.Condition}
+		pp := &FilterOp{Condition: stmt.Condition}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -457,7 +457,7 @@ func TestFilterPlanError(t *testing.T) {
 			break
 		}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		pp := &FilterPlan{Condition: stmt.Condition}
+		pp := &FilterOp{Condition: stmt.Condition}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)

+ 3 - 3
xsql/plans/having_operator.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"fmt"
@@ -6,11 +6,11 @@ import (
 	"github.com/emqx/kuiper/xstream/api"
 )
 
-type HavingPlan struct {
+type HavingOp struct {
 	Condition xsql.Expr
 }
 
-func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
+func (p *HavingOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("having plan receive %s", data)
 	switch input := data.(type) {

+ 4 - 4
xsql/plans/having_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"errors"
@@ -262,7 +262,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 			break
 		}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		pp := &HavingPlan{Condition: stmt.Having}
+		pp := &HavingOp{Condition: stmt.Having}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -415,7 +415,7 @@ func TestHavingPlanAlias_Apply(t *testing.T) {
 			break
 		}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		pp := &HavingPlan{Condition: stmt.Having}
+		pp := &HavingOp{Condition: stmt.Having}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -487,7 +487,7 @@ func TestHavingPlanError(t *testing.T) {
 			break
 		}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		pp := &HavingPlan{Condition: stmt.Having}
+		pp := &HavingOp{Condition: stmt.Having}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)

+ 2 - 2
xsql/plans/join_multi_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"fmt"
@@ -397,7 +397,7 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		} else {
 			fv, afv := xsql.NewFunctionValuersForOp(nil)
-			pp := &JoinPlan{Joins: stmt.Joins, From: table}
+			pp := &JoinOp{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)

+ 7 - 7
xsql/plans/join_operator.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"fmt"
@@ -7,14 +7,14 @@ import (
 )
 
 //TODO join expr should only be the equal op between 2 streams like tb1.id = tb2.id
-type JoinPlan struct {
+type JoinOp struct {
 	From  *xsql.Table
 	Joins xsql.Joins
 }
 
 // input:  xsql.WindowTuplesSet from windowOp, window is required for join
 // output: xsql.JoinTupleSets
-func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
+func (jp *JoinOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	var input xsql.WindowTuplesSet
 	switch v := data.(type) {
@@ -67,7 +67,7 @@ func getStreamNames(join *xsql.Join) ([]string, error) {
 	return srcs, nil
 }
 
-func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
+func (jp *JoinOp) evalSet(input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
 	var leftStream, rightStream string
 
 	if join.JoinType != xsql.CROSS_JOIN {
@@ -153,7 +153,7 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join, fv *xsql
 	return sets, nil
 }
 
-func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
+func (jp *JoinOp) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
 	streams, err := getStreamNames(&join)
 	if err != nil {
 		return nil, err
@@ -203,7 +203,7 @@ func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.J
 	return sets, nil
 }
 
-func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.FunctionValuer) (interface{}, error) {
+func (jp *JoinOp) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.FunctionValuer) (interface{}, error) {
 	var rightStream string
 	if join.Alias == "" {
 		rightStream = join.Name
@@ -262,7 +262,7 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 	return newSets, nil
 }
 
-func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
+func (jp *JoinOp) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
 	var rightStream string
 	if join.Alias == "" {
 		rightStream = join.Name

+ 7 - 7
xsql/plans/join_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"encoding/json"
@@ -663,7 +663,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		} else {
 			fv, afv := xsql.NewFunctionValuersForOp(nil)
-			pp := &JoinPlan{Joins: stmt.Joins, From: table}
+			pp := &JoinOp{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -1129,7 +1129,7 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		} else {
 			fv, afv := xsql.NewFunctionValuersForOp(nil)
-			pp := &JoinPlan{Joins: stmt.Joins, From: table}
+			pp := &JoinOp{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -1313,7 +1313,7 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		} else {
 			fv, afv := xsql.NewFunctionValuersForOp(nil)
-			pp := &JoinPlan{Joins: stmt.Joins, From: table}
+			pp := &JoinOp{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -1564,7 +1564,7 @@ func TestFullJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		} else {
 			fv, afv := xsql.NewFunctionValuersForOp(nil)
-			pp := &JoinPlan{Joins: stmt.Joins, From: table}
+			pp := &JoinOp{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -1695,7 +1695,7 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		} else {
 			fv, afv := xsql.NewFunctionValuersForOp(nil)
-			pp := &JoinPlan{Joins: stmt.Joins, From: table}
+			pp := &JoinOp{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -1769,7 +1769,7 @@ func TestCrossJoinPlanError(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		} else {
 			fv, afv := xsql.NewFunctionValuersForOp(nil)
-			pp := &JoinPlan{Joins: stmt.Joins, From: table}
+			pp := &JoinOp{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)

+ 2 - 2
xsql/plans/math_func_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"encoding/json"
@@ -465,7 +465,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 			t.Errorf("%q", err)
 			continue
 		}
-		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp := &ProjectOp{Fields: stmt.Fields}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)

+ 5 - 5
xsql/plans/misc_func_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"encoding/json"
@@ -181,7 +181,7 @@ func TestMiscFunc_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp := &ProjectOp{Fields: stmt.Fields}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
@@ -234,7 +234,7 @@ func TestMqttFunc_Apply2(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp := &ProjectOp{Fields: stmt.Fields}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
@@ -351,7 +351,7 @@ func TestMetaFunc_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp := &ProjectOp{Fields: stmt.Fields}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
@@ -697,7 +697,7 @@ func TestJsonPathFunc_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp := &ProjectOp{Fields: stmt.Fields}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)

+ 3 - 3
xsql/plans/order_operator.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"fmt"
@@ -6,7 +6,7 @@ import (
 	"github.com/emqx/kuiper/xstream/api"
 )
 
-type OrderPlan struct {
+type OrderOp struct {
 	SortFields xsql.SortFields
 }
 
@@ -14,7 +14,7 @@ type OrderPlan struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  */
-func (p *OrderPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
+func (p *OrderOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("order plan receive %s", data)
 	sorter := xsql.OrderedBy(p.SortFields, fv)

+ 2 - 2
xsql/plans/order_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"errors"
@@ -472,7 +472,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 			break
 		}
 
-		pp := &OrderPlan{SortFields: stmt.SortFields}
+		pp := &OrderOp{SortFields: stmt.SortFields}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {

+ 1 - 1
xsql/plans/preprocessor.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"encoding/base64"

+ 1 - 1
xsql/plans/preprocessor_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"encoding/base64"

+ 4 - 4
xsql/plans/project_operator.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"encoding/json"
@@ -10,7 +10,7 @@ import (
 	"strings"
 )
 
-type ProjectPlan struct {
+type ProjectOp struct {
 	Fields      xsql.Fields
 	IsAggregate bool
 	SendMeta    bool
@@ -21,7 +21,7 @@ type ProjectPlan struct {
  *  input: *xsql.Tuple from preprocessor or filterOp | xsql.WindowTuplesSet from windowOp or filterOp | xsql.JoinTupleSets from joinOp or filterOp
  *  output: []map[string]interface{}
  */
-func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
+func (pp *ProjectOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("project plan receive %s", data)
 	var results []map[string]interface{}
@@ -87,7 +87,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.F
 	}
 }
 
-func (pp *ProjectPlan) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) *xsql.ValuerEval {
+func (pp *ProjectOp) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) *xsql.ValuerEval {
 	afv.SetData(agg)
 	if pp.IsAggregate {
 		return &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, fv, tuple, fv, afv, &xsql.WildcardValuer{Data: tuple})}

+ 6 - 6
xsql/plans/project_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"encoding/json"
@@ -516,7 +516,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
-		pp := &ProjectPlan{Fields: stmt.Fields, SendMeta: true}
+		pp := &ProjectOp{Fields: stmt.Fields, SendMeta: true}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
@@ -1067,7 +1067,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
-		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp := &ProjectOp{Fields: stmt.Fields}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
@@ -1268,7 +1268,7 @@ func TestProjectPlan_Funcs(t *testing.T) {
 		if err != nil {
 			t.Error(err)
 		}
-		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
+		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
@@ -1971,7 +1971,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 		if err != nil {
 			t.Error(err)
 		}
-		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true, isTest: true}
+		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: true, isTest: true}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
@@ -2121,7 +2121,7 @@ func TestProjectPlanError(t *testing.T) {
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
-		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
+		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)

+ 2 - 2
xsql/plans/str_func_test.go

@@ -1,4 +1,4 @@
-package plans
+package operators
 
 import (
 	"encoding/json"
@@ -417,7 +417,7 @@ func TestStrFunc_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp := &ProjectOp{Fields: stmt.Fields}
 		pp.isTest = true
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)

+ 14 - 0
xstream/planner/aggregatePlan.go

@@ -0,0 +1,14 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+type AggregatePlan struct {
+	baseLogicalPlan
+	dimensions xsql.Dimensions
+	alias      xsql.Fields
+}
+
+func (p AggregatePlan) Init() *AggregatePlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}

+ 32 - 0
xstream/planner/dataSourcePlan.go

@@ -0,0 +1,32 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+type DataSourcePlan struct {
+	baseLogicalPlan
+	name       string
+	isWildCard bool
+	needMeta   bool
+	// if is wildCard, leave it empty
+	fields     xsql.Fields
+	metaFields xsql.Fields
+	alias      xsql.Fields
+}
+
+func (p DataSourcePlan) Init() *DataSourcePlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}
+
+// Presume no children for data source
+func (p *DataSourcePlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalPlan) {
+	if condition != nil {
+		// Add a filter plan for children
+		f := FilterPlan{
+			condition: condition,
+		}.Init()
+		f.SetChildren([]LogicalPlan{p})
+		return nil, f
+	}
+	return nil, p
+}

+ 70 - 0
xstream/planner/filterPlan.go

@@ -0,0 +1,70 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+type FilterPlan struct {
+	baseLogicalPlan
+	condition xsql.Expr
+}
+
+func (p FilterPlan) Init() *FilterPlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}
+
+func (p *FilterPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalPlan) {
+	// if no child, swallow all conditions
+	a := combine(condition, p.condition)
+	if len(p.children) == 0 {
+		p.condition = a
+		return nil, p
+	}
+	// if has child, try to move pushable condition out
+	up, pp := extractCondition(a)
+
+	rest, _ := p.baseLogicalPlan.PushDownPredicate(pp)
+
+	up = combine(up, rest)
+	if up != nil {
+		p.condition = up
+		return nil, p
+	} else if len(p.children) == 1 {
+		// eliminate this filter
+		return nil, p.children[0]
+	} else {
+		return nil, p
+	}
+}
+
+// Return the unpushable condition and pushable condition
+func extractCondition(condition xsql.Expr) (unpushable xsql.Expr, pushable xsql.Expr) {
+	s := GetRefSources(condition)
+	if len(s) < 2 {
+		pushable = condition
+		return
+	} else {
+		if be, ok := condition.(*xsql.BinaryExpr); ok && be.OP == xsql.AND {
+			ul, pl := extractCondition(be.LHS)
+			ur, pr := extractCondition(be.RHS)
+			unpushable = combine(ul, ur)
+			pushable = combine(pl, pr)
+			return
+		}
+	}
+	//default case: all condition are unpushable
+	return condition, nil
+}
+
+func combine(l xsql.Expr, r xsql.Expr) xsql.Expr {
+	if l != nil && r != nil {
+		return &xsql.BinaryExpr{
+			OP:  xsql.AND,
+			LHS: l,
+			RHS: r,
+		}
+	} else if l != nil {
+		return l
+	} else {
+		return r
+	}
+}

+ 13 - 0
xstream/planner/havingPlan.go

@@ -0,0 +1,13 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+type HavingPlan struct {
+	baseLogicalPlan
+	condition xsql.Expr
+}
+
+func (p HavingPlan) Init() *HavingPlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}

+ 14 - 0
xstream/planner/joinPlan.go

@@ -0,0 +1,14 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+type JoinPlan struct {
+	baseLogicalPlan
+	from  *xsql.Table
+	joins xsql.Joins
+}
+
+func (p JoinPlan) Init() *JoinPlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}

+ 41 - 0
xstream/planner/logicalPlan.go

@@ -0,0 +1,41 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+type LogicalPlan interface {
+	Children() []LogicalPlan
+	SetChildren(children []LogicalPlan)
+	// PushDownPredicate pushes down the filter in the filter/where/on/having clauses as deeply as possible.
+	// It will accept a condition that is an expression slice, and return the expressions that can't be pushed.
+	// It also return the new tree of plan as it can possibly change the tree
+	PushDownPredicate(xsql.Expr) (xsql.Expr, LogicalPlan)
+}
+
+type baseLogicalPlan struct {
+	children []LogicalPlan
+	// Can be used to return the derived instance from the base type
+	self LogicalPlan
+}
+
+func (p *baseLogicalPlan) Children() []LogicalPlan {
+	return p.children
+}
+
+func (p *baseLogicalPlan) SetChildren(children []LogicalPlan) {
+	p.children = children
+}
+
+// By default, push down the predicate to the first child instead of the children
+// as most plan cannot have multiple children
+func (p *baseLogicalPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalPlan) {
+	if len(p.children) == 0 {
+		return condition, p.self
+	}
+	rest := condition
+	for i, child := range p.children {
+		var newChild LogicalPlan
+		rest, newChild = child.PushDownPredicate(rest)
+		p.children[i] = newChild
+	}
+	return rest, p.self
+}

+ 16 - 0
xstream/planner/optimizer.go

@@ -0,0 +1,16 @@
+package planner
+
+var optRuleList = []logicalOptRule{
+	&predicatePushDown{},
+}
+
+func optimize(p LogicalPlan) (LogicalPlan, error) {
+	var err error
+	for _, rule := range optRuleList {
+		p, err = rule.optimize(p)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return p, err
+}

+ 13 - 0
xstream/planner/orderPlan.go

@@ -0,0 +1,13 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+type OrderPlan struct {
+	baseLogicalPlan
+	SortFields xsql.SortFields
+}
+
+func (p OrderPlan) Init() *OrderPlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}

+ 300 - 0
xstream/planner/planner.go

@@ -0,0 +1,300 @@
+package planner
+
+import (
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream"
+	"github.com/emqx/kuiper/xstream/api"
+	"github.com/emqx/kuiper/xstream/nodes"
+	"github.com/emqx/kuiper/xstream/operators"
+	"path"
+	"strings"
+)
+
+func Plan(rule *api.Rule, storePath string) (*xstream.TopologyNew, error) {
+	return PlanWithSourcesAndSinks(rule, storePath, nil, nil)
+}
+
+// For test only
+func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*nodes.SourceNode, sinks []*nodes.SinkNode) (*xstream.TopologyNew, error) {
+	sql := rule.Sql
+
+	common.Log.Infof("Init rule with options %+v", rule.Options)
+	stmt, err := xsql.GetStatementFromSql(sql)
+	if err != nil {
+		return nil, err
+	}
+	// validation
+	streamsFromStmt := xsql.GetStreams(stmt)
+	if len(sources) > 0 && len(sources) != len(streamsFromStmt) {
+		return nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.", len(streamsFromStmt))
+	}
+	if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || stmt.Dimensions != nil) {
+		return nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
+	}
+	store := common.GetSqliteKVStore(path.Join(storePath, "stream"))
+	err = store.Open()
+	if err != nil {
+		return nil, err
+	}
+	defer store.Close()
+	// Create logical plan and optimize. Logical plans are a linked list
+	lp, err := createLogicalPlan(stmt, rule.Options)
+	if err != nil {
+		return nil, err
+	}
+	tp, err := createTopo(rule, lp, sources, sinks, store, streamsFromStmt)
+	if err != nil {
+		return nil, err
+	}
+	return tp, nil
+}
+
+func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sinks []*nodes.SinkNode, store common.KeyValue, streamsFromStmt []string) (*xstream.TopologyNew, error) {
+	// Create topology
+	tp, err := xstream.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
+	if err != nil {
+		return nil, err
+	}
+
+	input, _, err := buildOps(lp, tp, rule.Options, sources, store, streamsFromStmt, 0)
+	if err != nil {
+		return nil, err
+	}
+	inputs := []api.Emitter{input}
+	// Add actions
+	if len(sinks) > 0 { // For use of mock sink in testing
+		for _, sink := range sinks {
+			tp.AddSink(inputs, sink)
+		}
+	} else {
+		for i, m := range rule.Actions {
+			for name, action := range m {
+				props, ok := action.(map[string]interface{})
+				if !ok {
+					return nil, fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action)
+				}
+				tp.AddSink(inputs, nodes.NewSinkNode(fmt.Sprintf("%s_%d", name, i), name, props))
+			}
+		}
+	}
+
+	return tp, nil
+}
+
+func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption, sources []*nodes.SourceNode, store common.KeyValue, streamsFromStmt []string, index int) (api.Emitter, int, error) {
+	var inputs []api.Emitter
+	newIndex := index
+	for _, c := range lp.Children() {
+		input, ni, err := buildOps(c, tp, options, sources, store, streamsFromStmt, newIndex)
+		if err != nil {
+			return nil, 0, err
+		}
+		newIndex = ni
+		inputs = append(inputs, input)
+	}
+	newIndex++
+	var (
+		op  nodes.OperatorNode
+		err error
+	)
+	switch t := lp.(type) {
+	case *DataSourcePlan:
+		streamStmt, err := getStream(store, t.name)
+		if err != nil {
+			return nil, 0, fmt.Errorf("fail to get stream %s, please check if stream is created", t.name)
+		}
+		isBinary := false
+		if f, ok := streamStmt.Options["FORMAT"]; ok {
+			if strings.ToLower(f) == common.FORMAT_BINARY {
+				isBinary = true
+			}
+		}
+		pp, err := operators.NewPreprocessor(streamStmt, t.alias, options.IsEventTime, isBinary)
+		if err != nil {
+			return nil, 0, err
+		}
+		var srcNode *nodes.SourceNode
+		if len(sources) == 0 {
+			node := nodes.NewSourceNode(t.name, streamStmt.Options)
+			srcNode = node
+		} else {
+			found := false
+			for _, source := range sources {
+				if t.name == source.GetName() {
+					srcNode = source
+					found = true
+				}
+			}
+			if !found {
+				return nil, 0, fmt.Errorf("can't find predefined source %s", t.name)
+			}
+		}
+		tp.AddSrc(srcNode)
+		op = xstream.Transform(pp, fmt.Sprintf("%d_preprocessor_%s", newIndex, t.name), options.BufferLength)
+		inputs = []api.Emitter{srcNode}
+	case *WindowPlan:
+		if t.condition != nil {
+			wfilterOp := xstream.Transform(&operators.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_windowFilter", newIndex), options.BufferLength)
+			wfilterOp.SetConcurrency(options.Concurrency)
+			tp.AddOperator(inputs, wfilterOp)
+			inputs = []api.Emitter{wfilterOp}
+		}
+
+		op, err = nodes.NewWindowOp(fmt.Sprintf("%d_window", newIndex), nodes.WindowConfig{
+			Type:     t.wtype,
+			Length:   t.length,
+			Interval: t.interval,
+		}, t.isEventTime, options.LateTol, streamsFromStmt, options.BufferLength)
+		if err != nil {
+			return nil, 0, err
+		}
+	case *JoinPlan:
+		op = xstream.Transform(&operators.JoinOp{Joins: t.joins, From: t.from}, fmt.Sprintf("%d_join", newIndex), options.BufferLength)
+	case *FilterPlan:
+		op = xstream.Transform(&operators.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_filter", newIndex), options.BufferLength)
+	case *AggregatePlan:
+		op = xstream.Transform(&operators.AggregateOp{Dimensions: t.dimensions, Alias: t.alias}, fmt.Sprintf("%d_aggregate", newIndex), options.BufferLength)
+	case *HavingPlan:
+		op = xstream.Transform(&operators.HavingOp{Condition: t.condition}, fmt.Sprintf("%d_having", newIndex), options.BufferLength)
+	case *OrderPlan:
+		op = xstream.Transform(&operators.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options.BufferLength)
+	case *ProjectPlan:
+		op = xstream.Transform(&operators.ProjectOp{Fields: t.fields, IsAggregate: t.isAggregate, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options.BufferLength)
+	default:
+		return nil, 0, fmt.Errorf("unknown logical plan %v", t)
+	}
+	if uop, ok := op.(*nodes.UnaryOperator); ok {
+		uop.SetConcurrency(options.Concurrency)
+	}
+	tp.AddOperator(inputs, op)
+	return op, newIndex, nil
+}
+
+func getStream(m common.KeyValue, name string) (stmt *xsql.StreamStmt, err error) {
+	var s string
+	f, err := m.Get(name, &s)
+	if !f || err != nil {
+		return nil, fmt.Errorf("Cannot find key %s. ", name)
+	}
+	parser := xsql.NewParser(strings.NewReader(s))
+	stream, err := xsql.Language.Parse(parser)
+	stmt, ok := stream.(*xsql.StreamStmt)
+	if !ok {
+		err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", name)
+	}
+	return
+}
+
+func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption) (LogicalPlan, error) {
+	streamsFromStmt := xsql.GetStreams(stmt)
+	dimensions := stmt.Dimensions
+	var (
+		p                     LogicalPlan
+		children              []LogicalPlan
+		w                     *xsql.Window
+		ds                    xsql.Dimensions
+		alias, aggregateAlias xsql.Fields
+	)
+	for _, f := range stmt.Fields {
+		if f.AName != "" {
+			if !xsql.HasAggFuncs(f.Expr) {
+				alias = append(alias, f)
+			} else {
+				aggregateAlias = append(aggregateAlias, f)
+			}
+		}
+	}
+	for _, s := range streamsFromStmt {
+		p = DataSourcePlan{
+			name:       s,
+			isWildCard: true,
+			needMeta:   opt.SendMetaToSink,
+			fields:     nil,
+			metaFields: nil,
+			alias:      alias,
+		}.Init()
+		children = append(children, p)
+	}
+	if dimensions != nil {
+		w = dimensions.GetWindow()
+		if w != nil {
+			wp := WindowPlan{
+				wtype:       w.WindowType,
+				length:      w.Length.Val,
+				isEventTime: opt.IsEventTime,
+			}.Init()
+			if w.Interval != nil {
+				wp.interval = w.Interval.Val
+			} else if w.WindowType == xsql.COUNT_WINDOW {
+				//if no interval value is set and it's count window, then set interval to length value.
+				wp.interval = w.Length.Val
+			}
+			if w.Filter != nil {
+				wp.condition = w.Filter
+			}
+			// TODO calculate limit
+			// TODO incremental aggregate
+			wp.SetChildren(children)
+			children = []LogicalPlan{wp}
+			p = wp
+		}
+	}
+	if w != nil && stmt.Joins != nil {
+		// TODO extract on filter
+		p = JoinPlan{
+			from:  stmt.Sources[0].(*xsql.Table),
+			joins: stmt.Joins,
+		}.Init()
+		p.SetChildren(children)
+		children = []LogicalPlan{p}
+	}
+	if stmt.Condition != nil {
+		p = FilterPlan{
+			condition: stmt.Condition,
+		}.Init()
+		p.SetChildren(children)
+		children = []LogicalPlan{p}
+	}
+	// TODO handle aggregateAlias in optimization as it does not only happen in select fields
+	if dimensions != nil || len(aggregateAlias) > 0 {
+		ds = dimensions.GetGroups()
+		if (ds != nil && len(ds) > 0) || len(aggregateAlias) > 0 {
+			p = AggregatePlan{
+				dimensions: ds,
+				alias:      aggregateAlias,
+			}.Init()
+			p.SetChildren(children)
+			children = []LogicalPlan{p}
+		}
+	}
+
+	if stmt.Having != nil {
+		p = HavingPlan{
+			condition: stmt.Having,
+		}.Init()
+		p.SetChildren(children)
+		children = []LogicalPlan{p}
+	}
+
+	if stmt.SortFields != nil {
+		p = OrderPlan{
+			SortFields: stmt.SortFields,
+		}.Init()
+		p.SetChildren(children)
+		children = []LogicalPlan{p}
+	}
+
+	if stmt.Fields != nil {
+		p = ProjectPlan{
+			fields:      stmt.Fields,
+			isAggregate: xsql.IsAggStatement(stmt),
+			sendMeta:    opt.SendMetaToSink,
+		}.Init()
+		p.SetChildren(children)
+		children = []LogicalPlan{p}
+	}
+
+	return optimize(p)
+}

+ 300 - 0
xstream/planner/planner_test.go

@@ -0,0 +1,300 @@
+package planner
+
+import (
+	"fmt"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/api"
+	"reflect"
+	"strings"
+	"testing"
+)
+
+func Test_createLogicalPlan(t *testing.T) {
+	var tests = []struct {
+		sql string
+		p   LogicalPlan
+		err string
+	}{
+		{ // 0
+			sql: `SELECT name FROM tbl`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						DataSourcePlan{
+							baseLogicalPlan: baseLogicalPlan{},
+							name:            "tbl",
+							isWildCard:      true,
+							needMeta:        false,
+							fields:          nil,
+							metaFields:      nil,
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "name"},
+						Name:  "name",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 1 optimize where to data source
+			sql: `SELECT abc FROM src1 WHERE f1 = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						WindowPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									FilterPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												DataSourcePlan{
+													name:       "src1",
+													isWildCard: true,
+													needMeta:   false,
+													fields:     nil,
+													metaFields: nil,
+												}.Init(),
+											},
+										},
+										condition: &xsql.BinaryExpr{
+											LHS: &xsql.FieldRef{Name: "f1"},
+											OP:  xsql.EQ,
+											RHS: &xsql.StringLiteral{Val: "v1"},
+										},
+									}.Init(),
+								},
+							},
+							condition: nil,
+							wtype:     xsql.TUMBLING_WINDOW,
+							length:    10000,
+							interval:  0,
+							limit:     0,
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "abc"},
+						Name:  "abc",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 2 condition that cannot be optimized
+			sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						FilterPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									JoinPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												WindowPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															DataSourcePlan{
+																name:       "src1",
+																isWildCard: true,
+																needMeta:   false,
+																fields:     nil,
+																metaFields: nil,
+															}.Init(),
+															DataSourcePlan{
+																name:       "src2",
+																isWildCard: true,
+																needMeta:   false,
+																fields:     nil,
+																metaFields: nil,
+															}.Init(),
+														},
+													},
+													condition: nil,
+													wtype:     xsql.TUMBLING_WINDOW,
+													length:    10000,
+													interval:  0,
+													limit:     0,
+												}.Init(),
+											},
+										},
+										from: &xsql.Table{Name: "src1"},
+										joins: xsql.Joins{xsql.Join{
+											Name:     "src2",
+											JoinType: xsql.INNER_JOIN,
+											Expr: &xsql.BinaryExpr{
+												OP:  xsql.EQ,
+												LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
+												RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
+											},
+										}},
+									}.Init(),
+								},
+							},
+							condition: &xsql.BinaryExpr{
+								LHS: &xsql.BinaryExpr{
+									OP:  xsql.GT,
+									LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
+									RHS: &xsql.IntegerLiteral{Val: 20},
+								},
+								OP: xsql.OR,
+								RHS: &xsql.BinaryExpr{
+									OP:  xsql.GT,
+									LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
+									RHS: &xsql.IntegerLiteral{Val: 60},
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "id1"},
+						Name:  "id1",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 3 optimize window filter
+			sql: `SELECT abc FROM src1 WHERE f1 = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE size > 2)`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						WindowPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									FilterPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												DataSourcePlan{
+													name:       "src1",
+													isWildCard: true,
+													needMeta:   false,
+													fields:     nil,
+													metaFields: nil,
+												}.Init(),
+											},
+										},
+										condition: &xsql.BinaryExpr{
+											OP: xsql.AND,
+											LHS: &xsql.BinaryExpr{
+												LHS: &xsql.FieldRef{Name: "f1"},
+												OP:  xsql.EQ,
+												RHS: &xsql.StringLiteral{Val: "v1"},
+											},
+											RHS: &xsql.BinaryExpr{
+												LHS: &xsql.FieldRef{Name: "size"},
+												OP:  xsql.GT,
+												RHS: &xsql.IntegerLiteral{Val: 2},
+											},
+										},
+									}.Init(),
+								},
+							},
+							condition: nil,
+							wtype:     xsql.TUMBLING_WINDOW,
+							length:    10000,
+							interval:  0,
+							limit:     0,
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "abc"},
+						Name:  "abc",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		}, { // 4. do not optimize count window
+			sql: `SELECT * FROM demo WHERE temperature > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						HavingPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									FilterPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												WindowPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															DataSourcePlan{
+																name:       "demo",
+																isWildCard: true,
+																needMeta:   false,
+																fields:     nil,
+																metaFields: nil,
+															}.Init(),
+														},
+													},
+													condition: nil,
+													wtype:     xsql.COUNT_WINDOW,
+													length:    5,
+													interval:  1,
+													limit:     0,
+												}.Init(),
+											},
+										},
+										condition: &xsql.BinaryExpr{
+											LHS: &xsql.FieldRef{Name: "temperature"},
+											OP:  xsql.GT,
+											RHS: &xsql.IntegerLiteral{Val: 20},
+										},
+									}.Init(),
+								},
+							},
+							condition: &xsql.BinaryExpr{
+								LHS: &xsql.Call{Name: "COUNT", Args: []xsql.Expr{&xsql.StringLiteral{
+									Val: "*",
+								}}},
+								OP:  xsql.GT,
+								RHS: &xsql.IntegerLiteral{Val: 2},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.Wildcard{Token: xsql.ASTERISK},
+						Name:  "",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
+		},
+	}
+	//TODO optimize having, optimize on
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		//fmt.Printf("Parsing SQL %q.\n", tt.s)
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
+		}
+		p, err := createLogicalPlan(stmt, &api.RuleOption{
+			IsEventTime:        false,
+			LateTol:            0,
+			Concurrency:        0,
+			BufferLength:       0,
+			SendMetaToSink:     false,
+			Qos:                0,
+			CheckpointInterval: 0,
+		})
+		if err != nil {
+			t.Errorf("%d. %q\n\nerror:%v\n\n", i, tt.sql, err)
+		}
+		if !reflect.DeepEqual(tt.p, p) {
+			t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.p, p)
+		}
+	}
+}

+ 15 - 0
xstream/planner/projectPlan.go

@@ -0,0 +1,15 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+type ProjectPlan struct {
+	baseLogicalPlan
+	fields      xsql.Fields
+	isAggregate bool
+	sendMeta    bool
+}
+
+func (p ProjectPlan) Init() *ProjectPlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}

+ 17 - 0
xstream/planner/rules.go

@@ -0,0 +1,17 @@
+package planner
+
+type logicalOptRule interface {
+	optimize(LogicalPlan) (LogicalPlan, error)
+	name() string
+}
+
+type predicatePushDown struct{}
+
+func (r *predicatePushDown) optimize(lp LogicalPlan) (LogicalPlan, error) {
+	_, p := lp.PushDownPredicate(nil)
+	return p, nil
+}
+
+func (r *predicatePushDown) name() string {
+	return "predicatePushDown"
+}

+ 20 - 0
xstream/planner/util.go

@@ -0,0 +1,20 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+func GetRefSources(node xsql.Node) []string {
+	result := make(map[string]bool)
+	keys := make([]string, 0, len(result))
+	if node == nil {
+		return keys
+	}
+	xsql.WalkFunc(node, func(n xsql.Node) {
+		if f, ok := n.(*xsql.FieldRef); ok {
+			result[string(f.StreamName)] = true
+		}
+	})
+	for k := range result {
+		keys = append(keys, k)
+	}
+	return keys
+}

+ 37 - 0
xstream/planner/windowPlan.go

@@ -0,0 +1,37 @@
+package planner
+
+import "github.com/emqx/kuiper/xsql"
+
+type WindowPlan struct {
+	baseLogicalPlan
+	condition   xsql.Expr
+	wtype       xsql.WindowType
+	length      int
+	interval    int //If interval is not set, it is equals to Length
+	limit       int //If limit is not positive, there will be no limit
+	isEventTime bool
+}
+
+func (p WindowPlan) Init() *WindowPlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}
+
+func (p *WindowPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalPlan) {
+	if p.wtype == xsql.COUNT_WINDOW {
+		return condition, p
+	} else if p.isEventTime {
+		// TODO event time filter, need event window op support
+		//p.condition = combine(condition, p.condition)
+		//// push nil condition won't return any
+		//p.baseLogicalPlan.PushDownPredicate(nil)
+		// return nil, p
+		return condition, p
+	} else {
+		//Presume window condition are only one table related.
+		// TODO window condition validation
+		a := combine(condition, p.condition)
+		p.condition, _ = p.baseLogicalPlan.PushDownPredicate(a)
+		return nil, p
+	}
+}

+ 3 - 1
xstream/server/server/ruleManager.go

@@ -5,6 +5,8 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/emqx/kuiper/xstream/planner"
+	"path"
 	"sort"
 	"sync"
 
@@ -54,7 +56,7 @@ func createRuleState(rule *api.Rule) (*RuleState, error) {
 		Name: rule.Id,
 	}
 	registry.Store(rule.Id, rs)
-	if tp, err := ruleProcessor.ExecInitRule(rule); err != nil {
+	if tp, err := planner.Plan(rule, path.Dir(dataDir)); err != nil {
 		return rs, err
 	} else {
 		rs.Topology = tp