Переглянути джерело

fix(op): filter collection should stop propagation

When all rows are filtered, stop propagation to next op

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 1 рік тому
батько
коміт
8804e96954

+ 4 - 1
internal/topo/operator/filter_operator.go

@@ -75,7 +75,10 @@ func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 			return err
 			return err
 		}
 		}
 		r := input.Filter(sel)
 		r := input.Filter(sel)
-		return r
+		// Only return if any row meets the condition, otherwise filter all
+		if r.Len() > 0 {
+			return r
+		}
 	default:
 	default:
 		return fmt.Errorf("run Where error: invalid input %[1]T(%[1]v)", input)
 		return fmt.Errorf("run Where error: invalid input %[1]T(%[1]v)", input)
 	}
 	}

+ 2 - 6
internal/topo/operator/filter_test.go

@@ -374,9 +374,7 @@ func TestFilterPlan_Apply(t *testing.T) {
 					},
 					},
 				},
 				},
 			},
 			},
-			result: &xsql.WindowTuples{
-				Content: []xsql.TupleRow{},
-			},
+			result: nil,
 		},
 		},
 		{
 		{
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
@@ -483,9 +481,7 @@ func TestFilterPlan_Apply(t *testing.T) {
 					},
 					},
 				},
 				},
 			},
 			},
-			result: &xsql.JoinTuples{
-				Content: []*xsql.JoinTuple{},
-			},
+			result: nil,
 		},
 		},
 		{
 		{
 			sql: "SELECT abc FROM tbl WHERE meta(topic) = \"topic1\" ",
 			sql: "SELECT abc FROM tbl WHERE meta(topic) = \"topic1\" ",

+ 5 - 8
internal/topo/topotest/window_rule_test.go

@@ -799,25 +799,22 @@ func TestEventWindow(t *testing.T) {
 					"color":        "red",
 					"color":        "red",
 					"ts":           float64(1541152486013),
 					"ts":           float64(1541152486013),
 				}},
 				}},
-				{},
 				{{
 				{{
 					"window_start": float64(1541152488000),
 					"window_start": float64(1541152488000),
 					"window_end":   float64(1541152489000),
 					"window_end":   float64(1541152489000),
 					"color":        "yellow",
 					"color":        "yellow",
 					"ts":           float64(1541152488442),
 					"ts":           float64(1541152488442),
 				}},
 				}},
-				{},
-				{},
 			},
 			},
 			M: map[string]interface{}{
 			M: map[string]interface{}{
 				"op_4_project_0_exceptions_total":   int64(0),
 				"op_4_project_0_exceptions_total":   int64(0),
 				"op_4_project_0_process_latency_us": int64(0),
 				"op_4_project_0_process_latency_us": int64(0),
-				"op_4_project_0_records_in_total":   int64(5),
-				"op_4_project_0_records_out_total":  int64(5),
+				"op_4_project_0_records_in_total":   int64(2),
+				"op_4_project_0_records_out_total":  int64(2),
 
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(5),
-				"sink_mockSink_0_records_out_total": int64(5),
+				"sink_mockSink_0_records_in_total":  int64(2),
+				"sink_mockSink_0_records_out_total": int64(2),
 
 
 				"source_demoE_0_exceptions_total":  int64(0),
 				"source_demoE_0_exceptions_total":  int64(0),
 				"source_demoE_0_records_in_total":  int64(6),
 				"source_demoE_0_records_in_total":  int64(6),
@@ -831,7 +828,7 @@ func TestEventWindow(t *testing.T) {
 				"op_3_filter_0_exceptions_total":   int64(0),
 				"op_3_filter_0_exceptions_total":   int64(0),
 				"op_3_filter_0_process_latency_us": int64(0),
 				"op_3_filter_0_process_latency_us": int64(0),
 				"op_3_filter_0_records_in_total":   int64(5),
 				"op_3_filter_0_records_in_total":   int64(5),
-				"op_3_filter_0_records_out_total":  int64(5),
+				"op_3_filter_0_records_out_total":  int64(2),
 			},
 			},
 		}, {
 		}, {
 			Name: `TestEventWindowRule3`,
 			Name: `TestEventWindowRule3`,