Преглед на файлове

refactor(edgex): edgex sink to deal with map directly

- Explicitly warn un-support of data template
- Remove json unmarshal from the sink node

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Signed-off-by: Jiyong <huangjy@emqx.io>
Jiyong Huang преди 3 години
родител
ревизия
8d5c4c32fe
променени са 2 файла, в които са добавени 100 реда и са изтрити 90 реда
  1. 96 89
      internal/topo/sink/edgex_sink.go
  2. 4 1
      internal/topo/sink/edgex_sink_test.go

+ 96 - 89
internal/topo/sink/edgex_sink.go

@@ -55,6 +55,7 @@ type EdgexConf struct {
 	Metadata           string            `json:"metadata"`
 	Optional           map[string]string `json:"optional"`
 	ConnectionSelector string            `json:"connectionSelector"`
+	DataTemplate       string            `json:"dataTemplate"`
 }
 
 type EdgexMsgBusSink struct {
@@ -100,7 +101,6 @@ func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 			return fmt.Errorf("specified wrong contentType value %s: only 'application/json' is supported if messageType is event", c.ContentType)
 		}
 	} else {
-
 		if c.Host == "" {
 			c.Host = defaultServer
 		}
@@ -133,6 +133,9 @@ func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 	if c.Topic != "" && c.TopicPrefix != "" {
 		return fmt.Errorf("not allow to specify both topic and topicPrefix, please set one only")
 	}
+	if c.DataTemplate != "" {
+		conf.Log.Warn("edgex sink does not support dataTemplate, just ignore")
+	}
 
 	ems.c = c
 	ems.conf = &types.MessageBusConfig{
@@ -195,65 +198,61 @@ func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 	return nil
 }
 
-func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, result []byte) (*dtos.Event, error) {
-	var m []map[string]interface{}
-	if err := json.Unmarshal(result, &m); err == nil {
-		m1 := ems.getMeta(m)
-		event := m1.createEvent()
-		//Override the devicename if user specified the value
-		if event.DeviceName == "" {
-			event.DeviceName = ems.c.DeviceName
-		}
-		if event.ProfileName == "" {
-			event.ProfileName = ems.c.ProfileName
-		}
-		if event.SourceName == "" {
-			event.SourceName = ems.c.SourceName
-		}
-		for _, v := range m {
-			for k1, v1 := range v {
-				// Ignore nil values
-				if k1 == ems.c.Metadata || v1 == nil {
-					continue
+func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, m []map[string]interface{}) (*dtos.Event, error) {
+	m1 := ems.getMeta(m)
+	event := m1.createEvent()
+	//Override the devicename if user specified the value
+	if event.DeviceName == "" {
+		event.DeviceName = ems.c.DeviceName
+	}
+	if event.ProfileName == "" {
+		event.ProfileName = ems.c.ProfileName
+	}
+	if event.SourceName == "" {
+		event.SourceName = ems.c.SourceName
+	}
+	for _, v := range m {
+		for k1, v1 := range v {
+			// Ignore nil values
+			if k1 == ems.c.Metadata || v1 == nil {
+				continue
+			} else {
+				var (
+					vt  string
+					vv  interface{}
+					err error
+				)
+				mm1 := m1.readingMeta(ctx, k1)
+				if mm1 != nil && mm1.valueType != nil {
+					vt = *mm1.valueType
+					vv, err = getValueByType(v1, vt)
 				} else {
-					var (
-						vt string
-						vv interface{}
-					)
-					mm1 := m1.readingMeta(ctx, k1)
-					if mm1 != nil && mm1.valueType != nil {
-						vt = *mm1.valueType
-						vv, err = getValueByType(v1, vt)
-					} else {
-						vt, vv, err = getValueType(v1)
-					}
-					if err != nil {
-						ctx.GetLogger().Errorf("%v", err)
-						continue
-					}
-					switch vt {
-					case v2.ValueTypeBinary:
-						// default media type
-						event.AddBinaryReading(k1, vv.([]byte), "application/text")
-					default:
-						err = event.AddSimpleReading(k1, vt, vv)
-					}
+					vt, vv, err = getValueType(v1)
+				}
+				if err != nil {
+					ctx.GetLogger().Errorf("%v", err)
+					continue
+				}
+				switch vt {
+				case v2.ValueTypeBinary:
+					// default media type
+					event.AddBinaryReading(k1, vv.([]byte), "application/text")
+				default:
+					err = event.AddSimpleReading(k1, vt, vv)
+				}
 
-					if err != nil {
-						ctx.GetLogger().Errorf("%v", err)
-						continue
-					}
-					r := event.Readings[len(event.Readings)-1]
-					if mm1 != nil {
-						event.Readings[len(event.Readings)-1] = mm1.decorate(&r)
-					}
+				if err != nil {
+					ctx.GetLogger().Errorf("%v", err)
+					continue
+				}
+				r := event.Readings[len(event.Readings)-1]
+				if mm1 != nil {
+					event.Readings[len(event.Readings)-1] = mm1.decorate(&r)
 				}
 			}
 		}
-		return event, nil
-	} else {
-		return nil, err
 	}
+	return event, nil
 }
 
 func getValueType(v interface{}) (string, interface{}, error) {
@@ -270,7 +269,7 @@ func getValueType(v interface{}) (string, interface{}, error) {
 	case reflect.Int64:
 		return v2.ValueTypeInt64, v, nil
 	case reflect.Int:
-		return v2.ValueTypeInt64, v, nil
+		return v2.ValueTypeInt64, int64(v.(int)), nil
 	case reflect.Float64:
 		return v2.ValueTypeFloat64, v, nil
 	case reflect.Slice:
@@ -437,47 +436,55 @@ func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) *meta {
 	return newMetaFromMap(nil)
 }
 
-func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, _ interface{}) error {
+func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
-	if payload, _, err := ctx.TransformOutput(); err == nil {
-		logger.Debugf("EdgeX message bus sink: %s\n", payload)
-		evt, err := ems.produceEvents(ctx, payload)
+	var (
+		evt *dtos.Event
+		err error
+	)
+	switch payload := item.(type) {
+	case map[string]interface{}:
+		evt, err = ems.produceEvents(ctx, []map[string]interface{}{payload})
+	case []map[string]interface{}:
+		evt, err = ems.produceEvents(ctx, payload)
+	default:
+		// impossible
+		return fmt.Errorf("receive invalid data %v", item)
+	}
+	if err != nil {
+		return fmt.Errorf("Failed to convert to EdgeX event: %s.", err.Error())
+	}
+	var (
+		data  []byte
+		topic string
+	)
+	if ems.c.MessageType == MessageTypeRequest {
+		req := requests.NewAddEventRequest(*evt)
+		data, _, err = req.Encode()
 		if err != nil {
-			return fmt.Errorf("Failed to convert to EdgeX event: %s.", err.Error())
-		}
-		var (
-			data  []byte
-			topic string
-		)
-		if ems.c.MessageType == MessageTypeRequest {
-			req := requests.NewAddEventRequest(*evt)
-			data, _, err = req.Encode()
-			if err != nil {
-				return fmt.Errorf("unexpected error encode event %v", err)
-			}
-		} else {
-			data, err = json.Marshal(evt)
-			if err != nil {
-				return fmt.Errorf("unexpected error MarshalEvent %v", err)
-			}
+			return fmt.Errorf("unexpected error encode event %v", err)
 		}
-		env := types.NewMessageEnvelope(data, ctx)
-		env.ContentType = ems.c.ContentType
-
-		if ems.topic == "" { // dynamic topic
-			topic = fmt.Sprintf("%s/%s/%s/%s", ems.c.TopicPrefix, evt.ProfileName, evt.DeviceName, evt.SourceName)
-		} else {
-			topic = ems.topic
+	} else {
+		data, err = json.Marshal(evt)
+		if err != nil {
+			return fmt.Errorf("unexpected error MarshalEvent %v", err)
 		}
+	}
+	env := types.NewMessageEnvelope(data, ctx)
+	env.ContentType = ems.c.ContentType
 
-		if e := ems.client.Publish(env, topic); e != nil {
-			logger.Errorf("%s: found error %s when publish to EdgeX message bus.\n", e)
-			return fmt.Errorf("%s:%s", errorx.IOErr, e.Error())
-		}
-		logger.Debugf("Published %+v to EdgeX message bus topic %s", evt, topic)
+	if ems.topic == "" { // dynamic topic
+		topic = fmt.Sprintf("%s/%s/%s/%s", ems.c.TopicPrefix, evt.ProfileName, evt.DeviceName, evt.SourceName)
 	} else {
-		return fmt.Errorf("Unkown type of data %v, the message cannot be published.\n", err)
+		topic = ems.topic
 	}
+
+	if e := ems.client.Publish(env, topic); e != nil {
+		logger.Errorf("%s: found error %s when publish to EdgeX message bus.\n", e)
+		return fmt.Errorf("%s:%s", errorx.IOErr, e.Error())
+	}
+	logger.Debugf("Published %+v to EdgeX message bus topic %s", evt, topic)
+
 	return nil
 }
 

+ 4 - 1
internal/topo/sink/edgex_sink_test.go

@@ -17,6 +17,7 @@
 package sink
 
 import (
+	"encoding/json"
 	"fmt"
 	v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
 	"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
@@ -460,7 +461,9 @@ func TestProduceEvents(t1 *testing.T) {
 		if ems.c.SourceName == "" {
 			ems.c.SourceName = "ruleTest"
 		}
-		result, err := ems.produceEvents(ctx, []byte(t.input))
+		var payload []map[string]interface{}
+		json.Unmarshal([]byte(t.input), &payload)
+		result, err := ems.produceEvents(ctx, payload)
 		if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
 			t1.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, t.input, t.error, err)
 		} else if t.error == "" && !compareEvent(t.expected, result) {