Преглед на файлове

feat: support trigger meta func in window condition (#2038)

1. last_hit_time will return 0 by default.
2. Allow to these funcs in sliding window trigger condition as it is also event trigger.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying преди 1 година
родител
ревизия
e04113f228

+ 18 - 0
docs/en_US/sqls/functions/other_functions.md

@@ -34,6 +34,18 @@ tstamp()
 
 Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970.
 
+## EVENT_TIME
+
+```text
+event_time()
+```
+
+Returns the int64 timestamp of the current processing event.
+It may be earlier then the current time due to processing
+latency.
+
+If it is used in a window rule as aggregate function, it returns the window end time.
+
 ## RULE_ID
 
 ```text
@@ -77,6 +89,9 @@ If the function is used in `WHERE` clause, it will only update the count when th
 Notice that, this function is not supported in aggregate rule except using in `WHEN` clause of a sliding window.
 To get the hit count of an aggregate rule, use [last_agg_hit_count](./aggregate_functions.md#lastagghitcount) instead.
 
+If used in a sliding window trigger condition,
+the status will be updated only when the trigger condition is met regardless of the rule trigger result.
+
 ## LAST_HIT_TIME
 
 ```text
@@ -90,6 +105,9 @@ If the function is used in `WHERE` clause, it will only update the timestamp whe
 Notice that, this function is not supported in aggregate rule except using in `WHEN` clause of a sliding window.
 To get the hit time of an aggregate rule, use [last_agg_hit_time](./aggregate_functions.md#lastagghittime) instead.
 
+If used in a sliding window trigger condition,
+the status will be updated only when the trigger condition is met regardless of the rule trigger result.
+
 ## WINDOW_START
 
 ```text

+ 13 - 0
docs/zh_CN/sqls/functions/other_functions.md

@@ -34,6 +34,15 @@ tstamp()
 
 返回当前时间戳,以1970年1月1日星期四00:00:00协调世界时(UTC)为单位。
 
+## EVENT_TIME
+
+```text
+event_time()
+```
+
+返回当前处理事件的 int64 格式时间戳。由于处理延迟,该时间戳可能早于当前时间。
+若在窗口规则中用作聚合函数,则返回窗口结束时间。
+
 ## RULE_ID
 
 ```text
@@ -73,6 +82,8 @@ last_hit_count()
 该函数仅可用于非聚合规则中或 Sliding Window
 的条件中。若要在聚合规则中实现类似功能,请使用 [last_agg_hit_count](./aggregate_functions.md#lastagghitcount)。
 
+若在滑动窗口触发条件中使用,当触发条件满足时就会更新计数而无需考虑规则整体触发情况。
+
 ## LAST_HIT_TIME
 
 ```text
@@ -83,6 +94,8 @@ last_hit_time()
 该函数仅可用于非聚合规则中或 Sliding Window
 的条件中。若要在聚合规则中实现类似功能,请使用 [last_agg_hit_time](./aggregate_functions.md#lastagghittime)。
 
+若在滑动窗口触发条件中使用,当触发条件满足时就会更新时间而无需考虑规则整体触发情况。
+
 ## WINDOW_START
 
 ```text

+ 6 - 0
internal/binder/function/funcs_global_state.go

@@ -55,6 +55,9 @@ func registerGlobalStateFunc() {
 			if err != nil {
 				return err, false
 			}
+			if lv == nil {
+				lv = 0
+			}
 			if args0 {
 				err := ctx.PutState(timeKey, args1)
 				if err != nil {
@@ -93,6 +96,9 @@ func registerGlobalStateFunc() {
 			if err != nil {
 				return err, false
 			}
+			if lv == nil {
+				lv = 0
+			}
 			if args0 {
 				err := ctx.PutState(aggTimeKey, args1)
 				if err != nil {

+ 1 - 1
internal/binder/function/funcs_global_state_test.go

@@ -59,7 +59,7 @@ func TestHitFuncs(t *testing.T) {
 				int64(10100),
 			},
 			result: []any{
-				0, nil, 0, nil,
+				0, 0, 0, 0,
 			},
 		},
 		{

+ 8 - 0
internal/topo/node/window_op.go

@@ -33,6 +33,7 @@ import (
 
 type WindowConfig struct {
 	TriggerCondition ast.Expr
+	StateFuncs       []*ast.Call
 	Type             ast.WindowType
 	Length           int64
 	Interval         int64 // If the interval is not set, it is equals to Length
@@ -56,6 +57,7 @@ type WindowOperator struct {
 	delayTS          []int64
 	triggerTS        []int64
 	triggerCondition ast.Expr
+	stateFuncs       []*ast.Call
 }
 
 const (
@@ -96,6 +98,7 @@ func NewWindowOp(name string, w WindowConfig, options *api.RuleOption) (*WindowO
 	}
 	if w.TriggerCondition != nil {
 		o.triggerCondition = w.TriggerCondition
+		o.stateFuncs = w.StateFuncs
 	}
 	o.delayTS = make([]int64, 0)
 	o.triggerTS = make([]int64, 0)
@@ -630,6 +633,11 @@ func (o *WindowOperator) isMatchCondition(ctx api.StreamContext, d *xsql.Tuple)
 		return false
 	case bool:
 		// match trigger condition
+		if v {
+			for _, f := range o.stateFuncs {
+				_ = ve.Eval(f)
+			}
+		}
 		return v
 	default:
 		return false

+ 5 - 1
internal/topo/planner/analyzer.go

@@ -153,7 +153,7 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
 		return nil, nil, walkErr
 	}
 	// walk sources at last to let them run firstly
-	// because other clause may depend on the alias defined here
+	// because another clause may depend on the alias defined here
 	ast.WalkFunc(s.Fields, func(n ast.Node) bool {
 		switch f := n.(type) {
 		case *ast.Call:
@@ -213,6 +213,10 @@ func validate(s *ast.SelectStatement) (err error) {
 				err = fmt.Errorf("function %s is not allowed in an aggregate query", f.Name)
 				return false
 			}
+		case *ast.Window:
+			// agg func check is done in dimensions.
+			// in window trigger condition, NoAggFunc is allowed unlike normal condition so return false to skip that check
+			return false
 		}
 		return true
 	})

+ 11 - 3
internal/topo/planner/analyzer_test.go

@@ -21,6 +21,8 @@ import (
 	"strings"
 	"testing"
 
+	"github.com/stretchr/testify/assert"
+
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/xsql"
@@ -132,6 +134,14 @@ var tests = []struct {
 		sql: `SELECT last_hit_time() FROM src1 GROUP BY SlidingWindow(ss,5) HAVING last_agg_hit_count() < 3`,
 		r:   newErrorStruct("function last_hit_time is not allowed in an aggregate query"),
 	},
+	{ // 18
+		sql: `SELECT * FROM src1 GROUP BY SlidingWindow(ss,5) Over (WHEN last_hit_time() > 1) HAVING last_agg_hit_count() < 3`,
+		r:   newErrorStruct(""),
+	},
+	//{ // 19 already captured in parser
+	//	sql: `SELECT * FROM src1 GROUP BY SlidingWindow(ss,5) Over (WHEN abs(sum(a)) > 1) HAVING last_agg_hit_count() < 3`,
+	//	r:   newErrorStruct("error compile sql: Not allowed to call aggregate functions in GROUP BY clause."),
+	//},
 }
 
 func Test_validation(t *testing.T) {
@@ -190,9 +200,7 @@ func Test_validation(t *testing.T) {
 			CheckpointInterval: 0,
 			SendError:          true,
 		}, store)
-		if !reflect.DeepEqual(tt.r.err, testx.Errstring(err)) {
-			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.sql, tt.r.err, err)
-		}
+		assert.Equal(t, tt.r.err, testx.Errstring(err))
 	}
 }
 

+ 2 - 0
internal/topo/planner/planner.go

@@ -146,6 +146,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 		case ast.HOPPING_WINDOW:
 			rawInterval = t.interval
 		}
+		t.ExtractStateFunc()
 		op, err = node.NewWindowOp(fmt.Sprintf("%d_window", newIndex), node.WindowConfig{
 			Type:             t.wtype,
 			Delay:            d,
@@ -154,6 +155,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 			RawInterval:      rawInterval,
 			TimeUnit:         t.timeUnit,
 			TriggerCondition: t.triggerCondition,
+			StateFuncs:       t.stateFuncs,
 		}, options)
 		if err != nil {
 			return nil, 0, err

+ 42 - 1
internal/topo/planner/windowPlan.go

@@ -14,7 +14,10 @@
 
 package planner
 
-import "github.com/lf-edge/ekuiper/pkg/ast"
+import (
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
 
 type WindowPlan struct {
 	baseLogicalPlan
@@ -27,6 +30,8 @@ type WindowPlan struct {
 	timeUnit         ast.Token
 	limit            int // If limit is not positive, there will be no limit
 	isEventTime      bool
+
+	stateFuncs []*ast.Call
 }
 
 func (p WindowPlan) Init() *WindowPlan {
@@ -56,5 +61,41 @@ func (p *WindowPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPla
 
 func (p *WindowPlan) PruneColumns(fields []ast.Expr) error {
 	f := getFields(p.condition)
+	f = append(f, getFields(p.triggerCondition)...)
 	return p.baseLogicalPlan.PruneColumns(append(fields, f...))
 }
+
+func (p *WindowPlan) ExtractStateFunc() {
+	aliases := make(map[string]ast.Expr)
+	ast.WalkFunc(p.triggerCondition, func(n ast.Node) bool {
+		switch f := n.(type) {
+		case *ast.Call:
+			p.transform(f)
+		case *ast.FieldRef:
+			if f.AliasRef != nil {
+				aliases[f.Name] = f.AliasRef.Expression
+			}
+		}
+		return true
+	})
+	for _, ex := range aliases {
+		ast.WalkFunc(ex, func(n ast.Node) bool {
+			switch f := n.(type) {
+			case *ast.Call:
+				p.transform(f)
+			}
+			return true
+		})
+	}
+}
+
+func (p *WindowPlan) transform(f *ast.Call) {
+	if _, ok := xsql.ImplicitStateFuncs[f.Name]; ok {
+		f.Cached = true
+		p.stateFuncs = append(p.stateFuncs, &ast.Call{
+			Name:     f.Name,
+			FuncId:   f.FuncId,
+			FuncType: f.FuncType,
+		})
+	}
+}

+ 1 - 0
internal/topo/topotest/rule_test.go

@@ -978,6 +978,7 @@ func TestSingleSQLWithEventTime(t *testing.T) {
 					"size":  float64(2),
 					"ts":    float64(1541152487632),
 					"lc":    float64(0),
+					"lt":    float64(0),
 					"et":    float64(1541152487632),
 				}},
 				{{

+ 50 - 130
internal/topo/topotest/window_rule_test.go

@@ -250,31 +250,19 @@ func TestWindow(t *testing.T) {
 		},
 		{
 			Name: `TestWindowRule4`,
-			Sql:  `SELECT color, count(*) as c FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
+			Sql:  `SELECT color, count(*) as c FROM demo GROUP BY SlidingWindow(ss, 2) OVER(WHEN ts - last_hit_time() > 1000) , color ORDER BY color`,
 			R: [][]map[string]interface{}{
 				{{
 					"color": "red",
 					"c":     float64(1),
 				}}, {{
 					"color": "blue",
-					"c":     float64(1),
-				}, {
-					"color": "red",
-					"c":     float64(1),
-				}}, {{
-					"color": "blue",
 					"c":     float64(2),
 				}, {
 					"color": "red",
 					"c":     float64(1),
 				}}, {{
 					"color": "blue",
-					"c":     float64(2),
-				}, {
-					"color": "yellow",
-					"c":     float64(1),
-				}}, {{
-					"color": "blue",
 					"c":     float64(1),
 				}, {
 					"color": "red",
@@ -287,12 +275,12 @@ func TestWindow(t *testing.T) {
 			M: map[string]interface{}{
 				"op_5_project_0_exceptions_total":   int64(0),
 				"op_5_project_0_process_latency_us": int64(0),
-				"op_5_project_0_records_in_total":   int64(5),
-				"op_5_project_0_records_out_total":  int64(5),
+				"op_5_project_0_records_in_total":   int64(3),
+				"op_5_project_0_records_out_total":  int64(3),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(5),
-				"sink_mockSink_0_records_out_total": int64(5),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
 
 				"source_demo_0_exceptions_total":  int64(0),
 				"source_demo_0_records_in_total":  int64(5),
@@ -301,17 +289,7 @@ func TestWindow(t *testing.T) {
 				"op_2_window_0_exceptions_total":   int64(0),
 				"op_2_window_0_process_latency_us": int64(0),
 				"op_2_window_0_records_in_total":   int64(5),
-				"op_2_window_0_records_out_total":  int64(5),
-
-				"op_3_aggregate_0_exceptions_total":   int64(0),
-				"op_3_aggregate_0_process_latency_us": int64(0),
-				"op_3_aggregate_0_records_in_total":   int64(5),
-				"op_3_aggregate_0_records_out_total":  int64(5),
-
-				"op_4_order_0_exceptions_total":   int64(0),
-				"op_4_order_0_process_latency_us": int64(0),
-				"op_4_order_0_records_in_total":   int64(5),
-				"op_4_order_0_records_out_total":  int64(5),
+				"op_2_window_0_records_out_total":  int64(3),
 			},
 		},
 		{
@@ -825,6 +803,7 @@ func TestEventWindow(t *testing.T) {
 				{{
 					"count": float64(1),
 					"lc":    float64(0),
+					"lt":    float64(0),
 					"et":    float64(1541152487000),
 				}},
 				{{
@@ -908,7 +887,7 @@ func TestEventWindow(t *testing.T) {
 		},
 		{
 			Name: `TestEventWindowRule3`,
-			Sql:  `SELECT color, temp, demoE.ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
+			Sql:  `SELECT color, temp, demoE.ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1) OVER (WHEN demoE.ts - last_hit_time() > 400 or demo1E.ts - last_hit_time() > 400)`,
 			R: [][]map[string]interface{}{
 				{
 					{
@@ -926,36 +905,10 @@ func TestEventWindow(t *testing.T) {
 				},
 				{
 					{
-						"color": "red",
-						"temp":  25.5,
-						"ts":    float64(1541152486013),
-					},
-				},
-				{
-					{
-						"color": "blue",
-						"temp":  28.1,
-						"ts":    float64(1541152487632),
-					},
-				},
-				{
-					{
-						"color": "blue",
-						"temp":  28.1,
-						"ts":    float64(1541152487632),
-					},
-				},
-				{
-					{
 						"color": "blue",
 						"temp":  28.1,
 						"ts":    float64(1541152487632),
 					},
-					{
-						"color": "yellow",
-						"temp":  27.4,
-						"ts":    float64(1541152488442),
-					},
 				},
 				{
 					{
@@ -981,28 +934,16 @@ func TestEventWindow(t *testing.T) {
 						"ts":    float64(1541152489252),
 					},
 				},
-				{
-					{
-						"color": "yellow",
-						"temp":  27.4,
-						"ts":    float64(1541152488442),
-					},
-					{
-						"color": "red",
-						"temp":  25.5,
-						"ts":    float64(1541152489252),
-					},
-				},
 			},
 			M: map[string]interface{}{
 				"op_6_project_0_exceptions_total":   int64(0),
 				"op_6_project_0_process_latency_us": int64(0),
-				"op_6_project_0_records_in_total":   int64(9),
-				"op_6_project_0_records_out_total":  int64(9),
+				"op_6_project_0_records_in_total":   int64(5),
+				"op_6_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(9),
-				"sink_mockSink_0_records_out_total": int64(9),
+				"sink_mockSink_0_records_in_total":  int64(5),
+				"sink_mockSink_0_records_out_total": int64(5),
 
 				"source_demoE_0_exceptions_total":  int64(0),
 				"source_demoE_0_records_in_total":  int64(6),
@@ -1015,74 +956,53 @@ func TestEventWindow(t *testing.T) {
 				"op_4_window_0_exceptions_total":   int64(0),
 				"op_4_window_0_process_latency_us": int64(0),
 				"op_4_window_0_records_in_total":   int64(9),
-				"op_4_window_0_records_out_total":  int64(9),
+				"op_4_window_0_records_out_total":  int64(5),
 
 				"op_5_join_0_exceptions_total":   int64(0),
 				"op_5_join_0_process_latency_us": int64(0),
-				"op_5_join_0_records_in_total":   int64(9),
-				"op_5_join_0_records_out_total":  int64(9),
+				"op_5_join_0_records_in_total":   int64(5),
+				"op_5_join_0_records_out_total":  int64(5),
 			},
 		},
 		{
 			Name: `TestEventWindowRule4`,
-			Sql:  `SELECT  window_start() as ws, color, window_end() as we FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
+			Sql:  `SELECT  window_start() as ws, color, window_end() as we FROM demoE GROUP BY SlidingWindow(ss, 2) OVER (WHEN ts - last_hit_time() > 1000), color ORDER BY color`,
 			R: [][]map[string]interface{}{
-				{
-					{
-						"color": "red",
-						"ws":    float64(1541152484013),
-						"we":    float64(1541152486013),
-					},
-				},
-				{
-					{
-						"color": "blue",
-						"ws":    float64(1541152485632),
-						"we":    float64(1541152487632),
-					}, {
-						"color": "red",
-						"ws":    float64(1541152485632),
-						"we":    float64(1541152487632),
-					},
-				},
-				{
-					{
-						"color": "blue",
-						"ws":    float64(1541152486442),
-						"we":    float64(1541152488442),
-					}, {
-						"color": "yellow",
-						"ws":    float64(1541152486442),
-						"we":    float64(1541152488442),
-					},
-				},
-				{
-					{
-						"color": "blue",
-						"ws":    float64(1541152487252),
-						"we":    float64(1541152489252),
-					},
-					{
-						"color": "red",
-						"ws":    float64(1541152487252),
-						"we":    float64(1541152489252),
-					},
-					{
-						"color": "yellow",
-						"ws":    float64(1541152487252),
-						"we":    float64(1541152489252),
-					},
-				},
+				{{
+					"color": "red",
+					"ws":    float64(1541152484013),
+					"we":    float64(1541152486013),
+				}}, {{
+					"color": "blue",
+					"ws":    float64(1541152485632),
+					"we":    float64(1541152487632),
+				}, {
+					"color": "red",
+					"ws":    float64(1541152485632),
+					"we":    float64(1541152487632),
+				}}, {{
+					"color": "blue",
+					"ws":    float64(1541152487252),
+					"we":    float64(1541152489252),
+				}, {
+					"color": "red",
+					"ws":    float64(1541152487252),
+					"we":    float64(1541152489252),
+				}, {
+					"color": "yellow",
+					"ws":    float64(1541152487252),
+					"we":    float64(1541152489252),
+				}},
 			},
 			M: map[string]interface{}{
 				"op_6_project_0_exceptions_total":   int64(0),
 				"op_6_project_0_process_latency_us": int64(0),
-				"op_6_project_0_records_in_total":   int64(4),
-				"op_6_project_0_records_out_total":  int64(4),
+				"op_6_project_0_records_in_total":   int64(3),
+				"op_6_project_0_records_out_total":  int64(3),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(4),
-				"sink_mockSink_0_records_out_total": int64(4),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
 
 				"source_demoE_0_exceptions_total":  int64(0),
 				"source_demoE_0_records_in_total":  int64(6),
@@ -1091,17 +1011,17 @@ func TestEventWindow(t *testing.T) {
 				"op_3_window_0_exceptions_total":   int64(0),
 				"op_3_window_0_process_latency_us": int64(0),
 				"op_3_window_0_records_in_total":   int64(4),
-				"op_3_window_0_records_out_total":  int64(4),
+				"op_3_window_0_records_out_total":  int64(3),
 
 				"op_4_aggregate_0_exceptions_total":   int64(0),
 				"op_4_aggregate_0_process_latency_us": int64(0),
-				"op_4_aggregate_0_records_in_total":   int64(4),
-				"op_4_aggregate_0_records_out_total":  int64(4),
+				"op_4_aggregate_0_records_in_total":   int64(3),
+				"op_4_aggregate_0_records_out_total":  int64(3),
 
 				"op_5_order_0_exceptions_total":   int64(0),
 				"op_5_order_0_process_latency_us": int64(0),
-				"op_5_order_0_records_in_total":   int64(4),
-				"op_5_order_0_records_out_total":  int64(4),
+				"op_5_order_0_records_in_total":   int64(3),
+				"op_5_order_0_records_out_total":  int64(3),
 			},
 		},
 		{