mqtt_sink.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package sinks
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/google/uuid"
  9. "strings"
  10. )
  11. type MQTTSink struct {
  12. srv string
  13. tpc string
  14. clientid string
  15. pVersion uint
  16. uName string
  17. password string
  18. certPath string
  19. pkeyPath string
  20. insecureSkipVerify bool
  21. conn MQTT.Client
  22. }
  23. func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
  24. srv, ok := ps["server"]
  25. if !ok {
  26. return fmt.Errorf("mqtt sink is missing property server")
  27. }
  28. tpc, ok := ps["topic"]
  29. if !ok {
  30. return fmt.Errorf("mqtt sink is missing property topic")
  31. }
  32. clientid, ok := ps["clientId"]
  33. if !ok {
  34. if uuid, err := uuid.NewUUID(); err != nil {
  35. return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  36. } else {
  37. clientid = uuid.String()
  38. }
  39. }
  40. var pVersion uint = 3
  41. pVersionStr, ok := ps["protocolVersion"]
  42. if ok {
  43. v, _ := pVersionStr.(string)
  44. if v == "3.1" {
  45. pVersion = 3
  46. } else if v == "3.1.1" {
  47. pVersion = 4
  48. } else {
  49. return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
  50. }
  51. }
  52. uName := ""
  53. un, ok := ps["username"]
  54. if ok {
  55. v, _ := un.(string)
  56. if strings.Trim(v, " ") != "" {
  57. uName = v
  58. }
  59. }
  60. password := ""
  61. pwd, ok := ps["password"]
  62. if ok {
  63. v, _ := pwd.(string)
  64. if strings.Trim(v, " ") != "" {
  65. password = v
  66. }
  67. }
  68. certPath := ""
  69. if cp, ok := ps["certificationPath"]; ok {
  70. if v, ok := cp.(string); ok {
  71. certPath = v
  72. }
  73. }
  74. pKeyPath := ""
  75. if pk, ok := ps["privateKeyPath"]; ok {
  76. if v, ok := pk.(string); ok {
  77. pKeyPath = v
  78. }
  79. }
  80. insecureSkipVerify := false
  81. if pk, ok := ps["insecureSkipVerify"]; ok {
  82. if v, ok := pk.(bool); ok {
  83. insecureSkipVerify = v
  84. }
  85. }
  86. ms.srv = srv.(string)
  87. ms.tpc = tpc.(string)
  88. ms.clientid = clientid.(string)
  89. ms.pVersion = pVersion
  90. ms.uName = uName
  91. ms.password = password
  92. ms.certPath = certPath
  93. ms.pkeyPath = pKeyPath
  94. ms.insecureSkipVerify = insecureSkipVerify
  95. return nil
  96. }
  97. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  98. log := ctx.GetLogger()
  99. log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  100. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  101. if ms.certPath != "" || ms.pkeyPath != "" {
  102. log.Infof("Connect MQTT broker with certification and keys.")
  103. if cp, err := common.ProcessPath(ms.certPath); err == nil {
  104. if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
  105. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  106. return err2
  107. } else {
  108. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.insecureSkipVerify})
  109. }
  110. } else {
  111. return err1
  112. }
  113. } else {
  114. return err
  115. }
  116. } else {
  117. log.Infof("Connect MQTT broker with username and password.")
  118. if ms.uName != "" {
  119. opts = opts.SetUsername(ms.uName)
  120. }
  121. if ms.password != "" {
  122. opts = opts.SetPassword(ms.password)
  123. }
  124. }
  125. opts.SetAutoReconnect(true)
  126. var reconn = false
  127. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  128. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  129. ms.conn = client
  130. reconn = true
  131. })
  132. opts.SetOnConnectHandler(func(client MQTT.Client) {
  133. if reconn {
  134. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  135. }
  136. })
  137. c := MQTT.NewClient(opts)
  138. if token := c.Connect(); token.Wait() && token.Error() != nil {
  139. return fmt.Errorf("Found error: %s", token.Error())
  140. }
  141. log.Infof("The connection to server %s was established successfully", ms.srv)
  142. ms.conn = c
  143. return nil
  144. }
  145. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  146. logger := ctx.GetLogger()
  147. c := ms.conn
  148. logger.Debugf("%s publish %s", ctx.GetOpId(), item)
  149. if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
  150. return fmt.Errorf("publish error: %s", token.Error())
  151. }
  152. return nil
  153. }
  154. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  155. logger := ctx.GetLogger()
  156. logger.Infof("Closing mqtt sink")
  157. if ms.conn != nil && ms.conn.IsConnected() {
  158. ms.conn.Disconnect(5000)
  159. }
  160. return nil
  161. }