|
@@ -1,8 +1,12 @@
|
|
|
package extensions
|
|
|
|
|
|
import (
|
|
|
+ "context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
+ "github.com/edgexfoundry/go-mod-core-contracts/clients"
|
|
|
+ "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
|
|
|
+ "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
|
|
|
"github.com/edgexfoundry/go-mod-core-contracts/models"
|
|
|
"github.com/edgexfoundry/go-mod-messaging/messaging"
|
|
|
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
|
|
@@ -13,17 +17,19 @@ import (
|
|
|
)
|
|
|
|
|
|
type EdgexZMQSource struct {
|
|
|
- client messaging.MessageClient
|
|
|
+ client messaging.MessageClient
|
|
|
+ vdc coredata.ValueDescriptorClient
|
|
|
device string
|
|
|
topic string
|
|
|
valueDescs map[string]string
|
|
|
}
|
|
|
|
|
|
type EdgexConfig struct {
|
|
|
- Protocol string `json:"protocol"`
|
|
|
- Server string `json:"server"`
|
|
|
- Port int `json:"port"`
|
|
|
- Topic string `json:"topic"`
|
|
|
+ Protocol string `json:"protocol"`
|
|
|
+ Server string `json:"server"`
|
|
|
+ Port int `json:"port"`
|
|
|
+ Topic string `json:"topic"`
|
|
|
+ ServiceServer string `json:"serviceServer"`
|
|
|
}
|
|
|
|
|
|
func (es *EdgexZMQSource) Configure(device string, props map[string]interface{}) error {
|
|
@@ -43,6 +49,13 @@ func (es *EdgexZMQSource) Configure(device string, props map[string]interface{})
|
|
|
es.topic = tpc.(string)
|
|
|
}
|
|
|
|
|
|
+ if serviceServer, ok := props["serviceServer"]; ok {
|
|
|
+ es.vdc = coredata.NewValueDescriptorClient(local.New(serviceServer.(string) + clients.ApiValueDescriptorRoute))
|
|
|
+ es.valueDescs = make(map[string]string)
|
|
|
+ } else {
|
|
|
+ return fmt.Errorf("The service server cannot be empty.")
|
|
|
+ }
|
|
|
+
|
|
|
mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: cfg.Protocol, Host: cfg.Server, Port: cfg.Port}, Type: messaging.ZeroMQ}
|
|
|
if client, err := messaging.NewMessageClient(mbconf); err != nil {
|
|
|
return err
|
|
@@ -50,6 +63,7 @@ func (es *EdgexZMQSource) Configure(device string, props map[string]interface{})
|
|
|
es.client = client
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
func (es *EdgexZMQSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
|
|
@@ -121,7 +135,7 @@ func (es *EdgexZMQSource) Open(ctx api.StreamContext, consumer chan<- api.Source
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interface{}, error){
|
|
|
+func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
|
|
|
t, err := es.getType(r.Name, logger)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
@@ -163,28 +177,30 @@ func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interfa
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (s *EdgexZMQSource) getType(id string, logger api.Logger) (string, error) {
|
|
|
- return "string", nil
|
|
|
- //if t, ok := s.valueDescs[id]; ok {
|
|
|
- // return t, nil
|
|
|
- //}
|
|
|
- //else {
|
|
|
-
|
|
|
- //url := s.coreDataUrl + "/valuedescriptor/name/" + id
|
|
|
- //logger.Debugf("get %s", url)
|
|
|
- //response, err := http.Get(url)
|
|
|
- //if err != nil {
|
|
|
- // return "", fmt.Errorf("fail to get type for value %s: %v", id, err)
|
|
|
- //}
|
|
|
- //defer response.Body.Close()
|
|
|
- //logger.Debugf("response %s", response.Body)
|
|
|
- //vd := models.ValueDescriptor{}
|
|
|
- //if err := json.NewDecoder(response.Body).Decode(&vd); err != nil {
|
|
|
- // return "", fmt.Errorf("fail to decode value descriptor value %s: %v", id, err)
|
|
|
- //}
|
|
|
- //s.valueDescs[id] = vd.Type
|
|
|
- //return vd.Type, nil
|
|
|
- //}
|
|
|
+func (es *EdgexZMQSource) fetchAllDataDescriptors() error {
|
|
|
+ if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
|
|
|
+ return err
|
|
|
+ } else {
|
|
|
+ for _, vd := range vdArr {
|
|
|
+ es.valueDescs[vd.Id] = vd.Type
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (es *EdgexZMQSource) getType(id string, logger api.Logger) (string, error) {
|
|
|
+ if t, ok := es.valueDescs[id]; ok {
|
|
|
+ return t, nil
|
|
|
+ } else {
|
|
|
+ if e := es.fetchAllDataDescriptors(); e != nil {
|
|
|
+ return "", e
|
|
|
+ }
|
|
|
+ if t, ok := es.valueDescs[id]; ok {
|
|
|
+ return t, nil
|
|
|
+ } else {
|
|
|
+ return "", fmt.Errorf("cannot find type info for %s in value descriptor.", id)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (es *EdgexZMQSource) Close(ctx api.StreamContext) error {
|