Przeglądaj źródła

feat(edgex): support dataTemplate

Closes #1285

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 lat temu
rodzic
commit
bb274be6f0

+ 3 - 2
docs/en_US/rules/sinks/builtin/edgex.md

@@ -39,6 +39,8 @@ Below optional configurations are supported, please check MQTT specification for
   - KeyPEMBlock
   - SkipCertVerify
 
+Notice that, the edgex action can support data templates to vary the result format, but the result of the data template must be in the object form of a JSON string, e.g. `"{\"key\":\"{{.key}}\"}"`. JSON strings in the form of arrays or non-JSON strings are not supported.
+
 ## Send to various targets
 
 By setting the combination of the properties, we can send the result to various EdgeX message bus settings.
@@ -270,5 +272,4 @@ Please notice that,
 - For the reading that can NOT be found in original message,  the metadata will not be set.  Such as metadata of `t1` in the sample will fill with default value that generated by eKuiper. 
 - If your SQL has aggregated function, then it does not make sense to keep these metadata, but eKuiper will still fill with metadata from a particular message in the time window. For example, with following SQL, 
 ```SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10)```. 
-In this case, there are possibly several messages in the window, the metadata value for `temperature` will be filled with value from 1st message that received from bus.
-
+In this case, there are possibly several messages in the window, the metadata value for `temperature` will be filled with value from 1st message that received from bus.

+ 2 - 0
docs/zh_CN/rules/sinks/builtin/edgex.md

@@ -39,6 +39,8 @@
   - KeyPEMBlock
   - SkipCertVerify
 
+EdgeX 动作可支持数据模板对结果格式进行变化,但是数据模板的结果必须为 JSON 字符串的 object 形式,例如 `"{\"key\":\"{{.key}}\"}"`。数组形式的 JSON 字符串或者非 JSON 字符串都不支持。
+
 ## 发送到各种目标
 
 通过设置不同的属性组合,我们可以将结果采用不同的格式发送到不同的 EdgeX 消息总线设置中。

+ 25 - 18
internal/topo/sink/edgex_sink.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -84,9 +84,6 @@ func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 	if c.Topic != "" && c.TopicPrefix != "" {
 		return fmt.Errorf("not allow to specify both topic and topicPrefix, please set one only")
 	}
-	if c.DataTemplate != "" {
-		conf.Log.Warn("edgex sink does not support dataTemplate, just ignore")
-	}
 	ems.c = c
 	ems.config = ps
 
@@ -120,7 +117,29 @@ func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 	return nil
 }
 
-func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, m []map[string]interface{}) (*dtos.Event, error) {
+func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, item interface{}) (*dtos.Event, error) {
+	if ems.c.DataTemplate != "" {
+		jsonBytes, _, err := ctx.TransformOutput(item)
+		if err != nil {
+			return nil, err
+		}
+		tm := make(map[string]interface{})
+		err = json.Unmarshal(jsonBytes, &tm)
+		if err != nil {
+			return nil, fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
+		}
+		item = tm
+	}
+	var m []map[string]interface{}
+	switch payload := item.(type) {
+	case map[string]interface{}:
+		m = []map[string]interface{}{payload}
+	case []map[string]interface{}:
+		m = payload
+	default:
+		// impossible
+		return nil, fmt.Errorf("receive invalid data %v", item)
+	}
 	m1 := ems.getMeta(m)
 	event := m1.createEvent()
 	//Override the devicename if user specified the value
@@ -460,19 +479,7 @@ func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) *meta {
 
 func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
-	var (
-		evt *dtos.Event
-		err error
-	)
-	switch payload := item.(type) {
-	case map[string]interface{}:
-		evt, err = ems.produceEvents(ctx, []map[string]interface{}{payload})
-	case []map[string]interface{}:
-		evt, err = ems.produceEvents(ctx, payload)
-	default:
-		// impossible
-		return fmt.Errorf("receive invalid data %v", item)
-	}
+	evt, err := ems.produceEvents(ctx, item)
 	if err != nil {
 		return fmt.Errorf("Failed to convert to EdgeX event: %s.", err.Error())
 	}

+ 114 - 3
internal/topo/sink/edgex_sink_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"reflect"
 	"testing"
 )
@@ -262,8 +264,7 @@ func TestProduceEvents(t1 *testing.T) {
 
 		{ // 2
 			input: `[
-						{"meta": 50},
-						{"h1":100}
+						{"meta": 50,"h1":100}
 					]`,
 			conf: map[string]interface{}{
 				"sourceName": "demo",
@@ -488,3 +489,113 @@ func TestProduceEvents(t1 *testing.T) {
 		}
 	}
 }
+
+func TestEdgeXTemplate_Apply(t1 *testing.T) {
+	var tests = []struct {
+		input    string
+		conf     map[string]interface{}
+		expected *dtos.Event
+		error    string
+	}{
+		{ // 0
+			input: `[{"meta":{
+							"correlationid":"","deviceName":"demo","id":"","origin":3,
+							"humidity":{"deviceName":"test device name1","id":"12","origin":14,"valueType":"Int64"},
+							"temperature":{"deviceName":"test device name2","id":"22","origin":24}
+							},
+						"humidity":100,
+						"temperature":50}
+					]`,
+			conf: map[string]interface{}{
+				"metadata": "meta",
+
+				"dataTemplate": `{"wrapper":"w1","ab":"{{.humidity}}"}`,
+			},
+			expected: &dtos.Event{
+				Id:          "",
+				DeviceName:  "ekuiper",
+				ProfileName: "ekuiperProfile",
+				SourceName:  "ruleTest",
+				Origin:      0,
+				Readings: []dtos.BaseReading{
+					{
+						ResourceName:  "wrapper",
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
+						Id:            "",
+						Origin:        0,
+						ValueType:     v2.ValueTypeString,
+						SimpleReading: dtos.SimpleReading{Value: "w1"},
+					},
+					{
+						ResourceName:  "ab",
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
+						Id:            "",
+						Origin:        0,
+						ValueType:     v2.ValueTypeString,
+						SimpleReading: dtos.SimpleReading{Value: "100"},
+					},
+				},
+			},
+			error: "",
+		}, {
+			input: `[{"json":"{\"a\":24,\"b\":\"c\"}"}]`,
+			conf: map[string]interface{}{
+				"dataTemplate": `{{.json}}`,
+			},
+			expected: &dtos.Event{
+				Id:          "",
+				DeviceName:  "ekuiper",
+				ProfileName: "ekuiperProfile",
+				SourceName:  "ruleTest",
+				Origin:      0,
+				Readings: []dtos.BaseReading{
+					{
+						ResourceName:  "a",
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
+						Id:            "",
+						Origin:        0,
+						ValueType:     v2.ValueTypeFloat64,
+						SimpleReading: dtos.SimpleReading{Value: "2.400000e+01"},
+					},
+					{
+						ResourceName:  "b",
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
+						Id:            "",
+						Origin:        0,
+						ValueType:     v2.ValueTypeString,
+						SimpleReading: dtos.SimpleReading{Value: "c"},
+					},
+				},
+			},
+			error: "",
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, t := range tests {
+		ems := EdgexMsgBusSink{}
+		err := ems.Configure(t.conf)
+		if err != nil {
+			t1.Errorf("%d: configure error %v", i, err)
+			continue
+		}
+		if ems.c.SourceName == "" {
+			ems.c.SourceName = "ruleTest"
+		}
+		var payload []map[string]interface{}
+		json.Unmarshal([]byte(t.input), &payload)
+		dt := t.conf["dataTemplate"]
+		tf, _ := transform.GenTransform(cast.ToStringAlways(dt))
+		vCtx := context.WithValue(ctx, context.TransKey, tf)
+		result, err := ems.produceEvents(vCtx, payload[0])
+		if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
+			t1.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, t.input, t.error, err)
+		} else if t.error == "" && !compareEvent(t.expected, result) {
+			t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.input, t.expected, result)
+		}
+	}
+}