Browse Source

fix(edgex): add messageType for sink

- Add messageType for edgeX sink to support send message to message bus like a device
- Change the default configuration to match edgex v2

Closes: #947

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 years ago
parent
commit
79c955315b

+ 107 - 105
internal/topo/sink/edgex_sink.go

@@ -22,6 +22,7 @@ import (
 	"fmt"
 	v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
 	"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
+	"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/requests"
 	"github.com/edgexfoundry/go-mod-messaging/v2/messaging"
 	"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
 	"github.com/lf-edge/ekuiper/internal/conf"
@@ -30,120 +31,92 @@ import (
 	"reflect"
 )
 
-type EdgexMsgBusSink struct {
-	protocol string
-	host     string
-	port     int
-	ptype    string
+type messageType string
 
-	topic       string
-	contentType string
+const (
+	MessageTypeEvent   messageType = "event"
+	MessageTypeRequest messageType = "request"
+)
 
-	deviceName  string
-	profileName string
-	sourceName  string
-	metadata    string
+type EdgexConf struct {
+	Protocol    string            `json:"protocol"`
+	Host        string            `json:"host"`
+	Port        int               `json:"port"`
+	Topic       string            `json:"topic"`
+	TopicPrefix string            `json:"topicPrefix"`
+	Type        string            `json:"type"`
+	MessageType messageType       `json:"messageType"`
+	ContentType string            `json:"contentType"`
+	DeviceName  string            `json:"deviceName"`
+	ProfileName string            `json:"profileName"`
+	SourceName  string            `json:"sourceName"`
+	Metadata    string            `json:"metadata"`
+	Optional    map[string]string `json:"optional"`
+}
 
-	optional map[string]string
-	client   messaging.MessageClient
+type EdgexMsgBusSink struct {
+	c *EdgexConf
+
+	topic  string
+	conf   *types.MessageBusConfig
+	client messaging.MessageClient
 }
 
 func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
-	ems.host = "*"
-	ems.protocol = "tcp"
-	ems.port = 5573
-	ems.topic = "events"
-	ems.contentType = "application/json"
-	ems.ptype = messaging.ZeroMQ
-
-	if host, ok := ps["host"]; ok {
-		ems.host = host.(string)
-	} else {
-		conf.Log.Infof("Not find host conf, will use default value '*'.")
-	}
-
-	if pro, ok := ps["protocol"]; ok {
-		ems.protocol = pro.(string)
-	} else {
-		conf.Log.Infof("Not find protocol conf, will use default value 'tcp'.")
+	c := &EdgexConf{
+		Protocol:    "redis",
+		Host:        "localhost",
+		Port:        6379,
+		Type:        messaging.Redis,
+		MessageType: MessageTypeEvent,
+		ContentType: "application/json",
+		DeviceName:  "ekuiper",
+		ProfileName: "ekuiperProfile",
+		// SourceName is set in open as the rule id
 	}
-
-	if port, ok := ps["port"]; ok {
-		if pv, ok := port.(float64); ok {
-			ems.port = int(pv)
-		} else if pv, ok := port.(float32); ok {
-			ems.port = int(pv)
-		} else {
-			conf.Log.Infof("Not valid port value, will use default value '5563'.")
-		}
-
-	} else {
-		conf.Log.Infof("Not find port conf, will use default value '5563'.")
+	err := cast.MapToStruct(ps, c)
+	if err != nil {
+		return fmt.Errorf("read properties %v fail with error: %v", ps, err)
 	}
 
-	if topic, ok := ps["topic"]; ok {
-		ems.topic = topic.(string)
-	} else {
-		conf.Log.Infof("Not find topic conf, will use default value 'events'.")
+	if c.Port < 0 {
+		return fmt.Errorf("specified wrong port value, expect positive integer but got %d", c.Port)
 	}
 
-	if contentType, ok := ps["contentType"]; ok {
-		ems.contentType = contentType.(string)
-	} else {
-		conf.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
+	if c.Type != messaging.ZeroMQ && c.Type != messaging.MQTT && c.Type != messaging.Redis {
+		return fmt.Errorf("specified wrong type value %s", c.Type)
 	}
 
-	if ptype, ok := ps["type"]; ok {
-		ems.ptype = ptype.(string)
-		if ems.ptype != messaging.ZeroMQ && ems.ptype != messaging.MQTT && ems.ptype != messaging.Redis {
-			conf.Log.Infof("Specified wrong message type value %s, will use zeromq messagebus.\n", ems.ptype)
-			ems.ptype = messaging.ZeroMQ
-		}
+	if c.MessageType != MessageTypeEvent && c.MessageType != MessageTypeRequest {
+		return fmt.Errorf("specified wrong messageType value %s", c.MessageType)
 	}
 
-	if dname, ok := ps["deviceName"]; ok {
-		ems.deviceName = dname.(string)
+	if c.MessageType == MessageTypeEvent && c.ContentType != "application/json" {
+		return fmt.Errorf("specified wrong contentType value %s: only 'application/json' is supported if messageType is event", c.ContentType)
 	}
 
-	if pname, ok := ps["profileName"]; ok {
-		ems.profileName = pname.(string)
+	if c.Topic != "" && c.TopicPrefix != "" {
+		return fmt.Errorf("not allow to specify both topic and topicPrefix, please set one only")
 	}
 
-	if metadata, ok := ps["metadata"]; ok {
-		ems.metadata = metadata.(string)
+	ems.c = c
+	ems.conf = &types.MessageBusConfig{
+		PublishHost: types.HostInfo{
+			Host:     c.Host,
+			Port:     c.Port,
+			Protocol: c.Protocol,
+		},
+		Type:     c.Type,
+		Optional: c.Optional,
 	}
 
-	if optIntf, ok := ps["optional"]; ok {
-		if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
-			optional := make(map[string]string)
-			for k, v := range opt {
-				if sv, ok2 := v.(string); ok2 {
-					optional[k] = sv
-				} else {
-					info := fmt.Sprintf("Only string value is allowed for optional value, the value for key %s is not a string.", k)
-					conf.Log.Infof(info)
-					return fmt.Errorf(info)
-				}
-			}
-			ems.optional = optional
-		}
-	}
 	return nil
 }
 
 func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 	log := ctx.GetLogger()
-	conf := types.MessageBusConfig{
-		PublishHost: types.HostInfo{
-			Host:     ems.host,
-			Port:     ems.port,
-			Protocol: ems.protocol,
-		},
-		Type:     ems.ptype,
-		Optional: ems.optional,
-	}
-	log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
-	if msgClient, err := messaging.NewMessageClient(conf); err != nil {
+	log.Infof("Using configuration for EdgeX message bus sink: %+v", ems.c)
+	if msgClient, err := messaging.NewMessageClient(*ems.conf); err != nil {
 		return err
 	} else {
 		if ec := msgClient.Connect(); ec != nil {
@@ -152,6 +125,19 @@ func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 			ems.client = msgClient
 		}
 	}
+	if ems.c.SourceName == "" {
+		ems.c.SourceName = ctx.GetRuleId()
+	}
+
+	if ems.c.Topic == "" && ems.c.TopicPrefix == "" {
+		ems.topic = "application"
+	} else if ems.c.Topic != "" {
+		ems.topic = ems.c.Topic
+	} else if ems.c.Metadata == "" { // If meta data are static, the "dynamic" topic is static
+		ems.topic = fmt.Sprintf("%s/%s/%s/%s", ems.c.TopicPrefix, ems.c.DeviceName, ems.c.ProfileName, ems.c.SourceName)
+	} else {
+		ems.topic = "" // calculate dynamically
+	}
 	return nil
 }
 
@@ -161,19 +147,19 @@ func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, result []byte)
 		m1 := ems.getMeta(m)
 		event := m1.createEvent()
 		//Override the devicename if user specified the value
-		if ems.deviceName != "" {
-			event.DeviceName = ems.deviceName
+		if event.DeviceName == "" {
+			event.DeviceName = ems.c.DeviceName
 		}
-		if ems.profileName != "" {
-			event.ProfileName = ems.profileName
+		if event.ProfileName == "" {
+			event.ProfileName = ems.c.ProfileName
 		}
 		if event.SourceName == "" {
-			event.SourceName = ems.topic
+			event.SourceName = ems.c.SourceName
 		}
 		for _, v := range m {
 			for k1, v1 := range v {
 				// Ignore nil values
-				if k1 == ems.metadata || v1 == nil {
+				if k1 == ems.c.Metadata || v1 == nil {
 					continue
 				} else {
 					var (
@@ -381,12 +367,12 @@ func getValueByType(v interface{}, vt string) (interface{}, error) {
 }
 
 func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) *meta {
-	if ems.metadata == "" {
+	if ems.c.Metadata == "" {
 		return newMetaFromMap(nil)
 	}
 	//Try to get the meta field
 	for _, v := range result {
-		if m, ok := v[ems.metadata]; ok {
+		if m, ok := v[ems.c.Metadata]; ok {
 			if m1, ok1 := m.(map[string]interface{}); ok1 {
 				return newMetaFromMap(m1)
 			} else {
@@ -405,14 +391,32 @@ func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) err
 		if err != nil {
 			return fmt.Errorf("Failed to convert to EdgeX event: %s.", err.Error())
 		}
-		data, err := json.Marshal(evt)
-		if err != nil {
-			return fmt.Errorf("unexpected error MarshalEvent %v", err)
+		var (
+			data  []byte
+			topic string
+		)
+		if ems.c.MessageType == MessageTypeRequest {
+			req := requests.NewAddEventRequest(*evt)
+			data, _, err = req.Encode()
+			if err != nil {
+				return fmt.Errorf("unexpected error encode event %v", err)
+			}
+		} else {
+			data, err = json.Marshal(evt)
+			if err != nil {
+				return fmt.Errorf("unexpected error MarshalEvent %v", err)
+			}
 		}
-		env := types.NewMessageEnvelope([]byte(data), ctx)
-		env.ContentType = ems.contentType
+		env := types.NewMessageEnvelope(data, ctx)
+		env.ContentType = ems.c.ContentType
 
-		if e := ems.client.Publish(env, ems.topic); e != nil {
+		if ems.topic == "" { // dynamic topic
+			topic = fmt.Sprintf("%s/%s/%s/%s", ems.c.TopicPrefix, evt.DeviceName, evt.ProfileName, evt.SourceName)
+		} else {
+			topic = ems.topic
+		}
+
+		if e := ems.client.Publish(env, topic); e != nil {
 			logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
 			return e
 		}
@@ -481,9 +485,7 @@ type meta struct {
 
 func newMetaFromMap(m1 map[string]interface{}) *meta {
 	result := &meta{
-		eventMeta: eventMeta{
-			profileName: "kuiperProfile",
-		},
+		eventMeta: eventMeta{},
 	}
 	for k, v := range m1 {
 		switch k {

+ 239 - 32
internal/topo/sink/edgex_sink_test.go

@@ -20,6 +20,7 @@ import (
 	"fmt"
 	v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
 	"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
+	"github.com/edgexfoundry/go-mod-messaging/v2/messaging"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
@@ -51,16 +52,180 @@ func compareReading(expected, actual dtos.BaseReading) bool {
 	return false
 }
 
+func TestConfigure(t *testing.T) {
+	var tests = []struct {
+		conf     map[string]interface{}
+		expected *EdgexConf
+		error    string
+	}{
+		{ // 0
+			conf: map[string]interface{}{
+				"metadata": "meta",
+			},
+			expected: &EdgexConf{
+				Protocol:    "redis",
+				Host:        "localhost",
+				Port:        6379,
+				Type:        messaging.Redis,
+				MessageType: MessageTypeEvent,
+				ContentType: "application/json",
+				DeviceName:  "ekuiper",
+				ProfileName: "ekuiperProfile",
+				Metadata:    "meta",
+			},
+		},
+		{ // 1
+			conf: map[string]interface{}{
+				"type":        "redis",
+				"protocol":    "redis",
+				"host":        "edgex-redis",
+				"port":        6379,
+				"topic":       "ekuiperResult",
+				"deviceName":  "ekuiper",
+				"profileName": "ekuiper",
+				"sourceName":  "ekuiper",
+				"contentType": "application/json",
+			},
+			expected: &EdgexConf{
+				Protocol:    "redis",
+				Host:        "edgex-redis",
+				Port:        6379,
+				Type:        messaging.Redis,
+				MessageType: MessageTypeEvent,
+				ContentType: "application/json",
+				DeviceName:  "ekuiper",
+				ProfileName: "ekuiper",
+				SourceName:  "ekuiper",
+				Topic:       "ekuiperResult",
+			},
+		},
+		{ // 2
+			conf: map[string]interface{}{
+				"protocol":    "tcp",
+				"host":        "127.0.0.1",
+				"port":        1883,
+				"topic":       "result",
+				"type":        "mqtt",
+				"metadata":    "edgex_meta",
+				"contentType": "application/json",
+				"optional": map[string]interface{}{
+					"ClientId": "edgex_message_bus_001",
+				},
+			},
+			expected: &EdgexConf{
+				Protocol:    "tcp",
+				Host:        "127.0.0.1",
+				Port:        1883,
+				Type:        messaging.MQTT,
+				MessageType: MessageTypeEvent,
+				ContentType: "application/json",
+				DeviceName:  "ekuiper",
+				ProfileName: "ekuiperProfile",
+				SourceName:  "",
+				Metadata:    "edgex_meta",
+				Topic:       "result",
+				Optional: map[string]string{
+					"ClientId": "edgex_message_bus_001",
+				},
+			},
+		}, { // 3
+			conf: map[string]interface{}{
+				"type":        "redis",
+				"protocol":    "redis",
+				"host":        "edgex-redis",
+				"port":        6379,
+				"topicPrefix": "edgex/events/device",
+				"messageType": "request",
+				"contentType": "application/json",
+			},
+			expected: &EdgexConf{
+				Protocol:    "redis",
+				Host:        "edgex-redis",
+				Port:        6379,
+				Type:        messaging.Redis,
+				MessageType: MessageTypeRequest,
+				ContentType: "application/json",
+				DeviceName:  "ekuiper",
+				ProfileName: "ekuiperProfile",
+				SourceName:  "",
+				TopicPrefix: "edgex/events/device",
+			},
+		}, { // 4
+			conf: map[string]interface{}{
+				"type":        "redis",
+				"protocol":    "redis",
+				"host":        "edgex-redis",
+				"port":        6379,
+				"topicPrefix": "edgex/events/device",
+				"messageType": "requests",
+				"contentType": "application/json",
+			},
+			error: "specified wrong messageType value requests",
+		}, { // 5
+			conf: map[string]interface{}{
+				"type":        20,
+				"protocol":    "redis",
+				"host":        "edgex-redis",
+				"port":        6379,
+				"topicPrefix": "edgex/events/device",
+				"messageType": "requests",
+				"contentType": "application/json",
+			},
+			error: "read properties map[contentType:application/json host:edgex-redis messageType:requests port:6379 protocol:redis topicPrefix:edgex/events/device type:20] fail with error: json: cannot unmarshal number into Go struct field EdgexConf.type of type string",
+		}, { // 6
+			conf: map[string]interface{}{
+				"type":        "redis",
+				"protocol":    "redis",
+				"host":        "edgex-redis",
+				"port":        -1,
+				"topicPrefix": "edgex/events/device",
+				"messageType": "requests",
+				"contentType": "application/json",
+			},
+			error: "specified wrong port value, expect positive integer but got -1",
+		}, { // 7
+			conf: map[string]interface{}{
+				"type":        "zmq",
+				"protocol":    "redis",
+				"host":        "edgex-redis",
+				"port":        6379,
+				"topicPrefix": "edgex/events/device",
+				"messageType": "requests",
+				"contentType": "application/json",
+			},
+			error: "specified wrong type value zmq",
+		}, { // 8
+			conf: map[string]interface{}{
+				"protocol":    "redis",
+				"host":        "edgex-redis",
+				"port":        6379,
+				"topicPrefix": "edgex/events/device",
+				"topic":       "requests",
+				"contentType": "application/json",
+			},
+			error: "not allow to specify both topic and topicPrefix, please set one only",
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, test := range tests {
+		ems := EdgexMsgBusSink{}
+		err := ems.Configure(test.conf)
+		if !reflect.DeepEqual(test.error, testx.Errstring(err)) {
+			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, test.error, err)
+		} else if test.error == "" && !reflect.DeepEqual(test.expected, ems.c) {
+			t.Errorf("%d\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, test.expected, ems.c)
+		}
+	}
+}
+
 func TestProduceEvents(t1 *testing.T) {
 	var tests = []struct {
-		input       string
-		deviceName  string
-		profileName string
-		topic       string
-		expected    *dtos.Event
-		error       string
+		input    string
+		conf     map[string]interface{}
+		expected *dtos.Event
+		error    string
 	}{
-		{
+		{ // 0
 			input: `[
 						{"meta":{
 							"correlationid":"","deviceName":"demo","id":"","origin":3,
@@ -71,16 +236,20 @@ func TestProduceEvents(t1 *testing.T) {
 						{"humidity":100},
 						{"temperature":50}
 					]`,
+			conf: map[string]interface{}{
+				"metadata": "meta",
+			},
 			expected: &dtos.Event{
 				Id:          "",
 				DeviceName:  "demo",
-				ProfileName: "kuiperProfile",
+				ProfileName: "ekuiperProfile",
+				SourceName:  "ruleTest",
 				Origin:      3,
 				Readings: []dtos.BaseReading{
 					{
 						ResourceName:  "humidity",
 						DeviceName:    "test device name1",
-						ProfileName:   "kuiperProfile",
+						ProfileName:   "ekuiperProfile",
 						Id:            "12",
 						Origin:        14,
 						ValueType:     v2.ValueTypeInt64,
@@ -100,7 +269,7 @@ func TestProduceEvents(t1 *testing.T) {
 			error: "",
 		},
 
-		{
+		{ // 1
 			input: `[
 						{"meta":{
 							"correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
@@ -111,6 +280,9 @@ func TestProduceEvents(t1 *testing.T) {
 						{"h1":100},
 						{"h2":null}
 					]`,
+			conf: map[string]interface{}{
+				"metadata": "meta",
+			},
 			expected: &dtos.Event{
 				Id:          "abc",
 				DeviceName:  "demo",
@@ -131,18 +303,31 @@ func TestProduceEvents(t1 *testing.T) {
 			error: "",
 		},
 
-		{
+		{ // 2
 			input: `[
 						{"meta": 50},
 						{"h1":100}
 					]`,
+			conf: map[string]interface{}{
+				"sourceName": "demo",
+			},
 			expected: &dtos.Event{
-				ProfileName: "kuiperProfile",
+				DeviceName:  "ekuiper",
+				ProfileName: "ekuiperProfile",
+				SourceName:  "demo",
 				Readings: []dtos.BaseReading{
 					{
+						ResourceName:  "meta",
+						SimpleReading: dtos.SimpleReading{Value: "5.000000e+01"},
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
+						ValueType:     v2.ValueTypeFloat64,
+					},
+					{
 						ResourceName:  "h1",
 						SimpleReading: dtos.SimpleReading{Value: "1.000000e+02"},
-						ProfileName:   "kuiperProfile",
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
 						ValueType:     v2.ValueTypeFloat64,
 					},
 				},
@@ -150,7 +335,7 @@ func TestProduceEvents(t1 *testing.T) {
 			error: "",
 		},
 
-		{
+		{ // 3
 			input: `[
 						{"meta1": "newmeta"},
 						{"h1":true},
@@ -158,30 +343,36 @@ func TestProduceEvents(t1 *testing.T) {
 						{"fa":[1.1,2.2,3.3,4.4]}
 					]`,
 			expected: &dtos.Event{
-				ProfileName: "kuiperProfile",
+				DeviceName:  "ekuiper",
+				ProfileName: "ekuiperProfile",
+				SourceName:  "ruleTest",
 				Readings: []dtos.BaseReading{
 					{
 						ResourceName:  "meta1",
 						SimpleReading: dtos.SimpleReading{Value: "newmeta"},
-						ProfileName:   "kuiperProfile",
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
 						ValueType:     v2.ValueTypeString,
 					},
 					{
 						ResourceName:  "h1",
 						SimpleReading: dtos.SimpleReading{Value: "true"},
-						ProfileName:   "kuiperProfile",
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
 						ValueType:     v2.ValueTypeBool,
 					},
 					{
 						ResourceName:  "sa",
 						SimpleReading: dtos.SimpleReading{Value: "[\"1\",\"2\",\"3\",\"4\"]"},
-						ProfileName:   "kuiperProfile",
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
 						ValueType:     v2.ValueTypeStringArray,
 					},
 					{
 						ResourceName:  "fa",
 						SimpleReading: dtos.SimpleReading{Value: "[1.100000e+00, 2.200000e+00, 3.300000e+00, 4.400000e+00]"},
-						ProfileName:   "kuiperProfile",
+						DeviceName:    "ekuiper",
+						ProfileName:   "ekuiperProfile",
 						ValueType:     v2.ValueTypeFloat64Array,
 					},
 				},
@@ -189,41 +380,50 @@ func TestProduceEvents(t1 *testing.T) {
 			error: "",
 		},
 
-		{
-			input:       `[]`,
-			deviceName:  "kuiper",
-			profileName: "kp",
-			topic:       "demo",
+		{ // 4
+			input: `[]`,
+			conf: map[string]interface{}{
+				"deviceName":  "kuiper",
+				"profileName": "kp",
+				"topic":       "demo",
+			},
 			expected: &dtos.Event{
 				ProfileName: "kp",
 				DeviceName:  "kuiper",
-				SourceName:  "demo",
+				SourceName:  "ruleTest",
 				Origin:      0,
 				Readings:    nil,
 			},
 			error: "",
 		},
-		{
+		{ // 5
 			input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
 			expected: &dtos.Event{
-				ProfileName: "kuiperProfile",
+				DeviceName:  "ekuiper",
+				ProfileName: "ekuiperProfile",
+				SourceName:  "ruleTest",
 				Origin:      0,
 				Readings:    nil,
 			},
 		},
-		{
+		{ // 6
 			input: `[
 						{"meta1": "newmeta"},
 						{"sa":"SGVsbG8gV29ybGQ="},
 						{"meta":{
-							"correlationid":"","profileName":"demoProfile","deviceName":"demo","sourceName":"demoSource","id":"abc","origin":3,"tags":{"auth":"admin"},
+							"correlationid":"","profileName":"demoProfile","deviceName":"demo","id":"abc","origin":3,"tags":{"auth":"admin"},
 							"sa":{"deviceName":"test device name1","id":"12","origin":14, "valueType":"Binary","mediaType":"application/css"}
 						}}
 					]`,
+			conf: map[string]interface{}{
+				"metadata":    "meta",
+				"profileName": "myprofile",
+				"sourceName":  "ds",
+			},
 			expected: &dtos.Event{
 				DeviceName:  "demo",
 				ProfileName: "demoProfile",
-				SourceName:  "demoSource",
+				SourceName:  "ds",
 				Origin:      3,
 				Tags:        map[string]string{"auth": "admin"},
 				Readings: []dtos.BaseReading{
@@ -251,9 +451,16 @@ func TestProduceEvents(t1 *testing.T) {
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	for i, t := range tests {
-		ems := EdgexMsgBusSink{deviceName: t.deviceName, profileName: t.profileName, topic: t.topic, metadata: "meta"}
+		ems := EdgexMsgBusSink{}
+		err := ems.Configure(t.conf)
+		if err != nil {
+			t1.Errorf("%d: configure error %v", i, err)
+			continue
+		}
+		if ems.c.SourceName == "" {
+			ems.c.SourceName = "ruleTest"
+		}
 		result, err := ems.produceEvents(ctx, []byte(t.input))
-
 		if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
 			t1.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, t.input, t.error, err)
 		} else if t.error == "" && !compareEvent(t.expected, result) {

+ 7 - 3
internal/topo/source/edgex_source.go

@@ -60,9 +60,9 @@ const (
 func (es *EdgexSource) Configure(_ string, props map[string]interface{}) error {
 	c := &EdgexConf{
 		Format:      message.FormatJson,
-		Protocol:    "tcp",
+		Protocol:    "redis",
 		Server:      "localhost",
-		Port:        5563,
+		Port:        6379,
 		Type:        messaging.Redis,
 		MessageType: MessageTypeEvent,
 	}
@@ -74,8 +74,12 @@ func (es *EdgexSource) Configure(_ string, props map[string]interface{}) error {
 		return fmt.Errorf("edgex source only supports `json` format")
 	}
 
+	if c.MessageType != MessageTypeEvent && c.MessageType != MessageTypeRequest {
+		return fmt.Errorf("specified wrong messageType value %s", c.MessageType)
+	}
+
 	if c.Type != messaging.ZeroMQ && c.Type != messaging.MQTT && c.Type != messaging.Redis {
-		return fmt.Errorf("Specified wrong message type value %s, will use zeromq messagebus.\n", c.Type)
+		return fmt.Errorf("specified wrong type value %s", c.Type)
 	}
 
 	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: c.Protocol, Host: c.Server, Port: c.Port}, Type: c.Type}

+ 5 - 4
test/edgex_mqtt_sink_rule.jmx

@@ -165,6 +165,7 @@
         &quot;type&quot;: &quot;mqtt&quot;,&#xd;
         &quot;metadata&quot;: &quot;edgex_meta&quot;,&#xd;
         &quot;contentType&quot;: &quot;application/json&quot;,&#xd;
+        &quot;messageType&quot;: &quot;request&quot;,&#xd;
         &quot;optional&quot;: {&#xd;
         	&quot;ClientId&quot;: &quot;edgex_message_bus_001&quot;&#xd;
         }&#xd;
@@ -468,7 +469,7 @@
         </SystemSampler>
         <hashTree>
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="device Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$.deviceName</stringProp>
+            <stringProp name="JSON_PATH">$.event.deviceName</stringProp>
             <stringProp name="EXPECTED_VALUE">demo1</stringProp>
             <boolProp name="JSONVALIDATION">true</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
@@ -477,7 +478,7 @@
           </JSONPathAssertion>
           <hashTree/>
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="reading_0 Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$.readings[0].value</stringProp>
+            <stringProp name="JSON_PATH">$.event.readings[0].value</stringProp>
             <stringProp name="EXPECTED_VALUE">30</stringProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
@@ -486,7 +487,7 @@
           </JSONPathAssertion>
           <hashTree/>
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="reading_1 Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$.readings[1].value</stringProp>
+            <stringProp name="JSON_PATH">$.event.readings[1].value</stringProp>
             <stringProp name="EXPECTED_VALUE">20</stringProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
@@ -495,7 +496,7 @@
           </JSONPathAssertion>
           <hashTree/>
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="reading_2 Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$.readings[2].value</stringProp>
+            <stringProp name="JSON_PATH">$.event.readings[2].value</stringProp>
             <stringProp name="EXPECTED_VALUE">20</stringProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>

+ 1 - 0
test/edgex_sink_rule.jmx

@@ -156,6 +156,7 @@
         &quot;protocol&quot;: &quot;tcp&quot;,&#xd;
         &quot;host&quot;: &quot;*&quot;,&#xd;
         &quot;port&quot;: 5571,&#xd;
+        &quot;type&quot;: &quot;zero&quot;,&#xd;
         &quot;topic&quot;: &quot;application&quot;,&#xd;
         &quot;metadata&quot;: &quot;metadt&quot;,&#xd;
         &quot;contentType&quot;: &quot;application/json&quot;&#xd;