Browse Source

fix(window): fix wrong first window start

In event window, tumbling window start time for is wrong

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 1 năm trước cách đây
mục cha
commit
53eccc3c66

+ 5 - 1
internal/topo/node/watermark.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -251,6 +251,10 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.T
 						log.Debugf("receive non tuple element %v", d)
 						log.Debugf("receive non tuple element %v", d)
 					}
 					}
 					log.Debugf("event window receive tuple %s", tuple.Message)
 					log.Debugf("event window receive tuple %s", tuple.Message)
+					// first tuple, set the window start time, which will set to triggerTime
+					if o.triggerTime == 0 {
+						o.triggerTime = tuple.Timestamp
+					}
 					if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(), ctx) {
 					if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(), ctx) {
 						inputs = append(inputs, tuple)
 						inputs = append(inputs, tuple)
 					}
 					}

+ 3 - 1
internal/topo/node/window_op.go

@@ -123,7 +123,9 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 	} else {
 	} else {
 		log.Warnf("Restore window state fails: %s", err)
 		log.Warnf("Restore window state fails: %s", err)
 	}
 	}
-	o.triggerTime = conf.GetNowInMilli()
+	if !o.isEventTime {
+		o.triggerTime = conf.GetNowInMilli()
+	}
 	if s, err := ctx.GetState(TRIGGER_TIME_KEY); err == nil && s != nil {
 	if s, err := ctx.GetState(TRIGGER_TIME_KEY); err == nil && s != nil {
 		if si, ok := s.(int64); ok {
 		if si, ok := s.(int64); ok {
 			o.triggerTime = si
 			o.triggerTime = si

+ 24 - 24
internal/topo/topotest/mocknode/mock_data.go

@@ -280,7 +280,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"size":  3,
 				"size":  3,
 				"ts":    1541152486013,
 				"ts":    1541152486013,
 			},
 			},
-			Timestamp: 1541152486023,
+			Timestamp: 1541152586023,
 		},
 		},
 		{
 		{
 			Emitter: "demoE",
 			Emitter: "demoE",
@@ -289,7 +289,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"size":  2,
 				"size":  2,
 				"ts":    1541152487632,
 				"ts":    1541152487632,
 			},
 			},
-			Timestamp: 1541152487822,
+			Timestamp: 1541152587822,
 		},
 		},
 		{
 		{
 			Emitter: "demoE",
 			Emitter: "demoE",
@@ -298,7 +298,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"size":  1,
 				"size":  1,
 				"ts":    1541152489252,
 				"ts":    1541152489252,
 			},
 			},
-			Timestamp: 1541152489632,
+			Timestamp: 1541152589632,
 		},
 		},
 		{ // dropped item
 		{ // dropped item
 			Emitter: "demoE",
 			Emitter: "demoE",
@@ -307,7 +307,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"size":  6,
 				"size":  6,
 				"ts":    1541152486822,
 				"ts":    1541152486822,
 			},
 			},
-			Timestamp: 1541152489842,
+			Timestamp: 1541152589842,
 		},
 		},
 		{
 		{
 			Emitter: "demoE",
 			Emitter: "demoE",
@@ -316,7 +316,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"size":  4,
 				"size":  4,
 				"ts":    1541152488442,
 				"ts":    1541152488442,
 			},
 			},
-			Timestamp: 1541152490052,
+			Timestamp: 1541152590052,
 		},
 		},
 		{ // To lift the watermark and issue all windows
 		{ // To lift the watermark and issue all windows
 			Emitter: "demoE",
 			Emitter: "demoE",
@@ -325,7 +325,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"size":  4,
 				"size":  4,
 				"ts":    1541152492342,
 				"ts":    1541152492342,
 			},
 			},
-			Timestamp: 1541152498888,
+			Timestamp: 1541152598888,
 		},
 		},
 	},
 	},
 	"demo1E": {
 	"demo1E": {
@@ -336,7 +336,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  59,
 				"hum":  59,
 				"ts":   1541152486823,
 				"ts":   1541152486823,
 			},
 			},
-			Timestamp: 1541152487250,
+			Timestamp: 1541152587250,
 		},
 		},
 		{
 		{
 			Emitter: "demo1E",
 			Emitter: "demo1E",
@@ -345,7 +345,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  65,
 				"hum":  65,
 				"ts":   1541152486013,
 				"ts":   1541152486013,
 			},
 			},
-			Timestamp: 1541152487751,
+			Timestamp: 1541152587751,
 		},
 		},
 		{
 		{
 			Emitter: "demo1E",
 			Emitter: "demo1E",
@@ -354,7 +354,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  80,
 				"hum":  80,
 				"ts":   1541152488442,
 				"ts":   1541152488442,
 			},
 			},
-			Timestamp: 1541152489252,
+			Timestamp: 1541152589252,
 		},
 		},
 		{
 		{
 			Emitter: "demo1E",
 			Emitter: "demo1E",
@@ -363,7 +363,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  75,
 				"hum":  75,
 				"ts":   1541152487632,
 				"ts":   1541152487632,
 			},
 			},
-			Timestamp: 1541152489753,
+			Timestamp: 1541152589753,
 		},
 		},
 		{
 		{
 			Emitter: "demo1E",
 			Emitter: "demo1E",
@@ -372,7 +372,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  62,
 				"hum":  62,
 				"ts":   1541152489252,
 				"ts":   1541152489252,
 			},
 			},
-			Timestamp: 1541152489954,
+			Timestamp: 1541152589954,
 		},
 		},
 		{
 		{
 			Emitter: "demo1E",
 			Emitter: "demo1E",
@@ -381,7 +381,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  62,
 				"hum":  62,
 				"ts":   1541152499252,
 				"ts":   1541152499252,
 			},
 			},
-			Timestamp: 1541152499755,
+			Timestamp: 1541152599755,
 		},
 		},
 	},
 	},
 	"sessionDemoE": {
 	"sessionDemoE": {
@@ -392,7 +392,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  65,
 				"hum":  65,
 				"ts":   1541152486013,
 				"ts":   1541152486013,
 			},
 			},
-			Timestamp: 1541152486250,
+			Timestamp: 1541152586250,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -401,7 +401,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  75,
 				"hum":  75,
 				"ts":   1541152487932,
 				"ts":   1541152487932,
 			},
 			},
-			Timestamp: 1541152487951,
+			Timestamp: 1541152587951,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -410,7 +410,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  59,
 				"hum":  59,
 				"ts":   1541152486823,
 				"ts":   1541152486823,
 			},
 			},
-			Timestamp: 1541152488552,
+			Timestamp: 1541152588552,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -419,7 +419,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  62,
 				"hum":  62,
 				"ts":   1541152489252,
 				"ts":   1541152489252,
 			},
 			},
-			Timestamp: 1541152489353,
+			Timestamp: 1541152589353,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -428,7 +428,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  80,
 				"hum":  80,
 				"ts":   1541152488442,
 				"ts":   1541152488442,
 			},
 			},
-			Timestamp: 1541152489854,
+			Timestamp: 1541152589854,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -437,7 +437,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  63,
 				"hum":  63,
 				"ts":   1541152490062,
 				"ts":   1541152490062,
 			},
 			},
-			Timestamp: 1541152490155,
+			Timestamp: 1541152590155,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -446,7 +446,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  85,
 				"hum":  85,
 				"ts":   1541152491682,
 				"ts":   1541152491682,
 			},
 			},
-			Timestamp: 1541152491686,
+			Timestamp: 1541152591686,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -455,7 +455,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  71,
 				"hum":  71,
 				"ts":   1541152490872,
 				"ts":   1541152490872,
 			},
 			},
-			Timestamp: 1541152491972,
+			Timestamp: 1541152591972,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -464,7 +464,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  92,
 				"hum":  92,
 				"ts":   1541152492492,
 				"ts":   1541152492492,
 			},
 			},
-			Timestamp: 1541152492592,
+			Timestamp: 1541152592592,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -473,7 +473,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  87,
 				"hum":  87,
 				"ts":   1541152494112,
 				"ts":   1541152494112,
 			},
 			},
-			Timestamp: 1541152494212,
+			Timestamp: 1541152594212,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -482,7 +482,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  99,
 				"hum":  99,
 				"ts":   1541152493202,
 				"ts":   1541152493202,
 			},
 			},
-			Timestamp: 1541152495202,
+			Timestamp: 1541152595202,
 		},
 		},
 		{
 		{
 			Emitter: "sessionDemoE",
 			Emitter: "sessionDemoE",
@@ -491,7 +491,7 @@ var TestData = map[string][]*xsql.Tuple{
 				"hum":  99,
 				"hum":  99,
 				"ts":   1541152499202,
 				"ts":   1541152499202,
 			},
 			},
-			Timestamp: 1541152499402,
+			Timestamp: 1541152599402,
 		},
 		},
 	},
 	},
 	"demoErr": {
 	"demoErr": {

+ 2 - 2
internal/topo/topotest/window_rule_test.go

@@ -794,7 +794,7 @@ func TestEventWindow(t *testing.T) {
 			Sql:  `SELECT window_start(), window_end(), color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
 			Sql:  `SELECT window_start(), window_end(), color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
 			R: [][]map[string]interface{}{
 			R: [][]map[string]interface{}{
 				{{
 				{{
-					"window_start": float64(1541152486000),
+					"window_start": float64(1541152486013),
 					"window_end":   float64(1541152487000),
 					"window_end":   float64(1541152487000),
 					"color":        "red",
 					"color":        "red",
 					"ts":           float64(1541152486013),
 					"ts":           float64(1541152486013),
@@ -1179,7 +1179,7 @@ func TestEventWindow(t *testing.T) {
 			R: [][]map[string]interface{}{
 			R: [][]map[string]interface{}{
 				{{
 				{{
 					"color":        "red",
 					"color":        "red",
-					"window_start": float64(1541152485000),
+					"window_start": float64(1541152485013),
 					"window_end":   float64(1541152487000),
 					"window_end":   float64(1541152487000),
 				}},
 				}},
 				{{
 				{{