edgex_source.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. // +build edgex
  2. package source
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  7. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  8. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  9. "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
  10. "github.com/emqx/kuiper/internal/conf"
  11. "github.com/emqx/kuiper/pkg/api"
  12. "github.com/emqx/kuiper/pkg/cast"
  13. "github.com/emqx/kuiper/pkg/message"
  14. "strconv"
  15. "strings"
  16. )
  17. type EdgexSource struct {
  18. client messaging.MessageClient
  19. subscribed bool
  20. topic string
  21. }
  22. func (es *EdgexSource) Configure(_ string, props map[string]interface{}) error {
  23. if f, ok := props["format"]; ok {
  24. if f != message.FormatJson {
  25. return fmt.Errorf("edgex source only supports `json` format")
  26. }
  27. }
  28. var protocol = "tcp"
  29. if p, ok := props["protocol"]; ok {
  30. protocol = p.(string)
  31. }
  32. var server = "localhost"
  33. if s, ok := props["server"]; ok {
  34. server = s.(string)
  35. }
  36. var port = 5563
  37. if p, ok := props["port"]; ok {
  38. port = p.(int)
  39. }
  40. if tpc, ok := props["topic"]; ok {
  41. es.topic = tpc.(string)
  42. }
  43. var mbusType = messaging.ZeroMQ
  44. if t, ok := props["type"]; ok {
  45. mbusType = t.(string)
  46. if mbusType != messaging.ZeroMQ && mbusType != messaging.MQTT && mbusType != messaging.Redis {
  47. return fmt.Errorf("Specified wrong message type value %s, will use zeromq messagebus.\n", mbusType)
  48. }
  49. }
  50. mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: mbusType}
  51. var optional = make(map[string]string)
  52. if ops, ok := props["optional"]; ok {
  53. if ops1, ok1 := ops.(map[string]interface{}); ok1 {
  54. for k, v := range ops1 {
  55. if cv, err := cast.ToString(v, cast.CONVERT_ALL); err == nil {
  56. optional[k] = cv
  57. } else {
  58. conf.Log.Infof("Cannot convert configuration %s: %s to string type.\n", k, v)
  59. }
  60. }
  61. }
  62. mbconf.Optional = optional
  63. }
  64. printConf(mbconf)
  65. if client, err := messaging.NewMessageClient(mbconf); err != nil {
  66. return err
  67. } else {
  68. es.client = client
  69. return nil
  70. }
  71. }
  72. // Modify the copied conf to print no password.
  73. func printConf(mbconf types.MessageBusConfig) {
  74. var printableOptional = make(map[string]string)
  75. for k, v := range mbconf.Optional {
  76. if strings.ToLower(k) == "password" {
  77. printableOptional[k] = "*"
  78. } else {
  79. printableOptional[k] = v
  80. }
  81. }
  82. mbconf.Optional = printableOptional
  83. conf.Log.Infof("Use configuration for edgex messagebus %v", mbconf)
  84. }
  85. func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  86. log := ctx.GetLogger()
  87. if err := es.client.Connect(); err != nil {
  88. info := fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  89. log.Errorf(info.Error())
  90. errCh <- info
  91. return
  92. }
  93. log.Infof("The connection to edgex messagebus is established successfully.")
  94. messages := make(chan types.MessageEnvelope)
  95. topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
  96. err := make(chan error)
  97. if e := es.client.Subscribe(topics, err); e != nil {
  98. log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  99. errCh <- e
  100. } else {
  101. es.subscribed = true
  102. log.Infof("Successfully subscribed to edgex messagebus topic %s.", es.topic)
  103. for {
  104. select {
  105. case e1 := <-err:
  106. errCh <- e1
  107. return
  108. case env, ok := <-messages:
  109. if !ok { // the source is closed
  110. return
  111. }
  112. if strings.ToLower(env.ContentType) == "application/json" {
  113. e := &dtos.Event{}
  114. if err := json.Unmarshal(env.Payload, e); err != nil {
  115. l := len(env.Payload)
  116. if l > 200 {
  117. l = 200
  118. }
  119. log.Warnf("payload %s unmarshal fail: %v", env.Payload[0:(l-1)], err)
  120. } else {
  121. result := make(map[string]interface{})
  122. meta := make(map[string]interface{})
  123. log.Debugf("receive message %s from device %s", env.Payload, e.DeviceName)
  124. for _, r := range e.Readings {
  125. if r.ResourceName != "" {
  126. if v, err := es.getValue(r, log); err != nil {
  127. log.Warnf("fail to get value for %s: %v", r.ResourceName, err)
  128. } else {
  129. result[r.ResourceName] = v
  130. }
  131. r_meta := map[string]interface{}{}
  132. r_meta["id"] = r.Id
  133. //r_meta["created"] = r.Created
  134. //r_meta["modified"] = r.Modified
  135. r_meta["origin"] = r.Origin
  136. //r_meta["pushed"] = r.Pushed
  137. r_meta["deviceName"] = r.DeviceName
  138. r_meta["profileName"] = r.ProfileName
  139. r_meta["valueType"] = r.ValueType
  140. if r.MediaType != "" {
  141. r_meta["mediaType"] = r.MediaType
  142. }
  143. meta[r.ResourceName] = r_meta
  144. } else {
  145. log.Warnf("The name of readings should not be empty!")
  146. }
  147. }
  148. if len(result) > 0 {
  149. meta["id"] = e.Id
  150. //meta["pushed"] = e.Pushed
  151. meta["deviceName"] = e.DeviceName
  152. meta["profileName"] = e.ProfileName
  153. meta["sourceName"] = e.SourceName
  154. //meta["created"] = e.Created
  155. //meta["modified"] = e.Modified
  156. meta["origin"] = e.Origin
  157. meta["tags"] = e.Tags
  158. meta["correlationid"] = env.CorrelationID
  159. select {
  160. case consumer <- api.NewDefaultSourceTuple(result, meta):
  161. log.Debugf("send data to device node")
  162. case <-ctx.Done():
  163. return
  164. }
  165. } else {
  166. log.Warnf("No readings are processed for the event, so ignore it.")
  167. }
  168. }
  169. } else {
  170. log.Errorf("Unsupported data type %s.", env.ContentType)
  171. }
  172. }
  173. }
  174. }
  175. }
  176. func (es *EdgexSource) getValue(r dtos.BaseReading, logger api.Logger) (interface{}, error) {
  177. t := r.ValueType
  178. logger.Debugf("name %s with type %s", r.ResourceName, r.ValueType)
  179. v := r.Value
  180. switch t {
  181. case v2.ValueTypeBool:
  182. if r, err := strconv.ParseBool(v); err != nil {
  183. return nil, err
  184. } else {
  185. return r, nil
  186. }
  187. case v2.ValueTypeInt8, v2.ValueTypeInt16, v2.ValueTypeInt32, v2.ValueTypeInt64, v2.ValueTypeUint8, v2.ValueTypeUint16, v2.ValueTypeUint32:
  188. if r, err := strconv.Atoi(v); err != nil {
  189. return nil, err
  190. } else {
  191. return r, nil
  192. }
  193. case v2.ValueTypeUint64:
  194. if u64, err := strconv.ParseUint(v, 10, 64); err != nil {
  195. return nil, err
  196. } else {
  197. return u64, nil
  198. }
  199. case v2.ValueTypeFloat32:
  200. if r, err := strconv.ParseFloat(v, 32); err != nil {
  201. return nil, err
  202. } else {
  203. return r, nil
  204. }
  205. case v2.ValueTypeFloat64:
  206. if r, err := strconv.ParseFloat(v, 64); err != nil {
  207. return nil, err
  208. } else {
  209. return r, nil
  210. }
  211. case v2.ValueTypeString:
  212. return v, nil
  213. case v2.ValueTypeBoolArray:
  214. var val []bool
  215. if e := json.Unmarshal([]byte(v), &val); e == nil {
  216. return val, nil
  217. } else {
  218. return nil, e
  219. }
  220. case v2.ValueTypeInt8Array, v2.ValueTypeInt16Array, v2.ValueTypeInt32Array, v2.ValueTypeInt64Array, v2.ValueTypeUint8Array, v2.ValueTypeUint16Array, v2.ValueTypeUint32Array:
  221. var val []int
  222. if e := json.Unmarshal([]byte(v), &val); e == nil {
  223. return val, nil
  224. } else {
  225. return nil, e
  226. }
  227. case v2.ValueTypeUint64Array:
  228. var val []uint64
  229. if e := json.Unmarshal([]byte(v), &val); e == nil {
  230. return val, nil
  231. } else {
  232. return nil, e
  233. }
  234. case v2.ValueTypeFloat32Array:
  235. return convertFloatArray(v, 32)
  236. case v2.ValueTypeFloat64Array:
  237. return convertFloatArray(v, 64)
  238. case v2.ValueTypeStringArray:
  239. var val []string
  240. if e := json.Unmarshal([]byte(v), &val); e == nil {
  241. return val, nil
  242. } else {
  243. return nil, e
  244. }
  245. case v2.ValueTypeBinary:
  246. return r.BinaryValue, nil
  247. default:
  248. logger.Warnf("Not supported type %s, and processed as string value", t)
  249. return v, nil
  250. }
  251. }
  252. func convertFloatArray(v string, bitSize int) (interface{}, error) {
  253. var val1 []string
  254. if e := json.Unmarshal([]byte(v), &val1); e == nil {
  255. var ret []float64
  256. for _, v := range val1 {
  257. if fv, err := strconv.ParseFloat(v, bitSize); err != nil {
  258. return nil, err
  259. } else {
  260. ret = append(ret, fv)
  261. }
  262. }
  263. return ret, nil
  264. } else {
  265. var val []float64
  266. if e := json.Unmarshal([]byte(v), &val); e == nil {
  267. return val, nil
  268. } else {
  269. return nil, e
  270. }
  271. }
  272. }
  273. func (es *EdgexSource) Close(_ api.StreamContext) error {
  274. if es.subscribed {
  275. if e := es.client.Disconnect(); e != nil {
  276. return e
  277. }
  278. }
  279. return nil
  280. }