فهرست منبع

feat(func): latest function

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 سال پیش
والد
کامیت
2b4507b9f9

+ 6 - 5
docs/en_US/sqls/built-in_functions.md

@@ -172,11 +172,12 @@ The syntax is like:
 AnalyticFuncName(<arguments>...) OVER ([PARTITION BY <partition key>])
 ```
 
-| Function     | Example                              | Description                                                                                                                                                                                                                                                                                                                                                                                       |
-|--------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| lag          | lag(expr, [offset], [default value]) | Return the former result of expression at offset, if not found, return the default value specified , if default value not set, return nil. if offset and default value not specified, offset is 1 and default value is nil                                                                                                                                                                        |
-| changed_col  | changed_col(true, col)               | Return the column value if it has changed from the last execution.                                                                                                                                                                                                                                                                                                                                |
-| had_changed  | had_changed(true, expr1, expr2, ...) | Return if any of the columns had changed since the last run. The expression could be * to easily detect the change status of all columns.                                                                                                                                                                                                                                                         |
+| Function    | Example                              | Description                                                                                                                                                                                                                |
+|-------------|--------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| lag         | lag(expr, [offset], [default value]) | Return the former result of expression at offset, if not found, return the default value specified , if default value not set, return nil. if offset and default value not specified, offset is 1 and default value is nil |
+| latest      | latest(expr, [default value])        | Return the lastest non null value of the expression. If not found, return the default value specified , if default value not set, return nil.                                                                              |
+| changed_col | changed_col(true, col)               | Return the column value if it has changed from the last execution.                                                                                                                                                         |
+| had_changed | had_changed(true, expr1, expr2, ...) | Return if any of the columns had changed since the last run. The expression could be * to easily detect the change status of all columns.                                                                                  |
 
 Example function call to get the previous temperature value:
 

+ 1 - 0
docs/zh_CN/sqls/built-in_functions.md

@@ -176,6 +176,7 @@ AnalyticFuncName(<arguments>...) OVER ([PARTITION BY <partition key>])
 | Function    | Example                              | Description                                                                                        |
 |-------------|--------------------------------------|----------------------------------------------------------------------------------------------------|
 | lag         | lag(expr, [offset], [default value]) | 返回表达式前一个值在偏移 offset 处的结果,如果没有找到,则返回默认值,如果没有指定默认值则返回 nil。如果除 expression 外其余参数均未指定,偏移量默认为 1,默认值为 nil |
+| latest      | latest(expr, [default value])        | 返回表达式最新的非空值。如果没有找到,则返回默认值。否则,返回 nil 。                                                              |
 | changed_col | changed_col(true, col)               | 返回列的相比上次执行后的变化值。若未变化则返回 null 。                                                                     |
 | had_changed | had_changed(true, expr1, expr2, ...) | 返回是否上次运行后列的值有变化。 其参数可以为 * 以方便地监测所有列。                                                               |
 

+ 36 - 0
internal/binder/function/funcs_analytic.go

@@ -174,4 +174,40 @@ func registerAnalyticFunc() {
 			return nil
 		},
 	}
+
+	builtins["latest"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			l := len(args) - 1
+			key := args[l].(string)
+			if l != 1 && l != 2 {
+				return fmt.Errorf("expect one or two args but got %d", l), false
+			}
+			if args[0] == nil {
+				v, err := ctx.GetState(key)
+				if err != nil {
+					return fmt.Errorf("error getting state for %s: %v", key, err), false
+				}
+				if v == nil {
+					if l == 2 {
+						return args[1], true
+					} else {
+						return nil, true
+					}
+				} else {
+					return v, true
+				}
+			} else {
+				ctx.PutState(key, args[0])
+				return args[0], true
+			}
+		},
+		val: func(_ api.FunctionContext, args []ast.Expr) error {
+			l := len(args)
+			if l != 1 && l != 2 {
+				return fmt.Errorf("expect one or two args but got %d", l)
+			}
+			return nil
+		},
+	}
 }

+ 121 - 0
internal/binder/function/funcs_analytic_test.go

@@ -816,3 +816,124 @@ func TestLagExecIndex(t *testing.T) {
 		}
 	}
 }
+
+func TestLatestExec(t *testing.T) {
+	f, ok := builtins["latest"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args:   []interface{}{"self"},
+			result: fmt.Errorf("expect one or two args but got 0"),
+		}, { // 1
+			args: []interface{}{
+				"foo",
+				"self",
+			},
+			result: "foo",
+		},
+		{ // 2
+			args: []interface{}{
+				nil,
+				"self",
+			},
+			result: "foo",
+		},
+		{ // 3
+			args: []interface{}{
+				"bar",
+				"self",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				nil,
+				"self",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				"self",
+			},
+			result: "foo",
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestLatestPartition(t *testing.T) {
+	f, ok := builtins["latest"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args:   []interface{}{"self"},
+			result: fmt.Errorf("expect one or two args but got 0"),
+		}, { // 1
+			args: []interface{}{
+				"foo",
+				"2",
+			},
+			result: "foo",
+		},
+		{ // 2
+			args: []interface{}{
+				nil,
+				"dd",
+				"1",
+			},
+			result: "dd",
+		},
+		{ // 3
+			args: []interface{}{
+				"bar",
+				"1",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				nil,
+				"2",
+			},
+			result: "foo",
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				"1",
+			},
+			result: "foo",
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}

+ 1 - 0
internal/binder/function/function.go

@@ -51,6 +51,7 @@ var analyticFuncs = map[string]struct{}{
 	"lag":         {},
 	"changed_col": {},
 	"had_changed": {},
+	"latest":      {},
 }
 
 const AnalyticPrefix = "$$a"