소스 검색

feat(func): add trigger related meta functions (#2028)

* feat(func): add trigger related meta functions

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* test: add tests for newly global state func

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* feat(func): add event_time func

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

---------

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 1 년 전
부모
커밋
8fd3c37074
32개의 변경된 파일918개의 추가작업 그리고 144개의 파일을 삭제
  1. 26 0
      docs/en_US/sqls/functions/aggregate_functions.md
  2. 26 0
      docs/en_US/sqls/functions/other_functions.md
  3. 19 0
      docs/zh_CN/sqls/functions/aggregate_functions.md
  4. 20 0
      docs/zh_CN/sqls/functions/other_functions.md
  5. 8 0
      internal/binder/function/binder.go
  6. 106 0
      internal/binder/function/funcs_global_state.go
  7. 124 0
      internal/binder/function/funcs_global_state_test.go
  8. 5 0
      internal/binder/function/funcs_misc.go
  9. 1 1
      internal/binder/function/funcs_misc_test.go
  10. 1 0
      internal/binder/function/function.go
  11. 6 2
      internal/topo/operator/filter_operator.go
  12. 8 2
      internal/topo/operator/having_operator.go
  13. 1 1
      internal/topo/operator/math_func_test.go
  14. 4 4
      internal/topo/operator/project_test.go
  15. 9 0
      internal/topo/planner/analyzer.go
  16. 4 0
      internal/topo/planner/analyzer_test.go
  17. 42 3
      internal/topo/planner/filterPlan.go
  18. 146 0
      internal/topo/planner/filterPlan_test.go
  19. 42 3
      internal/topo/planner/havingPlan.go
  20. 134 0
      internal/topo/planner/havingPlan_test.go
  21. 5 3
      internal/topo/planner/planner.go
  22. 1 1
      internal/topo/planner/planner_graph.go
  23. 8 13
      internal/topo/planner/watermark_plan.go
  24. 3 3
      internal/topo/topotest/mock_topo.go
  25. 62 34
      internal/topo/topotest/rule_test.go
  26. 50 63
      internal/topo/topotest/window_rule_test.go
  27. 2 2
      internal/xsql/checkAgg.go
  28. 2 0
      internal/xsql/collection.go
  29. 2 2
      internal/xsql/parser_agg_test.go
  30. 10 1
      internal/xsql/row.go
  31. 40 5
      internal/xsql/valuer.go
  32. 1 1
      pkg/ast/expr.go

+ 26 - 0
docs/en_US/sqls/functions/aggregate_functions.md

@@ -222,3 +222,29 @@ percentile_disc(col, percentile)
 Returns the percentile value based on a discrete distribution of expression in the group, usually a window. The first
 argument is the column as the key to percentile_disc. The second argument is the percentile of the value that you want
 to find. The percentile must be a constant between 0.0 and 1.0.
+
+## LAST_AGG_HIT_COUNT
+
+```text
+last_agg_hit_count()
+```
+
+Returns the number of times the function had been called and passed.
+The function is usually used to get the accumulated trigger count of an aggregate rule.
+If the function is used in `HAVING` clause, it will only update the count when the condition is true.
+
+To use the similar functionality in a non-aggregate rule,
+use the [last_hit_count](./other_functions.md#lasthitcount) function.
+
+## LAST_AGG_HIT_TIME
+
+```text
+last_agg_hit_time()
+```
+
+Returns the int64 timestamp of the last **event** time the function had been called and passed.
+The function is usually used to get the last trigger time of an aggregate rule.
+If the function is used in `HAVING` clause, it will only update the timestamp when the condition is true.
+
+To use the similar functionality in a non-aggregate rule,
+use the [last_hit_time](./other_functions.md#lasthittime) function.

+ 26 - 0
docs/en_US/sqls/functions/other_functions.md

@@ -64,6 +64,32 @@ Returns the meta-data of a specified key. The key could be:
 - A key to refer to nested field for multi level metadata, such as `meta(src1.reading.device.name)`. This assumes
   reading is map structure metadata.
 
+## LAST_HIT_COUNT
+
+```text
+last_hit_count()
+```
+
+Returns the number of times the function had been called and passed.
+The function is usually used to get the accumulated trigger count of a continuous rule.
+If the function is used in `WHERE` clause, it will only update the count when the condition is true.
+
+Notice that, this function is not supported in aggregate rule except using in `WHEN` clause of a sliding window.
+To get the hit count of an aggregate rule, use [last_agg_hit_count](./aggregate_functions.md#lastagghitcount) instead.
+
+## LAST_HIT_TIME
+
+```text
+last_hit_time()
+```
+
+Returns the int64 timestamp of the last **event** time the function had been called and passed.
+The function is usually used to get the last trigger time of a continuous rule.
+If the function is used in `WHERE` clause, it will only update the timestamp when the condition is true.
+
+Notice that, this function is not supported in aggregate rule except using in `WHEN` clause of a sliding window.
+To get the hit time of an aggregate rule, use [last_agg_hit_time](./aggregate_functions.md#lastagghittime) instead.
+
 ## WINDOW_START
 
 ```text

+ 19 - 0
docs/zh_CN/sqls/functions/aggregate_functions.md

@@ -188,3 +188,22 @@ percentile_disc(col, 0.5)
 
 返回组中所有值的指定百分位数。空值不参与计算。其中,第一个参数指定用于计算百分位数的列;第二个参数指定百分位数的值,取值范围为
 0.0 ~ 1.0 。
+
+## LAST_AGG_HIT_COUNT
+
+```text
+last_agg_hit_count()
+```
+
+返回该函数的总命中次数。通常用于获取聚合规则的累计触发次数。如果在 `HAVING` 子句中使用,只有当条件为真时才会更新计数。
+若要在非聚合规则中实现类似功能,请使用 [last_hit_count](./other_functions.md#lasthitcount)。
+
+## LAST_AGG_HIT_TIME
+
+```text
+last_agg_hit_time()
+```
+
+返回该函数最后一次命中时的 int64 格式时间戳。通常用于获取聚合规则的最后一次触发时间。如果在 `HAVING`
+子句中使用,只有当条件为真时才会更新时间戳。
+若要在非聚合规则中实现类似功能,请使用 [last_hit_time](./other_functions.md#lasthittime)。

+ 20 - 0
docs/zh_CN/sqls/functions/other_functions.md

@@ -63,6 +63,26 @@ meta(key)
 
 返回指定键的元数据。
 
+## LAST_HIT_COUNT
+
+```text
+last_hit_count()
+```
+
+返回该函数的总命中次数。通常用于获取聚合规则的累计触发次数。如果在 `WHERE` 子句中使用,只有当条件为真时才会更新计数。
+该函数仅可用于非聚合规则中或 Sliding Window
+的条件中。若要在聚合规则中实现类似功能,请使用 [last_agg_hit_count](./aggregate_functions.md#lastagghitcount)。
+
+## LAST_HIT_TIME
+
+```text
+last_hit_time()
+```
+
+返回该函数最后一次命中时的 int64 格式时间戳。通常用于获取聚合规则的最后一次触发时间。如果在 `WHERE` 子句中使用,只有当条件为真时才会更新时间戳。
+该函数仅可用于非聚合规则中或 Sliding Window
+的条件中。若要在聚合规则中实现类似功能,请使用 [last_agg_hit_time](./aggregate_functions.md#lastagghittime)。
+
 ## WINDOW_START
 
 ```text

+ 8 - 0
internal/binder/function/binder.go

@@ -113,6 +113,14 @@ func IsAggFunc(funcName string) bool {
 	return false
 }
 
+// NoAggFunc returns true if the function CANNOT be used in an aggregate query
+func NoAggFunc(funcName string) bool {
+	if funcName == "last_hit_count" || funcName == "last_hit_time" {
+		return true
+	}
+	return false
+}
+
 func GetFuncType(funcName string) ast.FuncType {
 	f, _ := Function(funcName)
 	if f != nil {

+ 106 - 0
internal/binder/function/funcs_global_state.go

@@ -0,0 +1,106 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package function
+
+import (
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
+
+const (
+	countKey    = "$$last_hit_count"
+	timeKey     = "$$last_hit_time"
+	aggCountKey = "$$last_agg_hit_count"
+	aggTimeKey  = "$$last_agg_hit_time"
+)
+
+func registerGlobalStateFunc() {
+	builtins["last_hit_count"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			doUpdate := args[0].(bool)
+			lv, err := ctx.GetCounter(countKey)
+			if err != nil {
+				return err, false
+			}
+			if doUpdate {
+				err := ctx.IncrCounter(countKey, 1)
+				if err != nil {
+					return nil, false
+				}
+			}
+			return lv, true
+		},
+		val: ValidateNoArg,
+	}
+	builtins["last_hit_time"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			args0 := args[0].(bool)
+			args1 := args[1].(int64)
+
+			lv, err := ctx.GetState(timeKey)
+			if err != nil {
+				return err, false
+			}
+			if args0 {
+				err := ctx.PutState(timeKey, args1)
+				if err != nil {
+					return nil, false
+				}
+			}
+			return lv, true
+		},
+		val: ValidateNoArg,
+	}
+	builtins["last_agg_hit_count"] = builtinFunc{
+		fType: ast.FuncTypeAgg,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			doUpdate := args[0].(bool)
+			lv, err := ctx.GetCounter(aggCountKey)
+			if err != nil {
+				return err, false
+			}
+			if doUpdate {
+				err := ctx.IncrCounter(aggCountKey, 1)
+				if err != nil {
+					return nil, false
+				}
+			}
+			return lv, true
+		},
+		val: ValidateNoArg,
+	}
+	builtins["last_agg_hit_time"] = builtinFunc{
+		fType: ast.FuncTypeAgg,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			args0 := args[0].(bool)
+			args1 := args[1].(int64)
+
+			lv, err := ctx.GetState(aggTimeKey)
+			if err != nil {
+				return err, false
+			}
+			if args0 {
+				err := ctx.PutState(aggTimeKey, args1)
+				if err != nil {
+					return nil, false
+				}
+			}
+			return lv, true
+		},
+		val: ValidateNoArg,
+	}
+}

+ 124 - 0
internal/binder/function/funcs_global_state_test.go

@@ -0,0 +1,124 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package function
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/state"
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+func TestHitFuncs(t *testing.T) {
+	f1, ok := builtins["last_hit_count"]
+	if !ok {
+		t.Fatal("builtin last_hit_count not found")
+	}
+	f2, ok := builtins["last_hit_time"]
+	if !ok {
+		t.Fatal("builtin last_hit_time not found")
+	}
+	f3, ok := builtins["last_agg_hit_count"]
+	if !ok {
+		t.Fatal("builtin last_agg_hit_count not found")
+	}
+	f4, ok := builtins["last_agg_hit_time"]
+	if !ok {
+		t.Fatal("builtin last_agg_hit_time not found")
+	}
+	funcs := []builtinFunc{f1, f2, f3, f4}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
+	tests := []struct {
+		name   string
+		args   []any
+		result []any
+	}{
+		{
+			name: "first hit",
+			args: []any{
+				true,
+				int64(10100),
+			},
+			result: []any{
+				0, nil, 0, nil,
+			},
+		},
+		{
+			name: "second hit",
+			args: []any{
+				true,
+				int64(10200),
+			},
+			result: []any{
+				1, int64(10100), 1, int64(10100),
+			},
+		},
+		{
+			name: "third hit but not update",
+			args: []any{
+				false,
+				int64(10300),
+			},
+			result: []any{
+				2, int64(10200), 2, int64(10200),
+			},
+		},
+		{
+			name: "fourth hit",
+			args: []any{
+				true,
+				int64(10400),
+			},
+			result: []any{
+				2, int64(10200), 2, int64(10200),
+			},
+		},
+		{
+			name: "fifth hit, no update",
+			args: []any{
+				false,
+				int64(10500),
+			},
+			result: []any{
+				3, int64(10400), 3, int64(10400),
+			},
+		},
+		{
+			name: "sixth hit",
+			args: []any{
+				true,
+				int64(10600),
+			},
+			result: []any{
+				3, int64(10400), 3, int64(10400),
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			for i, re := range tt.result {
+				result, _ := funcs[i].exec(fctx, tt.args)
+				assert.Equal(t, re, result, "failed on %d", i)
+			}
+		})
+	}
+}

+ 5 - 0
internal/binder/function/funcs_misc.go

@@ -493,6 +493,11 @@ func registerMiscFunc() {
 		exec:  nil, // directly return in the valuer
 		val:   ValidateNoArg,
 	}
+	builtins["event_time"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec:  nil, // directly return in the valuer
+		val:   ValidateNoArg,
+	}
 	builtins["object_construct"] = builtinFunc{
 		fType: ast.FuncTypeScalar,
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {

+ 1 - 1
internal/binder/function/funcs_misc_test.go

@@ -431,7 +431,7 @@ func TestMiscFuncNil(t *testing.T) {
 	registerMiscFunc()
 	for name, function := range builtins {
 		switch name {
-		case "compress", "decompress", "newuuid", "tstamp", "rule_id", "window_start", "window_end",
+		case "compress", "decompress", "newuuid", "tstamp", "rule_id", "window_start", "window_end", "event_time",
 			"json_path_query", "json_path_query_first", "coalesce", "meta", "json_path_exists":
 			continue
 		case "isnull":

+ 1 - 0
internal/binder/function/function.go

@@ -52,6 +52,7 @@ func init() {
 	registerSetReturningFunc()
 	registerArrayFunc()
 	registerObjectFunc()
+	registerGlobalStateFunc()
 }
 
 //var funcWithAsteriskSupportMap = map[string]string{

+ 6 - 2
internal/topo/operator/filter_operator.go

@@ -23,7 +23,8 @@ import (
 )
 
 type FilterOp struct {
-	Condition ast.Expr
+	Condition  ast.Expr
+	StateFuncs []*ast.Call
 }
 
 // Apply
@@ -31,7 +32,7 @@ type FilterOp struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuples from windowOp | xsql.JoinTuples from joinOp
  *  output: *xsql.Tuple | xsql.WindowTuples | xsql.JoinTuples
  */
-func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
+func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("filter plan receive %v", data)
 	switch input := data.(type) {
@@ -45,6 +46,9 @@ func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 			return fmt.Errorf("run Where error: %s", r)
 		case bool:
 			if r {
+				for _, f := range p.StateFuncs {
+					_ = ve.Eval(f)
+				}
 				return input
 			}
 		case nil: // nil is false

+ 8 - 2
internal/topo/operator/having_operator.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -23,7 +23,8 @@ import (
 )
 
 type HavingOp struct {
-	Condition ast.Expr
+	Condition  ast.Expr
+	StateFuncs []*ast.Call
 }
 
 func (p *HavingOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
@@ -54,6 +55,11 @@ func (p *HavingOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 			return err
 		}
 		if len(groups) > 0 {
+			// update trigger
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(fv)}
+			for _, f := range p.StateFuncs {
+				_ = ve.Eval(f)
+			}
 			switch gi := input.(type) {
 			case *xsql.GroupedTuplesSet:
 				return gi.Filter(groups)

+ 1 - 1
internal/topo/operator/math_func_test.go

@@ -490,7 +490,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 			t.Errorf("%d: found error %q", i, err)
 			continue
 		}
-		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.WithAggFields(stmt)}
 		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		opResult := pp.Apply(ctx, tt.data, fv, afv)

+ 4 - 4
internal/topo/operator/project_test.go

@@ -617,7 +617,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			t.Errorf("parse sql error: %s", err)
 			continue
 		}
-		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.WithAggFields(stmt)}
 		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		opResult := pp.Apply(ctx, tt.data, fv, afv)
@@ -1221,7 +1221,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
-		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.WithAggFields(stmt)}
 		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		opResult := pp.Apply(ctx, tt.data, fv, afv)
@@ -1429,7 +1429,7 @@ func TestProjectPlan_Funcs(t *testing.T) {
 		if err != nil {
 			t.Error(err)
 		}
-		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.WithAggFields(stmt)}
 		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		opResult := pp.Apply(ctx, tt.data, fv, afv)
@@ -2386,7 +2386,7 @@ func TestProjectPlanError(t *testing.T) {
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
-		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.WithAggFields(stmt)}
 		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		opResult := pp.Apply(ctx, tt.data, fv, afv)

+ 9 - 0
internal/topo/planner/analyzer.go

@@ -180,6 +180,7 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
 }
 
 func validate(s *ast.SelectStatement) (err error) {
+	isAggStmt := false
 	if xsql.IsAggregate(s.Condition) {
 		return fmt.Errorf("Not allowed to call aggregate functions in WHERE clause.")
 	}
@@ -187,10 +188,14 @@ func validate(s *ast.SelectStatement) (err error) {
 		return fmt.Errorf("Not allowed to call non-aggregate functions in HAVING clause.")
 	}
 	for _, d := range s.Dimensions {
+		isAggStmt = true
 		if xsql.IsAggregate(d.Expr) {
 			return fmt.Errorf("Not allowed to call aggregate functions in GROUP BY clause.")
 		}
 	}
+	if s.Joins != nil {
+		isAggStmt = true
+	}
 	ast.WalkFunc(s, func(n ast.Node) bool {
 		switch f := n.(type) {
 		case *ast.Call:
@@ -204,6 +209,10 @@ func validate(s *ast.SelectStatement) (err error) {
 					}
 				}
 			}
+			if isAggStmt && function.NoAggFunc(f.Name) {
+				err = fmt.Errorf("function %s is not allowed in an aggregate query", f.Name)
+				return false
+			}
 		}
 		return true
 	})

+ 4 - 0
internal/topo/planner/analyzer_test.go

@@ -128,6 +128,10 @@ var tests = []struct {
 		sql: `SELECT collect(*)[0] as last FROM src1 GROUP BY SlidingWindow(ss,5) HAVING last.temp > 30`,
 		r:   newErrorStruct(""),
 	},
+	{ // 17
+		sql: `SELECT last_hit_time() FROM src1 GROUP BY SlidingWindow(ss,5) HAVING last_agg_hit_count() < 3`,
+		r:   newErrorStruct("function last_hit_time is not allowed in an aggregate query"),
+	},
 }
 
 func Test_validation(t *testing.T) {

+ 42 - 3
internal/topo/planner/filterPlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -14,11 +14,15 @@
 
 package planner
 
-import "github.com/lf-edge/ekuiper/pkg/ast"
+import (
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
 
 type FilterPlan struct {
 	baseLogicalPlan
-	condition ast.Expr
+	condition  ast.Expr
+	stateFuncs []*ast.Call
 }
 
 func (p FilterPlan) Init() *FilterPlan {
@@ -51,3 +55,38 @@ func (p *FilterPlan) PruneColumns(fields []ast.Expr) error {
 	f := getFields(p.condition)
 	return p.baseLogicalPlan.PruneColumns(append(fields, f...))
 }
+
+func (p *FilterPlan) ExtractStateFunc() {
+	aliases := make(map[string]ast.Expr)
+	ast.WalkFunc(p.condition, func(n ast.Node) bool {
+		switch f := n.(type) {
+		case *ast.Call:
+			p.transform(f)
+		case *ast.FieldRef:
+			if f.AliasRef != nil {
+				aliases[f.Name] = f.AliasRef.Expression
+			}
+		}
+		return true
+	})
+	for _, ex := range aliases {
+		ast.WalkFunc(ex, func(n ast.Node) bool {
+			switch f := n.(type) {
+			case *ast.Call:
+				p.transform(f)
+			}
+			return true
+		})
+	}
+}
+
+func (p *FilterPlan) transform(f *ast.Call) {
+	if _, ok := xsql.ImplicitStateFuncs[f.Name]; ok {
+		f.Cached = true
+		p.stateFuncs = append(p.stateFuncs, &ast.Call{
+			Name:     f.Name,
+			FuncId:   f.FuncId,
+			FuncType: f.FuncType,
+		})
+	}
+}

+ 146 - 0
internal/topo/planner/filterPlan_test.go

@@ -0,0 +1,146 @@
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package planner
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
+
+func TestFilterPlan_ExtractStateFunc(t *testing.T) {
+	tests := []struct {
+		name         string
+		condition    ast.Expr
+		newCondition ast.Expr
+		stateFuncs   []*ast.Call
+	}{
+		{
+			name: "test extract one",
+			condition: &ast.BinaryExpr{
+				OP: ast.AND,
+				LHS: &ast.BinaryExpr{
+					OP: ast.GT,
+					LHS: &ast.FieldRef{
+						Name: "a",
+					},
+					RHS: &ast.IntegerLiteral{
+						Val: 1,
+					},
+				},
+				RHS: &ast.Call{
+					Name: "last_hit_count",
+				},
+			},
+			newCondition: &ast.BinaryExpr{
+				OP: ast.AND,
+				LHS: &ast.BinaryExpr{
+					OP: ast.GT,
+					LHS: &ast.FieldRef{
+						Name: "a",
+					},
+					RHS: &ast.IntegerLiteral{
+						Val: 1,
+					},
+				},
+				RHS: &ast.Call{
+					Name:   "last_hit_count",
+					Cached: true,
+				},
+			},
+			stateFuncs: []*ast.Call{
+				{
+					Name: "last_hit_count",
+				},
+			},
+		},
+		{
+			name: "test extract multiple",
+			condition: &ast.BinaryExpr{
+				OP: ast.AND,
+				LHS: &ast.BinaryExpr{
+					OP: ast.GT,
+					LHS: &ast.Call{
+						Name: "last_hit_time",
+					},
+					RHS: &ast.IntegerLiteral{
+						Val: 1,
+					},
+				},
+				RHS: &ast.Call{
+					Name: "last_hit_count",
+				},
+			},
+			newCondition: &ast.BinaryExpr{
+				OP: ast.AND,
+				LHS: &ast.BinaryExpr{
+					OP: ast.GT,
+					LHS: &ast.Call{
+						Name:   "last_hit_time",
+						Cached: true,
+					},
+					RHS: &ast.IntegerLiteral{
+						Val: 1,
+					},
+				},
+				RHS: &ast.Call{
+					Name:   "last_hit_count",
+					Cached: true,
+				},
+			},
+			stateFuncs: []*ast.Call{
+				{
+					Name: "last_hit_time",
+				}, {
+					Name: "last_hit_count",
+				},
+			},
+		},
+		{
+			name: "test extract none",
+			condition: &ast.BinaryExpr{
+				OP: ast.GT,
+				LHS: &ast.Call{
+					Name: "event_time",
+				},
+				RHS: &ast.IntegerLiteral{
+					Val: 1,
+				},
+			},
+			newCondition: &ast.BinaryExpr{
+				OP: ast.GT,
+				LHS: &ast.Call{
+					Name: "event_time",
+				},
+				RHS: &ast.IntegerLiteral{
+					Val: 1,
+				},
+			},
+			stateFuncs: nil,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			p := &FilterPlan{
+				condition: tt.condition,
+			}
+			p.ExtractStateFunc()
+			assert.Equal(t, tt.newCondition, p.condition)
+			assert.Equal(t, tt.stateFuncs, p.stateFuncs)
+		})
+	}
+}

+ 42 - 3
internal/topo/planner/havingPlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -14,11 +14,15 @@
 
 package planner
 
-import "github.com/lf-edge/ekuiper/pkg/ast"
+import (
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
 
 type HavingPlan struct {
 	baseLogicalPlan
-	condition ast.Expr
+	condition  ast.Expr
+	stateFuncs []*ast.Call
 }
 
 func (p HavingPlan) Init() *HavingPlan {
@@ -30,3 +34,38 @@ func (p *HavingPlan) PruneColumns(fields []ast.Expr) error {
 	f := getFields(p.condition)
 	return p.baseLogicalPlan.PruneColumns(append(fields, f...))
 }
+
+func (p *HavingPlan) ExtractStateFunc() {
+	aliases := make(map[string]ast.Expr)
+	ast.WalkFunc(p.condition, func(n ast.Node) bool {
+		switch f := n.(type) {
+		case *ast.Call:
+			p.transform(f)
+		case *ast.FieldRef:
+			if f.AliasRef != nil {
+				aliases[f.Name] = f.AliasRef.Expression
+			}
+		}
+		return true
+	})
+	for _, ex := range aliases {
+		ast.WalkFunc(ex, func(n ast.Node) bool {
+			switch f := n.(type) {
+			case *ast.Call:
+				p.transform(f)
+			}
+			return true
+		})
+	}
+}
+
+func (p *HavingPlan) transform(f *ast.Call) {
+	if _, ok := xsql.ImplicitStateFuncs[f.Name]; ok {
+		f.Cached = true
+		p.stateFuncs = append(p.stateFuncs, &ast.Call{
+			Name:     f.Name,
+			FuncId:   f.FuncId,
+			FuncType: f.FuncType,
+		})
+	}
+}

+ 134 - 0
internal/topo/planner/havingPlan_test.go

@@ -0,0 +1,134 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package planner
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
+
+func TestHavingPlan_ExtractStateFunc(t *testing.T) {
+	tests := []struct {
+		name         string
+		condition    ast.Expr
+		newCondition ast.Expr
+		stateFuncs   []*ast.Call
+	}{
+		{
+			name: "test extract one",
+			condition: &ast.BinaryExpr{
+				OP: ast.GT,
+				LHS: &ast.Call{
+					Name: "last_agg_hit_count",
+				},
+				RHS: &ast.IntegerLiteral{
+					Val: 1,
+				},
+			},
+			newCondition: &ast.BinaryExpr{
+				OP: ast.GT,
+				LHS: &ast.Call{
+					Name:   "last_agg_hit_count",
+					Cached: true,
+				},
+				RHS: &ast.IntegerLiteral{
+					Val: 1,
+				},
+			},
+			stateFuncs: []*ast.Call{
+				{
+					Name: "last_agg_hit_count",
+				},
+			},
+		},
+		{
+			name: "test extract multiple",
+			condition: &ast.BinaryExpr{
+				OP: ast.AND,
+				LHS: &ast.BinaryExpr{
+					OP: ast.GT,
+					LHS: &ast.Call{
+						Name: "last_agg_hit_time",
+					},
+					RHS: &ast.IntegerLiteral{
+						Val: 1,
+					},
+				},
+				RHS: &ast.Call{
+					Name: "last_agg_hit_count",
+				},
+			},
+			newCondition: &ast.BinaryExpr{
+				OP: ast.AND,
+				LHS: &ast.BinaryExpr{
+					OP: ast.GT,
+					LHS: &ast.Call{
+						Name:   "last_agg_hit_time",
+						Cached: true,
+					},
+					RHS: &ast.IntegerLiteral{
+						Val: 1,
+					},
+				},
+				RHS: &ast.Call{
+					Name:   "last_agg_hit_count",
+					Cached: true,
+				},
+			},
+			stateFuncs: []*ast.Call{
+				{
+					Name: "last_agg_hit_time",
+				}, {
+					Name: "last_agg_hit_count",
+				},
+			},
+		},
+		{
+			name: "test extract none",
+			condition: &ast.BinaryExpr{
+				OP: ast.GT,
+				LHS: &ast.Call{
+					Name: "event_time",
+				},
+				RHS: &ast.IntegerLiteral{
+					Val: 1,
+				},
+			},
+			newCondition: &ast.BinaryExpr{
+				OP: ast.GT,
+				LHS: &ast.Call{
+					Name: "event_time",
+				},
+				RHS: &ast.IntegerLiteral{
+					Val: 1,
+				},
+			},
+			stateFuncs: nil,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			p := &HavingPlan{
+				condition: tt.condition,
+			}
+			p.ExtractStateFunc()
+			assert.Equal(t, tt.newCondition, p.condition)
+			assert.Equal(t, tt.stateFuncs, p.stateFuncs)
+		})
+	}
+}

+ 5 - 3
internal/topo/planner/planner.go

@@ -165,11 +165,13 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	case *JoinPlan:
 		op = Transform(&operator.JoinOp{Joins: t.joins, From: t.from}, fmt.Sprintf("%d_join", newIndex), options)
 	case *FilterPlan:
-		op = Transform(&operator.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_filter", newIndex), options)
+		t.ExtractStateFunc()
+		op = Transform(&operator.FilterOp{Condition: t.condition, StateFuncs: t.stateFuncs}, fmt.Sprintf("%d_filter", newIndex), options)
 	case *AggregatePlan:
 		op = Transform(&operator.AggregateOp{Dimensions: t.dimensions}, fmt.Sprintf("%d_aggregate", newIndex), options)
 	case *HavingPlan:
-		op = Transform(&operator.HavingOp{Condition: t.condition}, fmt.Sprintf("%d_having", newIndex), options)
+		t.ExtractStateFunc()
+		op = Transform(&operator.HavingOp{Condition: t.condition, StateFuncs: t.stateFuncs}, fmt.Sprintf("%d_having", newIndex), options)
 	case *OrderPlan:
 		op = Transform(&operator.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options)
 	case *ProjectPlan:
@@ -462,7 +464,7 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 	if stmt.Fields != nil {
 		p = ProjectPlan{
 			fields:      stmt.Fields,
-			isAggregate: xsql.IsAggStatement(stmt),
+			isAggregate: xsql.WithAggFields(stmt),
 			sendMeta:    opt.SendMetaToSink,
 		}.Init()
 		p.SetChildren(children)

+ 1 - 1
internal/topo/planner/planner_graph.go

@@ -690,7 +690,7 @@ func parsePick(props map[string]interface{}, sourceNames []string) (*operator.Pr
 	}
 	t := ProjectPlan{
 		fields:      stmt.Fields,
-		isAggregate: xsql.IsAggStatement(stmt),
+		isAggregate: xsql.WithAggFields(stmt),
 	}.Init()
 	return &operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, nil
 }

+ 8 - 13
internal/topo/planner/watermark_plan.go

@@ -27,19 +27,14 @@ func (p WatermarkPlan) Init() *WatermarkPlan {
 	return &p
 }
 
-// PushDownPredicate Push down all the conditions to the data source.
-// The condition here must be safe to push down or it will be catched by above planner, such as countWindow planner.
+// PushDownPredicate watermark plan can not push down predicate. It must receive all tuples to process watermark
 func (p *WatermarkPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
-	if len(p.children) == 0 {
-		return condition, p.self
+	if condition != nil {
+		f := FilterPlan{
+			condition: condition,
+		}.Init()
+		f.SetChildren([]LogicalPlan{p})
+		return nil, f
 	}
-	rest := condition
-	for i, child := range p.children {
-		if _, ok := child.(*DataSourcePlan); ok {
-			var newChild LogicalPlan
-			rest, newChild = child.PushDownPredicate(rest)
-			p.children[i] = newChild
-		}
-	}
-	return rest, p.self
+	return nil, p.self
 }

+ 3 - 3
internal/topo/topotest/mock_topo.go

@@ -22,6 +22,8 @@ import (
 	"testing"
 	"time"
 
+	"github.com/stretchr/testify/assert"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/testx"
@@ -159,9 +161,7 @@ func compareResult(t *testing.T, mockSink *mocknode.MockSink, resultFunc func(re
 	results := mockSink.GetResults()
 	maps := resultFunc(results)
 
-	if !reflect.DeepEqual(tt.R, maps) {
-		t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.Sql, tt.R, maps)
-	}
+	assert.Equal(t, tt.R, maps)
 	if err := CompareMetrics(tp, tt.M); err != nil {
 		t.Errorf("%d. %q\n\nmetrics mismatch:\n\n%s\n\n", i, tt.Sql, err)
 	}

+ 62 - 34
internal/topo/topotest/rule_test.go

@@ -230,37 +230,42 @@ func TestSingleSQL(t *testing.T) {
 		},
 		{
 			Name: `TestSingleSQLRule1`,
-			Sql:  `SELECT *, upper(color) FROM demo`,
+			Sql:  `SELECT *, upper(color), event_time() FROM demo`,
 			R: [][]map[string]interface{}{
 				{{
-					"color": "red",
-					"size":  float64(3),
-					"ts":    float64(1541152486013),
-					"upper": "RED",
+					"color":      "red",
+					"size":       float64(3),
+					"ts":         float64(1541152486013),
+					"upper":      "RED",
+					"event_time": float64(1541152486013),
 				}},
 				{{
-					"color": "blue",
-					"size":  float64(6),
-					"ts":    float64(1541152486822),
-					"upper": "BLUE",
+					"color":      "blue",
+					"size":       float64(6),
+					"ts":         float64(1541152486822),
+					"upper":      "BLUE",
+					"event_time": float64(1541152486822),
 				}},
 				{{
-					"color": "blue",
-					"size":  float64(2),
-					"ts":    float64(1541152487632),
-					"upper": "BLUE",
+					"color":      "blue",
+					"size":       float64(2),
+					"ts":         float64(1541152487632),
+					"upper":      "BLUE",
+					"event_time": float64(1541152487632),
 				}},
 				{{
-					"color": "yellow",
-					"size":  float64(4),
-					"ts":    float64(1541152488442),
-					"upper": "YELLOW",
+					"color":      "yellow",
+					"size":       float64(4),
+					"ts":         float64(1541152488442),
+					"upper":      "YELLOW",
+					"event_time": float64(1541152488442),
 				}},
 				{{
-					"color": "red",
-					"size":  float64(1),
-					"ts":    float64(1541152489252),
-					"upper": "RED",
+					"color":      "red",
+					"size":       float64(1),
+					"ts":         float64(1541152489252),
+					"upper":      "RED",
+					"event_time": float64(1541152489252),
 				}},
 			},
 			M: map[string]interface{}{
@@ -287,15 +292,17 @@ func TestSingleSQL(t *testing.T) {
 		},
 		{
 			Name: `TestSingleSQLRule2`,
-			Sql:  `SELECT color, ts FROM demo where size > 3`,
+			Sql:  `SELECT color, ts, last_hit_count() + 1 as lc FROM demo where size > 3`,
 			R: [][]map[string]interface{}{
 				{{
 					"color": "blue",
 					"ts":    float64(1541152486822),
+					"lc":    float64(1),
 				}},
 				{{
 					"color": "yellow",
 					"ts":    float64(1541152488442),
+					"lc":    float64(2),
 				}},
 			},
 			M: map[string]interface{}{
@@ -392,7 +399,7 @@ func TestSingleSQL(t *testing.T) {
 		},
 		{
 			Name: `TestSingleSQLRule5`,
-			Sql:  `SELECT meta(topic) as m, ts FROM demo`,
+			Sql:  `SELECT meta(topic) as m, ts FROM demo WHERE last_hit_count() < 4`,
 			R: [][]map[string]interface{}{
 				{{
 					"m":  "mock",
@@ -410,20 +417,11 @@ func TestSingleSQL(t *testing.T) {
 					"m":  "mock",
 					"ts": float64(1541152488442),
 				}},
-				{{
-					"m":  "mock",
-					"ts": float64(1541152489252),
-				}},
 			},
 			M: map[string]interface{}{
-				"op_2_project_0_exceptions_total":   int64(0),
-				"op_2_project_0_process_latency_us": int64(0),
-				"op_2_project_0_records_in_total":   int64(5),
-				"op_2_project_0_records_out_total":  int64(5),
-
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(5),
-				"sink_mockSink_0_records_out_total": int64(5),
+				"sink_mockSink_0_records_in_total":  int64(4),
+				"sink_mockSink_0_records_out_total": int64(4),
 
 				"source_demo_0_exceptions_total":  int64(0),
 				"source_demo_0_records_in_total":  int64(5),
@@ -972,6 +970,36 @@ func TestSingleSQLWithEventTime(t *testing.T) {
 			},
 		},
 		{
+			Name: `TestStateFunc`,
+			Sql:  `SELECT *, last_hit_time() as lt, last_hit_count() as lc, event_time() as et FROM demoE WHERE size < 3 AND lc < 2`,
+			R: [][]map[string]interface{}{
+				{{
+					"color": "blue",
+					"size":  float64(2),
+					"ts":    float64(1541152487632),
+					"lc":    float64(0),
+					"et":    float64(1541152487632),
+				}},
+				{{
+					"color": "red",
+					"size":  float64(1),
+					"ts":    float64(1541152489252),
+					"lc":    float64(1),
+					"lt":    float64(1541152487632),
+					"et":    float64(1541152489252),
+				}},
+			},
+			M: map[string]interface{}{
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(2),
+				"sink_mockSink_0_records_out_total": int64(2),
+
+				"source_demoE_0_exceptions_total":  int64(0),
+				"source_demoE_0_records_in_total":  int64(6),
+				"source_demoE_0_records_out_total": int64(6),
+			},
+		},
+		{
 			Name: `TestChanged`,
 			Sql:  "SELECT changed_cols(\"tt_\", true, color, size) FROM demoE",
 			R: [][]map[string]interface{}{

+ 50 - 63
internal/topo/topotest/window_rule_test.go

@@ -44,47 +44,56 @@ func TestWindow(t *testing.T) {
 		},
 		{
 			Name: `TestWindowRule1`,
-			Sql:  `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
+			Sql:  `SELECT *, event_time() as et FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
 			R: [][]map[string]interface{}{
 				{{
 					"color": "red",
 					"size":  float64(3),
 					"ts":    float64(1541152486013),
+					"et":    float64(1541152486013),
 				}, {
 					"color": "blue",
 					"size":  float64(6),
 					"ts":    float64(1541152486822),
+					"et":    float64(1541152486822),
 				}},
 				{{
 					"color": "red",
 					"size":  float64(3),
 					"ts":    float64(1541152486013),
+					"et":    float64(1541152486013),
 				}, {
 					"color": "blue",
 					"size":  float64(6),
 					"ts":    float64(1541152486822),
+					"et":    float64(1541152486822),
 				}, {
 					"color": "blue",
 					"size":  float64(2),
 					"ts":    float64(1541152487632),
+					"et":    float64(1541152487632),
 				}},
 				{{
 					"color": "blue",
 					"size":  float64(2),
 					"ts":    float64(1541152487632),
+					"et":    float64(1541152487632),
 				}, {
 					"color": "yellow",
 					"size":  float64(4),
 					"ts":    float64(1541152488442),
+					"et":    float64(1541152488442),
 				}},
 				{{
 					"color": "yellow",
 					"size":  float64(4),
 					"ts":    float64(1541152488442),
+					"et":    float64(1541152488442),
 				}, {
 					"color": "red",
 					"size":  float64(1),
 					"ts":    float64(1541152489252),
+					"et":    float64(1541152489252),
 				}},
 			},
 			M: map[string]interface{}{
@@ -152,7 +161,7 @@ func TestWindow(t *testing.T) {
 		},
 		{
 			Name: `TestWindowRule3`,
-			Sql:  `SELECT color, temp, demo.ts as ts1, demo1.ts as ts2, demo.ts - demo1.ts as diff FROM demo INNER JOIN demo1 ON ts1 = ts2 GROUP BY SlidingWindow(ss, 1)`,
+			Sql:  `SELECT color, temp, demo.ts as ts1, demo1.ts as ts2, demo.ts - demo1.ts as diff FROM demo INNER JOIN demo1 ON ts1 = ts2 GROUP BY SlidingWindow(ss, 1) HAVING last_agg_hit_count() < 7`,
 			R: [][]map[string]interface{}{
 				{{
 					"color": "red",
@@ -202,29 +211,12 @@ func TestWindow(t *testing.T) {
 					"ts1":   float64(1541152488442),
 					"ts2":   float64(1541152488442),
 					"diff":  float64(0),
-				}}, {{
-					"color": "yellow",
-					"temp":  27.4,
-					"ts1":   float64(1541152488442),
-					"ts2":   float64(1541152488442),
-					"diff":  float64(0),
-				}, {
-					"color": "red",
-					"temp":  25.5,
-					"ts1":   float64(1541152489252),
-					"ts2":   float64(1541152489252),
-					"diff":  float64(0),
 				}},
 			},
 			M: map[string]interface{}{
-				"op_5_project_0_exceptions_total":   int64(0),
-				"op_5_project_0_process_latency_us": int64(0),
-				"op_5_project_0_records_in_total":   int64(8),
-				"op_5_project_0_records_out_total":  int64(8),
-
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(8),
-				"sink_mockSink_0_records_out_total": int64(8),
+				"sink_mockSink_0_records_in_total":  int64(7),
+				"sink_mockSink_0_records_out_total": int64(7),
 
 				"source_demo_0_exceptions_total":  int64(0),
 				"source_demo_0_records_in_total":  int64(5),
@@ -250,8 +242,9 @@ func TestWindow(t *testing.T) {
 					"source_demo":  {"op_3_window"},
 					"source_demo1": {"op_3_window"},
 					"op_3_window":  {"op_4_join"},
-					"op_4_join":    {"op_5_project"},
-					"op_5_project": {"sink_mockSink"},
+					"op_4_join":    {"op_5_having"},
+					"op_5_having":  {"op_6_project"},
+					"op_6_project": {"sink_mockSink"},
 				},
 			},
 		},
@@ -323,24 +316,28 @@ func TestWindow(t *testing.T) {
 		},
 		{
 			Name: `TestWindowRule5`,
-			Sql:  `SELECT count(temp), window_start() as ws, window_end() FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
+			Sql:  `SELECT count(temp), window_start() as ws, window_end(), event_time() as et FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
 			R: [][]map[string]interface{}{
 				{{
 					"count":      float64(2),
 					"ws":         float64(1541152486013),
 					"window_end": float64(1541152487823), // timeout
+					"et":         float64(1541152487823),
 				}}, {{
 					"count":      float64(3),
 					"ws":         float64(1541152487932),
 					"window_end": float64(1541152490000), // tick
+					"et":         float64(1541152490000),
 				}}, {{
 					"count":      float64(5),
 					"ws":         float64(1541152490000),
 					"window_end": float64(1541152494000), // tick
+					"et":         float64(1541152494000),
 				}}, {{
 					"count":      float64(1),
 					"ws":         float64(1541152494000),
 					"window_end": float64(1541152495112), // timeout
+					"et":         float64(1541152495112),
 				}},
 			},
 			M: map[string]interface{}{
@@ -365,48 +362,56 @@ func TestWindow(t *testing.T) {
 		},
 		{
 			Name: `TestWindowRule6`,
-			Sql:  `SELECT window_end(), sum(temp) as temp, count(color) as c, window_start() FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
+			Sql:  `SELECT window_end(), event_time(), sum(temp) as temp, count(color) as c, window_start() FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
 			R: [][]map[string]interface{}{
 				{{
 					"temp":         25.5,
 					"c":            float64(1),
 					"window_start": float64(1541152485115),
 					"window_end":   float64(1541152486115),
+					"event_time":   float64(1541152486115),
 				}}, {{
 					"temp":         25.5,
 					"c":            float64(1),
 					"window_start": float64(1541152485822),
 					"window_end":   float64(1541152486822),
+					"event_time":   float64(1541152486822),
 				}}, {{
 					"temp":         25.5,
 					"c":            float64(1),
 					"window_start": float64(1541152485903),
 					"window_end":   float64(1541152486903),
+					"event_time":   float64(1541152486903),
 				}}, {{
 					"temp":         28.1,
 					"c":            float64(1),
 					"window_start": float64(1541152486702),
 					"window_end":   float64(1541152487702),
+					"event_time":   float64(1541152487702),
 				}}, {{
 					"temp":         28.1,
 					"c":            float64(1),
 					"window_start": float64(1541152487442),
 					"window_end":   float64(1541152488442),
+					"event_time":   float64(1541152488442),
 				}}, {{
 					"temp":         55.5,
 					"c":            float64(2),
 					"window_start": float64(1541152487605),
 					"window_end":   float64(1541152488605),
+					"event_time":   float64(1541152488605),
 				}}, {{
 					"temp":         27.4,
 					"c":            float64(1),
 					"window_start": float64(1541152488252),
 					"window_end":   float64(1541152489252),
+					"event_time":   float64(1541152489252),
 				}}, {{
 					"temp":         52.9,
 					"c":            float64(2),
 					"window_start": float64(1541152488305),
 					"window_end":   float64(1541152489305),
+					"event_time":   float64(1541152489305),
 				}},
 			},
 			M: map[string]interface{}{
@@ -494,7 +499,7 @@ func TestWindow(t *testing.T) {
 		},
 		{
 			Name: `TestWindowRule8`,
-			Sql:  `SELECT color, window_end(), ts, count(*) as c, window_start() FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
+			Sql:  `SELECT color, window_end(), event_time() as et, ts, count(*) as c, window_start() FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
 			R: [][]map[string]interface{}{
 				{{
 					"color":        "red",
@@ -502,6 +507,7 @@ func TestWindow(t *testing.T) {
 					"c":            float64(2),
 					"window_start": float64(1541152486000),
 					"window_end":   float64(1541152487000),
+					"et":           float64(1541152487000),
 				}},
 			},
 			M: map[string]interface{}{
@@ -814,55 +820,36 @@ func TestEventWindow(t *testing.T) {
 		},
 		{
 			Name: `TestEventWindowRule1`,
-			Sql:  `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
+			Sql:  `SELECT count(*), last_agg_hit_time() as lt, last_agg_hit_count() as lc, event_time() as et FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1) HAVING lc < 4`,
 			R: [][]map[string]interface{}{
 				{{
-					"color": "red",
-					"size":  float64(3),
-					"ts":    float64(1541152486013),
+					"count": float64(1),
+					"lc":    float64(0),
+					"et":    float64(1541152487000),
 				}},
 				{{
-					"color": "red",
-					"size":  float64(3),
-					"ts":    float64(1541152486013),
-				}, {
-					"color": "blue",
-					"size":  float64(2),
-					"ts":    float64(1541152487632),
+					"count": float64(2),
+					"lc":    float64(1),
+					"lt":    float64(1541152487000),
+					"et":    float64(1541152488000),
 				}},
 				{{
-					"color": "blue",
-					"size":  float64(2),
-					"ts":    float64(1541152487632),
-				}, {
-					"color": "yellow",
-					"size":  float64(4),
-					"ts":    float64(1541152488442),
-				}},
-				{{
-					"color": "yellow",
-					"size":  float64(4),
-					"ts":    float64(1541152488442),
-				}, {
-					"color": "red",
-					"size":  float64(1),
-					"ts":    float64(1541152489252),
+					"count": float64(2),
+					"lc":    float64(2),
+					"lt":    float64(1541152488000),
+					"et":    float64(1541152489000),
 				}},
 				{{
-					"color": "red",
-					"size":  float64(1),
-					"ts":    float64(1541152489252),
+					"count": float64(2),
+					"lc":    float64(3),
+					"lt":    float64(1541152489000),
+					"et":    float64(1541152490000),
 				}},
 			},
 			M: map[string]interface{}{
-				"op_4_project_0_exceptions_total":   int64(0),
-				"op_4_project_0_process_latency_us": int64(0),
-				"op_4_project_0_records_in_total":   int64(5),
-				"op_4_project_0_records_out_total":  int64(5),
-
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(5),
-				"sink_mockSink_0_records_out_total": int64(5),
+				"sink_mockSink_0_records_in_total":  int64(4),
+				"sink_mockSink_0_records_out_total": int64(4),
 
 				"source_demoE_0_exceptions_total":  int64(0),
 				"source_demoE_0_records_in_total":  int64(6),

+ 2 - 2
internal/xsql/checkAgg.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -53,7 +53,7 @@ func getOrCalculateAgg(f *ast.FieldRef) bool {
 	return false
 }
 
-func IsAggStatement(stmt *ast.SelectStatement) bool {
+func WithAggFields(stmt *ast.SelectStatement) bool {
 	if stmt.Dimensions != nil {
 		ds := stmt.Dimensions.GetGroups()
 		if ds != nil && len(ds) > 0 {

+ 2 - 0
internal/xsql/collection.go

@@ -534,6 +534,8 @@ func (r *WindowRange) FuncValue(key string) (interface{}, bool) {
 		return r.windowStart, true
 	case "window_end":
 		return r.windowEnd, true
+	case "event_time":
+		return r.windowEnd, true
 	default:
 		return nil, false
 	}

+ 2 - 2
internal/xsql/parser_agg_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -48,7 +48,7 @@ func TestIsAggStatement(t *testing.T) {
 	for i, tt := range tests {
 		// fmt.Printf("Parsing SQL %q.\n", tt.s)
 		stmt, err := NewParser(strings.NewReader(tt.s)).Parse()
-		isAgg := IsAggStatement(stmt)
+		isAgg := WithAggFields(stmt)
 		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
 			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.s, tt.err, err)
 		} else if tt.err == "" && (tt.agg != isAgg) {

+ 10 - 1
internal/xsql/row.go

@@ -26,7 +26,7 @@ import (
 // The tuple clone should be cheap.
 
 /*
- *  Interfaces definition
+ * Interfaces definition
  */
 
 type Wildcarder interface {
@@ -412,6 +412,15 @@ func (t *Tuple) IsWatermark() bool {
 	return false
 }
 
+func (t *Tuple) FuncValue(key string) (interface{}, bool) {
+	switch key {
+	case "event_time":
+		return t.Timestamp, true
+	default:
+		return nil, false
+	}
+}
+
 func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
 	cols = t.AffiliateRow.Pick(cols)
 	if !allWildcard && wildcardEmitters[t.Emitter] {

+ 40 - 5
internal/xsql/valuer.go

@@ -26,10 +26,21 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
-var implicitValueFuncs = map[string]bool{
-	"window_start": true,
-	"window_end":   true,
-}
+var (
+	// implicitValueFuncs is a set of functions that event implicitly passes the value.
+	implicitValueFuncs = map[string]bool{
+		"window_start": true,
+		"window_end":   true,
+		"event_time":   true,
+	}
+	// ImplicitStateFuncs is a set of functions that read/update global state implicitly.
+	ImplicitStateFuncs = map[string]bool{
+		"last_hit_time":      true,
+		"last_hit_count":     true,
+		"last_agg_hit_time":  true,
+		"last_agg_hit_count": true,
+	}
+)
 
 /*
  *  Valuer definitions
@@ -272,7 +283,7 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 		return &BracketEvalResult{Start: ii, End: ii}
 	case *ast.Call:
 		// The analytic functions are calculated prior to all ops, so just get the cached field value
-		if expr.Cached {
+		if expr.Cached && expr.CachedField != "" {
 			val, ok := v.Valuer.Value(expr.CachedField, "")
 			if ok {
 				return val
@@ -293,6 +304,30 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 					args []interface{}
 					ft   = expr.FuncType
 				)
+				if _, ok := ImplicitStateFuncs[expr.Name]; ok {
+					args = make([]interface{}, 1)
+					// This is the implicit arg set by the filter planner
+					// If set, it will only return the value, no updating the value
+					if expr.Cached {
+						args[0] = false
+					} else {
+						args[0] = true
+					}
+					if expr.Name == "last_hit_time" || expr.Name == "last_agg_hit_time" {
+						if vv, ok := v.Valuer.(FuncValuer); ok {
+							val, ok := vv.FuncValue("event_time")
+							if ok {
+								args = append(args, val)
+							} else {
+								return fmt.Errorf("call %s error: %v", expr.Name, val)
+							}
+						} else {
+							return fmt.Errorf("call %s error: %v", expr.Name, "cannot get current time")
+						}
+					}
+					val, _ := valuer.Call(expr.Name, expr.FuncId, args)
+					return val
+				}
 				if len(expr.Args) > 0 {
 					switch ft {
 					case ast.FuncTypeAgg:

+ 1 - 1
pkg/ast/expr.go

@@ -133,7 +133,7 @@ type Call struct {
 	Args     []Expr
 	// This is used for analytic functions.
 	// In planner, all analytic functions are planned to calculate in analytic_op which produce a new field.
-	// This cachedField cached the new field name and when evaluating, just return the field access evaluated value.
+	// This cachedField cached the new field name and when evaluating, just returned the field access evaluated value.
 	CachedField string
 	Cached      bool
 	Partition   *PartitionExpr