mqtt_sink.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package sinks
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/google/uuid"
  9. "strings"
  10. )
  11. type MQTTSink struct {
  12. srv string
  13. tpc string
  14. clientid string
  15. pVersion uint
  16. uName string
  17. password string
  18. certPath string
  19. pkeyPath string
  20. conn MQTT.Client
  21. }
  22. func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
  23. srv, ok := ps["server"]
  24. if !ok {
  25. return fmt.Errorf("mqtt sink is missing property server")
  26. }
  27. tpc, ok := ps["topic"]
  28. if !ok {
  29. return fmt.Errorf("mqtt sink is missing property topic")
  30. }
  31. clientid, ok := ps["clientId"]
  32. if !ok {
  33. if uuid, err := uuid.NewUUID(); err != nil {
  34. return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  35. } else {
  36. clientid = uuid.String()
  37. }
  38. }
  39. var pVersion uint = 3
  40. pVersionStr, ok := ps["protocolVersion"]
  41. if ok {
  42. v, _ := pVersionStr.(string)
  43. if v == "3.1" {
  44. pVersion = 3
  45. } else if v == "3.1.1" {
  46. pVersion = 4
  47. } else {
  48. return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
  49. }
  50. }
  51. uName := ""
  52. un, ok := ps["username"]
  53. if ok {
  54. v, _ := un.(string)
  55. if strings.Trim(v, " ") != "" {
  56. uName = v
  57. }
  58. }
  59. password := ""
  60. pwd, ok := ps["password"]
  61. if ok {
  62. v, _ := pwd.(string)
  63. if strings.Trim(v, " ") != "" {
  64. password = v
  65. }
  66. }
  67. certPath := ""
  68. if cp, ok := ps["certificationPath"]; ok {
  69. if v, ok := cp.(string); ok {
  70. certPath = v
  71. }
  72. }
  73. pKeyPath := ""
  74. if pk, ok := ps["privateKeyPath"]; ok {
  75. if v, ok := pk.(string); ok {
  76. pKeyPath = v
  77. }
  78. }
  79. ms.srv = srv.(string)
  80. ms.tpc = tpc.(string)
  81. ms.clientid = clientid.(string)
  82. ms.pVersion = pVersion
  83. ms.uName = uName
  84. ms.password = password
  85. ms.certPath = certPath
  86. ms.pkeyPath = pKeyPath
  87. return nil
  88. }
  89. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  90. log := ctx.GetLogger()
  91. log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  92. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  93. if ms.certPath != "" || ms.pkeyPath != "" {
  94. log.Infof("Connect MQTT broker with certification and keys.")
  95. if cp, err := common.ProcessPath(ms.certPath); err == nil {
  96. if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
  97. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  98. return err2
  99. } else {
  100. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  101. }
  102. } else {
  103. return err1
  104. }
  105. } else {
  106. return err
  107. }
  108. } else {
  109. log.Infof("Connect MQTT broker with username and password.")
  110. if ms.uName != "" {
  111. opts = opts.SetUsername(ms.uName)
  112. }
  113. if ms.password != "" {
  114. opts = opts.SetPassword(ms.password)
  115. }
  116. }
  117. opts.SetAutoReconnect(true)
  118. var reconn = false
  119. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  120. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  121. ms.conn = client
  122. reconn = true
  123. })
  124. opts.SetOnConnectHandler(func(client MQTT.Client) {
  125. if reconn {
  126. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  127. }
  128. })
  129. c := MQTT.NewClient(opts)
  130. if token := c.Connect(); token.Wait() && token.Error() != nil {
  131. return fmt.Errorf("Found error: %s", token.Error())
  132. }
  133. log.Infof("The connection to server %s was established successfully", ms.srv)
  134. ms.conn = c
  135. return nil
  136. }
  137. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  138. logger := ctx.GetLogger()
  139. c := ms.conn
  140. logger.Debugf("%s publish %s", ctx.GetOpId(), item)
  141. if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
  142. return fmt.Errorf("publish error: %s", token.Error())
  143. }
  144. return nil
  145. }
  146. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  147. logger := ctx.GetLogger()
  148. logger.Infof("Closing mqtt sink")
  149. if ms.conn != nil && ms.conn.IsConnected() {
  150. ms.conn.Disconnect(5000)
  151. }
  152. return nil
  153. }