edgex_source.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. // +build edgex
  2. package extensions
  3. import (
  4. "context"
  5. "fmt"
  6. "github.com/edgexfoundry/go-mod-core-contracts/clients"
  7. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  8. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  9. "github.com/edgexfoundry/go-mod-core-contracts/models"
  10. "github.com/edgexfoundry/go-mod-messaging/messaging"
  11. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  12. "github.com/emqx/kuiper/common"
  13. "github.com/emqx/kuiper/xstream/api"
  14. "strconv"
  15. "strings"
  16. )
  17. type EdgexSource struct {
  18. client messaging.MessageClient
  19. subscribed bool
  20. vdc coredata.ValueDescriptorClient
  21. topic string
  22. valueDescs map[string]string
  23. }
  24. func (es *EdgexSource) Configure(device string, props map[string]interface{}) error {
  25. var protocol = "tcp";
  26. if p, ok := props["protocol"]; ok {
  27. protocol = p.(string)
  28. }
  29. var server = "localhost";
  30. if s, ok := props["server"]; ok {
  31. server = s.(string)
  32. }
  33. var port = 5563
  34. if p, ok := props["port"]; ok {
  35. port = p.(int)
  36. }
  37. if tpc, ok := props["topic"]; ok {
  38. es.topic = tpc.(string)
  39. }
  40. var mbusType = messaging.ZeroMQ
  41. if t, ok := props["type"]; ok {
  42. mbusType = t.(string)
  43. if mbusType != messaging.ZeroMQ && mbusType != messaging.MQTT {
  44. return fmt.Errorf("Specified wrong message type value %s, will use zeromq messagebus.\n", mbusType)
  45. }
  46. }
  47. if serviceServer, ok := props["serviceServer"]; ok {
  48. svr := serviceServer.(string) + clients.ApiValueDescriptorRoute
  49. common.Log.Infof("Connect to value descriptor service at: %s \n", svr)
  50. es.vdc = coredata.NewValueDescriptorClient(local.New(svr))
  51. es.valueDescs = make(map[string]string)
  52. } else {
  53. return fmt.Errorf("The service server cannot be empty.")
  54. }
  55. mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: mbusType}
  56. common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
  57. var optional = make(map[string]string)
  58. if ops, ok := props["optional"]; ok {
  59. if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
  60. for k, v := range ops1 {
  61. k1 := k.(string)
  62. v1 := v.(string)
  63. optional[k1] = v1
  64. }
  65. }
  66. mbconf.Optional = optional
  67. }
  68. if client, err := messaging.NewMessageClient(mbconf); err != nil {
  69. return err
  70. } else {
  71. es.client = client
  72. return nil
  73. }
  74. }
  75. func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  76. log := ctx.GetLogger()
  77. if err := es.client.Connect(); err != nil {
  78. info := fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  79. log.Errorf(info.Error())
  80. errCh <- info
  81. return
  82. }
  83. log.Infof("The connection to edgex messagebus is established successfully.")
  84. messages := make(chan types.MessageEnvelope)
  85. topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
  86. err := make(chan error)
  87. if e := es.client.Subscribe(topics, err); e != nil {
  88. log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  89. errCh <- e
  90. } else {
  91. es.subscribed = true
  92. log.Infof("Successfully subscribed to edgex messagebus topic %s.", es.topic)
  93. for {
  94. select {
  95. case e1 := <-err:
  96. errCh <- e1
  97. return
  98. case env := <-messages:
  99. if strings.ToLower(env.ContentType) == "application/json" {
  100. e := models.Event{}
  101. if err := e.UnmarshalJSON(env.Payload); err != nil {
  102. log.Warnf("payload %s unmarshal fail: %v", env.Payload, err)
  103. } else {
  104. result := make(map[string]interface{})
  105. meta := make(map[string]interface{})
  106. log.Debugf("receive message from device %s", e.Device)
  107. for _, r := range e.Readings {
  108. if r.Name != "" {
  109. if v, err := es.getValue(r, log); err != nil {
  110. log.Warnf("fail to get value for %s: %v", r.Name, err)
  111. } else {
  112. result[strings.ToLower(r.Name)] = v
  113. }
  114. r_meta := map[string]interface{}{}
  115. r_meta["id"] = r.Id
  116. r_meta["created"] = r.Created
  117. r_meta["modified"] = r.Modified
  118. r_meta["origin"] = r.Origin
  119. r_meta["pushed"] = r.Pushed
  120. r_meta["device"] = r.Device
  121. meta[strings.ToLower(r.Name)] = r_meta
  122. } else {
  123. log.Warnf("The name of readings should not be empty!")
  124. }
  125. }
  126. if len(result) > 0 {
  127. meta["id"] = e.ID
  128. meta["pushed"] = e.Pushed
  129. meta["device"] = e.Device
  130. meta["created"] = e.Created
  131. meta["modified"] = e.Modified
  132. meta["origin"] = e.Origin
  133. meta["correlationid"] = env.CorrelationID
  134. select {
  135. case consumer <- api.NewDefaultSourceTuple(result, meta):
  136. log.Debugf("send data to device node")
  137. case <-ctx.Done():
  138. return
  139. }
  140. } else {
  141. log.Warnf("got an empty result, ignored")
  142. }
  143. }
  144. } else {
  145. log.Errorf("Unsupported data type %s.", env.ContentType)
  146. }
  147. }
  148. }
  149. }
  150. }
  151. func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
  152. t, err := es.getType(r.Name, logger)
  153. if err != nil {
  154. return nil, err
  155. }
  156. t = strings.ToUpper(t)
  157. logger.Debugf("name %s with type %s", r.Name, t)
  158. v := r.Value
  159. switch t {
  160. case "BOOL":
  161. if r, err := strconv.ParseBool(v); err != nil {
  162. return nil, err
  163. } else {
  164. return r, nil
  165. }
  166. case "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32", "UINT64":
  167. if r, err := strconv.Atoi(v); err != nil {
  168. return nil, err
  169. } else {
  170. return r, nil
  171. }
  172. case "FLOAT32", "FLOAT64":
  173. if r, err := strconv.ParseFloat(v, 64); err != nil {
  174. return nil, err
  175. } else {
  176. return r, nil
  177. }
  178. case "STRING":
  179. return v, nil
  180. default:
  181. logger.Warnf("unknown type %s return the string value", t)
  182. return v, nil
  183. }
  184. }
  185. func (es *EdgexSource) fetchAllDataDescriptors() error {
  186. if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
  187. return err
  188. } else {
  189. for _, vd := range vdArr {
  190. es.valueDescs[vd.Name] = vd.Type
  191. }
  192. if len(vdArr) == 0 {
  193. common.Log.Infof("Cannot find any value descriptors from value descriptor services.")
  194. } else {
  195. common.Log.Infof("Get %d of value descriptors from service.", len(vdArr))
  196. for i, v := range vdArr {
  197. common.Log.Debugf("%d: %s - %s ", i, v.Name, v.Type)
  198. }
  199. }
  200. }
  201. return nil
  202. }
  203. func (es *EdgexSource) getType(id string, logger api.Logger) (string, error) {
  204. if t, ok := es.valueDescs[id]; ok {
  205. return t, nil
  206. } else {
  207. if e := es.fetchAllDataDescriptors(); e != nil {
  208. return "", e
  209. }
  210. if t, ok := es.valueDescs[id]; ok {
  211. return t, nil
  212. } else {
  213. return "", fmt.Errorf("cannot find type info for %s in value descriptor.", id)
  214. }
  215. }
  216. }
  217. func (es *EdgexSource) Close(ctx api.StreamContext) error {
  218. if es.subscribed {
  219. if e := es.client.Disconnect(); e != nil {
  220. return e
  221. }
  222. }
  223. return nil
  224. }