edgex_sink.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. // +build edgex
  2. package sinks
  3. import (
  4. "fmt"
  5. "github.com/edgexfoundry/go-mod-messaging/messaging"
  6. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xstream/api"
  9. )
  10. type EdgexMsgBusSink struct {
  11. protocol string
  12. host string
  13. port int
  14. ptype string
  15. topic string
  16. contentType string
  17. optional *OptionalConf
  18. client messaging.MessageClient
  19. }
  20. type OptionalConf struct {
  21. clientid string
  22. username string
  23. password string
  24. }
  25. func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
  26. ems.host = "*"
  27. ems.protocol = "tcp"
  28. ems.port = 5570
  29. ems.contentType = "application/json"
  30. ems.ptype = messaging.ZeroMQ
  31. if host, ok := ps["host"]; ok {
  32. ems.host = host.(string)
  33. } else {
  34. common.Log.Infof("Not find host conf, will use default value '*'.")
  35. }
  36. if pro, ok := ps["protocol"]; ok {
  37. ems.protocol = pro.(string)
  38. } else {
  39. common.Log.Infof("Not find protocol conf, will use default value 'tcp'.")
  40. }
  41. if port, ok := ps["port"]; ok {
  42. if pv, ok := port.(float64); ok {
  43. ems.port = int(pv)
  44. } else if pv, ok := port.(float32); ok {
  45. ems.port = int(pv)
  46. } else {
  47. common.Log.Infof("Not valid port value, will use default value '5570'.")
  48. }
  49. } else {
  50. common.Log.Infof("Not find port conf, will use default value '5570'.")
  51. }
  52. if topic, ok := ps["topic"]; ok {
  53. ems.topic = topic.(string)
  54. } else {
  55. return fmt.Errorf("Topic must be specified.")
  56. }
  57. if contentType, ok := ps["contentType"]; ok {
  58. ems.contentType = contentType.(string)
  59. } else {
  60. common.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
  61. }
  62. if optIntf, ok := ps["optional"]; ok {
  63. if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
  64. optional := &OptionalConf{}
  65. ems.optional = optional
  66. if cid, ok2 := opt["clientid"]; ok2 {
  67. optional.clientid = cid.(string)
  68. }
  69. if uname, ok2 := opt["username"]; ok2 {
  70. optional.username = uname.(string)
  71. }
  72. if password, ok2 := opt["password"]; ok2 {
  73. optional.password = password.(string)
  74. }
  75. }
  76. }
  77. return nil
  78. }
  79. func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
  80. log := ctx.GetLogger()
  81. conf := types.MessageBusConfig{
  82. PublishHost: types.HostInfo{
  83. Host: ems.host,
  84. Port: ems.port,
  85. Protocol: ems.protocol,
  86. },
  87. Type: ems.ptype,
  88. }
  89. log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
  90. if msgClient, err := messaging.NewMessageClient(conf); err != nil {
  91. return err
  92. } else {
  93. if ec := msgClient.Connect(); ec != nil {
  94. return ec
  95. } else {
  96. ems.client = msgClient
  97. }
  98. }
  99. return nil
  100. }
  101. func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
  102. logger := ctx.GetLogger()
  103. if payload, ok := item.([]byte); ok {
  104. logger.Debugf("EdgeX message bus sink: %s\n", payload)
  105. env := types.NewMessageEnvelope(payload, ctx)
  106. env.ContentType = ems.contentType
  107. if e := ems.client.Publish(env, ems.topic); e != nil {
  108. logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
  109. return e
  110. }
  111. } else {
  112. return fmt.Errorf("Unkown type %t, the message cannot be published.\n", item)
  113. }
  114. return nil
  115. }
  116. func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error {
  117. logger := ctx.GetLogger()
  118. logger.Infof("Closing edgex sink")
  119. if ems.client != nil {
  120. if e := ems.client.Disconnect(); e != nil {
  121. return e
  122. }
  123. }
  124. return nil
  125. }