edgex_sink.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. // +build edgex
  2. package sinks
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  7. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  8. "github.com/edgexfoundry/go-mod-core-contracts/models"
  9. "github.com/edgexfoundry/go-mod-messaging/messaging"
  10. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  11. "github.com/emqx/kuiper/common"
  12. "github.com/emqx/kuiper/xstream/api"
  13. )
  14. type EdgexMsgBusSink struct {
  15. protocol string
  16. host string
  17. port int
  18. ptype string
  19. topic string
  20. contentType string
  21. deviceName string
  22. metadata string
  23. optional map[string]string
  24. client messaging.MessageClient
  25. }
  26. func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
  27. ems.host = "*"
  28. ems.protocol = "tcp"
  29. ems.port = 5573
  30. ems.topic = "events"
  31. ems.contentType = "application/json"
  32. ems.ptype = messaging.ZeroMQ
  33. if host, ok := ps["host"]; ok {
  34. ems.host = host.(string)
  35. } else {
  36. common.Log.Infof("Not find host conf, will use default value '*'.")
  37. }
  38. if pro, ok := ps["protocol"]; ok {
  39. ems.protocol = pro.(string)
  40. } else {
  41. common.Log.Infof("Not find protocol conf, will use default value 'tcp'.")
  42. }
  43. if port, ok := ps["port"]; ok {
  44. if pv, ok := port.(float64); ok {
  45. ems.port = int(pv)
  46. } else if pv, ok := port.(float32); ok {
  47. ems.port = int(pv)
  48. } else {
  49. common.Log.Infof("Not valid port value, will use default value '5563'.")
  50. }
  51. } else {
  52. common.Log.Infof("Not find port conf, will use default value '5563'.")
  53. }
  54. if topic, ok := ps["topic"]; ok {
  55. ems.topic = topic.(string)
  56. } else {
  57. common.Log.Infof("Not find topic conf, will use default value 'events'.")
  58. }
  59. if contentType, ok := ps["contentType"]; ok {
  60. ems.contentType = contentType.(string)
  61. } else {
  62. common.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
  63. }
  64. if ptype, ok := ps["type"]; ok {
  65. ems.ptype = ptype.(string)
  66. if ems.ptype != messaging.ZeroMQ && ems.ptype != messaging.MQTT && ems.ptype != messaging.RedisStreams {
  67. common.Log.Infof("Specified wrong message type value %s, will use zeromq messagebus.\n", ems.ptype)
  68. ems.ptype = messaging.ZeroMQ
  69. }
  70. }
  71. if dname, ok := ps["deviceName"]; ok {
  72. ems.deviceName = dname.(string)
  73. }
  74. if metadata, ok := ps["metadata"]; ok {
  75. ems.metadata = metadata.(string)
  76. }
  77. if optIntf, ok := ps["optional"]; ok {
  78. if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
  79. optional := make(map[string]string)
  80. for k, v := range opt {
  81. if sv, ok2 := v.(string); ok2 {
  82. optional[k] = sv
  83. } else {
  84. info := fmt.Sprintf("Only string value is allowed for optional value, the value for key %s is not a string.", k)
  85. common.Log.Infof(info)
  86. return fmt.Errorf(info)
  87. }
  88. }
  89. ems.optional = optional
  90. }
  91. }
  92. return nil
  93. }
  94. func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
  95. log := ctx.GetLogger()
  96. conf := types.MessageBusConfig{
  97. PublishHost: types.HostInfo{
  98. Host: ems.host,
  99. Port: ems.port,
  100. Protocol: ems.protocol,
  101. },
  102. Type: ems.ptype,
  103. Optional: ems.optional,
  104. }
  105. log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
  106. if msgClient, err := messaging.NewMessageClient(conf); err != nil {
  107. return err
  108. } else {
  109. if ec := msgClient.Connect(); ec != nil {
  110. return ec
  111. } else {
  112. ems.client = msgClient
  113. }
  114. }
  115. return nil
  116. }
  117. func (ems *EdgexMsgBusSink) produceEvents(result []byte) (*models.Event, error) {
  118. var m []map[string]interface{}
  119. if err := json.Unmarshal(result, &m); err == nil {
  120. m1, f := ems.getMeta(m)
  121. var event = &models.Event{}
  122. if f {
  123. event.Device = m1.getStrVal("device")
  124. event.Created = m1.getIntVal("created")
  125. event.Modified = m1.getIntVal("modified")
  126. event.Origin = m1.getIntVal("origin")
  127. event.ID = m1.getStrVal("id")
  128. event.Pushed = m1.getIntVal("pushed")
  129. }
  130. //Override the devicename if user specified the value
  131. if ems.deviceName != "" {
  132. event.Device = ems.deviceName
  133. }
  134. for _, v := range m {
  135. for k1, v1 := range v {
  136. if k1 == ems.metadata {
  137. continue
  138. } else {
  139. value := fmt.Sprintf("%v", v1)
  140. r := models.Reading{Name: k1, Value: value}
  141. if m, ok := m1[k1]; ok {
  142. if mm, ok1 := m.(map[string]interface{}); ok1 {
  143. mm1 := meta(mm)
  144. r.Created = mm1.getIntVal("created")
  145. r.Device = mm1.getStrVal("device")
  146. r.Id = mm1.getStrVal("id")
  147. r.Modified = mm1.getIntVal("modified")
  148. r.Origin = mm1.getIntVal("origin")
  149. r.Pushed = mm1.getIntVal("pushed")
  150. }
  151. }
  152. event.Readings = append(event.Readings, r)
  153. }
  154. }
  155. }
  156. return event, nil
  157. } else {
  158. return nil, err
  159. }
  160. }
  161. type meta map[string]interface{}
  162. func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) (meta, bool) {
  163. if ems.metadata == "" {
  164. return nil, false
  165. }
  166. //Try to get the meta field
  167. for _, v := range result {
  168. if m, ok := v[ems.metadata]; ok {
  169. if m1, ok1 := m.(map[string]interface{}); ok1 {
  170. return meta(m1), true
  171. } else {
  172. common.Log.Infof("Specified a meta field, but the field does not contains any EdgeX metadata.")
  173. }
  174. }
  175. }
  176. return nil, false
  177. }
  178. func (m meta) getIntVal(k string) int64 {
  179. if v, ok := m[k]; ok {
  180. if v1, ok1 := v.(float64); ok1 {
  181. return int64(v1)
  182. }
  183. }
  184. return 0
  185. }
  186. func (m meta) getStrVal(k string) string {
  187. if v, ok := m[k]; ok {
  188. if v1, ok1 := v.(string); ok1 {
  189. return v1
  190. }
  191. }
  192. return ""
  193. }
  194. func (ems *EdgexMsgBusSink) getMetaValueAsMap(m meta, k string) map[string]interface{} {
  195. if v, ok := m[k]; ok {
  196. if v1, ok1 := v.(map[string]interface{}); ok1 {
  197. return v1
  198. }
  199. }
  200. return nil
  201. }
  202. func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
  203. logger := ctx.GetLogger()
  204. client := coredata.NewEventClient(local.New(""))
  205. if payload, ok := item.([]byte); ok {
  206. logger.Debugf("EdgeX message bus sink: %s\n", payload)
  207. evt, err := ems.produceEvents(payload)
  208. if err != nil {
  209. return fmt.Errorf("Failed to convert to EdgeX event: %s.", err.Error())
  210. }
  211. data, err := client.MarshalEvent(*evt)
  212. if err != nil {
  213. return fmt.Errorf("unexpected error MarshalEvent %v", err)
  214. }
  215. env := types.NewMessageEnvelope([]byte(data), ctx)
  216. env.ContentType = ems.contentType
  217. if e := ems.client.Publish(env, ems.topic); e != nil {
  218. logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
  219. return e
  220. }
  221. } else {
  222. return fmt.Errorf("Unkown type %t, the message cannot be published.\n", item)
  223. }
  224. return nil
  225. }
  226. func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error {
  227. logger := ctx.GetLogger()
  228. logger.Infof("Closing edgex sink")
  229. if ems.client != nil {
  230. if e := ems.client.Disconnect(); e != nil {
  231. return e
  232. }
  233. }
  234. return nil
  235. }