edgex_source.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. package extensions
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/base64"
  6. "encoding/binary"
  7. "fmt"
  8. "github.com/edgexfoundry/go-mod-core-contracts/clients"
  9. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  10. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  11. "github.com/edgexfoundry/go-mod-core-contracts/models"
  12. "github.com/edgexfoundry/go-mod-messaging/messaging"
  13. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  14. "github.com/emqx/kuiper/common"
  15. "github.com/emqx/kuiper/xstream/api"
  16. "strconv"
  17. "strings"
  18. )
  19. type EdgexSource struct {
  20. client messaging.MessageClient
  21. subscribed bool
  22. vdc coredata.ValueDescriptorClient
  23. topic string
  24. valueDescs map[string]string
  25. }
  26. func (es *EdgexSource) Configure(device string, props map[string]interface{}) error {
  27. var protocol = "tcp";
  28. if p, ok := props["protocol"]; ok {
  29. protocol = p.(string)
  30. }
  31. var server = "localhost";
  32. if s, ok := props["server"]; ok {
  33. server = s.(string)
  34. }
  35. var port = 5563
  36. if p, ok := props["port"]; ok {
  37. port = p.(int)
  38. }
  39. if tpc, ok := props["topic"]; ok {
  40. es.topic = tpc.(string)
  41. }
  42. var mbusType = messaging.ZeroMQ
  43. if t, ok := props["type"]; ok {
  44. mbusType = t.(string)
  45. if mbusType != messaging.ZeroMQ && mbusType != messaging.MQTT {
  46. return fmt.Errorf("Specified wrong message type value %s, will use zeromq messagebus.\n", mbusType)
  47. }
  48. }
  49. if serviceServer, ok := props["serviceServer"]; ok {
  50. svr := serviceServer.(string) + clients.ApiValueDescriptorRoute
  51. common.Log.Infof("Connect to value descriptor service at: %s \n", svr)
  52. es.vdc = coredata.NewValueDescriptorClient(local.New(svr))
  53. es.valueDescs = make(map[string]string)
  54. } else {
  55. return fmt.Errorf("The service server cannot be empty.")
  56. }
  57. mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: mbusType}
  58. common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
  59. var optional = make(map[string]string)
  60. if ops, ok := props["optional"]; ok {
  61. if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
  62. for k, v := range ops1 {
  63. k1 := k.(string)
  64. v1 := v.(string)
  65. optional[k1] = v1
  66. }
  67. }
  68. mbconf.Optional = optional
  69. }
  70. if client, err := messaging.NewMessageClient(mbconf); err != nil {
  71. return err
  72. } else {
  73. es.client = client
  74. return nil
  75. }
  76. }
  77. func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  78. log := ctx.GetLogger()
  79. if err := es.client.Connect(); err != nil {
  80. info := fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  81. log.Errorf(info.Error())
  82. errCh <- info
  83. return
  84. }
  85. log.Infof("The connection to edgex messagebus is established successfully.")
  86. messages := make(chan types.MessageEnvelope)
  87. topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
  88. err := make(chan error)
  89. if e := es.client.Subscribe(topics, err); e != nil {
  90. log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  91. errCh <- e
  92. } else {
  93. es.subscribed = true
  94. log.Infof("Successfully subscribed to edgex messagebus topic %s.", es.topic)
  95. for {
  96. select {
  97. case e1 := <-err:
  98. errCh <- e1
  99. return
  100. case env := <-messages:
  101. if strings.ToLower(env.ContentType) == "application/json" {
  102. e := models.Event{}
  103. if err := e.UnmarshalJSON(env.Payload); err != nil {
  104. len := len(env.Payload)
  105. if len > 200 {
  106. len = 200
  107. }
  108. log.Warnf("payload %s unmarshal fail: %v", env.Payload[0:(len - 1)], err)
  109. } else {
  110. result := make(map[string]interface{})
  111. meta := make(map[string]interface{})
  112. log.Debugf("receive message from device %s", e.Device)
  113. for _, r := range e.Readings {
  114. if r.Name != "" {
  115. if v, err := es.getValue(r, log); err != nil {
  116. log.Warnf("fail to get value for %s: %v", r.Name, err)
  117. } else {
  118. result[strings.ToLower(r.Name)] = v
  119. }
  120. r_meta := map[string]interface{}{}
  121. r_meta["id"] = r.Id
  122. r_meta["created"] = r.Created
  123. r_meta["modified"] = r.Modified
  124. r_meta["origin"] = r.Origin
  125. r_meta["pushed"] = r.Pushed
  126. r_meta["device"] = r.Device
  127. meta[strings.ToLower(r.Name)] = r_meta
  128. } else {
  129. log.Warnf("The name of readings should not be empty!")
  130. }
  131. }
  132. if len(result) > 0 {
  133. meta["id"] = e.ID
  134. meta["pushed"] = e.Pushed
  135. meta["device"] = e.Device
  136. meta["created"] = e.Created
  137. meta["modified"] = e.Modified
  138. meta["origin"] = e.Origin
  139. meta["correlationid"] = env.CorrelationID
  140. select {
  141. case consumer <- api.NewDefaultSourceTuple(result, meta):
  142. log.Debugf("send data to device node")
  143. case <-ctx.Done():
  144. return
  145. }
  146. } else {
  147. log.Warnf("got an empty result, ignored")
  148. }
  149. }
  150. } else {
  151. log.Errorf("Unsupported data type %s.", env.ContentType)
  152. }
  153. }
  154. }
  155. }
  156. }
  157. func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
  158. t, err := es.getType(r.Name, logger)
  159. if err != nil {
  160. return nil, err
  161. }
  162. t = strings.ToUpper(t)
  163. logger.Debugf("name %s with type %s", r.Name, t)
  164. v := r.Value
  165. switch t {
  166. case "BOOL":
  167. if r, err := strconv.ParseBool(v); err != nil {
  168. return nil, err
  169. } else {
  170. return r, nil
  171. }
  172. case "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32":
  173. if r, err := strconv.Atoi(v); err != nil {
  174. return nil, err
  175. } else {
  176. return r, nil
  177. }
  178. case "UINT64":
  179. if u64, err := strconv.ParseUint(v, 10, 64); err != nil {
  180. return nil, err
  181. } else {
  182. return u64, nil
  183. }
  184. case "FLOAT32", "FLOAT64":
  185. return es.getFloatValue(r, logger)
  186. case "STRING":
  187. return v, nil
  188. case "BINARY":
  189. return nil, fmt.Errorf("Unsupport for binary type, the value will be ignored.")
  190. default:
  191. logger.Warnf("unknown type %s return the string value", t)
  192. return v, nil
  193. }
  194. }
  195. func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (interface{}, error) {
  196. if len(r.FloatEncoding) == 0 {
  197. if strings.Contains(r.Value, "==") {
  198. r.FloatEncoding = models.Base64Encoding
  199. } else {
  200. r.FloatEncoding = models.ENotation
  201. }
  202. }
  203. switch r.ValueType {
  204. case models.ValueTypeFloat32:
  205. var value float32
  206. switch r.FloatEncoding {
  207. case models.Base64Encoding:
  208. data, err := base64.StdEncoding.DecodeString(r.Value)
  209. if err != nil {
  210. return false, fmt.Errorf("unable to Base 64 decode float32 value ('%s'): %s", r.Value, err.Error())
  211. }
  212. err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value)
  213. if err != nil {
  214. return false, fmt.Errorf("unable to decode float32 value bytes: %s", err.Error())
  215. }
  216. case models.ENotation:
  217. var err error
  218. var temp float64
  219. temp, err = strconv.ParseFloat(r.Value, 32)
  220. if err != nil {
  221. return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
  222. }
  223. value = float32(temp)
  224. default:
  225. return false, fmt.Errorf("unkown FloatEncoding for float32 value: %s", r.FloatEncoding)
  226. }
  227. return value, nil
  228. case models.ValueTypeFloat64:
  229. var value float64
  230. switch r.FloatEncoding {
  231. case models.Base64Encoding:
  232. data, err := base64.StdEncoding.DecodeString(r.Value)
  233. if err != nil {
  234. return false, fmt.Errorf("unable to Base 64 decode float64 value ('%s'): %s", r.Value, err.Error())
  235. }
  236. err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value)
  237. if err != nil {
  238. return false, fmt.Errorf("unable to decode float64 value bytes: %s", err.Error())
  239. }
  240. return value, nil
  241. case models.ENotation:
  242. var err error
  243. value, err = strconv.ParseFloat(r.Value, 64)
  244. if err != nil {
  245. return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
  246. }
  247. return value, nil
  248. default:
  249. return false, fmt.Errorf("unkown FloatEncoding for float64 value: %s", r.FloatEncoding)
  250. }
  251. }
  252. return nil, fmt.Errorf("unkown FloatEncoding type: %s", r.FloatEncoding)
  253. }
  254. func (es *EdgexSource) fetchAllDataDescriptors() error {
  255. if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
  256. return err
  257. } else {
  258. for _, vd := range vdArr {
  259. es.valueDescs[vd.Name] = vd.Type
  260. }
  261. if len(vdArr) == 0 {
  262. common.Log.Infof("Cannot find any value descriptors from value descriptor services.")
  263. } else {
  264. common.Log.Infof("Get %d of value descriptors from service.", len(vdArr))
  265. for i, v := range vdArr {
  266. common.Log.Debugf("%d: %s - %s ", i, v.Name, v.Type)
  267. }
  268. }
  269. }
  270. return nil
  271. }
  272. func (es *EdgexSource) getType(id string, logger api.Logger) (string, error) {
  273. if t, ok := es.valueDescs[id]; ok {
  274. return t, nil
  275. } else {
  276. if e := es.fetchAllDataDescriptors(); e != nil {
  277. return "", e
  278. }
  279. if t, ok := es.valueDescs[id]; ok {
  280. return t, nil
  281. } else {
  282. return "", fmt.Errorf("cannot find type info for %s in value descriptor.", id)
  283. }
  284. }
  285. }
  286. func (es *EdgexSource) Close(ctx api.StreamContext) error {
  287. if es.subscribed {
  288. if e := es.client.Disconnect(); e != nil {
  289. return e
  290. }
  291. }
  292. return nil
  293. }