mqtt_sink.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package sink
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/emqx/kuiper/internal/conf"
  7. "github.com/emqx/kuiper/pkg/api"
  8. "github.com/emqx/kuiper/pkg/cast"
  9. "github.com/google/uuid"
  10. "strings"
  11. )
  12. type MQTTSink struct {
  13. srv string
  14. tpc string
  15. clientid string
  16. pVersion uint
  17. qos byte
  18. uName string
  19. password string
  20. certPath string
  21. pkeyPath string
  22. insecureSkipVerify bool
  23. retained bool
  24. conn MQTT.Client
  25. }
  26. func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
  27. srv, ok := ps["server"]
  28. if !ok {
  29. return fmt.Errorf("mqtt sink is missing property server")
  30. }
  31. tpc, ok := ps["topic"]
  32. if !ok {
  33. return fmt.Errorf("mqtt sink is missing property topic")
  34. }
  35. clientid, ok := ps["clientId"]
  36. if !ok {
  37. if uuid, err := uuid.NewUUID(); err != nil {
  38. return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  39. } else {
  40. clientid = uuid.String()
  41. }
  42. }
  43. var pVersion uint = 3
  44. pVersionStr, ok := ps["protocolVersion"]
  45. if ok {
  46. v, _ := pVersionStr.(string)
  47. if v == "3.1" {
  48. pVersion = 3
  49. } else if v == "3.1.1" {
  50. pVersion = 4
  51. } else {
  52. return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
  53. }
  54. }
  55. var qos byte = 0
  56. if qosRec, ok := ps["qos"]; ok {
  57. if v, err := cast.ToInt(qosRec, cast.STRICT); err == nil {
  58. qos = byte(v)
  59. }
  60. if qos != 0 && qos != 1 && qos != 2 {
  61. return fmt.Errorf("not valid qos value %v, the value could be only int 0 or 1 or 2", qos)
  62. }
  63. }
  64. uName := ""
  65. un, ok := ps["username"]
  66. if ok {
  67. v, _ := un.(string)
  68. if strings.Trim(v, " ") != "" {
  69. uName = v
  70. }
  71. }
  72. password := ""
  73. pwd, ok := ps["password"]
  74. if ok {
  75. v, _ := pwd.(string)
  76. if strings.Trim(v, " ") != "" {
  77. password = v
  78. }
  79. }
  80. certPath := ""
  81. if cp, ok := ps["certificationPath"]; ok {
  82. if v, ok := cp.(string); ok {
  83. certPath = v
  84. }
  85. }
  86. pKeyPath := ""
  87. if pk, ok := ps["privateKeyPath"]; ok {
  88. if v, ok := pk.(string); ok {
  89. pKeyPath = v
  90. }
  91. }
  92. insecureSkipVerify := false
  93. if pk, ok := ps["insecureSkipVerify"]; ok {
  94. if v, ok := pk.(bool); ok {
  95. insecureSkipVerify = v
  96. }
  97. }
  98. retained := false
  99. if pk, ok := ps["retained"]; ok {
  100. if v, ok := pk.(bool); ok {
  101. retained = v
  102. }
  103. }
  104. ms.srv = srv.(string)
  105. ms.tpc = tpc.(string)
  106. ms.clientid = clientid.(string)
  107. ms.pVersion = pVersion
  108. ms.qos = qos
  109. ms.uName = uName
  110. ms.password = password
  111. ms.certPath = certPath
  112. ms.pkeyPath = pKeyPath
  113. ms.insecureSkipVerify = insecureSkipVerify
  114. ms.retained = retained
  115. return nil
  116. }
  117. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  118. log := ctx.GetLogger()
  119. log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  120. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  121. if ms.certPath != "" || ms.pkeyPath != "" {
  122. log.Infof("Connect MQTT broker with certification and keys.")
  123. if cp, err := conf.ProcessPath(ms.certPath); err == nil {
  124. if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
  125. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  126. return err2
  127. } else {
  128. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.insecureSkipVerify})
  129. }
  130. } else {
  131. return err1
  132. }
  133. } else {
  134. return err
  135. }
  136. } else {
  137. log.Infof("Connect MQTT broker with username and password.")
  138. if ms.uName != "" {
  139. opts = opts.SetUsername(ms.uName)
  140. }
  141. if ms.password != "" {
  142. opts = opts.SetPassword(ms.password)
  143. }
  144. }
  145. opts.SetAutoReconnect(true)
  146. var reconn = false
  147. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  148. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  149. ms.conn = client
  150. reconn = true
  151. })
  152. opts.SetOnConnectHandler(func(client MQTT.Client) {
  153. if reconn {
  154. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  155. }
  156. })
  157. c := MQTT.NewClient(opts)
  158. if token := c.Connect(); token.Wait() && token.Error() != nil {
  159. return fmt.Errorf("Found error: %s", token.Error())
  160. }
  161. log.Infof("The connection to server %s was established successfully", ms.srv)
  162. ms.conn = c
  163. return nil
  164. }
  165. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  166. logger := ctx.GetLogger()
  167. c := ms.conn
  168. logger.Debugf("%s publish %s", ctx.GetOpId(), item)
  169. if token := c.Publish(ms.tpc, ms.qos, ms.retained, item); token.Wait() && token.Error() != nil {
  170. return fmt.Errorf("publish error: %s", token.Error())
  171. }
  172. return nil
  173. }
  174. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  175. logger := ctx.GetLogger()
  176. logger.Infof("Closing mqtt sink")
  177. if ms.conn != nil && ms.conn.IsConnected() {
  178. ms.conn.Disconnect(5000)
  179. }
  180. return nil
  181. }