edgex_source.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. // +build edgex
  2. package extensions
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/base64"
  7. "encoding/binary"
  8. "encoding/json"
  9. "fmt"
  10. "github.com/edgexfoundry/go-mod-core-contracts/clients"
  11. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  12. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  13. "github.com/edgexfoundry/go-mod-core-contracts/models"
  14. "github.com/edgexfoundry/go-mod-messaging/messaging"
  15. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  16. "github.com/emqx/kuiper/common"
  17. "github.com/emqx/kuiper/xstream/api"
  18. "strconv"
  19. "strings"
  20. )
  21. type EdgexSource struct {
  22. client messaging.MessageClient
  23. subscribed bool
  24. vdc coredata.ValueDescriptorClient
  25. topic string
  26. valueDescs map[string]string
  27. }
  28. func (es *EdgexSource) Configure(device string, props map[string]interface{}) error {
  29. if f, ok := props["format"]; ok {
  30. if f != common.FORMAT_JSON {
  31. return fmt.Errorf("edgex source only supports `json` format")
  32. }
  33. }
  34. var protocol = "tcp"
  35. if p, ok := props["protocol"]; ok {
  36. protocol = p.(string)
  37. }
  38. var server = "localhost"
  39. if s, ok := props["server"]; ok {
  40. server = s.(string)
  41. }
  42. var port = 5563
  43. if p, ok := props["port"]; ok {
  44. port = p.(int)
  45. }
  46. if tpc, ok := props["topic"]; ok {
  47. es.topic = tpc.(string)
  48. }
  49. var mbusType = messaging.ZeroMQ
  50. if t, ok := props["type"]; ok {
  51. mbusType = t.(string)
  52. if mbusType != messaging.ZeroMQ && mbusType != messaging.MQTT && mbusType != messaging.RedisStreams {
  53. return fmt.Errorf("Specified wrong message type value %s, will use zeromq messagebus.\n", mbusType)
  54. }
  55. }
  56. if serviceServer, ok := props["serviceServer"]; ok {
  57. svr := serviceServer.(string) + clients.ApiValueDescriptorRoute
  58. common.Log.Infof("Connect to value descriptor service at: %s \n", svr)
  59. es.vdc = coredata.NewValueDescriptorClient(local.New(svr))
  60. es.valueDescs = make(map[string]string)
  61. } else {
  62. return fmt.Errorf("The service server cannot be empty.")
  63. }
  64. mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: mbusType}
  65. var optional = make(map[string]string)
  66. if ops, ok := props["optional"]; ok {
  67. if ops1, ok1 := ops.(map[string]interface{}); ok1 {
  68. for k, v := range ops1 {
  69. if cv, ok := CastToString(v); ok {
  70. optional[k] = cv
  71. } else {
  72. common.Log.Infof("Cannot convert configuration %s: %s to string type.\n", k, v)
  73. }
  74. }
  75. }
  76. mbconf.Optional = optional
  77. }
  78. common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
  79. if client, err := messaging.NewMessageClient(mbconf); err != nil {
  80. return err
  81. } else {
  82. es.client = client
  83. return nil
  84. }
  85. }
  86. func castToString(v interface{}) (result string, ok bool) {
  87. switch v := v.(type) {
  88. case int:
  89. return strconv.Itoa(v), true
  90. case string:
  91. return v, true
  92. case bool:
  93. return strconv.FormatBool(v), true
  94. case float64, float32:
  95. return fmt.Sprintf("%.2f", v), true
  96. default:
  97. return "", false
  98. }
  99. }
  100. func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  101. log := ctx.GetLogger()
  102. if err := es.client.Connect(); err != nil {
  103. info := fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  104. log.Errorf(info.Error())
  105. errCh <- info
  106. return
  107. }
  108. log.Infof("The connection to edgex messagebus is established successfully.")
  109. messages := make(chan types.MessageEnvelope)
  110. topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
  111. err := make(chan error)
  112. if e := es.client.Subscribe(topics, err); e != nil {
  113. log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  114. errCh <- e
  115. } else {
  116. es.subscribed = true
  117. log.Infof("Successfully subscribed to edgex messagebus topic %s.", es.topic)
  118. for {
  119. select {
  120. case e1 := <-err:
  121. errCh <- e1
  122. return
  123. case env := <-messages:
  124. if strings.ToLower(env.ContentType) == "application/json" {
  125. e := models.Event{}
  126. if err := e.UnmarshalJSON(env.Payload); err != nil {
  127. len := len(env.Payload)
  128. if len > 200 {
  129. len = 200
  130. }
  131. log.Warnf("payload %s unmarshal fail: %v", env.Payload[0:(len-1)], err)
  132. } else {
  133. result := make(map[string]interface{})
  134. meta := make(map[string]interface{})
  135. log.Debugf("receive message %s from device %s", env.Payload, e.Device)
  136. for _, r := range e.Readings {
  137. if r.Name != "" {
  138. if v, err := es.getValue(r, log); err != nil {
  139. log.Warnf("fail to get value for %s: %v", r.Name, err)
  140. } else {
  141. result[r.Name] = v
  142. }
  143. r_meta := map[string]interface{}{}
  144. r_meta["id"] = r.Id
  145. r_meta["created"] = r.Created
  146. r_meta["modified"] = r.Modified
  147. r_meta["origin"] = r.Origin
  148. r_meta["pushed"] = r.Pushed
  149. r_meta["device"] = r.Device
  150. meta[r.Name] = r_meta
  151. } else {
  152. log.Warnf("The name of readings should not be empty!")
  153. }
  154. }
  155. if len(result) > 0 {
  156. meta["id"] = e.ID
  157. meta["pushed"] = e.Pushed
  158. meta["device"] = e.Device
  159. meta["created"] = e.Created
  160. meta["modified"] = e.Modified
  161. meta["origin"] = e.Origin
  162. meta["correlationid"] = env.CorrelationID
  163. select {
  164. case consumer <- api.NewDefaultSourceTuple(result, meta):
  165. log.Debugf("send data to device node")
  166. case <-ctx.Done():
  167. return
  168. }
  169. } else {
  170. log.Warnf("No readings are processed for the event, so ignore it.")
  171. }
  172. }
  173. } else {
  174. log.Errorf("Unsupported data type %s.", env.ContentType)
  175. }
  176. }
  177. }
  178. }
  179. }
  180. func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
  181. t, err := es.getType(r.Name, logger)
  182. var ot = t
  183. if err != nil {
  184. return nil, err
  185. }
  186. t = strings.ToUpper(t)
  187. logger.Debugf("name %s with type %s", r.Name, t)
  188. v := r.Value
  189. switch t {
  190. case "BOOL":
  191. if r, err := strconv.ParseBool(v); err != nil {
  192. return nil, err
  193. } else {
  194. return r, nil
  195. }
  196. case "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32":
  197. if r, err := strconv.Atoi(v); err != nil {
  198. return nil, err
  199. } else {
  200. return r, nil
  201. }
  202. case "UINT64":
  203. if u64, err := strconv.ParseUint(v, 10, 64); err != nil {
  204. return nil, err
  205. } else {
  206. return u64, nil
  207. }
  208. case "FLOAT32", "FLOAT64":
  209. if r.ValueType == "" {
  210. r.ValueType = ot
  211. }
  212. return es.getFloatValue(r.FloatEncoding, r.Value, r.ValueType, logger)
  213. case "STRING":
  214. return v, nil
  215. case "BOOLARRAY":
  216. var val []bool
  217. if e := json.Unmarshal([]byte(v), &val); e == nil {
  218. return val, nil
  219. } else {
  220. return nil, e
  221. }
  222. case "UINT8ARRAY", "UINT16ARRAY", "UINT32ARRAY", "INT8ARRAY", "INT16ARRAY", "INT32ARRAY", "INT64ARRAY":
  223. var val []int
  224. if e := json.Unmarshal([]byte(v), &val); e == nil {
  225. return val, nil
  226. } else {
  227. return nil, e
  228. }
  229. case "UINT64ARRAY":
  230. var val []uint64
  231. if e := json.Unmarshal([]byte(v), &val); e == nil {
  232. return val, nil
  233. } else {
  234. return nil, e
  235. }
  236. case "FLOAT32ARRAY", "FLOAT64ARRAY":
  237. if r.ValueType == "" {
  238. r.ValueType = ot
  239. }
  240. var val1 []string
  241. if e := json.Unmarshal([]byte(v), &val1); e == nil {
  242. ret := []float64{}
  243. for _, v := range val1 {
  244. if fv, err := es.getFloatValue(r.FloatEncoding, v, r.ValueType, logger); err != nil {
  245. return nil, err
  246. } else {
  247. if f, ok := fv.(float64); ok {
  248. ret = append(ret, f)
  249. } else {
  250. return nil, fmt.Errorf("The %v is not a float64 type.", f)
  251. }
  252. }
  253. }
  254. return ret, nil
  255. } else {
  256. var val []float64
  257. ret := []float64{}
  258. if e := json.Unmarshal([]byte(v), &val); e == nil {
  259. for _, v := range val {
  260. ret = append(ret, v)
  261. }
  262. return ret, nil
  263. } else {
  264. return nil, e
  265. }
  266. }
  267. case "BINARY":
  268. return nil, fmt.Errorf("Unsupport for binary type, the value will be ignored.")
  269. default:
  270. logger.Warnf("Not supported type %s, and processed as string value", t)
  271. return v, nil
  272. }
  273. }
  274. func (es *EdgexSource) getFloatValue(FloatEncoding string, Value string, ValueType string, logger api.Logger) (interface{}, error) {
  275. if len(FloatEncoding) == 0 {
  276. if strings.Contains(Value, "=") {
  277. FloatEncoding = models.Base64Encoding
  278. } else {
  279. FloatEncoding = models.ENotation
  280. }
  281. }
  282. switch strings.ToLower(ValueType) {
  283. case strings.ToLower(models.ValueTypeFloat32), strings.ToLower(models.ValueTypeFloat32Array):
  284. var value float64
  285. switch FloatEncoding {
  286. case models.Base64Encoding:
  287. data, err := base64.StdEncoding.DecodeString(Value)
  288. if err != nil {
  289. return false, fmt.Errorf("unable to Base 64 decode float32 value ('%s'): %s", Value, err.Error())
  290. }
  291. var value1 float32
  292. err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value1)
  293. if err != nil {
  294. return false, fmt.Errorf("unable to decode float32 value bytes: %s", err.Error())
  295. }
  296. value = float64(value1)
  297. case models.ENotation:
  298. var err error
  299. var temp float64
  300. temp, err = strconv.ParseFloat(Value, 64)
  301. if err != nil {
  302. return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
  303. }
  304. value = float64(temp)
  305. default:
  306. return false, fmt.Errorf("unkown FloatEncoding for float32 value: %s", FloatEncoding)
  307. }
  308. return value, nil
  309. case strings.ToLower(models.ValueTypeFloat64), strings.ToLower(models.ValueTypeFloat64Array):
  310. var value float64
  311. switch FloatEncoding {
  312. case models.Base64Encoding:
  313. data, err := base64.StdEncoding.DecodeString(Value)
  314. if err != nil {
  315. return false, fmt.Errorf("unable to Base 64 decode float64 value ('%s'): %s", Value, err.Error())
  316. }
  317. err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value)
  318. if err != nil {
  319. return false, fmt.Errorf("unable to decode float64 value bytes: %s", err.Error())
  320. }
  321. return value, nil
  322. case models.ENotation:
  323. var err error
  324. value, err = strconv.ParseFloat(Value, 64)
  325. if err != nil {
  326. return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
  327. }
  328. return value, nil
  329. default:
  330. return false, fmt.Errorf("unkown FloatEncoding for float64 value: %s", FloatEncoding)
  331. }
  332. default:
  333. return nil, fmt.Errorf("unkown value type: %s", ValueType)
  334. }
  335. }
  336. func (es *EdgexSource) fetchAllDataDescriptors() error {
  337. if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
  338. return err
  339. } else {
  340. for _, vd := range vdArr {
  341. es.valueDescs[vd.Name] = vd.Type
  342. }
  343. if len(vdArr) == 0 {
  344. common.Log.Infof("Cannot find any value descriptors from value descriptor services.")
  345. } else {
  346. common.Log.Infof("Get %d of value descriptors from service.", len(vdArr))
  347. for i, v := range vdArr {
  348. common.Log.Debugf("%d: %s - %s ", i, v.Name, v.Type)
  349. }
  350. }
  351. }
  352. return nil
  353. }
  354. func (es *EdgexSource) getType(id string, logger api.Logger) (string, error) {
  355. if t, ok := es.valueDescs[id]; ok {
  356. return t, nil
  357. } else {
  358. if e := es.fetchAllDataDescriptors(); e != nil {
  359. return "", e
  360. }
  361. if t, ok := es.valueDescs[id]; ok {
  362. return t, nil
  363. } else {
  364. return "", fmt.Errorf("cannot find type info for %s in value descriptor.", id)
  365. }
  366. }
  367. }
  368. func (es *EdgexSource) Close(ctx api.StreamContext) error {
  369. if es.subscribed {
  370. if e := es.client.Disconnect(); e != nil {
  371. return e
  372. }
  373. }
  374. return nil
  375. }