Procházet zdrojové kódy

feat: support slidingwindow condition (#2008)

* support sliding window condition

Signed-off-by: yisaer <disxiaofei@163.com>

* fix test

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* add test

Signed-off-by: yisaer <disxiaofei@163.com>

* fix lint

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* add docs

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao před 1 rokem
rodič
revize
4a30840f74

+ 14 - 0
docs/en_US/sqls/windows.md

@@ -145,3 +145,17 @@ In event time mode, the watermark algorithm is used to calculate a window.
 ## Runtime error in window
 
 If the window receive an error (for example, the data type does not comply to the stream definition) from upstream, the error event will be forwarded immediately to the sink. The current window calculation will ignore the error event.
+
+## The trigger condition of the Sliding Window
+
+Each piece of data can trigger a window. We can filter the data that triggers the window through the `over` clause, and only the data that meets the filtering conditions will be used to trigger the window. The `over` clause can be used alone behind the sliding window, or it can be used after the `filter` clause, the `over` clause must be similar to `Over(When expr)`, for example:
+
+```sql
+SELECT * FROM demo GROUP BY COUNTWINDOW(3,1) FILTER(where revenue > 100) OVER(when revenue > 200)
+```
+
+or:
+
+```sql
+SELECT * FROM demo GROUP BY COUNTWINDOW(3,1) OVER(when revenue > 200)
+```

+ 14 - 0
docs/zh_CN/sqls/windows.md

@@ -145,3 +145,17 @@ CREATE STREAM demo (
 ## 窗口中的运行时错误
 
 如果窗口从上游接收到错误(例如,数据类型不符合流定义),则错误事件将立即转发到目标(sink)。 当前窗口计算将忽略错误事件。
+
+## 过滤窗口的触发条件
+
+对于滑动窗口,每一条数据都可以触发一个窗口,我们可以通过 `over` 子句将触发窗口的数据进行过滤,只会将满足过滤条件的数据去触发窗口。`over` 子句可以单独用在滑动窗口后面,也可以用在 `filter` 子句后,`over` 子句必须类似于 `Over(When expr)`,例如:
+
+```sql
+SELECT * FROM demo GROUP BY COUNTWINDOW(3,1) FILTER(where revenue > 100) OVER(when revenue > 200)
+```
+
+或者:
+
+```sql
+SELECT * FROM demo GROUP BY COUNTWINDOW(3,1) OVER(when revenue > 200)
+```

+ 21 - 2
internal/topo/node/event_window_trigger.go

@@ -115,6 +115,7 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.T
 		prevWindowEndTs int64
 		lastTicked      bool
 	)
+	isTupleMatch := make(map[*xsql.Tuple]bool, 0)
 	for {
 		select {
 		// process incoming item
@@ -158,8 +159,25 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.T
 						o.triggerTime = inputs[0].Timestamp
 					}
 					if windowEndTs > 0 {
-						if o.window.Delay > 0 && o.window.Type == ast.SLIDING_WINDOW {
-							o.delayTS = append(o.delayTS, windowEndTs+o.window.Delay)
+						if o.window.Type == ast.SLIDING_WINDOW {
+							var targetTuple *xsql.Tuple
+							if len(inputs) > 0 && o.window.Type == ast.SLIDING_WINDOW {
+								for _, t := range inputs {
+									if t.Timestamp == windowEndTs {
+										targetTuple = t
+										break
+									}
+								}
+							}
+							isMatch := isTupleMatch[targetTuple]
+							if isMatch {
+								if o.window.Delay > 0 && o.window.Type == ast.SLIDING_WINDOW {
+									o.delayTS = append(o.delayTS, windowEndTs+o.window.Delay)
+								} else {
+									inputs = o.scan(inputs, windowEndTs, ctx)
+								}
+							}
+							delete(isTupleMatch, targetTuple)
 						} else {
 							inputs = o.scan(inputs, windowEndTs, ctx)
 						}
@@ -184,6 +202,7 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.T
 				if o.triggerTime == 0 {
 					o.triggerTime = d.Timestamp
 				}
+				isTupleMatch[d] = o.isMatchCondition(ctx, d)
 				inputs = append(inputs, d)
 				o.statManager.ProcessTimeEnd()
 				_ = ctx.PutState(WindowInputsKey, inputs)

+ 53 - 18
internal/topo/node/window_op.go

@@ -32,12 +32,13 @@ import (
 )
 
 type WindowConfig struct {
-	Type        ast.WindowType
-	Length      int64
-	Interval    int64 // If the interval is not set, it is equals to Length
-	Delay       int64
-	RawInterval int
-	TimeUnit    ast.Token
+	TriggerCondition ast.Expr
+	Type             ast.WindowType
+	Length           int64
+	Interval         int64 // If the interval is not set, it is equals to Length
+	Delay            int64
+	RawInterval      int
+	TimeUnit         ast.Token
 }
 
 type WindowOperator struct {
@@ -50,9 +51,10 @@ type WindowOperator struct {
 	statManager metric.StatManager
 	ticker      *clock.Ticker // For processing time only
 	// states
-	triggerTime int64
-	msgCount    int
-	delayTS     []int64
+	triggerTime      int64
+	msgCount         int
+	delayTS          []int64
+	triggerCondition ast.Expr
 }
 
 const (
@@ -91,6 +93,9 @@ func NewWindowOp(name string, w WindowConfig, options *api.RuleOption) (*WindowO
 			o.trigger = w
 		}
 	}
+	if w.TriggerCondition != nil {
+		o.triggerCondition = w.TriggerCondition
+	}
 	o.delayTS = make([]int64, 0)
 	return o, nil
 }
@@ -331,16 +336,22 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 				case ast.NOT_WINDOW:
 					inputs = o.scan(inputs, d.Timestamp, ctx)
 				case ast.SLIDING_WINDOW:
-					if o.window.Delay > 0 {
-						go func(ts int64) {
-							after := time.After(time.Duration(o.window.Delay) * time.Millisecond)
-							select {
-							case <-after:
-								delayCh <- ts
-							}
-						}(d.Timestamp + o.window.Delay)
-					} else {
+					if o.window.Type != ast.SLIDING_WINDOW {
 						inputs = o.scan(inputs, d.Timestamp, ctx)
+					} else {
+						if o.isMatchCondition(ctx, d) {
+							if o.window.Delay > 0 {
+								go func(ts int64) {
+									after := time.After(time.Duration(o.window.Delay) * time.Millisecond)
+									select {
+									case <-after:
+										delayCh <- ts
+									}
+								}(d.Timestamp + o.window.Delay)
+							} else {
+								inputs = o.scan(inputs, d.Timestamp, ctx)
+							}
+						}
 					}
 				case ast.SESSION_WINDOW:
 					if timeoutTicker != nil {
@@ -598,3 +609,27 @@ func (o *WindowOperator) GetMetrics() [][]interface{} {
 		return nil
 	}
 }
+
+func (o *WindowOperator) isMatchCondition(ctx api.StreamContext, d *xsql.Tuple) bool {
+	if o.triggerCondition == nil || o.window.Type != ast.SLIDING_WINDOW {
+		return true
+	}
+	log := ctx.GetLogger()
+	fv, _ := xsql.NewFunctionValuersForOp(ctx)
+	ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
+	result := ve.Eval(o.triggerCondition)
+	// not match trigger condition
+	if result == nil {
+		return false
+	}
+	switch v := result.(type) {
+	case error:
+		log.Errorf("window %s trigger condition meet error: %v", o.name, v)
+		return false
+	case bool:
+		// match trigger condition
+		return v
+	default:
+		return false
+	}
+}

+ 10 - 6
internal/topo/planner/planner.go

@@ -147,12 +147,13 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 			rawInterval = t.interval
 		}
 		op, err = node.NewWindowOp(fmt.Sprintf("%d_window", newIndex), node.WindowConfig{
-			Type:        t.wtype,
-			Delay:       d,
-			Length:      l,
-			Interval:    i,
-			RawInterval: rawInterval,
-			TimeUnit:    t.timeUnit,
+			Type:             t.wtype,
+			Delay:            d,
+			Length:           l,
+			Interval:         i,
+			RawInterval:      rawInterval,
+			TimeUnit:         t.timeUnit,
+			TriggerCondition: t.triggerCondition,
 		}, options)
 		if err != nil {
 			return nil, 0, err
@@ -369,6 +370,9 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 			if w.Filter != nil {
 				wp.condition = w.Filter
 			}
+			if w.TriggerCondition != nil {
+				wp.triggerCondition = w.TriggerCondition
+			}
 			// TODO calculate limit
 			// TODO incremental aggregate
 			wp.SetChildren(children)

+ 9 - 8
internal/topo/planner/windowPlan.go

@@ -18,14 +18,15 @@ import "github.com/lf-edge/ekuiper/pkg/ast"
 
 type WindowPlan struct {
 	baseLogicalPlan
-	condition   ast.Expr
-	wtype       ast.WindowType
-	delay       int64
-	length      int
-	interval    int // If interval is not set, it is equals to Length
-	timeUnit    ast.Token
-	limit       int // If limit is not positive, there will be no limit
-	isEventTime bool
+	triggerCondition ast.Expr
+	condition        ast.Expr
+	wtype            ast.WindowType
+	delay            int64
+	length           int
+	interval         int // If interval is not set, it is equals to Length
+	timeUnit         ast.Token
+	limit            int // If limit is not positive, there will be no limit
+	isEventTime      bool
 }
 
 func (p WindowPlan) Init() *WindowPlan {

+ 17 - 0
internal/topo/topotest/window_rule_test.go

@@ -26,6 +26,23 @@ func TestWindow(t *testing.T) {
 	HandleStream(false, streamList, t)
 	tests := []RuleTest{
 		{
+			Name: `TestWindowRule0`,
+			Sql:  `SELECT size,color FROM demo GROUP BY SlidingWindow(ss, 5) Filter (where color = "red") Over (when size = 1)`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"size":  float64(3),
+						"color": "red",
+					},
+					{
+						"size":  float64(1),
+						"color": "red",
+					},
+				},
+			},
+			M: map[string]interface{}{},
+		},
+		{
 			Name: `TestWindowRule1`,
 			Sql:  `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
 			R: [][]map[string]interface{}{

+ 29 - 0
internal/xsql/parser.go

@@ -918,6 +918,14 @@ func (p *Parser) parseCall(n string) (ast.Expr, error) {
 		} else if f != nil {
 			win.Filter = f
 		}
+		// parse over when clause
+		c, err := p.ParseOver4Window()
+		if err != nil {
+			return nil, err
+		} else if c != nil {
+			win.TriggerCondition = c
+		}
+
 		return win, nil
 	}
 }
@@ -1516,6 +1524,27 @@ func (p *Parser) parseStreamOptions() (*ast.Options, error) {
 	return opts, nil
 }
 
+func (p *Parser) ParseOver4Window() (ast.Expr, error) {
+	if tok, _ := p.scanIgnoreWhitespace(); tok != ast.OVER {
+		p.unscan()
+		return nil, nil
+	}
+	if tok, lit := p.scanIgnoreWhitespace(); tok != ast.LPAREN {
+		return nil, fmt.Errorf("Found %q after OVER, expect parentheses.", lit)
+	}
+	if tok, lit := p.scanIgnoreWhitespace(); tok != ast.WHEN {
+		return nil, fmt.Errorf("Found %q after OVER(, expect WHEN.", lit)
+	}
+	expr, err := p.ParseExpr()
+	if err != nil {
+		return nil, err
+	}
+	if tok, lit := p.scanIgnoreWhitespace(); tok != ast.RPAREN {
+		return nil, fmt.Errorf("Found %q after OVER, expect right parentheses.", lit)
+	}
+	return expr, nil
+}
+
 // Only support filter on window now
 func (p *Parser) parseFilter() (ast.Expr, error) {
 	if tok, _ := p.scanIgnoreWhitespace(); tok != ast.FILTER {

+ 63 - 0
internal/xsql/parser_test.go

@@ -3038,6 +3038,69 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 		err  string
 	}{
 		{
+			s: `SELECT f1 FROM tbl GROUP BY SLIDINGWINDOW(ms, 5) OVER (WHEN a > 5)`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr:  &ast.FieldRef{Name: "f1", StreamName: ast.DefaultStream},
+						Name:  "f1",
+						AName: "",
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+				Dimensions: ast.Dimensions{
+					ast.Dimension{
+						Expr: &ast.Window{
+							WindowType: ast.SLIDING_WINDOW,
+							Length:     &ast.IntegerLiteral{Val: 5},
+							Interval:   &ast.IntegerLiteral{Val: 0},
+							TimeUnit:   &ast.TimeLiteral{Val: ast.MS},
+							TriggerCondition: &ast.BinaryExpr{
+								OP:  ast.GT,
+								LHS: &ast.FieldRef{Name: "a", StreamName: ast.DefaultStream},
+								RHS: &ast.IntegerLiteral{Val: 5},
+							},
+							Delay: &ast.IntegerLiteral{Val: 0},
+						},
+					},
+				},
+			},
+		},
+		{
+			s: `SELECT f1 FROM tbl GROUP BY SLIDINGWINDOW(ms, 5) FILTER (WHERE a > 4) OVER (WHEN a > 5)`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr:  &ast.FieldRef{Name: "f1", StreamName: ast.DefaultStream},
+						Name:  "f1",
+						AName: "",
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+				Dimensions: ast.Dimensions{
+					ast.Dimension{
+						Expr: &ast.Window{
+							WindowType: ast.SLIDING_WINDOW,
+							Length:     &ast.IntegerLiteral{Val: 5},
+							Interval:   &ast.IntegerLiteral{Val: 0},
+							TimeUnit:   &ast.TimeLiteral{Val: ast.MS},
+							TriggerCondition: &ast.BinaryExpr{
+								OP:  ast.GT,
+								LHS: &ast.FieldRef{Name: "a", StreamName: ast.DefaultStream},
+								RHS: &ast.IntegerLiteral{Val: 5},
+							},
+							Filter: &ast.BinaryExpr{
+								OP:  ast.GT,
+								LHS: &ast.FieldRef{Name: "a", StreamName: ast.DefaultStream},
+								RHS: &ast.IntegerLiteral{Val: 4},
+							},
+							Delay: &ast.IntegerLiteral{Val: 0},
+						},
+					},
+				},
+			},
+		},
+		{
 			s: `SELECT f1 FROM tbl GROUP BY SLIDINGWINDOW(ms, 5)`,
 			stmt: &ast.SelectStatement{
 				Fields: []ast.Field{

+ 7 - 6
pkg/ast/statement.go

@@ -172,12 +172,13 @@ const (
 )
 
 type Window struct {
-	WindowType WindowType
-	Delay      *IntegerLiteral
-	Length     *IntegerLiteral
-	Interval   *IntegerLiteral
-	TimeUnit   *TimeLiteral
-	Filter     Expr
+	TriggerCondition Expr
+	WindowType       WindowType
+	Delay            *IntegerLiteral
+	Length           *IntegerLiteral
+	Interval         *IntegerLiteral
+	TimeUnit         *TimeLiteral
+	Filter           Expr
 	Expr
 }
 

+ 1 - 0
pkg/ast/visitor.go

@@ -77,6 +77,7 @@ func Walk(v Visitor, node Node) {
 		Walk(v, n.Length)
 		Walk(v, n.Interval)
 		Walk(v, n.Filter)
+		Walk(v, n.TriggerCondition)
 
 	case SortFields:
 		for _, sf := range n {