123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- // +build edgex
- package sinks
- import (
- "encoding/json"
- "fmt"
- "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
- "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
- "github.com/edgexfoundry/go-mod-core-contracts/models"
- "github.com/edgexfoundry/go-mod-messaging/messaging"
- "github.com/edgexfoundry/go-mod-messaging/pkg/types"
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/xstream/api"
- )
- type EdgexMsgBusSink struct {
- protocol string
- host string
- port int
- ptype string
- topic string
- contentType string
- deviceName string
- metadata string
- optional map[string]string
- client messaging.MessageClient
- }
- func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
- ems.host = "*"
- ems.protocol = "tcp"
- ems.port = 5573
- ems.topic = "events"
- ems.contentType = "application/json"
- ems.ptype = messaging.ZeroMQ
- if host, ok := ps["host"]; ok {
- ems.host = host.(string)
- } else {
- common.Log.Infof("Not find host conf, will use default value '*'.")
- }
- if pro, ok := ps["protocol"]; ok {
- ems.protocol = pro.(string)
- } else {
- common.Log.Infof("Not find protocol conf, will use default value 'tcp'.")
- }
- if port, ok := ps["port"]; ok {
- if pv, ok := port.(float64); ok {
- ems.port = int(pv)
- } else if pv, ok := port.(float32); ok {
- ems.port = int(pv)
- } else {
- common.Log.Infof("Not valid port value, will use default value '5563'.")
- }
- } else {
- common.Log.Infof("Not find port conf, will use default value '5563'.")
- }
- if topic, ok := ps["topic"]; ok {
- ems.topic = topic.(string)
- } else {
- common.Log.Infof("Not find topic conf, will use default value 'events'.")
- }
- if contentType, ok := ps["contentType"]; ok {
- ems.contentType = contentType.(string)
- } else {
- common.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
- }
- if ptype, ok := ps["type"]; ok {
- ems.ptype = ptype.(string)
- if ems.ptype != messaging.ZeroMQ && ems.ptype != messaging.MQTT && ems.ptype != messaging.RedisStreams {
- common.Log.Infof("Specified wrong message type value %s, will use zeromq messagebus.\n", ems.ptype)
- ems.ptype = messaging.ZeroMQ
- }
- }
- if dname, ok := ps["deviceName"]; ok {
- ems.deviceName = dname.(string)
- }
- if metadata, ok := ps["metadata"]; ok {
- ems.metadata = metadata.(string)
- }
- if optIntf, ok := ps["optional"]; ok {
- if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
- optional := make(map[string]string)
- for k, v := range opt {
- if sv, ok2 := v.(string); ok2 {
- optional[k] = sv
- } else {
- info := fmt.Sprintf("Only string value is allowed for optional value, the value for key %s is not a string.", k)
- common.Log.Infof(info)
- return fmt.Errorf(info)
- }
- }
- ems.optional = optional
- }
- }
- return nil
- }
- func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
- log := ctx.GetLogger()
- conf := types.MessageBusConfig{
- PublishHost: types.HostInfo{
- Host: ems.host,
- Port: ems.port,
- Protocol: ems.protocol,
- },
- Type: ems.ptype,
- Optional: ems.optional,
- }
- log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
- if msgClient, err := messaging.NewMessageClient(conf); err != nil {
- return err
- } else {
- if ec := msgClient.Connect(); ec != nil {
- return ec
- } else {
- ems.client = msgClient
- }
- }
- return nil
- }
- func (ems *EdgexMsgBusSink) produceEvents(result []byte) (*models.Event, error) {
- var m []map[string]interface{}
- if err := json.Unmarshal(result, &m); err == nil {
- m1, f := ems.getMeta(m)
- var event = &models.Event{}
- if f {
- event.Device = m1.getStrVal("device")
- event.Created = m1.getIntVal("created")
- event.Modified = m1.getIntVal("modified")
- event.Origin = m1.getIntVal("origin")
- event.ID = m1.getStrVal("id")
- event.Pushed = m1.getIntVal("pushed")
- }
- //Override the devicename if user specified the value
- if ems.deviceName != "" {
- event.Device = ems.deviceName
- }
- for _, v := range m {
- for k1, v1 := range v {
- if k1 == ems.metadata {
- continue
- } else {
- value := fmt.Sprintf("%v", v1)
- r := models.Reading{Name: k1, Value: value}
- if m, ok := m1[k1]; ok {
- if mm, ok1 := m.(map[string]interface{}); ok1 {
- mm1 := meta(mm)
- r.Created = mm1.getIntVal("created")
- r.Device = mm1.getStrVal("device")
- r.Id = mm1.getStrVal("id")
- r.Modified = mm1.getIntVal("modified")
- r.Origin = mm1.getIntVal("origin")
- r.Pushed = mm1.getIntVal("pushed")
- }
- }
- event.Readings = append(event.Readings, r)
- }
- }
- }
- return event, nil
- } else {
- return nil, err
- }
- }
- type meta map[string]interface{}
- func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) (meta, bool) {
- if ems.metadata == "" {
- return nil, false
- }
- //Try to get the meta field
- for _, v := range result {
- if m, ok := v[ems.metadata]; ok {
- if m1, ok1 := m.(map[string]interface{}); ok1 {
- return meta(m1), true
- } else {
- common.Log.Infof("Specified a meta field, but the field does not contains any EdgeX metadata.")
- }
- }
- }
- return nil, false
- }
- func (m meta) getIntVal(k string) int64 {
- if v, ok := m[k]; ok {
- if v1, ok1 := v.(float64); ok1 {
- return int64(v1)
- }
- }
- return 0
- }
- func (m meta) getStrVal(k string) string {
- if v, ok := m[k]; ok {
- if v1, ok1 := v.(string); ok1 {
- return v1
- }
- }
- return ""
- }
- func (ems *EdgexMsgBusSink) getMetaValueAsMap(m meta, k string) map[string]interface{} {
- if v, ok := m[k]; ok {
- if v1, ok1 := v.(map[string]interface{}); ok1 {
- return v1
- }
- }
- return nil
- }
- func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
- logger := ctx.GetLogger()
- client := coredata.NewEventClient(local.New(""))
- if payload, ok := item.([]byte); ok {
- logger.Debugf("EdgeX message bus sink: %s\n", payload)
- evt, err := ems.produceEvents(payload)
- if err != nil {
- return fmt.Errorf("Failed to convert to EdgeX event: %s.", err.Error())
- }
- data, err := client.MarshalEvent(*evt)
- if err != nil {
- return fmt.Errorf("unexpected error MarshalEvent %v", err)
- }
- env := types.NewMessageEnvelope([]byte(data), ctx)
- env.ContentType = ems.contentType
- if e := ems.client.Publish(env, ems.topic); e != nil {
- logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
- return e
- }
- } else {
- return fmt.Errorf("Unkown type %t, the message cannot be published.\n", item)
- }
- return nil
- }
- func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error {
- logger := ctx.GetLogger()
- logger.Infof("Closing edgex sink")
- if ems.client != nil {
- if e := ems.client.Disconnect(); e != nil {
- return e
- }
- }
- return nil
- }
|