瀏覽代碼

feat(func): add delay function

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年之前
父節點
當前提交
1e4f50bcf5

+ 13 - 12
docs/en_US/sqls/built-in_functions.md

@@ -232,18 +232,19 @@ select lag(Status) as Status, ts - lag(ts, 1, ts) OVER (WHEN had_changed(true, s
 ```
 
 ## Other Functions
-| Function     | Example                              | Description                                                                                                                                                                                                                                                                                                                                                                                       |
-|--------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| isNull       | isNull(col1)                         | Returns true if the argument is the Null value.                                                                                                                                                                                                                                                                                                                                                   |
-| coalesce     | coalesce(expr1, expr2, ...)          | Return the first non null value. If all expr are null,return nil. |
-| cardinality  | cardinality(col1)                    | The number of members in the group. The null value is 0.                                                                                                                                                                                                                                                                                                                                          |
-| newuuid      | newuuid()                            | Returns a random 16-byte UUID.                                                                                                                                                                                                                                                                                                                                                                    |
-| tstamp       | tstamp()                             | Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970                                                                                                                                                                                                                                                                            |
-| rule_id       | rule_id()                           | Returns the ID of the currently matched rule            |
-| mqtt         | mqtt(topic)                          | Returns the MQTT meta-data of specified key. The current supported keys<br />- topic: return the topic of message.  If there are multiple stream source, then specify the source name in parameter. Such as `mqtt(src1.topic)`<br />- messageid: return the message id of message. If there are multiple stream source, then specify the source name in parameter. Such as `mqtt(src2.messageid)` |
-| meta         | meta(topic)                          | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as `meta(device)`<br />- A qualified key to specify the stream, such as `meta(src1.device)` <br />- A key with arrow for multi level meta data, such as `meta(src1.reading->device->name)` This assumes reading is a map structure meta data.              |
-| window_start | window_start()                       | Return the window start timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                        |
-| window_end   | window_end()                         | Return the window end timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                          |
+| Function     | Example                     | Description                                                                                                                                                                                                                                                                                                                                                                                       |
+|--------------|-----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| isNull       | isNull(col1)                | Returns true if the argument is the Null value.                                                                                                                                                                                                                                                                                                                                                   |
+| coalesce     | coalesce(expr1, expr2, ...) | Return the first non null value. If all expr are null,return nil.                                                                                                                                                                                                                                                                                                                                 |
+| cardinality  | cardinality(col1)           | The number of members in the group. The null value is 0.                                                                                                                                                                                                                                                                                                                                          |
+| newuuid      | newuuid()                   | Returns a random 16-byte UUID.                                                                                                                                                                                                                                                                                                                                                                    |
+| tstamp       | tstamp()                    | Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970                                                                                                                                                                                                                                                                            |
+| rule_id      | rule_id()                   | Returns the ID of the currently matched rule                                                                                                                                                                                                                                                                                                                                                      |
+| mqtt         | mqtt(topic)                 | Returns the MQTT meta-data of specified key. The current supported keys<br />- topic: return the topic of message.  If there are multiple stream source, then specify the source name in parameter. Such as `mqtt(src1.topic)`<br />- messageid: return the message id of message. If there are multiple stream source, then specify the source name in parameter. Such as `mqtt(src2.messageid)` |
+| meta         | meta(topic)                 | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as `meta(device)`<br />- A qualified key to specify the stream, such as `meta(src1.device)` <br />- A key with arrow for multi level meta data, such as `meta(src1.reading->device->name)` This assumes reading is a map structure meta data.              |
+| window_start | window_start()              | Return the window start timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                        |
+| window_end   | window_end()                | Return the window end timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                          |
+| delay        | delay(delayTime, returnVal) | Delay the execution of the rule for a specified time and then return the returnVal.                                                                                                                                                                                                                                                                                                               |
 
 ## Multiple Column Functions
 

+ 13 - 12
docs/zh_CN/sqls/built-in_functions.md

@@ -229,18 +229,19 @@ select lag(Status) as Status, ts - lag(ts, 1, ts) OVER (WHEN had_changed(true, s
 ```
 
 ## 其它函数
-| 函数           | 示例                                   | 说明                                                                                                                                                                                |
-|--------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| isNull       | isNull(col1)                         | 如果参数为空值,则返回 true。                                                                                                                                                                 |
-| coalesce     | coalesce(expr1, expr2, ...)          | 返回第一个非空参数,如果所有参数都是null,则返回null |
-| cardinality  | cardinality(col1)                    | 组中成员的数量。空值为0。                                                                                                                                                                     |
-| newuuid      | newuuid()                            | 返回一个随机的16字节 UUID。                                                                                                                                                                 |
-| tstamp       | tstamp()                             | 返回当前时间戳,以1970年1月1日星期四00:00:00协调世界时(UTC)为单位。    
-| rule_id       | rule_id()                           | 返回当前匹配到的规则的ID。    |
-| mqtt         | mqtt(topic)                          | 返回指定键的 MQTT 元数据。 当前支持的键包括<br />-topic:返回消息的主题。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src1.topic)`<br />- messageid:返回消息的消息ID。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src2.messageid)`                  |
-| meta         | meta(topic)                          | 返回指定键的元数据。 键可能是:<br/>-如果 from 子句中只有一个来源,则为独立键,例如`meta(device)`<br />-用于指定流的合格键,例如 `meta(src1.device)` <br />-用于多级元数据的带有箭头的键,例如 `meta(src1.reading->device->name)`。这里假定读取是地图结构元数据。 |
-| window_start | window_start()                       | 返回窗口的开始时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
-| window_end   | window_end()                         | 返回窗口的结束时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
+| 函数           | 示例                          | 说明                                                                                                                                                                                |
+|--------------|-----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| isNull       | isNull(col1)                | 如果参数为空值,则返回 true。                                                                                                                                                                 |
+| coalesce     | coalesce(expr1, expr2, ...) | 返回第一个非空参数,如果所有参数都是null,则返回null                                                                                                                                                    |
+| cardinality  | cardinality(col1)           | 组中成员的数量。空值为0。                                                                                                                                                                     |
+| newuuid      | newuuid()                   | 返回一个随机的16字节 UUID。                                                                                                                                                                 |
+| tstamp       | tstamp()                    | 返回当前时间戳,以1970年1月1日星期四00:00:00协调世界时(UTC)为单位。                                                                                                                                       |
+| rule_id      | rule_id()                   | 返回当前匹配到的规则的ID。                                                                                                                                                                    |
+| mqtt         | mqtt(topic)                 | 返回指定键的 MQTT 元数据。 当前支持的键包括<br />-topic:返回消息的主题。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src1.topic)`<br />- messageid:返回消息的消息ID。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src2.messageid)`                  |
+| meta         | meta(topic)                 | 返回指定键的元数据。 键可能是:<br/>-如果 from 子句中只有一个来源,则为独立键,例如`meta(device)`<br />-用于指定流的合格键,例如 `meta(src1.device)` <br />-用于多级元数据的带有箭头的键,例如 `meta(src1.reading->device->name)`。这里假定读取是地图结构元数据。 |
+| window_start | window_start()              | 返回窗口的开始时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
+| window_end   | window_end()                | 返回窗口的结束时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
+| delay        | delay(delayTime, returnVal) | 延迟执行规则一段时间后返回第二个参数作为返回值。                                                                                                                                                          |
 
 ## 多列函数
 

+ 53 - 1
etc/functions/internal.json

@@ -3509,7 +3509,7 @@
 				"zh_CN": "窗口开始时间"
 			}
 		}
-	}, {
+	},{
 		"name": "window_end",
 		"example": "window_end()",
 		"hint": {
@@ -3532,5 +3532,57 @@
 				"zh_CN": "窗口结束时间"
 			}
 		}
+	},{
+		"name": "delay",
+		"example": "delay(time, returnVar)",
+		"hint": {
+			"en_US": "Delay the execution of the rule for a specified time.",
+			"zh_CN": "延迟执行规则一段时间。"
+		},
+		"args": [
+			{
+				"name": "time",
+				"optional": false,
+				"control": "field",
+				"type": "number",
+				"hint": {
+					"en_US": "The delay time in milliseconds, could be a literal or a field.",
+					"zh_CN": "延迟时间,单位为毫秒。可为字面量或者字段。"
+				},
+				"label": {
+					"en_US": "Delay Time",
+					"zh_CN": "延迟时间"
+				}
+			},
+			{
+				"name": "returnVar",
+				"optional": false,
+				"control": "field",
+				"type": "any",
+				"hint": {
+					"en_US": "The value to return after the delay.",
+					"zh_CN": "延迟后返回的值。"
+				},
+				"label": {
+					"en_US": "Return Value",
+					"zh_CN": "返回值"
+				}
+			}
+		],
+		"return": {
+			"type": "any",
+			"hint": {
+				"en_US": "delay return value",
+				"zh_CN": "延迟返回值"
+			}
+		},
+		"node": {
+			"category": "function",
+			"icon": "iconPath",
+			"label": {
+				"en_US": "Delay",
+				"zh_CN": "延迟执行"
+			}
+		}
 	}]
 }

+ 22 - 2
internal/binder/function/funcs_misc.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@ import (
 	"reflect"
 	"strconv"
 	"strings"
+	"time"
 )
 
 func registerMiscFunc() {
@@ -606,7 +607,26 @@ func registerMiscFunc() {
 			return nil
 		},
 	}
-
+	builtins["delay"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			d, err := cast.ToInt(args[0], cast.CONVERT_SAMEKIND)
+			if err != nil {
+				return err, false
+			}
+			time.Sleep(time.Duration(d) * time.Millisecond)
+			return args[1], true
+		},
+		val: func(_ api.FunctionContext, args []ast.Expr) error {
+			if err := ValidateLen(2, len(args)); err != nil {
+				return err
+			}
+			if ast.IsStringArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) {
+				return ProduceErrInfo(0, "number - float or int")
+			}
+			return nil
+		},
+	}
 }
 
 func round(num float64) int {

+ 51 - 1
internal/binder/function/funcs_misc_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import (
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"reflect"
 	"testing"
 )
@@ -229,3 +230,52 @@ func TestFromJson(t *testing.T) {
 		}
 	}
 }
+
+func TestDelay(t *testing.T) {
+	f, ok := builtins["delay"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+
+	err := f.val(fctx, []ast.Expr{&ast.StringLiteral{Val: "abc"}})
+	if err == nil {
+		t.Fatal("expect error")
+	}
+	err = f.val(fctx, []ast.Expr{&ast.StringLiteral{Val: "1s"}, &ast.StringLiteral{Val: "1s"}})
+	if err == nil {
+		t.Fatal("expect error")
+	}
+	err = f.val(fctx, []ast.Expr{&ast.IntegerLiteral{Val: 1000}, &ast.StringLiteral{Val: "1s"}})
+	if err != nil {
+		t.Fatal("expect no error")
+	}
+
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				10,
+				"bar",
+			},
+			result: "bar",
+		}, { // 1
+			args: []interface{}{
+				"bar",
+				"bar",
+			},
+			result: fmt.Errorf("cannot convert string(bar) to int"),
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}