edgex_source.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package extensions
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/edgexfoundry/go-mod-core-contracts/clients"
  7. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  8. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  9. "github.com/edgexfoundry/go-mod-core-contracts/models"
  10. "github.com/edgexfoundry/go-mod-messaging/messaging"
  11. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  12. "github.com/emqx/kuiper/common"
  13. "github.com/emqx/kuiper/xstream/api"
  14. "strconv"
  15. "strings"
  16. )
  17. type EdgexZMQSource struct {
  18. client messaging.MessageClient
  19. vdc coredata.ValueDescriptorClient
  20. device string
  21. topic string
  22. valueDescs map[string]string
  23. }
  24. type EdgexConfig struct {
  25. Protocol string `json:"protocol"`
  26. Server string `json:"server"`
  27. Port int `json:"port"`
  28. Topic string `json:"topic"`
  29. ServiceServer string `json:"serviceServer"`
  30. }
  31. func (es *EdgexZMQSource) Configure(device string, props map[string]interface{}) error {
  32. cfg := &EdgexConfig{}
  33. err := common.MapToStruct(props, cfg)
  34. if err != nil {
  35. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  36. }
  37. if "" == strings.Trim(device, " ") {
  38. return fmt.Errorf("Device cannot be empty.")
  39. } else {
  40. es.device = device
  41. }
  42. if tpc, ok := props["topic"]; ok {
  43. es.topic = tpc.(string)
  44. }
  45. if serviceServer, ok := props["serviceServer"]; ok {
  46. es.vdc = coredata.NewValueDescriptorClient(local.New(serviceServer.(string) + clients.ApiValueDescriptorRoute))
  47. es.valueDescs = make(map[string]string)
  48. } else {
  49. return fmt.Errorf("The service server cannot be empty.")
  50. }
  51. mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: cfg.Protocol, Host: cfg.Server, Port: cfg.Port}, Type: messaging.ZeroMQ}
  52. if client, err := messaging.NewMessageClient(mbconf); err != nil {
  53. return err
  54. } else {
  55. es.client = client
  56. return nil
  57. }
  58. }
  59. func (es *EdgexZMQSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  60. log := ctx.GetLogger()
  61. if err := es.client.Connect(); err != nil {
  62. errCh <- fmt.Errorf("Failed to connect to message bus: " + err.Error())
  63. }
  64. messages := make(chan types.MessageEnvelope)
  65. topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
  66. err := make(chan error)
  67. if e := es.client.Subscribe(topics, err); e != nil {
  68. log.Errorf("Failed to subscribe to topic %s.\n", e)
  69. errCh <- e
  70. } else {
  71. for {
  72. select {
  73. case e1 := <-err:
  74. errCh <- e1
  75. return
  76. case env := <-messages:
  77. if strings.ToLower(env.ContentType) == "application/json" {
  78. e := models.Event{}
  79. if err := e.UnmarshalJSON(env.Payload); err != nil {
  80. log.Warnf("payload %s unmarshal fail: %v", env.Payload, err)
  81. } else {
  82. result := make(map[string]interface{})
  83. meta := make(map[string]interface{})
  84. log.Debugf("receive message from device %s vs %s", e.Device, es.device)
  85. if e.Device == es.device {
  86. for _, r := range e.Readings {
  87. if r.Name != "" {
  88. if v, err := es.getValue(r, log); err != nil {
  89. log.Warnf("fail to get value for %s: %v", r.Name, err)
  90. } else {
  91. result[strings.ToLower(r.Name)] = v
  92. }
  93. }
  94. }
  95. if len(result) > 0 {
  96. meta["id"] = e.ID
  97. meta["pushed"] = e.Pushed
  98. meta["device"] = e.Device
  99. meta["created"] = e.Created
  100. meta["modified"] = e.Modified
  101. meta["origin"] = e.Origin
  102. } else {
  103. log.Warnf("got an empty result, ignored")
  104. }
  105. }
  106. //if e := json.Unmarshal(env.Payload, &result); e != nil {
  107. // log.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(env.Payload), e)
  108. // return
  109. //}
  110. meta["CorrelationID"] = env.CorrelationID
  111. select {
  112. case consumer <- api.NewDefaultSourceTuple(result, meta):
  113. log.Debugf("send data to device node")
  114. case <-ctx.Done():
  115. return
  116. }
  117. }
  118. } else {
  119. log.Errorf("Unsupported data type %s.", env.ContentType)
  120. }
  121. }
  122. }
  123. }
  124. }
  125. func (es *EdgexZMQSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
  126. t, err := es.getType(r.Name, logger)
  127. if err != nil {
  128. return nil, err
  129. }
  130. t = strings.ToUpper(t)
  131. logger.Debugf("name %s with type %s", r.Name, t)
  132. v := r.Value
  133. switch t {
  134. case "B", "BOOL":
  135. if r, err := strconv.ParseBool(v); err != nil {
  136. return nil, err
  137. } else {
  138. return r, nil
  139. }
  140. case "I", "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32", "UINT64":
  141. if r, err := strconv.Atoi(v); err != nil {
  142. return nil, err
  143. } else {
  144. return r, nil
  145. }
  146. case "F", "FLOAT16", "FLOAT32", "FLOAT64":
  147. if r, err := strconv.ParseFloat(v, 64); err != nil {
  148. return nil, err
  149. } else {
  150. return r, nil
  151. }
  152. case "S", "STRING":
  153. return v, nil
  154. case "J", "JSON":
  155. var a interface{}
  156. if err := json.Unmarshal([]byte(v), &a); err != nil {
  157. return nil, err
  158. } else {
  159. return a, nil
  160. }
  161. default:
  162. logger.Warnf("unknown type %s return the string value", t)
  163. return v, nil
  164. }
  165. }
  166. func (es *EdgexZMQSource) fetchAllDataDescriptors() error {
  167. if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
  168. return err
  169. } else {
  170. for _, vd := range vdArr {
  171. es.valueDescs[vd.Id] = vd.Type
  172. }
  173. }
  174. return nil
  175. }
  176. func (es *EdgexZMQSource) getType(id string, logger api.Logger) (string, error) {
  177. if t, ok := es.valueDescs[id]; ok {
  178. return t, nil
  179. } else {
  180. if e := es.fetchAllDataDescriptors(); e != nil {
  181. return "", e
  182. }
  183. if t, ok := es.valueDescs[id]; ok {
  184. return t, nil
  185. } else {
  186. return "", fmt.Errorf("cannot find type info for %s in value descriptor.", id)
  187. }
  188. }
  189. }
  190. func (es *EdgexZMQSource) Close(ctx api.StreamContext) error {
  191. if e := es.client.Disconnect(); e != nil {
  192. return e
  193. } else {
  194. return nil
  195. }
  196. }