Procházet zdrojové kódy

fix: let window Left-closed right-open (#2053)

* add code

Signed-off-by: yisaer <disxiaofei@163.com>

* add test

Signed-off-by: yisaer <disxiaofei@163.com>

* fix lint

Signed-off-by: yisaer <disxiaofei@163.com>

* fix test

Signed-off-by: yisaer <disxiaofei@163.com>

* fix test

Signed-off-by: yisaer <disxiaofei@163.com>

* fix test

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao před 1 rokem
rodič
revize
60764effa3

+ 33 - 17
internal/topo/node/window_op.go

@@ -341,21 +341,17 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 				case ast.NOT_WINDOW:
 					inputs = o.scan(inputs, d.Timestamp, ctx)
 				case ast.SLIDING_WINDOW:
-					if o.window.Type != ast.SLIDING_WINDOW {
-						inputs = o.scan(inputs, d.Timestamp, ctx)
-					} else {
-						if o.isMatchCondition(ctx, d) {
-							if o.window.Delay > 0 {
-								go func(ts int64) {
-									after := time.After(time.Duration(o.window.Delay) * time.Millisecond)
-									select {
-									case <-after:
-										delayCh <- ts
-									}
-								}(d.Timestamp + o.window.Delay)
-							} else {
-								inputs = o.scan(inputs, d.Timestamp, ctx)
-							}
+					if o.isMatchCondition(ctx, d) {
+						if o.window.Delay > 0 {
+							go func(ts int64) {
+								after := time.After(time.Duration(o.window.Delay) * time.Millisecond)
+								select {
+								case <-after:
+									delayCh <- ts
+								}
+							}(d.Timestamp + o.window.Delay)
+						} else {
+							inputs = o.scan(inputs, d.Timestamp, ctx)
 						}
 					}
 				case ast.SESSION_WINDOW:
@@ -526,6 +522,20 @@ func (tl *TupleList) getRestTuples() []*xsql.Tuple {
 	return tl.tuples[len(tl.tuples)-tl.size+1:]
 }
 
+func (o *WindowOperator) isTimeRelatedWindow() bool {
+	switch o.window.Type {
+	case ast.SLIDING_WINDOW:
+		return o.window.Delay > 0
+	case ast.TUMBLING_WINDOW:
+		return true
+	case ast.HOPPING_WINDOW:
+		return true
+	case ast.SESSION_WINDOW:
+		return true
+	}
+	return false
+}
+
 func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.StreamContext) []*xsql.Tuple {
 	log := ctx.GetLogger()
 	log.Debugf("window %s triggered at %s(%d)", o.name, time.Unix(triggerTime/1000, triggerTime%1000), triggerTime)
@@ -560,8 +570,14 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
 			inputs[i] = tuple
 			i++
 		}
-		if tuple.Timestamp <= triggerTime {
-			results = results.AddTuple(tuple)
+		if o.isTimeRelatedWindow() {
+			if tuple.Timestamp < triggerTime {
+				results = results.AddTuple(tuple)
+			}
+		} else {
+			if tuple.Timestamp <= triggerTime {
+				results = results.AddTuple(tuple)
+			}
 		}
 	}
 

+ 2 - 0
internal/topo/topotest/mock_topo.go

@@ -270,6 +270,8 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 		var sql string
 		if createOrDrop {
 			switch name {
+			case "demoE2":
+				sql = `CREATE STREAM demoE2 () WITH (DATASOURCE="demoE2", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
 			case "demoArr":
 				sql = `CREATE STREAM demoArr () WITH (DATASOURCE="demoArr", TYPE="mock", FORMAT="json", KEY="ts");`
 			case "demo":

+ 29 - 0
internal/topo/topotest/mocknode/mock_data.go

@@ -1025,6 +1025,35 @@ var TestData = map[string][]*xsql.Tuple{
 			Timestamp: 1541152489253,
 		},
 	},
+	"demoE2": {
+		{
+			Emitter: "demoE2",
+			Message: map[string]interface{}{
+				"temp": 27.5,
+				"hum":  59,
+				"ts":   1541152486000,
+			},
+			Timestamp: 1541152486000,
+		},
+		{
+			Emitter: "demoE2",
+			Message: map[string]interface{}{
+				"temp": 25.5,
+				"hum":  65,
+				"ts":   1541152487000,
+			},
+			Timestamp: 1541152487000,
+		},
+		{
+			Emitter: "demoE2",
+			Message: map[string]interface{}{
+				"temp": 25.5,
+				"hum":  65,
+				"ts":   1541152488000,
+			},
+			Timestamp: 1541152488000,
+		},
+	},
 }
 
 var Image, _ = getImg()

+ 77 - 4
internal/topo/topotest/window_rule_test.go

@@ -22,7 +22,7 @@ import (
 
 func TestWindow(t *testing.T) {
 	// Reset
-	streamList := []string{"demo", "demoError", "demo1", "sessionDemo", "table1"}
+	streamList := []string{"demo", "demoError", "demo1", "sessionDemo", "table1", "demoE2"}
 	HandleStream(false, streamList, t)
 	tests := []RuleTest{
 		{
@@ -747,12 +747,14 @@ func TestWindow(t *testing.T) {
 		{
 			BufferLength: 100,
 			SendError:    true,
-		}, {
+		},
+		{
 			BufferLength:       100,
 			SendError:          true,
 			Qos:                api.AtLeastOnce,
 			CheckpointInterval: 5000,
-		}, {
+		},
+		{
 			BufferLength:       100,
 			SendError:          true,
 			Qos:                api.ExactlyOnce,
@@ -766,7 +768,7 @@ func TestWindow(t *testing.T) {
 
 func TestEventWindow(t *testing.T) {
 	// Reset
-	streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE"}
+	streamList := []string{"demoE", "demoErr", "demo1E", "sessionDemoE", "demoE2"}
 	HandleStream(false, streamList, t)
 	tests := []RuleTest{
 		{
@@ -1362,6 +1364,77 @@ func TestEventWindow(t *testing.T) {
 				"op_3_window_0_exceptions_total":  int64(0),
 			},
 		},
+		{
+			Name: `TestSlidingWindowInterval11`,
+			Sql:  `SELECT temp FROM demoE2 GROUP BY SLIDINGWINDOW(ss, 1, 1)`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"temp": float64(27.5),
+					},
+				},
+			},
+			M: map[string]interface{}{
+				"source_demoE2_0_records_in_total":   int64(3),
+				"source_demoE2_0_records_out_total":  int64(3),
+				"op_2_watermark_0_records_in_total":  int64(3),
+				"op_2_watermark_0_records_out_total": int64(2),
+				"op_3_window_0_records_in_total":     int64(2),
+				"op_3_window_0_records_out_total":    int64(1),
+				"sink_mockSink_0_records_in_total":   int64(1),
+				"sink_mockSink_0_records_out_total":  int64(1),
+			},
+		},
+		{
+			Name: `TestSlidingWindowInterval12`,
+			Sql:  `SELECT temp FROM demoE2 GROUP BY SLIDINGWINDOW(ss, 1)`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"temp": float64(27.5),
+					},
+				},
+				{
+					{
+						"temp": float64(27.5),
+					},
+					{
+						"temp": float64(25.5),
+					},
+				},
+			},
+			M: map[string]interface{}{
+				"source_demoE2_0_records_in_total":   int64(3),
+				"source_demoE2_0_records_out_total":  int64(3),
+				"op_2_watermark_0_records_in_total":  int64(3),
+				"op_2_watermark_0_records_out_total": int64(2),
+				"op_3_window_0_records_in_total":     int64(2),
+				"op_3_window_0_records_out_total":    int64(2),
+				"sink_mockSink_0_records_in_total":   int64(2),
+				"sink_mockSink_0_records_out_total":  int64(2),
+			},
+		},
+		{
+			Name: `TestTUMBLINGWindowInterval13`,
+			Sql:  `SELECT temp FROM demoE2 GROUP BY TUMBLINGWINDOW(ss, 1)`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"temp": float64(27.5),
+					},
+				},
+			},
+			M: map[string]interface{}{
+				"source_demoE2_0_records_in_total":   int64(3),
+				"source_demoE2_0_records_out_total":  int64(3),
+				"op_2_watermark_0_records_in_total":  int64(3),
+				"op_2_watermark_0_records_out_total": int64(2),
+				"op_3_window_0_records_in_total":     int64(2),
+				"op_3_window_0_records_out_total":    int64(1),
+				"sink_mockSink_0_records_in_total":   int64(1),
+				"sink_mockSink_0_records_out_total":  int64(1),
+			},
+		},
 	}
 	HandleStream(true, streamList, t)
 	options := []*api.RuleOption{