mqtt_sink.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package sinks
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/google/uuid"
  9. "strings"
  10. )
  11. type MQTTSink struct {
  12. srv string
  13. tpc string
  14. clientid string
  15. pVersion uint
  16. qos byte
  17. uName string
  18. password string
  19. certPath string
  20. pkeyPath string
  21. insecureSkipVerify bool
  22. retained bool
  23. conn MQTT.Client
  24. }
  25. func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
  26. srv, ok := ps["server"]
  27. if !ok {
  28. return fmt.Errorf("mqtt sink is missing property server")
  29. }
  30. tpc, ok := ps["topic"]
  31. if !ok {
  32. return fmt.Errorf("mqtt sink is missing property topic")
  33. }
  34. clientid, ok := ps["clientId"]
  35. if !ok {
  36. if uuid, err := uuid.NewUUID(); err != nil {
  37. return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  38. } else {
  39. clientid = uuid.String()
  40. }
  41. }
  42. var pVersion uint = 3
  43. pVersionStr, ok := ps["protocolVersion"]
  44. if ok {
  45. v, _ := pVersionStr.(string)
  46. if v == "3.1" {
  47. pVersion = 3
  48. } else if v == "3.1.1" {
  49. pVersion = 4
  50. } else {
  51. return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
  52. }
  53. }
  54. var qos byte = 0
  55. if qosRec, ok := ps["qos"]; ok {
  56. if v, ok := qosRec.(int); ok {
  57. qos = byte(v)
  58. }
  59. if qos != 0 && qos != 1 && qos != 2 {
  60. return fmt.Errorf("not valid qos value %v, the value could be only int 0 or 1 or 2", qos)
  61. }
  62. }
  63. uName := ""
  64. un, ok := ps["username"]
  65. if ok {
  66. v, _ := un.(string)
  67. if strings.Trim(v, " ") != "" {
  68. uName = v
  69. }
  70. }
  71. password := ""
  72. pwd, ok := ps["password"]
  73. if ok {
  74. v, _ := pwd.(string)
  75. if strings.Trim(v, " ") != "" {
  76. password = v
  77. }
  78. }
  79. certPath := ""
  80. if cp, ok := ps["certificationPath"]; ok {
  81. if v, ok := cp.(string); ok {
  82. certPath = v
  83. }
  84. }
  85. pKeyPath := ""
  86. if pk, ok := ps["privateKeyPath"]; ok {
  87. if v, ok := pk.(string); ok {
  88. pKeyPath = v
  89. }
  90. }
  91. insecureSkipVerify := false
  92. if pk, ok := ps["insecureSkipVerify"]; ok {
  93. if v, ok := pk.(bool); ok {
  94. insecureSkipVerify = v
  95. }
  96. }
  97. retained := false
  98. if pk, ok := ps["retained"]; ok {
  99. if v, ok := pk.(bool); ok {
  100. retained = v
  101. }
  102. }
  103. ms.srv = srv.(string)
  104. ms.tpc = tpc.(string)
  105. ms.clientid = clientid.(string)
  106. ms.pVersion = pVersion
  107. ms.qos = qos
  108. ms.uName = uName
  109. ms.password = password
  110. ms.certPath = certPath
  111. ms.pkeyPath = pKeyPath
  112. ms.insecureSkipVerify = insecureSkipVerify
  113. ms.retained = retained
  114. return nil
  115. }
  116. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  117. log := ctx.GetLogger()
  118. log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  119. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  120. if ms.certPath != "" || ms.pkeyPath != "" {
  121. log.Infof("Connect MQTT broker with certification and keys.")
  122. if cp, err := common.ProcessPath(ms.certPath); err == nil {
  123. if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
  124. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  125. return err2
  126. } else {
  127. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.insecureSkipVerify})
  128. }
  129. } else {
  130. return err1
  131. }
  132. } else {
  133. return err
  134. }
  135. } else {
  136. log.Infof("Connect MQTT broker with username and password.")
  137. if ms.uName != "" {
  138. opts = opts.SetUsername(ms.uName)
  139. }
  140. if ms.password != "" {
  141. opts = opts.SetPassword(ms.password)
  142. }
  143. }
  144. opts.SetAutoReconnect(true)
  145. var reconn = false
  146. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  147. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  148. ms.conn = client
  149. reconn = true
  150. })
  151. opts.SetOnConnectHandler(func(client MQTT.Client) {
  152. if reconn {
  153. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  154. }
  155. })
  156. c := MQTT.NewClient(opts)
  157. if token := c.Connect(); token.Wait() && token.Error() != nil {
  158. return fmt.Errorf("Found error: %s", token.Error())
  159. }
  160. log.Infof("The connection to server %s was established successfully", ms.srv)
  161. ms.conn = c
  162. return nil
  163. }
  164. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  165. logger := ctx.GetLogger()
  166. c := ms.conn
  167. logger.Debugf("%s publish %s", ctx.GetOpId(), item)
  168. if token := c.Publish(ms.tpc, ms.qos, ms.retained, item); token.Wait() && token.Error() != nil {
  169. return fmt.Errorf("publish error: %s", token.Error())
  170. }
  171. return nil
  172. }
  173. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  174. logger := ctx.GetLogger()
  175. logger.Infof("Closing mqtt sink")
  176. if ms.conn != nil && ms.conn.IsConnected() {
  177. ms.conn.Disconnect(5000)
  178. }
  179. return nil
  180. }