Преглед на файлове

feat(topo): add context function to parse dynamic properties

Also refactor a global jsonpath func and reuse it in function call

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang преди 3 години
родител
ревизия
6ee1e31c76

+ 12 - 0
docs/en_US/extension/native/overview.md

@@ -52,4 +52,16 @@ In the plugin source code, developers can access the dependencies of file system
 
 ```go
 ctx.GetRootPath()
+```
+
+## Parse dynamic properties
+
+For customized sink plugins, users may still want to support [dynamic properties](../../rules/overview.md#dynamic-properties) like the built-in ones. 
+
+In the context object, a function `ParseDynamicProp` is provided to support the parsing of the dynamic property syntax. In the customized sink, developers can specify some properties to be dynamic according to the business logic. And in the plugin code, use this function to parse the user input in the collect function or elsewhere.
+
+```go
+// Parse the prop of jsonpath syntax against the current data.
+value, err := ctx.ParseDynamicProp(s.prop, data)
+// Use the parsed value for the following business logic.
 ```

+ 19 - 0
docs/en_US/rules/overview.md

@@ -166,3 +166,22 @@ eKuiper extends several functions that can be used in data template.
 
 - (deprecated)`json para1`: The `json` function is used for convert the map content to a JSON string. Use`toJson` from sprig instead.
 - (deprecated)`base64 para1`: The `base64` function is used for encoding parameter value to a base64 string. Convert the pramater to string type and use `b64enc` from sprig instead.
+
+### Dynamic properties
+
+In the sink, it is common to fetch a property value from the result data to achieve dynamic output. For example, to write data into a dynamic topic of mqtt. The dynamic properties will all follow the json path syntax. In below example, the sink topic is gotten from the selected topic by using jsonpath syntax.
+
+```json
+{
+  "id": "rule1",
+  "sql": "SELECT topic FROM demo",
+  "actions": [{
+    "mqtt": {
+      "sendSingle": true,
+      "topic": "$.topic"
+    }
+  }]
+}
+```
+
+In the above example, `sendSingle` property is used, so the sink data is a map by default. If not using `sendSingle`, you can get the topic by index with jsonapth `$[0].topic`.

+ 7 - 5
docs/zh_CN/extension/native/overview.md

@@ -67,10 +67,12 @@ func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionConte
 ctx.GetRootPath()
 ```
 
-## 外部函数扩展
+## 解析动态属性
 
-提供一种配置的方式,使得 eKuiper 可以使用 SQL 以函数的方式直接调用外部服务,包括各种 rpc 服务, http 服务等。该方式将可大提高 eKuiper 扩展的易用性。外部函数将作为插件系统的补充,仅在性能要求较高的情况下才建议使用插件
+在自定义的 sink 插件中,用户可能仍然想要像内置的 sink 一样支持[动态属性](../../rules/overview.md#动态属性)。 我们在 context 对象中提供了 `ParseDynamicProp` 方法使得开发者可以方便地解析动态属性并应用于插件中。开发组应当根据业务逻辑,设计那些属性支持动态值。然后在代码编写时,使用此方法解析用户传入的属性值
 
-以 getFeature 函数为例,假设有 AI 服务基于 grpc 提供getFeature 服务。则可在 eKuiper 配置之后,使用 `SELECT getFeature(self) from demo` 的方式,无需定制插件而调用该 AI 服务。
-
-详细配置方法,请参考[外部函数](../external/external_func.md)。
+```go
+// Parse the prop of jsonpath syntax against the current data.
+value, err := ctx.ParseDynamicProp(s.prop, data)
+// Use the parsed value for the following business logic.
+```

+ 18 - 0
docs/zh_CN/rules/overview.md

@@ -169,3 +169,21 @@ eKuiper 扩展了几个可以在模版中使用的函数。
 - (deprecated)`json para1`: `json` 函数用于将 map 内容转换为 JSON 字符串。本函数已弃用,建议使用 sprig 扩展的 `toJson` 函数。
 - (deprecated)`base64 para1`: `base64` 函数用于将参数值编码为 base64 字符串。本函数已弃用,建议将参数转换为 string 类型后,使用 sprig 扩展的 `b64enc` 函数。
 
+### 动态属性
+
+有些情况下,用户需要按照数据把结果发送到不同的目标中。例如,根据收到的数据,把计算结果发到不同的 mqtt 主题中。使用基于 jsonpath 格式的动态属性,可以实现这样的功能。在以下的例子中,目标的 topic 属性是一个 jsonpath 格式的字符串从而在运行时会将消息发送到动态的主题中。 
+
+```json
+{
+  "id": "rule1",
+  "sql": "SELECT topic FROM demo",
+  "actions": [{
+    "mqtt": {
+      "sendSingle": true,
+      "topic": "$.topic"
+    }
+  }]
+}
+```
+
+需要注意的是,上例中的 `sendSingle` 属性已设置。在默认情况下,目标接收到的是数组,使用的 jsonpath 需要采用 `$[0].topic`。

+ 3 - 68
internal/binder/function/funcs_misc.go

@@ -15,18 +15,15 @@
 package function
 
 import (
-	"context"
 	"crypto/md5"
 	"crypto/sha1"
 	"crypto/sha256"
 	"crypto/sha512"
 	b64 "encoding/base64"
-	"encoding/json"
 	"fmt"
-	"github.com/PaesslerAG/gval"
-	"github.com/PaesslerAG/jsonpath"
 	"github.com/google/uuid"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"hash"
 	"io"
@@ -259,34 +256,8 @@ func otherCall(name string, args []interface{}) (interface{}, bool) {
 	}
 }
 
-func jsonCall(name string, args []interface{}) (interface{}, bool) {
-	var input interface{}
-	at := reflect.TypeOf(args[0])
-	if at != nil {
-		switch at.Kind() {
-		case reflect.Map:
-			input = convertToInterfaceArr(args[0].(map[string]interface{}))
-		case reflect.Slice:
-			input = convertSlice(args[0])
-		case reflect.String:
-			v, _ := args[0].(string)
-			err := json.Unmarshal([]byte(v), &input)
-			if err != nil {
-				return fmt.Errorf("%s function error: the first argument '%v' is not a valid json string", name, args[0]), false
-			}
-		default:
-			return fmt.Errorf("%s function error: the first argument must be a map but got %v", name, args[0]), false
-		}
-	} else {
-		return fmt.Errorf("%s function error: the first argument must be a map but got nil", name), false
-	}
-
-	builder := gval.Full(jsonpath.PlaceholderExtension())
-	path, err := builder.NewEvaluable(args[1].(string))
-	if err != nil {
-		return fmt.Errorf("%s function error: %s", name, err), false
-	}
-	result, err := path(context.Background(), input)
+func jsonCall(ctx api.StreamContext, name string, args []interface{}) (interface{}, bool) {
+	result, err := ctx.ParseDynamicProp(args[1].(string), args[0])
 	if err != nil {
 		if name == "json_path_exists" {
 			return false, true
@@ -317,39 +288,3 @@ func jsonCall(name string, args []interface{}) (interface{}, bool) {
 	}
 	return fmt.Errorf("invalid function name: %s", name), false
 }
-
-func convertToInterfaceArr(orig map[string]interface{}) map[string]interface{} {
-	result := make(map[string]interface{})
-	for k, v := range orig {
-		vt := reflect.TypeOf(v)
-		if vt == nil {
-			result[k] = nil
-			continue
-		}
-		switch vt.Kind() {
-		case reflect.Slice:
-			result[k] = convertSlice(v)
-		case reflect.Map:
-			result[k] = convertToInterfaceArr(v.(map[string]interface{}))
-		default:
-			result[k] = v
-		}
-	}
-	return result
-}
-
-func convertSlice(v interface{}) []interface{} {
-	value := reflect.ValueOf(v)
-	tempArr := make([]interface{}, value.Len())
-	for i := 0; i < value.Len(); i++ {
-		item := value.Index(i)
-		if item.Kind() == reflect.Map {
-			tempArr[i] = convertToInterfaceArr(item.Interface().(map[string]interface{}))
-		} else if item.Kind() == reflect.Slice {
-			tempArr[i] = convertSlice(item.Interface())
-		} else {
-			tempArr[i] = item.Interface()
-		}
-	}
-	return tempArr
-}

+ 2 - 2
internal/binder/function/function.go

@@ -126,7 +126,7 @@ func (f *funcExecutor) Exec(_ []interface{}, _ api.FunctionContext) (interface{}
 	return fmt.Errorf("unknow name"), false
 }
 
-func (f *funcExecutor) ExecWithName(args []interface{}, _ api.FunctionContext, name string) (interface{}, bool) {
+func (f *funcExecutor) ExecWithName(args []interface{}, ctx api.FunctionContext, name string) (interface{}, bool) {
 	lowerName := strings.ToLower(name)
 	switch getFuncType(lowerName) {
 	case AggFunc:
@@ -140,7 +140,7 @@ func (f *funcExecutor) ExecWithName(args []interface{}, _ api.FunctionContext, n
 	case HashFunc:
 		return hashCall(lowerName, args)
 	case JsonFunc:
-		return jsonCall(lowerName, args)
+		return jsonCall(ctx, lowerName, args)
 	case OtherFunc:
 		return otherCall(lowerName, args)
 	}

+ 67 - 0
internal/conf/jsonpath_eval.go

@@ -0,0 +1,67 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package conf
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/PaesslerAG/gval"
+	"github.com/PaesslerAG/jsonpath"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"reflect"
+)
+
+var builder = gval.Full(jsonpath.PlaceholderExtension())
+
+type JsonPathEval interface {
+	Eval(data interface{}) (interface{}, error)
+}
+
+type gvalPathEval struct {
+	valuer gval.Evaluable
+}
+
+func (e *gvalPathEval) Eval(data interface{}) (interface{}, error) {
+	var input interface{}
+	at := reflect.TypeOf(data)
+	if at != nil {
+		switch at.Kind() {
+		case reflect.Map:
+			input = cast.ConvertToInterfaceArr(data.(map[string]interface{}))
+		case reflect.Slice:
+			input = cast.ConvertSlice(data)
+		case reflect.String:
+			v, _ := data.(string)
+			err := json.Unmarshal([]byte(v), &input)
+			if err != nil {
+				return nil, fmt.Errorf("data '%v' is not a valid json string", data)
+			}
+		default:
+			return nil, fmt.Errorf("invalid data %v for jsonpath", data)
+		}
+	} else {
+		return nil, fmt.Errorf("invalid data nil for jsonpath")
+	}
+	return e.valuer(context.Background(), input)
+}
+
+func GetJsonPathEval(jsonpath string) (JsonPathEval, error) {
+	e, err := builder.NewEvaluable(jsonpath)
+	if err != nil {
+		return nil, err
+	}
+	return &gvalPathEval{valuer: e}, nil
+}

+ 31 - 6
internal/topo/context/default.go

@@ -22,6 +22,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/sirupsen/logrus"
+	"strings"
 	"sync"
 	"time"
 )
@@ -38,6 +39,8 @@ type DefaultContext struct {
 	store    api.Store
 	state    *sync.Map
 	snapshot map[string]interface{}
+	// cache
+	jsonEvalReg sync.Map
 }
 
 func Background() *DefaultContext {
@@ -106,18 +109,40 @@ func (c *DefaultContext) SetError(err error) {
 	c.err = err
 }
 
+func (c *DefaultContext) ParseDynamicProp(prop string, data interface{}) (interface{}, error) {
+	// If not a json path, just return itself
+	if !strings.HasPrefix(prop, "$") {
+		return prop, nil
+	}
+	var (
+		je  conf.JsonPathEval
+		err error
+	)
+	if raw, ok := c.jsonEvalReg.Load(prop); ok {
+		je = raw.(conf.JsonPathEval)
+	} else {
+		je, err = conf.GetJsonPathEval(prop)
+		if err != nil {
+			return nil, err
+		}
+		c.jsonEvalReg.Store(prop, je)
+	}
+	return je.Eval(data)
+}
+
 func (c *DefaultContext) WithMeta(ruleId string, opId string, store api.Store) api.StreamContext {
 	s, err := store.GetOpState(opId)
 	if err != nil {
 		c.GetLogger().Warnf("Initialize context store error for %s: %s", opId, err)
 	}
 	return &DefaultContext{
-		ruleId:     ruleId,
-		opId:       opId,
-		instanceId: 0,
-		ctx:        c.ctx,
-		store:      store,
-		state:      s,
+		ruleId:      ruleId,
+		opId:        opId,
+		instanceId:  0,
+		ctx:         c.ctx,
+		store:       store,
+		state:       s,
+		jsonEvalReg: sync.Map{},
 	}
 }
 

+ 92 - 0
internal/topo/context/default_test.go

@@ -15,6 +15,7 @@
 package context
 
 import (
+	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
@@ -106,3 +107,94 @@ func cleanStateData() {
 		conf.Log.Error(err)
 	}
 }
+
+func TestDynamicProp(t *testing.T) {
+	var tests = []struct {
+		j string
+		v []interface{} // values
+		r []interface{} // parsed results
+	}{
+		{
+			j: "$.a",
+			v: []interface{}{
+				map[string]interface{}{
+					"a": 123,
+					"b": "dafds",
+				},
+				map[string]interface{}{
+					"a": "single",
+					"c": 20.2,
+				},
+				map[string]interface{}{
+					"b": "b",
+					"c": "c",
+				},
+			},
+			r: []interface{}{
+				123,
+				"single",
+				nil,
+			},
+		}, {
+			j: "$[0].a",
+			v: []interface{}{
+				[]map[string]interface{}{{
+					"a": 123,
+					"b": "dafds",
+				}},
+				[]map[string]interface{}{},
+				[]map[string]interface{}{
+					{
+						"a": "single",
+						"c": 20.2,
+					},
+					{
+						"b": "b",
+						"c": "c",
+					},
+				},
+			},
+			r: []interface{}{
+				123,
+				nil,
+				"single",
+			},
+		}, {
+			j: "a",
+			v: []interface{}{
+				map[string]interface{}{
+					"a": 123,
+					"b": "dafds",
+				},
+				map[string]interface{}{
+					"a": "single",
+					"c": 20.2,
+				},
+				map[string]interface{}{
+					"b": "b",
+					"c": "c",
+				},
+			},
+			r: []interface{}{
+				"a",
+				"a",
+				"a",
+			},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	ctx := Background().WithMeta("testStateRule", "op1", &state.MemoryStore{})
+	for i, tt := range tests {
+		var result []interface{}
+		for _, v := range tt.v {
+			prop, err := ctx.ParseDynamicProp(tt.j, v)
+			if err != nil {
+				fmt.Printf("%d:%s parse %v error\n", i, tt.j, v)
+			}
+			result = append(result, prop)
+		}
+		if !reflect.DeepEqual(tt.r, result) {
+			t.Errorf("%d. %s\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.j, tt.r, result)
+		}
+	}
+}

+ 1 - 1
internal/topo/operator/misc_func_test.go

@@ -833,7 +833,7 @@ func TestJsonPathFunc_Apply1(t *testing.T) {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
 		pp := &ProjectOp{Fields: stmt.Fields}
-		fv, afv := xsql.NewFunctionValuersForOp(nil)
+		fv, afv := xsql.NewFunctionValuersForOp(ctx)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		switch rt := result.(type) {
 		case []byte:

+ 5 - 1
pkg/api/stream.go

@@ -143,14 +143,18 @@ type StreamContext interface {
 	WithInstance(instanceId int) StreamContext
 	WithCancel() (StreamContext, context.CancelFunc)
 	SetError(e error)
-	//State handling
+	// State handling
 	IncrCounter(key string, amount int) error
 	GetCounter(key string) (int, error)
 	PutState(key string, value interface{}) error
 	GetState(key string) (interface{}, error)
 	DeleteState(key string) error
+	// Connection related methods
 	GetConnection(connectSelector string) (interface{}, error)
 	ReleaseConnection(connectSelector string)
+	// Properties processing, prop is a json path
+	ParseDynamicProp(prop string, data interface{}) (interface{}, error)
+	//TransformOutput(data []map[string]interface{}) interface{}
 }
 
 type Operator interface {

+ 36 - 0
pkg/cast/cast.go

@@ -1040,3 +1040,39 @@ func isIntegral64(val float64) bool {
 func isIntegral32(val float32) bool {
 	return val == float32(int(val))
 }
+
+func ConvertToInterfaceArr(orig map[string]interface{}) map[string]interface{} {
+	result := make(map[string]interface{})
+	for k, v := range orig {
+		vt := reflect.TypeOf(v)
+		if vt == nil {
+			result[k] = nil
+			continue
+		}
+		switch vt.Kind() {
+		case reflect.Slice:
+			result[k] = ConvertSlice(v)
+		case reflect.Map:
+			result[k] = ConvertToInterfaceArr(v.(map[string]interface{}))
+		default:
+			result[k] = v
+		}
+	}
+	return result
+}
+
+func ConvertSlice(v interface{}) []interface{} {
+	value := reflect.ValueOf(v)
+	tempArr := make([]interface{}, value.Len())
+	for i := 0; i < value.Len(); i++ {
+		item := value.Index(i)
+		if item.Kind() == reflect.Map {
+			tempArr[i] = ConvertToInterfaceArr(item.Interface().(map[string]interface{}))
+		} else if item.Kind() == reflect.Slice {
+			tempArr[i] = ConvertSlice(item.Interface())
+		} else {
+			tempArr[i] = item.Interface()
+		}
+	}
+	return tempArr
+}