Kaynağa Gözat

fix(filter): window filter must continue running

Fix the problem which do not produce anything if all window content are filtered

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

tbf

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 yıl önce
ebeveyn
işleme
c199707c78

+ 1 - 4
internal/topo/operator/filter_operator.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -74,9 +74,6 @@ func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 			return err
 		}
 		r := input.Filter(sel)
-		if r.Len() == 0 {
-			return nil
-		}
 		return r
 	default:
 		return fmt.Errorf("run Where error: invalid input %[1]T(%[1]v)", input)

+ 7 - 3
internal/topo/operator/filter_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -374,7 +374,9 @@ func TestFilterPlan_Apply(t *testing.T) {
 					},
 				},
 			},
-			result: nil,
+			result: &xsql.WindowTuples{
+				Content: []xsql.TupleRow{},
+			},
 		},
 		{
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
@@ -481,7 +483,9 @@ func TestFilterPlan_Apply(t *testing.T) {
 					},
 				},
 			},
-			result: nil,
+			result: &xsql.JoinTuples{
+				Content: []*xsql.JoinTuple{},
+			},
 		},
 		{
 			sql: "SELECT abc FROM tbl WHERE meta(topic) = \"topic1\" ",

+ 9 - 6
internal/topo/topotest/window_rule_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -757,22 +757,25 @@ func TestEventWindow(t *testing.T) {
 					"color":        "red",
 					"ts":           float64(1541152486013),
 				}},
+				{},
 				{{
 					"window_start": float64(1541152488000),
 					"window_end":   float64(1541152489000),
 					"color":        "yellow",
 					"ts":           float64(1541152488442),
 				}},
+				{},
+				{},
 			},
 			M: map[string]interface{}{
 				"op_4_project_0_exceptions_total":   int64(0),
 				"op_4_project_0_process_latency_us": int64(0),
-				"op_4_project_0_records_in_total":   int64(2),
-				"op_4_project_0_records_out_total":  int64(2),
+				"op_4_project_0_records_in_total":   int64(5),
+				"op_4_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(2),
-				"sink_mockSink_0_records_out_total": int64(2),
+				"sink_mockSink_0_records_in_total":  int64(5),
+				"sink_mockSink_0_records_out_total": int64(5),
 
 				"source_demoE_0_exceptions_total":  int64(0),
 				"source_demoE_0_records_in_total":  int64(6),
@@ -786,7 +789,7 @@ func TestEventWindow(t *testing.T) {
 				"op_3_filter_0_exceptions_total":   int64(0),
 				"op_3_filter_0_process_latency_us": int64(0),
 				"op_3_filter_0_records_in_total":   int64(5),
-				"op_3_filter_0_records_out_total":  int64(2),
+				"op_3_filter_0_records_out_total":  int64(5),
 			},
 		}, {
 			Name: `TestEventWindowRule3`,

+ 4 - 4
test/graph_mix_rule.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2022 EMQ Technologies Co., Ltd.
+  ~ Copyright 2023 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -179,7 +179,7 @@
         &quot;type&quot;: &quot;operator&quot;,&#xd;
         &quot;nodeType&quot;: &quot;filter&quot;,&#xd;
         &quot;props&quot;: {&#xd;
-          &quot;expr&quot;: &quot;humidity &gt; 30&quot;&#xd;
+          &quot;expr&quot;: &quot;humidity &gt; 20&quot;&#xd;
         }&#xd;
       },&#xd;
       &quot;aggfunc&quot;: {&#xd;
@@ -356,7 +356,7 @@
             <hashTree/>
             <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
               <stringProp name="JSON_PATH">$.sink_mqtt2_0_records_out_total</stringProp>
-              <stringProp name="EXPECTED_VALUE">4</stringProp>
+              <stringProp name="EXPECTED_VALUE">5</stringProp>
               <boolProp name="JSONVALIDATION">true</boolProp>
               <boolProp name="EXPECT_NULL">false</boolProp>
               <boolProp name="INVERT">false</boolProp>
@@ -521,7 +521,7 @@
         <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
         <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
           <boolProp name="LoopController.continue_forever">false</boolProp>
-          <stringProp name="LoopController.loops">4</stringProp>
+          <stringProp name="LoopController.loops">5</stringProp>
         </elementProp>
         <stringProp name="ThreadGroup.num_threads">1</stringProp>
         <stringProp name="ThreadGroup.ramp_time">1</stringProp>

+ 2 - 1
test/graph_mix_rule_result2.txt

@@ -1,4 +1,5 @@
-27.2,0.8792730616507244
+22.5,-0.4871745124605095
+24.85,-0.4871745124605095
 27.15,0.8792730616507244
 26.950000000000003,0.9224328169230859
 26.85,0.9953511049115592