瀏覽代碼

feat(func): add rule_start() (#2143)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 1 年之前
父節點
當前提交
7c51f52b6f

+ 8 - 0
docs/en_US/sqls/functions/other_functions.md

@@ -54,6 +54,14 @@ rule_id()
 
 
 Returns the ID of the currently matched rule.
 Returns the ID of the currently matched rule.
 
 
+## RULE_START
+
+```text
+rule_start()
+```
+
+Returns the rule start timestamp in int64 format.
+
 ## MQTT
 ## MQTT
 
 
 ```text
 ```text

+ 8 - 0
docs/zh_CN/sqls/functions/other_functions.md

@@ -51,6 +51,14 @@ rule_id()
 
 
 返回当前匹配到的规则的ID。
 返回当前匹配到的规则的ID。
 
 
+## RULE_START
+
+```text
+rule_start()
+```
+
+返回规则开始运行的时间戳,格式为 int64。
+
 ## MQTT
 ## MQTT
 
 
 ```text
 ```text

+ 8 - 0
internal/binder/function/funcs_misc.go

@@ -33,6 +33,7 @@ import (
 
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/keyedstate"
 	"github.com/lf-edge/ekuiper/internal/keyedstate"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
@@ -438,6 +439,13 @@ func registerMiscFunc() {
 		},
 		},
 		val: ValidateNoArg,
 		val: ValidateNoArg,
 	}
 	}
+	builtins["rule_start"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			return ctx.Value(context.RuleStartKey), true
+		},
+		val: ValidateNoArg,
+	}
 	builtins["meta"] = builtinFunc{
 	builtins["meta"] = builtinFunc{
 		fType: ast.FuncTypeScalar,
 		fType: ast.FuncTypeScalar,
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {

+ 1 - 1
internal/binder/function/funcs_misc_test.go

@@ -555,7 +555,7 @@ func TestMiscFuncNil(t *testing.T) {
 	registerMiscFunc()
 	registerMiscFunc()
 	for name, function := range builtins {
 	for name, function := range builtins {
 		switch name {
 		switch name {
-		case "compress", "decompress", "newuuid", "tstamp", "rule_id", "window_start", "window_end", "event_time",
+		case "compress", "decompress", "newuuid", "tstamp", "rule_id", "rule_start", "window_start", "window_end", "event_time",
 			"json_path_query", "json_path_query_first", "coalesce", "meta", "json_path_exists":
 			"json_path_query", "json_path_query_first", "coalesce", "meta", "json_path_exists":
 			continue
 			continue
 		case "isnull":
 		case "isnull":

+ 5 - 2
internal/topo/context/default.go

@@ -31,7 +31,10 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 )
 
 
-const LoggerKey = "$$logger"
+const (
+	LoggerKey    = "$$logger"
+	RuleStartKey = "$$ruleStart"
+)
 
 
 type DefaultContext struct {
 type DefaultContext struct {
 	ruleId     string
 	ruleId     string
@@ -60,7 +63,7 @@ func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext {
 	return parent
 	return parent
 }
 }
 
 
-// Implement context interface
+// Deadline Implement context interface
 func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
 func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
 	return c.ctx.Deadline()
 	return c.ctx.Deadline()
 }
 }

+ 4 - 2
internal/topo/operator/misc_func_test.go

@@ -247,13 +247,14 @@ func TestMiscFunc_Apply1(t *testing.T) {
 			}},
 			}},
 		},
 		},
 		{
 		{
-			sql: "SELECT rule_id() AS rule_id FROM test",
+			sql: "SELECT rule_id() AS rule_id, rule_start() as rule_start FROM test",
 			data: &xsql.Tuple{
 			data: &xsql.Tuple{
 				Emitter: "test",
 				Emitter: "test",
 				Message: xsql.Message{},
 				Message: xsql.Message{},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"rule_id": "rule0",
+				"rule_id":    "rule0",
+				"rule_start": 12345,
 			}},
 			}},
 		},
 		},
 	}
 	}
@@ -261,6 +262,7 @@ func TestMiscFunc_Apply1(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
 	contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	ctx = context.WithValue(ctx, context.RuleStartKey, 12345)
 	ctx = ctx.WithMeta("rule0", "op1", &state.MemoryStore{}).(*context.DefaultContext)
 	ctx = ctx.WithMeta("rule0", "op1", &state.MemoryStore{}).(*context.DefaultContext)
 	for i, tt := range tests {
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()

+ 1 - 0
internal/topo/topo.go

@@ -159,6 +159,7 @@ func (s *Topo) prepareContext() {
 			}
 			}
 		}
 		}
 		ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 		ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+		ctx = kctx.WithValue(ctx, kctx.RuleStartKey, conf.GetNowInMilli())
 		s.ctx, s.cancel = ctx.WithCancel()
 		s.ctx, s.cancel = ctx.WithCancel()
 	}
 	}
 }
 }