|
@@ -10,13 +10,13 @@ import (
|
|
|
"github.com/edgexfoundry/go-mod-core-contracts/models"
|
|
|
"github.com/edgexfoundry/go-mod-messaging/messaging"
|
|
|
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
|
|
|
- "github.com/emqx/kuiper/common"
|
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
|
+ "github.com/prometheus/common/log"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
)
|
|
|
|
|
|
-type EdgexZMQSource struct {
|
|
|
+type EdgexSource struct {
|
|
|
client messaging.MessageClient
|
|
|
vdc coredata.ValueDescriptorClient
|
|
|
device string
|
|
@@ -24,19 +24,18 @@ type EdgexZMQSource struct {
|
|
|
valueDescs map[string]string
|
|
|
}
|
|
|
|
|
|
-type EdgexConfig struct {
|
|
|
- Protocol string `json:"protocol"`
|
|
|
- Server string `json:"server"`
|
|
|
- Port int `json:"port"`
|
|
|
- Topic string `json:"topic"`
|
|
|
- ServiceServer string `json:"serviceServer"`
|
|
|
-}
|
|
|
-
|
|
|
-func (es *EdgexZMQSource) Configure(device string, props map[string]interface{}) error {
|
|
|
- cfg := &EdgexConfig{}
|
|
|
- err := common.MapToStruct(props, cfg)
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("read properties %v fail with error: %v", props, err)
|
|
|
+func (es *EdgexSource) Configure(device string, props map[string]interface{}) error {
|
|
|
+ var protocol = "tcp";
|
|
|
+ if p, ok := props["protocol"]; ok {
|
|
|
+ protocol = p.(string)
|
|
|
+ }
|
|
|
+ var server = "localhost";
|
|
|
+ if s, ok := props["server"]; ok {
|
|
|
+ server = s.(string)
|
|
|
+ }
|
|
|
+ var port = 5570
|
|
|
+ if p, ok := props["port"]; ok {
|
|
|
+ port = p.(int)
|
|
|
}
|
|
|
|
|
|
if "" == strings.Trim(device, " ") {
|
|
@@ -49,6 +48,16 @@ func (es *EdgexZMQSource) Configure(device string, props map[string]interface{})
|
|
|
es.topic = tpc.(string)
|
|
|
}
|
|
|
|
|
|
+ var mbusType = messaging.ZeroMQ
|
|
|
+ if t, ok := props["type"]; ok {
|
|
|
+ mbusType = t.(string)
|
|
|
+ }
|
|
|
+
|
|
|
+ if messaging.ZeroMQ != strings.ToLower(mbusType) {
|
|
|
+ log.Info("Using MQTT message bus.")
|
|
|
+ mbusType = messaging.MQTT
|
|
|
+ }
|
|
|
+
|
|
|
if serviceServer, ok := props["serviceServer"]; ok {
|
|
|
es.vdc = coredata.NewValueDescriptorClient(local.New(serviceServer.(string) + clients.ApiValueDescriptorRoute))
|
|
|
es.valueDescs = make(map[string]string)
|
|
@@ -56,7 +65,19 @@ func (es *EdgexZMQSource) Configure(device string, props map[string]interface{})
|
|
|
return fmt.Errorf("The service server cannot be empty.")
|
|
|
}
|
|
|
|
|
|
- mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: cfg.Protocol, Host: cfg.Server, Port: cfg.Port}, Type: messaging.ZeroMQ}
|
|
|
+ mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: messaging.ZeroMQ}
|
|
|
+ var optional = make(map[string]string)
|
|
|
+ if ops, ok := props["optional"]; ok {
|
|
|
+ if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
|
|
|
+ for k, v := range ops1 {
|
|
|
+ k1 := k.(string)
|
|
|
+ v1 := v.(string)
|
|
|
+ optional[k1] = v1
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mbconf.Optional = optional
|
|
|
+ }
|
|
|
+
|
|
|
if client, err := messaging.NewMessageClient(mbconf); err != nil {
|
|
|
return err
|
|
|
} else {
|
|
@@ -66,7 +87,7 @@ func (es *EdgexZMQSource) Configure(device string, props map[string]interface{})
|
|
|
|
|
|
}
|
|
|
|
|
|
-func (es *EdgexZMQSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
|
|
|
+func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
|
|
|
log := ctx.GetLogger()
|
|
|
if err := es.client.Connect(); err != nil {
|
|
|
errCh <- fmt.Errorf("Failed to connect to message bus: " + err.Error())
|
|
@@ -114,10 +135,6 @@ func (es *EdgexZMQSource) Open(ctx api.StreamContext, consumer chan<- api.Source
|
|
|
log.Warnf("got an empty result, ignored")
|
|
|
}
|
|
|
}
|
|
|
- //if e := json.Unmarshal(env.Payload, &result); e != nil {
|
|
|
- // log.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(env.Payload), e)
|
|
|
- // return
|
|
|
- //}
|
|
|
|
|
|
meta["CorrelationID"] = env.CorrelationID
|
|
|
select {
|
|
@@ -135,7 +152,7 @@ func (es *EdgexZMQSource) Open(ctx api.StreamContext, consumer chan<- api.Source
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
|
|
|
+func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
|
|
|
t, err := es.getType(r.Name, logger)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
@@ -144,19 +161,19 @@ func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interfa
|
|
|
logger.Debugf("name %s with type %s", r.Name, t)
|
|
|
v := r.Value
|
|
|
switch t {
|
|
|
- case "B", "BOOL":
|
|
|
+ case "B", "BOOL", "BOOLEAN":
|
|
|
if r, err := strconv.ParseBool(v); err != nil {
|
|
|
return nil, err
|
|
|
} else {
|
|
|
return r, nil
|
|
|
}
|
|
|
- case "I", "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32", "UINT64":
|
|
|
+ case "I", "INT", "INT8", "INT16", "INT32", "INT64", "UINT", "UINT8", "UINT16", "UINT32", "UINT64":
|
|
|
if r, err := strconv.Atoi(v); err != nil {
|
|
|
return nil, err
|
|
|
} else {
|
|
|
return r, nil
|
|
|
}
|
|
|
- case "F", "FLOAT16", "FLOAT32", "FLOAT64":
|
|
|
+ case "F", "FLOAT", "FLOAT16", "FLOAT32", "FLOAT64":
|
|
|
if r, err := strconv.ParseFloat(v, 64); err != nil {
|
|
|
return nil, err
|
|
|
} else {
|
|
@@ -177,7 +194,7 @@ func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interfa
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (es *EdgexZMQSource) fetchAllDataDescriptors() error {
|
|
|
+func (es *EdgexSource) fetchAllDataDescriptors() error {
|
|
|
if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
|
|
|
return err
|
|
|
} else {
|
|
@@ -188,7 +205,7 @@ func (es *EdgexZMQSource) fetchAllDataDescriptors() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (es *EdgexZMQSource) getType(id string, logger api.Logger) (string, error) {
|
|
|
+func (es *EdgexSource) getType(id string, logger api.Logger) (string, error) {
|
|
|
if t, ok := es.valueDescs[id]; ok {
|
|
|
return t, nil
|
|
|
} else {
|
|
@@ -203,7 +220,7 @@ func (es *EdgexZMQSource) getType(id string, logger api.Logger) (string, error)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (es *EdgexZMQSource) Close(ctx api.StreamContext) error {
|
|
|
+func (es *EdgexSource) Close(ctx api.StreamContext) error {
|
|
|
if e := es.client.Disconnect(); e != nil {
|
|
|
return e
|
|
|
} else {
|