Bladeren bron

feat(func): add rule_id func

Signed-off-by: carlclone <906561974@qq.com>
carlclone 2 jaren geleden
bovenliggende
commit
555b705a82

+ 1 - 0
docs/en_US/sqls/built-in_functions.md

@@ -239,6 +239,7 @@ select lag(Status) as Status, ts - lag(ts, 1, ts) OVER (WHEN had_changed(true, s
 | cardinality  | cardinality(col1)                    | The number of members in the group. The null value is 0.                                                                                                                                                                                                                                                                                                                                          |
 | newuuid      | newuuid()                            | Returns a random 16-byte UUID.                                                                                                                                                                                                                                                                                                                                                                    |
 | tstamp       | tstamp()                             | Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970                                                                                                                                                                                                                                                                            |
+| rule_id       | rule_id()                           | Returns the ID of the currently matched rule            |
 | mqtt         | mqtt(topic)                          | Returns the MQTT meta-data of specified key. The current supported keys<br />- topic: return the topic of message.  If there are multiple stream source, then specify the source name in parameter. Such as `mqtt(src1.topic)`<br />- messageid: return the message id of message. If there are multiple stream source, then specify the source name in parameter. Such as `mqtt(src2.messageid)` |
 | meta         | meta(topic)                          | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as `meta(device)`<br />- A qualified key to specify the stream, such as `meta(src1.device)` <br />- A key with arrow for multi level meta data, such as `meta(src1.reading->device->name)` This assumes reading is a map structure meta data.              |
 | window_start | window_start()                       | Return the window start timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                        |

+ 2 - 1
docs/zh_CN/sqls/built-in_functions.md

@@ -235,7 +235,8 @@ select lag(Status) as Status, ts - lag(ts, 1, ts) OVER (WHEN had_changed(true, s
 | coalesce     | coalesce(expr1, expr2, ...)          | 返回第一个非空参数,如果所有参数都是null,则返回null |
 | cardinality  | cardinality(col1)                    | 组中成员的数量。空值为0。                                                                                                                                                                     |
 | newuuid      | newuuid()                            | 返回一个随机的16字节 UUID。                                                                                                                                                                 |
-| tstamp       | tstamp()                             | 返回当前时间戳,以1970年1月1日星期四00:00:00协调世界时(UTC)为单位。                                                                                                                                       |
+| tstamp       | tstamp()                             | 返回当前时间戳,以1970年1月1日星期四00:00:00协调世界时(UTC)为单位。    
+| rule_id       | rule_id()                           | 返回当前匹配到的规则的ID。    |
 | mqtt         | mqtt(topic)                          | 返回指定键的 MQTT 元数据。 当前支持的键包括<br />-topic:返回消息的主题。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src1.topic)`<br />- messageid:返回消息的消息ID。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src2.messageid)`                  |
 | meta         | meta(topic)                          | 返回指定键的元数据。 键可能是:<br/>-如果 from 子句中只有一个来源,则为独立键,例如`meta(device)`<br />-用于指定流的合格键,例如 `meta(src1.device)` <br />-用于多级元数据的带有箭头的键,例如 `meta(src1.reading->device->name)`。这里假定读取是地图结构元数据。 |
 | window_start | window_start()                       | 返回窗口的开始时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |

+ 23 - 0
etc/functions/internal.json

@@ -3388,6 +3388,29 @@
 			}
 		}
 	}, {
+		"name": "rule_id",
+		"example": "rule_id()",
+		"hint": {
+			"en_US": "Returns the ID of the currently matched rule",
+			"zh_CN": "返回当前匹配到的规则的ID。"
+		},
+		"args": [],
+		"return": {
+			"type": "string",
+			"hint": {
+				"en_US": "ID of currently matched rule",
+				"zh_CN": "当前匹配到的规则的ID"
+			}
+		},
+		"node": {
+			"category": "function",
+			"icon": "iconPath",
+			"label": {
+				"en_US": "Rule ID",
+				"zh_CN": "规则 ID"
+			}
+		}
+	}, {
 		"name": "meta",
 		"example": "meta(topic)",
 		"hint": {

+ 7 - 0
internal/binder/function/funcs_misc.go

@@ -476,6 +476,13 @@ func registerMiscFunc() {
 			return nil
 		},
 	}
+	builtins["rule_id"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			return ctx.GetRuleId(), true
+		},
+		val: ValidateNoArg,
+	}
 	builtins["meta"] = builtinFunc{
 		fType: ast.FuncTypeScalar,
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {

+ 12 - 1
internal/topo/operator/misc_func_test.go

@@ -245,11 +245,22 @@ func TestMiscFunc_Apply1(t *testing.T) {
 				"a": cast.TimeFromUnixMilli(1.62000273e+12),
 			}},
 		},
+		{
+			sql: "SELECT rule_id() AS rule_id FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{},
+			},
+			result: []map[string]interface{}{{
+				"rule_id": "rule0",
+			}},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	ctx = ctx.WithMeta("rule0", "op1", &state.MemoryStore{}).(*context.DefaultContext)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil || stmt == nil {
@@ -257,7 +268,7 @@ func TestMiscFunc_Apply1(t *testing.T) {
 		}
 		pp := &ProjectOp{}
 		parseStmt(pp, stmt.Fields)
-		fv, afv := xsql.NewFunctionValuersForOp(nil)
+		fv, afv := xsql.NewFunctionValuersForOp(ctx)
 		opResult := pp.Apply(ctx, tt.data, fv, afv)
 		result, err := parseResult(opResult, pp.IsAggregate)
 		if err != nil {

+ 16 - 1
internal/xsql/parser_test.go

@@ -1523,7 +1523,22 @@ func TestParser_ParseStatement(t *testing.T) {
 			},
 			err: "",
 		},
-
+		{
+			s: `select rule_id() as rule_id from demo`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr: &ast.Call{
+							Name: "rule_id",
+							Args: nil,
+						},
+						Name:  "rule_id",
+						AName: "rule_id"},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "demo"}},
+			},
+			err: "",
+		},
 		{
 			s:    "SELECT `half FROM tb",
 			stmt: nil,