Просмотр исходного кода

fix: sliding window should trigger for each matched event (#2037)

* add test

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* fix test

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 год назад
Родитель
Сommit
dd63ab83a8

+ 8 - 17
internal/topo/node/event_window_trigger.go

@@ -115,7 +115,6 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.T
 		prevWindowEndTs int64
 		lastTicked      bool
 	)
-	isTupleMatch := make(map[*xsql.Tuple]bool, 0)
 	for {
 		select {
 		// process incoming item
@@ -160,24 +159,14 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.T
 					}
 					if windowEndTs > 0 {
 						if o.window.Type == ast.SLIDING_WINDOW {
-							var targetTuple *xsql.Tuple
-							if len(inputs) > 0 && o.window.Type == ast.SLIDING_WINDOW {
-								for _, t := range inputs {
-									if t.Timestamp == windowEndTs {
-										targetTuple = t
-										break
-									}
-								}
-							}
-							isMatch := isTupleMatch[targetTuple]
-							if isMatch {
-								if o.window.Delay > 0 && o.window.Type == ast.SLIDING_WINDOW {
-									o.delayTS = append(o.delayTS, windowEndTs+o.window.Delay)
+							for len(o.triggerTS) > 0 && o.triggerTS[0] <= watermarkTs {
+								if o.window.Delay > 0 {
+									o.delayTS = append(o.delayTS, o.triggerTS[0]+o.window.Delay)
 								} else {
-									inputs = o.scan(inputs, windowEndTs, ctx)
+									inputs = o.scan(inputs, o.triggerTS[0], ctx)
 								}
+								o.triggerTS = o.triggerTS[1:]
 							}
-							delete(isTupleMatch, targetTuple)
 						} else {
 							inputs = o.scan(inputs, windowEndTs, ctx)
 						}
@@ -202,7 +191,9 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.T
 				if o.triggerTime == 0 {
 					o.triggerTime = d.Timestamp
 				}
-				isTupleMatch[d] = o.isMatchCondition(ctx, d)
+				if o.window.Type == ast.SLIDING_WINDOW && o.isMatchCondition(ctx, d) {
+					o.triggerTS = append(o.triggerTS, d.GetTimestamp())
+				}
 				inputs = append(inputs, d)
 				o.statManager.ProcessTimeEnd()
 				_ = ctx.PutState(WindowInputsKey, inputs)

+ 2 - 0
internal/topo/node/window_op.go

@@ -54,6 +54,7 @@ type WindowOperator struct {
 	triggerTime      int64
 	msgCount         int
 	delayTS          []int64
+	triggerTS        []int64
 	triggerCondition ast.Expr
 }
 
@@ -97,6 +98,7 @@ func NewWindowOp(name string, w WindowConfig, options *api.RuleOption) (*WindowO
 		o.triggerCondition = w.TriggerCondition
 	}
 	o.delayTS = make([]int64, 0)
+	o.triggerTS = make([]int64, 0)
 	return o, nil
 }
 

+ 221 - 94
internal/topo/topotest/window_rule_test.go

@@ -910,45 +910,99 @@ func TestEventWindow(t *testing.T) {
 			Name: `TestEventWindowRule3`,
 			Sql:  `SELECT color, temp, demoE.ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
 			R: [][]map[string]interface{}{
-				{{
-					"color": "red",
-					"temp":  25.5,
-					"ts":    float64(1541152486013),
-				}}, {{
-					"color": "red",
-					"temp":  25.5,
-					"ts":    float64(1541152486013),
-				}}, {{
-					"color": "blue",
-					"temp":  28.1,
-					"ts":    float64(1541152487632),
-				}}, {{
-					"color": "blue",
-					"temp":  28.1,
-					"ts":    float64(1541152487632),
-				}, {
-					"color": "yellow",
-					"temp":  27.4,
-					"ts":    float64(1541152488442),
-				}}, {{
-					"color": "yellow",
-					"temp":  27.4,
-					"ts":    float64(1541152488442),
-				}, {
-					"color": "red",
-					"temp":  25.5,
-					"ts":    float64(1541152489252),
-				}},
+				{
+					{
+						"color": "red",
+						"temp":  25.5,
+						"ts":    float64(1541152486013),
+					},
+				},
+				{
+					{
+						"color": "red",
+						"temp":  25.5,
+						"ts":    float64(1541152486013),
+					},
+				},
+				{
+					{
+						"color": "red",
+						"temp":  25.5,
+						"ts":    float64(1541152486013),
+					},
+				},
+				{
+					{
+						"color": "blue",
+						"temp":  28.1,
+						"ts":    float64(1541152487632),
+					},
+				},
+				{
+					{
+						"color": "blue",
+						"temp":  28.1,
+						"ts":    float64(1541152487632),
+					},
+				},
+				{
+					{
+						"color": "blue",
+						"temp":  28.1,
+						"ts":    float64(1541152487632),
+					},
+					{
+						"color": "yellow",
+						"temp":  27.4,
+						"ts":    float64(1541152488442),
+					},
+				},
+				{
+					{
+						"color": "blue",
+						"temp":  28.1,
+						"ts":    float64(1541152487632),
+					},
+					{
+						"color": "yellow",
+						"temp":  27.4,
+						"ts":    float64(1541152488442),
+					},
+				},
+				{
+					{
+						"color": "yellow",
+						"temp":  27.4,
+						"ts":    float64(1541152488442),
+					},
+					{
+						"color": "red",
+						"temp":  25.5,
+						"ts":    float64(1541152489252),
+					},
+				},
+				{
+					{
+						"color": "yellow",
+						"temp":  27.4,
+						"ts":    float64(1541152488442),
+					},
+					{
+						"color": "red",
+						"temp":  25.5,
+						"ts":    float64(1541152489252),
+					},
+				},
 			},
 			M: map[string]interface{}{
 				"op_6_project_0_exceptions_total":   int64(0),
 				"op_6_project_0_process_latency_us": int64(0),
-				"op_6_project_0_records_in_total":   int64(5),
-				"op_6_project_0_records_out_total":  int64(5),
+				"op_6_project_0_records_in_total":   int64(9),
+				"op_6_project_0_records_out_total":  int64(9),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(5),
-				"sink_mockSink_0_records_out_total": int64(5),
+				"sink_mockSink_0_records_in_total":  int64(9),
+				"sink_mockSink_0_records_out_total": int64(9),
 
 				"source_demoE_0_exceptions_total":  int64(0),
 				"source_demoE_0_records_in_total":  int64(6),
@@ -961,51 +1015,64 @@ func TestEventWindow(t *testing.T) {
 				"op_4_window_0_exceptions_total":   int64(0),
 				"op_4_window_0_process_latency_us": int64(0),
 				"op_4_window_0_records_in_total":   int64(9),
-				"op_4_window_0_records_out_total":  int64(5),
+				"op_4_window_0_records_out_total":  int64(9),
 
 				"op_5_join_0_exceptions_total":   int64(0),
 				"op_5_join_0_process_latency_us": int64(0),
-				"op_5_join_0_records_in_total":   int64(5),
-				"op_5_join_0_records_out_total":  int64(5),
+				"op_5_join_0_records_in_total":   int64(9),
+				"op_5_join_0_records_out_total":  int64(9),
 			},
 		},
 		{
 			Name: `TestEventWindowRule4`,
 			Sql:  `SELECT  window_start() as ws, color, window_end() as we FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
 			R: [][]map[string]interface{}{
-				{{
-					"color": "red",
-					"ws":    float64(1541152484013),
-					"we":    float64(1541152486013),
-				}}, {{
-					"color": "blue",
-					"ws":    float64(1541152485632),
-					"we":    float64(1541152487632),
-				}, {
-					"color": "red",
-					"ws":    float64(1541152485632),
-					"we":    float64(1541152487632),
-				}}, {{
-					"color": "blue",
-					"ws":    float64(1541152486442),
-					"we":    float64(1541152488442),
-				}, {
-					"color": "yellow",
-					"ws":    float64(1541152486442),
-					"we":    float64(1541152488442),
-				}}, {{
-					"color": "blue",
-					"ws":    float64(1541152487252),
-					"we":    float64(1541152489252),
-				}, {
-					"color": "red",
-					"ws":    float64(1541152487252),
-					"we":    float64(1541152489252),
-				}, {
-					"color": "yellow",
-					"ws":    float64(1541152487252),
-					"we":    float64(1541152489252),
-				}},
+				{
+					{
+						"color": "red",
+						"ws":    float64(1541152484013),
+						"we":    float64(1541152486013),
+					},
+				},
+				{
+					{
+						"color": "blue",
+						"ws":    float64(1541152485632),
+						"we":    float64(1541152487632),
+					}, {
+						"color": "red",
+						"ws":    float64(1541152485632),
+						"we":    float64(1541152487632),
+					},
+				},
+				{
+					{
+						"color": "blue",
+						"ws":    float64(1541152486442),
+						"we":    float64(1541152488442),
+					}, {
+						"color": "yellow",
+						"ws":    float64(1541152486442),
+						"we":    float64(1541152488442),
+					},
+				},
+				{
+					{
+						"color": "blue",
+						"ws":    float64(1541152487252),
+						"we":    float64(1541152489252),
+					},
+					{
+						"color": "red",
+						"ws":    float64(1541152487252),
+						"we":    float64(1541152489252),
+					},
+					{
+						"color": "yellow",
+						"ws":    float64(1541152487252),
+						"we":    float64(1541152489252),
+					},
+				},
 			},
 			M: map[string]interface{}{
 				"op_6_project_0_exceptions_total":   int64(0),
@@ -1087,32 +1154,70 @@ func TestEventWindow(t *testing.T) {
 			Name: `TestEventWindowRule6`,
 			Sql:  `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
 			R: [][]map[string]interface{}{
-				{{
-					"m": 25.5,
-					"c": float64(1),
-				}}, {{
-					"m": 25.5,
-					"c": float64(1),
-				}}, {{
-					"m": 28.1,
-					"c": float64(1),
-				}}, {{
-					"m": 28.1,
-					"c": float64(2),
-				}}, {{
-					"m": 27.4,
-					"c": float64(2),
-				}},
+				{
+					{
+						"m": 25.5,
+						"c": float64(1),
+					},
+				},
+				{
+					{
+						"m": 25.5,
+						"c": float64(1),
+					},
+				},
+				{
+					{
+						"m": 25.5,
+						"c": float64(1),
+					},
+				},
+				{
+					{
+						"m": 28.1,
+						"c": float64(1),
+					},
+				},
+				{
+					{
+						"m": 28.1,
+						"c": float64(1),
+					},
+				},
+				{
+					{
+						"m": 28.1,
+						"c": float64(2),
+					},
+				},
+				{
+					{
+						"m": 28.1,
+						"c": float64(2),
+					},
+				},
+				{
+					{
+						"m": 27.4,
+						"c": float64(2),
+					},
+				},
+				{
+					{
+						"m": 27.4,
+						"c": float64(2),
+					},
+				},
 			},
 			M: map[string]interface{}{
 				"op_6_project_0_exceptions_total":   int64(0),
 				"op_6_project_0_process_latency_us": int64(0),
-				"op_6_project_0_records_in_total":   int64(5),
-				"op_6_project_0_records_out_total":  int64(5),
+				"op_6_project_0_records_in_total":   int64(9),
+				"op_6_project_0_records_out_total":  int64(9),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(5),
-				"sink_mockSink_0_records_out_total": int64(5),
+				"sink_mockSink_0_records_in_total":  int64(9),
+				"sink_mockSink_0_records_out_total": int64(9),
 
 				"source_demoE_0_exceptions_total":  int64(0),
 				"source_demoE_0_records_in_total":  int64(6),
@@ -1124,12 +1229,12 @@ func TestEventWindow(t *testing.T) {
 
 				"op_4_window_0_exceptions_total":  int64(0),
 				"op_4_window_0_records_in_total":  int64(9),
-				"op_4_window_0_records_out_total": int64(5),
+				"op_4_window_0_records_out_total": int64(9),
 
 				"op_5_join_0_exceptions_total":   int64(0),
 				"op_5_join_0_process_latency_us": int64(0),
-				"op_5_join_0_records_in_total":   int64(5),
-				"op_5_join_0_records_out_total":  int64(5),
+				"op_5_join_0_records_in_total":   int64(9),
+				"op_5_join_0_records_out_total":  int64(9),
 			},
 		},
 		{
@@ -1317,6 +1422,26 @@ func TestEventWindow(t *testing.T) {
 				"op_3_window_0_records_out_total":  int64(5),
 			},
 		},
+		{
+			Name: `TestEventWindowCondition10`,
+			Sql:  `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 1) Over (When size = 3)`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"color": "red",
+					},
+				},
+			},
+			M: map[string]interface{}{
+				"op_2_watermark_0_records_in_total":  int64(6),
+				"op_2_watermark_0_records_out_total": int64(4),
+				"op_2_watermark_0_exceptions_total":  int64(0),
+
+				"op_3_window_0_records_in_total":  int64(4),
+				"op_3_window_0_records_out_total": int64(1),
+				"op_3_window_0_exceptions_total":  int64(0),
+			},
+		},
 	}
 	HandleStream(true, streamList, t)
 	options := []*api.RuleOption{
@@ -1325,14 +1450,16 @@ func TestEventWindow(t *testing.T) {
 			SendError:    true,
 			IsEventTime:  true,
 			LateTol:      1000,
-		}, {
+		},
+		{
 			BufferLength:       100,
 			SendError:          true,
 			Qos:                api.AtLeastOnce,
 			CheckpointInterval: 5000,
 			IsEventTime:        true,
 			LateTol:            1000,
-		}, {
+		},
+		{
 			BufferLength:       100,
 			SendError:          true,
 			Qos:                api.ExactlyOnce,