|
@@ -11,14 +11,15 @@ import (
|
|
|
"github.com/edgexfoundry/go-mod-core-contracts/models"
|
|
|
"github.com/edgexfoundry/go-mod-messaging/messaging"
|
|
|
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
|
|
|
+ "github.com/emqx/kuiper/common"
|
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
|
- "github.com/prometheus/common/log"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
)
|
|
|
|
|
|
type EdgexSource struct {
|
|
|
client messaging.MessageClient
|
|
|
+ subscribed bool
|
|
|
vdc coredata.ValueDescriptorClient
|
|
|
topic string
|
|
|
valueDescs map[string]string
|
|
@@ -48,18 +49,21 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
|
|
|
}
|
|
|
|
|
|
if messaging.ZeroMQ != strings.ToLower(mbusType) {
|
|
|
- log.Info("Using MQTT message bus.")
|
|
|
mbusType = messaging.MQTT
|
|
|
}
|
|
|
|
|
|
if serviceServer, ok := props["serviceServer"]; ok {
|
|
|
- es.vdc = coredata.NewValueDescriptorClient(local.New(serviceServer.(string) + clients.ApiValueDescriptorRoute))
|
|
|
+ svr := serviceServer.(string) + clients.ApiValueDescriptorRoute
|
|
|
+ common.Log.Infof("Connect to value descriptor service at: %s \n", svr)
|
|
|
+ es.vdc = coredata.NewValueDescriptorClient(local.New(svr))
|
|
|
es.valueDescs = make(map[string]string)
|
|
|
} else {
|
|
|
return fmt.Errorf("The service server cannot be empty.")
|
|
|
}
|
|
|
|
|
|
mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: messaging.ZeroMQ}
|
|
|
+ common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
|
|
|
+
|
|
|
var optional = make(map[string]string)
|
|
|
if ops, ok := props["optional"]; ok {
|
|
|
if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
|
|
@@ -84,15 +88,18 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
|
|
|
func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
|
|
|
log := ctx.GetLogger()
|
|
|
if err := es.client.Connect(); err != nil {
|
|
|
- errCh <- fmt.Errorf("Failed to connect to message bus: " + err.Error())
|
|
|
+ errCh <- fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
|
|
|
}
|
|
|
+ log.Infof("The connection to edgex messagebus is established successfully.")
|
|
|
messages := make(chan types.MessageEnvelope)
|
|
|
topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
|
|
|
err := make(chan error)
|
|
|
if e := es.client.Subscribe(topics, err); e != nil {
|
|
|
- log.Errorf("Failed to subscribe to topic %s.\n", e)
|
|
|
+ log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
|
|
|
errCh <- e
|
|
|
} else {
|
|
|
+ es.subscribed = true
|
|
|
+ log.Infof("Successfully subscribed to edgex messagebus topic %s.", es.topic)
|
|
|
for {
|
|
|
select {
|
|
|
case e1 := <-err:
|
|
@@ -196,6 +203,11 @@ func (es *EdgexSource) fetchAllDataDescriptors() error {
|
|
|
for _, vd := range vdArr {
|
|
|
es.valueDescs[vd.Id] = vd.Type
|
|
|
}
|
|
|
+ if len(vdArr) == 0 {
|
|
|
+ common.Log.Infof("Cannot find any value descriptors from value descriptor services.")
|
|
|
+ } else {
|
|
|
+ common.Log.Infof("Get %d of value descriptors from service.", len(vdArr))
|
|
|
+ }
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
@@ -216,9 +228,10 @@ func (es *EdgexSource) getType(id string, logger api.Logger) (string, error) {
|
|
|
}
|
|
|
|
|
|
func (es *EdgexSource) Close(ctx api.StreamContext) error {
|
|
|
- if e := es.client.Disconnect(); e != nil {
|
|
|
- return e
|
|
|
- } else {
|
|
|
- return nil
|
|
|
+ if es.subscribed {
|
|
|
+ if e := es.client.Disconnect(); e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
}
|
|
|
+ return nil
|
|
|
}
|