Просмотр исходного кода

fix(planner): filter must not apply on table before join align

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 лет назад
Родитель
Сommit
e400e20032

+ 4 - 1
internal/topo/planner/dataSourcePlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -50,6 +50,9 @@ func (p DataSourcePlan) Init() *DataSourcePlan {
 
 // Presume no children for data source
 func (p *DataSourcePlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
+	if p.streamStmt.StreamType == ast.TypeTable {
+		return condition, p.self
+	}
 	owned, other := p.extract(condition)
 	if owned != nil {
 		// Add a filter plan for children

+ 1 - 8
internal/topo/planner/joinAlignPlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -39,12 +39,5 @@ func (p *JoinAlignPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, Logical
 			p.children[i] = newChild
 		}
 	}
-	for i, child := range p.children {
-		if _, ok := child.(*DataSourcePlan); !ok {
-			var newChild LogicalPlan
-			rest, newChild = child.PushDownPredicate(rest)
-			p.children[i] = newChild
-		}
-	}
 	return rest, p.self
 }

+ 119 - 133
internal/topo/planner/planner_test.go

@@ -686,31 +686,20 @@ func Test_createLogicalPlan(t *testing.T) {
 														},
 													},
 												}.Init(),
-												FilterPlan{
-													baseLogicalPlan: baseLogicalPlan{
-														children: []LogicalPlan{
-															DataSourcePlan{
-																name: "tableInPlanner",
-																streamFields: []interface{}{
-																	&ast.StreamField{
-																		Name:      "hum",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
-																	},
-																	&ast.StreamField{
-																		Name:      "id",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
-																	},
-																},
-																streamStmt: streams["tableInPlanner"],
-																metaFields: []string{},
-															}.Init(),
+												DataSourcePlan{
+													name: "tableInPlanner",
+													streamFields: []interface{}{
+														&ast.StreamField{
+															Name:      "hum",
+															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														},
+														&ast.StreamField{
+															Name:      "id",
+															FieldType: &ast.BasicType{Type: ast.BIGINT},
 														},
 													},
-													condition: &ast.BinaryExpr{
-														OP:  ast.LT,
-														LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
-														RHS: &ast.IntegerLiteral{Val: 60},
-													},
+													streamStmt: streams["tableInPlanner"],
+													metaFields: []string{},
 												}.Init(),
 											},
 										},
@@ -727,9 +716,17 @@ func Test_createLogicalPlan(t *testing.T) {
 									Alias:    "",
 									JoinType: ast.INNER_JOIN,
 									Expr: &ast.BinaryExpr{
-										LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
-										OP:  ast.EQ,
-										RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
+										OP: ast.AND,
+										LHS: &ast.BinaryExpr{
+											LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
+											OP:  ast.EQ,
+											RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
+										},
+										RHS: &ast.BinaryExpr{
+											OP:  ast.LT,
+											LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
+											RHS: &ast.IntegerLiteral{Val: 60},
+										},
 									},
 								},
 							},
@@ -759,73 +756,44 @@ func Test_createLogicalPlan(t *testing.T) {
 												WindowPlan{
 													baseLogicalPlan: baseLogicalPlan{
 														children: []LogicalPlan{
-															FilterPlan{
-																baseLogicalPlan: baseLogicalPlan{
-																	children: []LogicalPlan{
-																		DataSourcePlan{
-																			name: "src1",
-																			streamFields: []interface{}{
-																				&ast.StreamField{
-																					Name:      "id1",
-																					FieldType: &ast.BasicType{Type: ast.BIGINT},
-																				},
-																				&ast.StreamField{
-																					Name:      "temp",
-																					FieldType: &ast.BasicType{Type: ast.BIGINT},
-																				},
-																			},
-																			streamStmt: streams["src1"],
-																			metaFields: []string{},
-																		}.Init(),
-																	},
-																},
-																condition: &ast.BinaryExpr{
-																	RHS: &ast.BinaryExpr{
-																		OP:  ast.GT,
-																		LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
-																		RHS: &ast.IntegerLiteral{Val: 20},
-																	},
-																	OP: ast.AND,
-																	LHS: &ast.BinaryExpr{
-																		OP:  ast.GT,
-																		LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
-																		RHS: &ast.IntegerLiteral{Val: 111},
-																	},
-																},
-															}.Init(),
-														},
-													},
-													condition: nil,
-													wtype:     ast.TUMBLING_WINDOW,
-													length:    10000,
-													interval:  0,
-													limit:     0,
-												}.Init(),
-												FilterPlan{
-													baseLogicalPlan: baseLogicalPlan{
-														children: []LogicalPlan{
+
 															DataSourcePlan{
-																name: "tableInPlanner",
+																name: "src1",
 																streamFields: []interface{}{
 																	&ast.StreamField{
-																		Name:      "hum",
+																		Name:      "id1",
 																		FieldType: &ast.BasicType{Type: ast.BIGINT},
 																	},
 																	&ast.StreamField{
-																		Name:      "id",
+																		Name:      "temp",
 																		FieldType: &ast.BasicType{Type: ast.BIGINT},
 																	},
 																},
-																streamStmt: streams["tableInPlanner"],
+																streamStmt: streams["src1"],
 																metaFields: []string{},
 															}.Init(),
 														},
 													},
-													condition: &ast.BinaryExpr{
-														OP:  ast.LT,
-														LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
-														RHS: &ast.IntegerLiteral{Val: 60},
+													condition: nil,
+													wtype:     ast.TUMBLING_WINDOW,
+													length:    10000,
+													interval:  0,
+													limit:     0,
+												}.Init(),
+												DataSourcePlan{
+													name: "tableInPlanner",
+													streamFields: []interface{}{
+														&ast.StreamField{
+															Name:      "hum",
+															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														},
+														&ast.StreamField{
+															Name:      "id",
+															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														},
 													},
+													streamStmt: streams["tableInPlanner"],
+													metaFields: []string{},
 												}.Init(),
 											},
 										},
@@ -842,9 +810,33 @@ func Test_createLogicalPlan(t *testing.T) {
 									Alias:    "",
 									JoinType: ast.INNER_JOIN,
 									Expr: &ast.BinaryExpr{
-										LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
-										OP:  ast.EQ,
-										RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
+										OP: ast.AND,
+										LHS: &ast.BinaryExpr{
+											LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
+											OP:  ast.EQ,
+											RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
+										},
+										RHS: &ast.BinaryExpr{
+											RHS: &ast.BinaryExpr{
+												OP: ast.AND,
+												LHS: &ast.BinaryExpr{
+													OP:  ast.GT,
+													LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
+													RHS: &ast.IntegerLiteral{Val: 20},
+												},
+												RHS: &ast.BinaryExpr{
+													OP:  ast.LT,
+													LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
+													RHS: &ast.IntegerLiteral{Val: 60},
+												},
+											},
+											OP: ast.AND,
+											LHS: &ast.BinaryExpr{
+												OP:  ast.GT,
+												LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
+												RHS: &ast.IntegerLiteral{Val: 111},
+											},
+										},
 									},
 								},
 							},
@@ -1751,32 +1743,13 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 												WindowPlan{
 													baseLogicalPlan: baseLogicalPlan{
 														children: []LogicalPlan{
-															FilterPlan{
-																baseLogicalPlan: baseLogicalPlan{
-																	children: []LogicalPlan{
-																		DataSourcePlan{
-																			name: "src1",
-																			streamFields: []interface{}{
-																				"id1", "temp",
-																			},
-																			streamStmt: streams["src1"],
-																			metaFields: []string{},
-																		}.Init(),
-																	},
-																},
-																condition: &ast.BinaryExpr{
-																	RHS: &ast.BinaryExpr{
-																		OP:  ast.GT,
-																		LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
-																		RHS: &ast.IntegerLiteral{Val: 20},
-																	},
-																	OP: ast.AND,
-																	LHS: &ast.BinaryExpr{
-																		OP:  ast.GT,
-																		LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
-																		RHS: &ast.IntegerLiteral{Val: 111},
-																	},
+															DataSourcePlan{
+																name: "src1",
+																streamFields: []interface{}{
+																	"id1", "temp",
 																},
+																streamStmt: streams["src1"],
+																metaFields: []string{},
 															}.Init(),
 														},
 													},
@@ -1786,31 +1759,20 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 													interval:  0,
 													limit:     0,
 												}.Init(),
-												FilterPlan{
-													baseLogicalPlan: baseLogicalPlan{
-														children: []LogicalPlan{
-															DataSourcePlan{
-																name: "tableInPlanner",
-																streamFields: []interface{}{
-																	&ast.StreamField{
-																		Name:      "hum",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
-																	},
-																	&ast.StreamField{
-																		Name:      "id",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
-																	},
-																},
-																streamStmt: streams["tableInPlanner"],
-																metaFields: []string{},
-															}.Init(),
+												DataSourcePlan{
+													name: "tableInPlanner",
+													streamFields: []interface{}{
+														&ast.StreamField{
+															Name:      "hum",
+															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														},
+														&ast.StreamField{
+															Name:      "id",
+															FieldType: &ast.BasicType{Type: ast.BIGINT},
 														},
 													},
-													condition: &ast.BinaryExpr{
-														OP:  ast.LT,
-														LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
-														RHS: &ast.IntegerLiteral{Val: 60},
-													},
+													streamStmt: streams["tableInPlanner"],
+													metaFields: []string{},
 												}.Init(),
 											},
 										},
@@ -1827,9 +1789,33 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 									Alias:    "",
 									JoinType: ast.INNER_JOIN,
 									Expr: &ast.BinaryExpr{
-										LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
-										OP:  ast.EQ,
-										RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
+										OP: ast.AND,
+										LHS: &ast.BinaryExpr{
+											LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
+											OP:  ast.EQ,
+											RHS: &ast.FieldRef{Name: "id", StreamName: "tableInPlanner"},
+										},
+										RHS: &ast.BinaryExpr{
+											RHS: &ast.BinaryExpr{
+												OP: ast.AND,
+												LHS: &ast.BinaryExpr{
+													OP:  ast.GT,
+													LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
+													RHS: &ast.IntegerLiteral{Val: 20},
+												},
+												RHS: &ast.BinaryExpr{
+													OP:  ast.LT,
+													LHS: &ast.FieldRef{Name: "hum", StreamName: "tableInPlanner"},
+													RHS: &ast.IntegerLiteral{Val: 60},
+												},
+											},
+											OP: ast.AND,
+											LHS: &ast.BinaryExpr{
+												OP:  ast.GT,
+												LHS: &ast.FieldRef{Name: "id1", StreamName: "src1"},
+												RHS: &ast.IntegerLiteral{Val: 111},
+											},
+										},
 									},
 								},
 							},

+ 13 - 27
internal/topo/topotest/window_rule_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -629,35 +629,21 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				//"op_4_project_0_exceptions_total":   int64(0),
-				//"op_4_project_0_process_latency_us": int64(0),
-				//"op_4_project_0_records_in_total":   int64(2),
-				//"op_4_project_0_records_out_total":  int64(2),
-
-				"op_3_window_0_exceptions_total":   int64(0),
-				"op_3_window_0_process_latency_us": int64(0),
-				"op_3_window_0_records_in_total":   int64(3),
-				"op_3_window_0_records_out_total":  int64(2),
-
-				"op_2_filter_0_exceptions_total":   int64(0),
-				"op_2_filter_0_process_latency_us": int64(0),
-				"op_2_filter_0_records_in_total":   int64(5),
-				"op_2_filter_0_records_out_total":  int64(3),
-
-				"op_5_filter_0_exceptions_total":  int64(0),
-				"op_5_filter_0_records_in_total":  int64(1),
-				"op_5_filter_0_records_out_total": int64(1),
+				"op_2_window_0_exceptions_total":   int64(0),
+				"op_2_window_0_process_latency_us": int64(0),
+				"op_2_window_0_records_in_total":   int64(5),
+				"op_2_window_0_records_out_total":  int64(4),
 
-				"op_6_join_aligner_0_records_in_total":  int64(3),
-				"op_6_join_aligner_0_records_out_total": int64(2),
+				"op_4_join_aligner_0_records_in_total":  int64(5),
+				"op_4_join_aligner_0_records_out_total": int64(4),
 
-				"op_7_join_0_exceptions_total":  int64(0),
-				"op_7_join_0_records_in_total":  int64(2),
-				"op_7_join_0_records_out_total": int64(1),
+				"op_5_join_0_exceptions_total":  int64(0),
+				"op_5_join_0_records_in_total":  int64(4),
+				"op_5_join_0_records_out_total": int64(1),
 
-				"op_8_project_0_exceptions_total":  int64(0),
-				"op_8_project_0_records_in_total":  int64(1),
-				"op_8_project_0_records_out_total": int64(1),
+				"op_6_project_0_exceptions_total":  int64(0),
+				"op_6_project_0_records_in_total":  int64(1),
+				"op_6_project_0_records_out_total": int64(1),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(1),