mqtt_sink.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package sinks
  2. import (
  3. "crypto/tls"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "fmt"
  7. MQTT "github.com/eclipse/paho.mqtt.golang"
  8. "github.com/google/uuid"
  9. "strings"
  10. )
  11. type MQTTSink struct {
  12. srv string
  13. tpc string
  14. clientid string
  15. pVersion uint
  16. uName string
  17. password string
  18. certPath string
  19. pkeyPath string
  20. conn MQTT.Client
  21. }
  22. func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
  23. srv, ok := ps["server"]
  24. if !ok {
  25. return fmt.Errorf("mqtt sink is missing property server")
  26. }
  27. tpc, ok := ps["topic"]
  28. if !ok {
  29. return fmt.Errorf("mqtt sink is missing property topic")
  30. }
  31. clientid, ok := ps["clientId"]
  32. if !ok{
  33. if uuid, err := uuid.NewUUID(); err != nil {
  34. return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  35. }else{
  36. clientid = uuid.String()
  37. }
  38. }
  39. var pVersion uint = 3
  40. pVersionStr, ok := ps["protocolVersion"];
  41. if ok {
  42. v, _ := pVersionStr.(string)
  43. if v == "3.1" {
  44. pVersion = 3
  45. } else if v == "3.1.1" {
  46. pVersion = 4
  47. } else {
  48. return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
  49. }
  50. }
  51. uName := ""
  52. un, ok := ps["username"];
  53. if ok {
  54. v, _ := un.(string)
  55. if strings.Trim(v, " ") != "" {
  56. uName = v
  57. }
  58. }
  59. password := ""
  60. pwd, ok := ps["password"];
  61. if ok {
  62. v, _ := pwd.(string)
  63. if strings.Trim(v, " ") != "" {
  64. password = v
  65. }
  66. }
  67. certPath := ""
  68. if cp, ok := ps["certificationPath"]; ok {
  69. if v, ok := cp.(string); ok {
  70. certPath = v
  71. }
  72. }
  73. pKeyPath := ""
  74. if pk, ok := ps["privateKeyPath"]; ok {
  75. if v, ok := pk.(string); ok {
  76. pKeyPath = v
  77. }
  78. }
  79. ms.srv = srv.(string)
  80. ms.tpc = tpc.(string)
  81. ms.clientid = clientid.(string)
  82. ms.pVersion = pVersion
  83. ms.uName = uName
  84. ms.password = password
  85. ms.certPath = certPath
  86. ms.pkeyPath = pKeyPath
  87. return nil
  88. }
  89. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  90. log := ctx.GetLogger()
  91. log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  92. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  93. if ms.certPath != "" || ms.pkeyPath != "" {
  94. log.Infof("Connect MQTT broker with certification and keys.")
  95. if cp, err := common.ProcessPath(ms.certPath); err == nil {
  96. if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
  97. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  98. return err2
  99. } else {
  100. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  101. }
  102. } else {
  103. return err1
  104. }
  105. } else {
  106. return err
  107. }
  108. } else {
  109. log.Infof("Connect MQTT broker with username and password.")
  110. if ms.uName != "" {
  111. opts = opts.SetUsername(ms.uName)
  112. }
  113. if ms.password != "" {
  114. opts = opts.SetPassword(ms.password)
  115. }
  116. }
  117. c := MQTT.NewClient(opts)
  118. if token := c.Connect(); token.Wait() && token.Error() != nil {
  119. return fmt.Errorf("Found error: %s", token.Error())
  120. }
  121. log.Infof("The connection to server %s was established successfully", ms.srv)
  122. ms.conn = c
  123. return nil
  124. }
  125. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  126. logger := ctx.GetLogger()
  127. c := ms.conn
  128. logger.Infof("publish %s", item)
  129. if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
  130. return fmt.Errorf("publish error: %s", token.Error())
  131. }
  132. return nil
  133. }
  134. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  135. logger := ctx.GetLogger()
  136. logger.Infof("Closing mqtt sink")
  137. if ms.conn != nil && ms.conn.IsConnected() {
  138. ms.conn.Disconnect(5000)
  139. }
  140. return nil
  141. }