edgex_source.go 5.9 KB


  1. package extensions
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/edgexfoundry/go-mod-core-contracts/clients"
  7. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  8. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  9. "github.com/edgexfoundry/go-mod-core-contracts/models"
  10. "github.com/edgexfoundry/go-mod-messaging/messaging"
  11. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  12. "github.com/emqx/kuiper/xstream/api"
  13. "github.com/prometheus/common/log"
  14. "strconv"
  15. "strings"
  16. )
  17. type EdgexSource struct {
  18. client messaging.MessageClient
  19. vdc coredata.ValueDescriptorClient
  20. device string
  21. topic string
  22. valueDescs map[string]string
  23. }
  24. func (es *EdgexSource) Configure(device string, props map[string]interface{}) error {
  25. var protocol = "tcp";
  26. if p, ok := props["protocol"]; ok {
  27. protocol = p.(string)
  28. }
  29. var server = "localhost";
  30. if s, ok := props["server"]; ok {
  31. server = s.(string)
  32. }
  33. var port = 5570
  34. if p, ok := props["port"]; ok {
  35. port = p.(int)
  36. }
  37. if "" == strings.Trim(device, " ") {
  38. return fmt.Errorf("Device cannot be empty.")
  39. } else {
  40. es.device = device
  41. }
  42. if tpc, ok := props["topic"]; ok {
  43. es.topic = tpc.(string)
  44. }
  45. var mbusType = messaging.ZeroMQ
  46. if t, ok := props["type"]; ok {
  47. mbusType = t.(string)
  48. }
  49. if messaging.ZeroMQ != strings.ToLower(mbusType) {
  50. log.Info("Using MQTT message bus.")
  51. mbusType = messaging.MQTT
  52. }
  53. if serviceServer, ok := props["serviceServer"]; ok {
  54. es.vdc = coredata.NewValueDescriptorClient(local.New(serviceServer.(string) + clients.ApiValueDescriptorRoute))
  55. es.valueDescs = make(map[string]string)
  56. } else {
  57. return fmt.Errorf("The service server cannot be empty.")
  58. }
  59. mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: messaging.ZeroMQ}
  60. var optional = make(map[string]string)
  61. if ops, ok := props["optional"]; ok {
  62. if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
  63. for k, v := range ops1 {
  64. k1 := k.(string)
  65. v1 := v.(string)
  66. optional[k1] = v1
  67. }
  68. }
  69. mbconf.Optional = optional
  70. }
  71. if client, err := messaging.NewMessageClient(mbconf); err != nil {
  72. return err
  73. } else {
  74. es.client = client
  75. return nil
  76. }
  77. }
  78. func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  79. log := ctx.GetLogger()
  80. if err := es.client.Connect(); err != nil {
  81. errCh <- fmt.Errorf("Failed to connect to message bus: " + err.Error())
  82. }
  83. messages := make(chan types.MessageEnvelope)
  84. topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
  85. err := make(chan error)
  86. if e := es.client.Subscribe(topics, err); e != nil {
  87. log.Errorf("Failed to subscribe to topic %s.\n", e)
  88. errCh <- e
  89. } else {
  90. for {
  91. select {
  92. case e1 := <-err:
  93. errCh <- e1
  94. return
  95. case env := <-messages:
  96. if strings.ToLower(env.ContentType) == "application/json" {
  97. e := models.Event{}
  98. if err := e.UnmarshalJSON(env.Payload); err != nil {
  99. log.Warnf("payload %s unmarshal fail: %v", env.Payload, err)
  100. } else {
  101. result := make(map[string]interface{})
  102. meta := make(map[string]interface{})
  103. log.Debugf("receive message from device %s vs %s", e.Device, es.device)
  104. if e.Device == es.device {
  105. for _, r := range e.Readings {
  106. if r.Name != "" {
  107. if v, err := es.getValue(r, log); err != nil {
  108. log.Warnf("fail to get value for %s: %v", r.Name, err)
  109. } else {
  110. result[strings.ToLower(r.Name)] = v
  111. }
  112. }
  113. }
  114. if len(result) > 0 {
  115. meta["id"] = e.ID
  116. meta["pushed"] = e.Pushed
  117. meta["device"] = e.Device
  118. meta["created"] = e.Created
  119. meta["modified"] = e.Modified
  120. meta["origin"] = e.Origin
  121. } else {
  122. log.Warnf("got an empty result, ignored")
  123. }
  124. }
  125. meta["CorrelationID"] = env.CorrelationID
  126. select {
  127. case consumer <- api.NewDefaultSourceTuple(result, meta):
  128. log.Debugf("send data to device node")
  129. case <-ctx.Done():
  130. return
  131. }
  132. }
  133. } else {
  134. log.Errorf("Unsupported data type %s.", env.ContentType)
  135. }
  136. }
  137. }
  138. }
  139. }
  140. func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
  141. t, err := es.getType(r.Name, logger)
  142. if err != nil {
  143. return nil, err
  144. }
  145. t = strings.ToUpper(t)
  146. logger.Debugf("name %s with type %s", r.Name, t)
  147. v := r.Value
  148. switch t {
  149. case "B", "BOOL", "BOOLEAN":
  150. if r, err := strconv.ParseBool(v); err != nil {
  151. return nil, err
  152. } else {
  153. return r, nil
  154. }
  155. case "I", "INT", "INT8", "INT16", "INT32", "INT64", "UINT", "UINT8", "UINT16", "UINT32", "UINT64":
  156. if r, err := strconv.Atoi(v); err != nil {
  157. return nil, err
  158. } else {
  159. return r, nil
  160. }
  161. case "F", "FLOAT", "FLOAT16", "FLOAT32", "FLOAT64":
  162. if r, err := strconv.ParseFloat(v, 64); err != nil {
  163. return nil, err
  164. } else {
  165. return r, nil
  166. }
  167. case "S", "STRING":
  168. return v, nil
  169. case "J", "JSON":
  170. var a interface{}
  171. if err := json.Unmarshal([]byte(v), &a); err != nil {
  172. return nil, err
  173. } else {
  174. return a, nil
  175. }
  176. default:
  177. logger.Warnf("unknown type %s return the string value", t)
  178. return v, nil
  179. }
  180. }
  181. func (es *EdgexSource) fetchAllDataDescriptors() error {
  182. if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
  183. return err
  184. } else {
  185. for _, vd := range vdArr {
  186. es.valueDescs[vd.Id] = vd.Type
  187. }
  188. }
  189. return nil
  190. }
  191. func (es *EdgexSource) getType(id string, logger api.Logger) (string, error) {
  192. if t, ok := es.valueDescs[id]; ok {
  193. return t, nil
  194. } else {
  195. if e := es.fetchAllDataDescriptors(); e != nil {
  196. return "", e
  197. }
  198. if t, ok := es.valueDescs[id]; ok {
  199. return t, nil
  200. } else {
  201. return "", fmt.Errorf("cannot find type info for %s in value descriptor.", id)
  202. }
  203. }
  204. }
  205. func (es *EdgexSource) Close(ctx api.StreamContext) error {
  206. if e := es.client.Disconnect(); e != nil {
  207. return e
  208. } else {
  209. return nil
  210. }
  211. }