edgex_source.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. // +build edgex
  2. package extensions
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/base64"
  7. "encoding/binary"
  8. "fmt"
  9. "github.com/edgexfoundry/go-mod-core-contracts/clients"
  10. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  11. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  12. "github.com/edgexfoundry/go-mod-core-contracts/models"
  13. "github.com/edgexfoundry/go-mod-messaging/messaging"
  14. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  15. "github.com/emqx/kuiper/common"
  16. "github.com/emqx/kuiper/xstream/api"
  17. "strconv"
  18. "strings"
  19. )
  20. type EdgexSource struct {
  21. client messaging.MessageClient
  22. subscribed bool
  23. vdc coredata.ValueDescriptorClient
  24. topic string
  25. valueDescs map[string]string
  26. }
  27. func (es *EdgexSource) Configure(device string, props map[string]interface{}) error {
  28. var protocol = "tcp"
  29. if p, ok := props["protocol"]; ok {
  30. protocol = p.(string)
  31. }
  32. var server = "localhost"
  33. if s, ok := props["server"]; ok {
  34. server = s.(string)
  35. }
  36. var port = 5563
  37. if p, ok := props["port"]; ok {
  38. port = p.(int)
  39. }
  40. if tpc, ok := props["topic"]; ok {
  41. es.topic = tpc.(string)
  42. }
  43. var mbusType = messaging.ZeroMQ
  44. if t, ok := props["type"]; ok {
  45. mbusType = t.(string)
  46. if mbusType != messaging.ZeroMQ && mbusType != messaging.MQTT {
  47. return fmt.Errorf("Specified wrong message type value %s, will use zeromq messagebus.\n", mbusType)
  48. }
  49. }
  50. if serviceServer, ok := props["serviceServer"]; ok {
  51. svr := serviceServer.(string) + clients.ApiValueDescriptorRoute
  52. common.Log.Infof("Connect to value descriptor service at: %s \n", svr)
  53. es.vdc = coredata.NewValueDescriptorClient(local.New(svr))
  54. es.valueDescs = make(map[string]string)
  55. } else {
  56. return fmt.Errorf("The service server cannot be empty.")
  57. }
  58. mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: mbusType}
  59. var optional = make(map[string]string)
  60. if ops, ok := props["optional"]; ok {
  61. if ops1, ok1 := ops.(map[string]interface{}); ok1 {
  62. for k, v := range ops1 {
  63. if cv, ok := CastToString(v); ok {
  64. optional[k] = cv
  65. } else {
  66. common.Log.Infof("Cannot convert configuration %s: %s to string type.\n", k, v)
  67. }
  68. }
  69. }
  70. mbconf.Optional = optional
  71. }
  72. common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
  73. if client, err := messaging.NewMessageClient(mbconf); err != nil {
  74. return err
  75. } else {
  76. es.client = client
  77. return nil
  78. }
  79. }
  80. func castToString(v interface{}) (result string, ok bool) {
  81. switch v := v.(type) {
  82. case int:
  83. return strconv.Itoa(v), true
  84. case string:
  85. return v, true
  86. case bool:
  87. return strconv.FormatBool(v), true
  88. case float64, float32:
  89. return fmt.Sprintf("%.2f", v), true
  90. default:
  91. return "", false
  92. }
  93. }
  94. func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  95. log := ctx.GetLogger()
  96. if err := es.client.Connect(); err != nil {
  97. info := fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  98. log.Errorf(info.Error())
  99. errCh <- info
  100. return
  101. }
  102. log.Infof("The connection to edgex messagebus is established successfully.")
  103. messages := make(chan types.MessageEnvelope)
  104. topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
  105. err := make(chan error)
  106. if e := es.client.Subscribe(topics, err); e != nil {
  107. log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  108. errCh <- e
  109. } else {
  110. es.subscribed = true
  111. log.Infof("Successfully subscribed to edgex messagebus topic %s.", es.topic)
  112. for {
  113. select {
  114. case e1 := <-err:
  115. errCh <- e1
  116. return
  117. case env := <-messages:
  118. if strings.ToLower(env.ContentType) == "application/json" {
  119. e := models.Event{}
  120. if err := e.UnmarshalJSON(env.Payload); err != nil {
  121. len := len(env.Payload)
  122. if len > 200 {
  123. len = 200
  124. }
  125. log.Warnf("payload %s unmarshal fail: %v", env.Payload[0:(len-1)], err)
  126. } else {
  127. result := make(map[string]interface{})
  128. meta := make(map[string]interface{})
  129. log.Debugf("receive message %s from device %s", env.Payload, e.Device)
  130. for _, r := range e.Readings {
  131. if r.Name != "" {
  132. if v, err := es.getValue(r, log); err != nil {
  133. log.Warnf("fail to get value for %s: %v", r.Name, err)
  134. } else {
  135. result[r.Name] = v
  136. }
  137. r_meta := map[string]interface{}{}
  138. r_meta["id"] = r.Id
  139. r_meta["created"] = r.Created
  140. r_meta["modified"] = r.Modified
  141. r_meta["origin"] = r.Origin
  142. r_meta["pushed"] = r.Pushed
  143. r_meta["device"] = r.Device
  144. meta[r.Name] = r_meta
  145. } else {
  146. log.Warnf("The name of readings should not be empty!")
  147. }
  148. }
  149. if len(result) > 0 {
  150. meta["id"] = e.ID
  151. meta["pushed"] = e.Pushed
  152. meta["device"] = e.Device
  153. meta["created"] = e.Created
  154. meta["modified"] = e.Modified
  155. meta["origin"] = e.Origin
  156. meta["correlationid"] = env.CorrelationID
  157. select {
  158. case consumer <- api.NewDefaultSourceTuple(result, meta):
  159. log.Debugf("send data to device node")
  160. case <-ctx.Done():
  161. return
  162. }
  163. } else {
  164. log.Warnf("No readings are processed for the event, so ignore it.")
  165. }
  166. }
  167. } else {
  168. log.Errorf("Unsupported data type %s.", env.ContentType)
  169. }
  170. }
  171. }
  172. }
  173. }
  174. func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
  175. t, err := es.getType(r.Name, logger)
  176. var ot = t
  177. if err != nil {
  178. return nil, err
  179. }
  180. t = strings.ToUpper(t)
  181. logger.Debugf("name %s with type %s", r.Name, t)
  182. v := r.Value
  183. switch t {
  184. case "BOOL":
  185. if r, err := strconv.ParseBool(v); err != nil {
  186. return nil, err
  187. } else {
  188. return r, nil
  189. }
  190. case "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32":
  191. if r, err := strconv.Atoi(v); err != nil {
  192. return nil, err
  193. } else {
  194. return r, nil
  195. }
  196. case "UINT64":
  197. if u64, err := strconv.ParseUint(v, 10, 64); err != nil {
  198. return nil, err
  199. } else {
  200. return u64, nil
  201. }
  202. case "FLOAT32", "FLOAT64":
  203. if r.ValueType == "" {
  204. r.ValueType = ot
  205. }
  206. return es.getFloatValue(r, logger)
  207. case "STRING":
  208. return v, nil
  209. case "BINARY":
  210. return nil, fmt.Errorf("Unsupport for binary type, the value will be ignored.")
  211. default:
  212. logger.Warnf("Not supported type %s, and processed as string value", t)
  213. return v, nil
  214. }
  215. }
  216. func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (interface{}, error) {
  217. if len(r.FloatEncoding) == 0 {
  218. if strings.Contains(r.Value, "=") {
  219. r.FloatEncoding = models.Base64Encoding
  220. } else {
  221. r.FloatEncoding = models.ENotation
  222. }
  223. }
  224. switch strings.ToLower(r.ValueType) {
  225. case strings.ToLower(models.ValueTypeFloat32):
  226. var value float64
  227. switch r.FloatEncoding {
  228. case models.Base64Encoding:
  229. data, err := base64.StdEncoding.DecodeString(r.Value)
  230. if err != nil {
  231. return false, fmt.Errorf("unable to Base 64 decode float32 value ('%s'): %s", r.Value, err.Error())
  232. }
  233. var value1 float32
  234. err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value1)
  235. if err != nil {
  236. return false, fmt.Errorf("unable to decode float32 value bytes: %s", err.Error())
  237. }
  238. value = float64(value1)
  239. case models.ENotation:
  240. var err error
  241. var temp float64
  242. temp, err = strconv.ParseFloat(r.Value, 64)
  243. if err != nil {
  244. return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
  245. }
  246. value = float64(temp)
  247. default:
  248. return false, fmt.Errorf("unkown FloatEncoding for float32 value: %s", r.FloatEncoding)
  249. }
  250. return value, nil
  251. case strings.ToLower(models.ValueTypeFloat64):
  252. var value float64
  253. switch r.FloatEncoding {
  254. case models.Base64Encoding:
  255. data, err := base64.StdEncoding.DecodeString(r.Value)
  256. if err != nil {
  257. return false, fmt.Errorf("unable to Base 64 decode float64 value ('%s'): %s", r.Value, err.Error())
  258. }
  259. err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value)
  260. if err != nil {
  261. return false, fmt.Errorf("unable to decode float64 value bytes: %s", err.Error())
  262. }
  263. return value, nil
  264. case models.ENotation:
  265. var err error
  266. value, err = strconv.ParseFloat(r.Value, 64)
  267. if err != nil {
  268. return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
  269. }
  270. return value, nil
  271. default:
  272. return false, fmt.Errorf("unkown FloatEncoding for float64 value: %s", r.FloatEncoding)
  273. }
  274. default:
  275. return nil, fmt.Errorf("unkown value type: %s, reading:%v", r.ValueType, r)
  276. }
  277. }
  278. func (es *EdgexSource) fetchAllDataDescriptors() error {
  279. if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
  280. return err
  281. } else {
  282. for _, vd := range vdArr {
  283. es.valueDescs[vd.Name] = vd.Type
  284. }
  285. if len(vdArr) == 0 {
  286. common.Log.Infof("Cannot find any value descriptors from value descriptor services.")
  287. } else {
  288. common.Log.Infof("Get %d of value descriptors from service.", len(vdArr))
  289. for i, v := range vdArr {
  290. common.Log.Debugf("%d: %s - %s ", i, v.Name, v.Type)
  291. }
  292. }
  293. }
  294. return nil
  295. }
  296. func (es *EdgexSource) getType(id string, logger api.Logger) (string, error) {
  297. if t, ok := es.valueDescs[id]; ok {
  298. return t, nil
  299. } else {
  300. if e := es.fetchAllDataDescriptors(); e != nil {
  301. return "", e
  302. }
  303. if t, ok := es.valueDescs[id]; ok {
  304. return t, nil
  305. } else {
  306. return "", fmt.Errorf("cannot find type info for %s in value descriptor.", id)
  307. }
  308. }
  309. }
  310. func (es *EdgexSource) Close(ctx api.StreamContext) error {
  311. if es.subscribed {
  312. if e := es.client.Disconnect(); e != nil {
  313. return e
  314. }
  315. }
  316. return nil
  317. }