ソースを参照

fix: fix tumbling window would miss data (#2212)

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 年間 前
コミット
82e73903bf

+ 15 - 4
internal/topo/node/window_op.go

@@ -565,10 +565,21 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
 			// Added back all inputs for non expired events
 			inputs[i] = tuple
 			i++
-		} else if tuple.Timestamp > triggerTime {
-			// Only added back early arrived events
-			inputs[i] = tuple
-			i++
+		} else {
+			// time-related window is left-closed,right-opened, so that we need keep the tuple if its timestamp >= trigger time
+			if o.isTimeRelatedWindow() {
+				if tuple.Timestamp >= triggerTime {
+					// Only added back early arrived events
+					inputs[i] = tuple
+					i++
+				}
+			} else {
+				if tuple.Timestamp > triggerTime {
+					// Only added back early arrived events
+					inputs[i] = tuple
+					i++
+				}
+			}
 		}
 		if o.isTimeRelatedWindow() {
 			if tuple.Timestamp < triggerTime {

+ 2 - 0
internal/topo/topotest/mock_topo.go

@@ -270,6 +270,8 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 		var sql string
 		if createOrDrop {
 			switch name {
+			case "demoE3":
+				sql = `CREATE STREAM demoE3 () WITH (DATASOURCE="demoE3", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
 			case "demoE2":
 				sql = `CREATE STREAM demoE2 () WITH (DATASOURCE="demoE2", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
 			case "demoArr2":

+ 50 - 0
internal/topo/topotest/mocknode/mock_data.go

@@ -1072,6 +1072,56 @@ var TestData = map[string][]*xsql.Tuple{
 			Timestamp: 1541152488000,
 		},
 	},
+	"demoE3": {
+		{
+			Emitter: "demoE3",
+			Message: map[string]interface{}{
+				"temp": 26.0,
+				"ts":   1541152486000,
+			},
+			Timestamp: 1541152486000,
+		},
+		{
+			Emitter: "demoE3",
+			Message: map[string]interface{}{
+				"temp": 27.0,
+				"ts":   1541152487000,
+			},
+			Timestamp: 1541152487000,
+		},
+		{
+			Emitter: "demoE3",
+			Message: map[string]interface{}{
+				"temp": 28.0,
+				"ts":   1541152488000,
+			},
+			Timestamp: 1541152488000,
+		},
+		{
+			Emitter: "demoE3",
+			Message: map[string]interface{}{
+				"temp": 28.5,
+				"ts":   1541152488500,
+			},
+			Timestamp: 1541152488500,
+		},
+		{
+			Emitter: "demoE3",
+			Message: map[string]interface{}{
+				"temp": 29.0,
+				"ts":   1541152489000,
+			},
+			Timestamp: 1541152488500,
+		},
+		{
+			Emitter: "demoE3",
+			Message: map[string]interface{}{
+				"temp": 29.5,
+				"ts":   1541152489500,
+			},
+			Timestamp: 1541152489500,
+		},
+	},
 }
 
 var Image, _ = getImg()

+ 19 - 1
internal/topo/topotest/window_rule_test.go

@@ -768,7 +768,7 @@ func TestWindow(t *testing.T) {
 
 func TestEventWindow(t *testing.T) {
 	// Reset
-	streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE", "demoE2"}
+	streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE", "demoE2", "demoE3"}
 	HandleStream(false, streamList, t)
 	tests := []RuleTest{
 		{
@@ -1435,6 +1435,24 @@ func TestEventWindow(t *testing.T) {
 				"sink_mockSink_0_records_out_total":  int64(1),
 			},
 		},
+		{
+			Name: `TestTUMBLINGWindowInterval14`,
+			Sql:  `SELECT temp,ts FROM demoE3 GROUP BY TUMBLINGWINDOW(ss, 1)`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"temp": float64(26.0),
+						"ts":   float64(1541152486000),
+					},
+				},
+				{
+					{
+						"temp": float64(27.0),
+						"ts":   float64(1541152487000),
+					},
+				},
+			},
+		},
 	}
 	HandleStream(true, streamList, t)
 	options := []*api.RuleOption{