浏览代码

add edgex fvt scenarios etc

RockyJin 5 年之前
父节点
当前提交
b9605fffa1

二进制
edgex_pub


二进制
edgex_server


+ 4 - 1
etc/sources/edgex.yaml

@@ -5,7 +5,10 @@ default:
   port: 5570
   topic: events
   serviceServer: http://localhost:10080
-
+#  optional:
+#    ClientId: client1
+#    Username: user1
+#    Password: password
 #Override the global configurations
 demo_conf: #Conf_key
   protocol: tcp

+ 13 - 1
fvt_scripts/edgex/pub.go

@@ -38,7 +38,19 @@ func pubEventClientZeroMq() {
 					Name: "Temperature", Value: fmt.Sprintf("%d", i*8)}
 				var testReading2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name",
 					Name: "Humidity", Value: fmt.Sprintf("%d", i*9)}
-				testEvent.Readings = append(testEvent.Readings, testReading1, testReading2)
+
+				var r3 = models.Reading{Name:"b1"}
+				if i % 2 == 0 {
+					r3.Value = "true"
+				} else {
+					r3.Value = "false"
+				}
+
+				r4 := models.Reading{Name:"i1", Value:fmt.Sprintf("%d", i)}
+				r5 := models.Reading{Name:"f1", Value:fmt.Sprintf("%.2f", float64(i)/2.0)}
+				r6 := models.Reading{Name:"j1", Value:`{"field1" : "v1", "field2" : 2}`}
+
+				testEvent.Readings = append(testEvent.Readings, testReading1, testReading2, r3, r4, r5, r6)
 
 				data, err := client.MarshalEvent(testEvent)
 				if err != nil {

+ 33 - 6
fvt_scripts/edgex/valuedesc/vd_server.go

@@ -10,18 +10,29 @@ import (
 )
 
 const (
-	testValueDesciptorDescription1 = "Temperature descriptor1"
-	testValueDesciptorDescription2 = "Humidity descriptor2"
+	desc1 = "Temperature descriptor1"
+	desc2 = "Humidity descriptor2"
+	desc3 = "Boolean descriptor"
+	desc4 = "Int descriptor"
+	desc5 = "Float descriptor"
+	desc6 = "JSON descriptor"
+	desc7 = "String descriptor"
 )
 
 var testValueDescriptor1 = models.ValueDescriptor{Id: "Temperature", Created: 123, Modified: 123, Origin: 123, Name: "Temperature",
-	Description: "test description", Min: -70, Max: 140, DefaultValue: 32, Formatting: "%d", Type:"STRING",
+	Description: "test description", Min: -70, Max: 140, DefaultValue: 32, Formatting: "%f", Type:"F",
 	Labels: []string{"temp", "room temp"}, UomLabel: "F", MediaType: clients.ContentTypeJSON, FloatEncoding: "eNotation"}
 
 var testValueDescriptor2 = models.ValueDescriptor{Id: "Humidity", Created: 123, Modified: 123, Origin: 123, Name: "Humidity",
 	Description: "test description", Min: -70, Max: 140, DefaultValue: 32, Formatting: "%d", Type:"INT",
 	Labels: []string{"humi", "room humidity"}, UomLabel: "F", MediaType: clients.ContentTypeJSON, FloatEncoding: "eNotation"}
 
+var testValueDescriptor3 = models.ValueDescriptor{Id: "b1", Name: "b1", Formatting: "%t", Type:"Bool", MediaType: clients.ContentTypeJSON}
+var testValueDescriptor4 = models.ValueDescriptor{Id: "i1", Name: "i1", Formatting: "%d", Type:"i", MediaType: clients.ContentTypeJSON}
+var testValueDescriptor5 = models.ValueDescriptor{Id: "f1", Name: "f1", Formatting: "%f", Type:"FLOAT32", MediaType: clients.ContentTypeJSON}
+var testValueDescriptor6 = models.ValueDescriptor{Id: "j1", Name: "j1", Formatting: "%s", Type:"Json", MediaType: clients.ContentTypeJSON}
+var testValueDescriptor7 = models.ValueDescriptor{Id: "s1", Name: "s1", Formatting: "%s", Type:"string", MediaType: clients.ContentTypeJSON}
+
 func main() {
 	http.HandleFunc(clients.ApiValueDescriptorRoute, Hello)
 	if e := http.ListenAndServe(":10080", nil); e != nil {
@@ -31,11 +42,27 @@ func main() {
 
 func Hello(w http.ResponseWriter, req *http.Request) {
 	descriptor1 := testValueDescriptor1
-	descriptor1.Description = testValueDesciptorDescription1
+	descriptor1.Description = desc1
 
 	descriptor2 := testValueDescriptor2
-	descriptor2.Description = testValueDesciptorDescription2
-	descriptors := []models.ValueDescriptor{descriptor1, descriptor2}
+	descriptor2.Description = desc2
+
+	descriptor3 := testValueDescriptor3
+	descriptor3.Description = desc3
+
+	descriptor4 := testValueDescriptor4
+	descriptor4.Description = desc4
+
+	descriptor5 := testValueDescriptor5
+	descriptor5.Description = desc5
+
+	descriptor6 := testValueDescriptor6
+	descriptor6.Description = desc6
+
+	descriptor7 := testValueDescriptor7
+	descriptor7.Description = desc7
+
+	descriptors := []models.ValueDescriptor{descriptor1, descriptor2, descriptor3, descriptor4, descriptor5, descriptor6, descriptor7}
 
 	data, err := json.Marshal(descriptors)
 	if err != nil {

+ 1 - 6
fvt_scripts/select_edgex_condition_rule.jmx

@@ -24,11 +24,6 @@
             <stringProp name="Argument.value">9081</stringProp>
             <stringProp name="Argument.metadata">=</stringProp>
           </elementProp>
-          <elementProp name="k_home" elementType="Argument">
-            <stringProp name="Argument.name">k_home</stringProp>
-            <stringProp name="Argument.value">${__property(base,,)}</stringProp>
-            <stringProp name="Argument.metadata">=</stringProp>
-          </elementProp>
           <elementProp name="fvt" elementType="Argument">
             <stringProp name="Argument.name">fvt</stringProp>
             <stringProp name="Argument.value">${__property(fvt,,)}</stringProp>
@@ -103,7 +98,7 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-&quot;sql&quot; : &quot;create stream demo (temperature float, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;demo\&quot; TYPE=\&quot;edgex\&quot;)&quot;&#xd;
+&quot;sql&quot; : &quot;create stream demo () WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;demo\&quot; TYPE=\&quot;edgex\&quot;)&quot;&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>

+ 45 - 28
xstream/extensions/edgex_source.go

@@ -10,13 +10,13 @@ import (
 	"github.com/edgexfoundry/go-mod-core-contracts/models"
 	"github.com/edgexfoundry/go-mod-messaging/messaging"
 	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
-	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xstream/api"
+	"github.com/prometheus/common/log"
 	"strconv"
 	"strings"
 )
 
-type EdgexZMQSource struct {
+type EdgexSource struct {
 	client     messaging.MessageClient
 	vdc        coredata.ValueDescriptorClient
 	device     string
@@ -24,19 +24,18 @@ type EdgexZMQSource struct {
 	valueDescs map[string]string
 }
 
-type EdgexConfig struct {
-	Protocol      string `json:"protocol"`
-	Server        string `json:"server"`
-	Port          int    `json:"port"`
-	Topic         string `json:"topic"`
-	ServiceServer string `json:"serviceServer"`
-}
-
-func (es *EdgexZMQSource) Configure(device string, props map[string]interface{}) error {
-	cfg := &EdgexConfig{}
-	err := common.MapToStruct(props, cfg)
-	if err != nil {
-		return fmt.Errorf("read properties %v fail with error: %v", props, err)
+func (es *EdgexSource) Configure(device string, props map[string]interface{}) error {
+	var protocol = "tcp";
+	if p, ok := props["protocol"]; ok {
+		protocol = p.(string)
+	}
+	var server = "localhost";
+	if s, ok := props["server"]; ok {
+		server = s.(string)
+	}
+	var port = 5570
+	if p, ok := props["port"]; ok {
+		port = p.(int)
 	}
 
 	if "" == strings.Trim(device, " ") {
@@ -49,6 +48,16 @@ func (es *EdgexZMQSource) Configure(device string, props map[string]interface{})
 		es.topic = tpc.(string)
 	}
 
+	var mbusType = messaging.ZeroMQ
+	if t, ok := props["type"]; ok {
+		mbusType = t.(string)
+	}
+
+	if messaging.ZeroMQ != strings.ToLower(mbusType) {
+		log.Info("Using MQTT message bus.")
+		mbusType = messaging.MQTT
+	}
+
 	if serviceServer, ok := props["serviceServer"]; ok {
 		es.vdc = coredata.NewValueDescriptorClient(local.New(serviceServer.(string) + clients.ApiValueDescriptorRoute))
 		es.valueDescs = make(map[string]string)
@@ -56,7 +65,19 @@ func (es *EdgexZMQSource) Configure(device string, props map[string]interface{})
 		return fmt.Errorf("The service server cannot be empty.")
 	}
 
-	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: cfg.Protocol, Host: cfg.Server, Port: cfg.Port}, Type: messaging.ZeroMQ}
+	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: messaging.ZeroMQ}
+	var optional = make(map[string]string)
+	if ops, ok := props["optional"]; ok {
+		if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
+			for k, v := range ops1 {
+				k1 := k.(string)
+				v1 := v.(string)
+				optional[k1] = v1
+			}
+		}
+		mbconf.Optional = optional
+	}
+
 	if client, err := messaging.NewMessageClient(mbconf); err != nil {
 		return err
 	} else {
@@ -66,7 +87,7 @@ func (es *EdgexZMQSource) Configure(device string, props map[string]interface{})
 
 }
 
-func (es *EdgexZMQSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
+func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 	log := ctx.GetLogger()
 	if err := es.client.Connect(); err != nil {
 		errCh <- fmt.Errorf("Failed to connect to message bus: " + err.Error())
@@ -114,10 +135,6 @@ func (es *EdgexZMQSource) Open(ctx api.StreamContext, consumer chan<- api.Source
 								log.Warnf("got an empty result, ignored")
 							}
 						}
-						//if e := json.Unmarshal(env.Payload, &result); e != nil {
-						//	log.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(env.Payload), e)
-						//	return
-						//}
 
 						meta["CorrelationID"] = env.CorrelationID
 						select {
@@ -135,7 +152,7 @@ func (es *EdgexZMQSource) Open(ctx api.StreamContext, consumer chan<- api.Source
 	}
 }
 
-func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
+func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
 	t, err := es.getType(r.Name, logger)
 	if err != nil {
 		return nil, err
@@ -144,19 +161,19 @@ func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interfa
 	logger.Debugf("name %s with type %s", r.Name, t)
 	v := r.Value
 	switch t {
-	case "B", "BOOL":
+	case "B", "BOOL", "BOOLEAN":
 		if r, err := strconv.ParseBool(v); err != nil {
 			return nil, err
 		} else {
 			return r, nil
 		}
-	case "I", "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32", "UINT64":
+	case "I", "INT", "INT8", "INT16", "INT32", "INT64", "UINT", "UINT8", "UINT16", "UINT32", "UINT64":
 		if r, err := strconv.Atoi(v); err != nil {
 			return nil, err
 		} else {
 			return r, nil
 		}
-	case "F", "FLOAT16", "FLOAT32", "FLOAT64":
+	case "F", "FLOAT", "FLOAT16", "FLOAT32", "FLOAT64":
 		if r, err := strconv.ParseFloat(v, 64); err != nil {
 			return nil, err
 		} else {
@@ -177,7 +194,7 @@ func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interfa
 	}
 }
 
-func (es *EdgexZMQSource) fetchAllDataDescriptors() error {
+func (es *EdgexSource) fetchAllDataDescriptors() error {
 	if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
 		return err
 	} else {
@@ -188,7 +205,7 @@ func (es *EdgexZMQSource) fetchAllDataDescriptors() error {
 	return nil
 }
 
-func (es *EdgexZMQSource) getType(id string, logger api.Logger) (string, error) {
+func (es *EdgexSource) getType(id string, logger api.Logger) (string, error) {
 	if t, ok := es.valueDescs[id]; ok {
 		return t, nil
 	} else {
@@ -203,7 +220,7 @@ func (es *EdgexZMQSource) getType(id string, logger api.Logger) (string, error)
 	}
 }
 
-func (es *EdgexZMQSource) Close(ctx api.StreamContext) error {
+func (es *EdgexSource) Close(ctx api.StreamContext) error {
 	if e := es.client.Disconnect(); e != nil {
 		return e
 	} else {

+ 1 - 1
xstream/nodes/source_node.go

@@ -153,7 +153,7 @@ func getSource(t string) (api.Source, error) {
 	case "mqtt":
 		s = &extensions.MQTTSource{}
 	case "edgex":
-		s = &extensions.EdgexZMQSource{}
+		s = &extensions.EdgexSource{}
 	default:
 		nf, err := plugin_manager.GetPlugin(t, "sources")
 		if err != nil {