mqtt_sink.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sink
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. MQTT "github.com/eclipse/paho.mqtt.golang"
  19. "github.com/google/uuid"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "strings"
  24. )
  25. type MQTTSink struct {
  26. srv string
  27. tpc string
  28. clientid string
  29. pVersion uint
  30. qos byte
  31. uName string
  32. password string
  33. certPath string
  34. pkeyPath string
  35. insecureSkipVerify bool
  36. retained bool
  37. conn MQTT.Client
  38. }
  39. func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
  40. srv, ok := ps["server"]
  41. if !ok {
  42. return fmt.Errorf("mqtt sink is missing property server")
  43. }
  44. tpc, ok := ps["topic"]
  45. if !ok {
  46. return fmt.Errorf("mqtt sink is missing property topic")
  47. }
  48. clientid, ok := ps["clientId"]
  49. if !ok {
  50. if uuid, err := uuid.NewUUID(); err != nil {
  51. return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  52. } else {
  53. clientid = uuid.String()
  54. }
  55. }
  56. var pVersion uint = 3
  57. pVersionStr, ok := ps["protocolVersion"]
  58. if ok {
  59. v, _ := pVersionStr.(string)
  60. if v == "3.1" {
  61. pVersion = 3
  62. } else if v == "3.1.1" {
  63. pVersion = 4
  64. } else {
  65. return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
  66. }
  67. }
  68. var qos byte = 0
  69. if qosRec, ok := ps["qos"]; ok {
  70. if v, err := cast.ToInt(qosRec, cast.STRICT); err == nil {
  71. qos = byte(v)
  72. }
  73. if qos != 0 && qos != 1 && qos != 2 {
  74. return fmt.Errorf("not valid qos value %v, the value could be only int 0 or 1 or 2", qos)
  75. }
  76. }
  77. uName := ""
  78. un, ok := ps["username"]
  79. if ok {
  80. v, _ := un.(string)
  81. if strings.Trim(v, " ") != "" {
  82. uName = v
  83. }
  84. }
  85. password := ""
  86. pwd, ok := ps["password"]
  87. if ok {
  88. v, _ := pwd.(string)
  89. if strings.Trim(v, " ") != "" {
  90. password = v
  91. }
  92. }
  93. certPath := ""
  94. if cp, ok := ps["certificationPath"]; ok {
  95. if v, ok := cp.(string); ok {
  96. certPath = v
  97. }
  98. }
  99. pKeyPath := ""
  100. if pk, ok := ps["privateKeyPath"]; ok {
  101. if v, ok := pk.(string); ok {
  102. pKeyPath = v
  103. }
  104. }
  105. insecureSkipVerify := false
  106. if pk, ok := ps["insecureSkipVerify"]; ok {
  107. if v, ok := pk.(bool); ok {
  108. insecureSkipVerify = v
  109. }
  110. }
  111. retained := false
  112. if pk, ok := ps["retained"]; ok {
  113. if v, ok := pk.(bool); ok {
  114. retained = v
  115. }
  116. }
  117. ms.srv = srv.(string)
  118. ms.tpc = tpc.(string)
  119. ms.clientid = clientid.(string)
  120. ms.pVersion = pVersion
  121. ms.qos = qos
  122. ms.uName = uName
  123. ms.password = password
  124. ms.certPath = certPath
  125. ms.pkeyPath = pKeyPath
  126. ms.insecureSkipVerify = insecureSkipVerify
  127. ms.retained = retained
  128. return nil
  129. }
  130. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  131. log := ctx.GetLogger()
  132. log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  133. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  134. if ms.certPath != "" || ms.pkeyPath != "" {
  135. log.Infof("Connect MQTT broker with certification and keys.")
  136. if cp, err := conf.ProcessPath(ms.certPath); err == nil {
  137. if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
  138. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  139. return err2
  140. } else {
  141. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.insecureSkipVerify})
  142. }
  143. } else {
  144. return err1
  145. }
  146. } else {
  147. return err
  148. }
  149. } else {
  150. log.Infof("Connect MQTT broker with username and password.")
  151. if ms.uName != "" {
  152. opts = opts.SetUsername(ms.uName)
  153. }
  154. if ms.password != "" {
  155. opts = opts.SetPassword(ms.password)
  156. }
  157. }
  158. opts.SetAutoReconnect(true)
  159. var reconn = false
  160. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  161. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  162. ms.conn = client
  163. reconn = true
  164. })
  165. opts.SetOnConnectHandler(func(client MQTT.Client) {
  166. if reconn {
  167. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  168. }
  169. })
  170. c := MQTT.NewClient(opts)
  171. if token := c.Connect(); token.Wait() && token.Error() != nil {
  172. return fmt.Errorf("Found error: %s", token.Error())
  173. }
  174. log.Infof("The connection to server %s was established successfully", ms.srv)
  175. ms.conn = c
  176. return nil
  177. }
  178. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  179. logger := ctx.GetLogger()
  180. c := ms.conn
  181. logger.Debugf("%s publish %s", ctx.GetOpId(), item)
  182. if token := c.Publish(ms.tpc, ms.qos, ms.retained, item); token.Wait() && token.Error() != nil {
  183. return fmt.Errorf("publish error: %s", token.Error())
  184. }
  185. return nil
  186. }
  187. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  188. logger := ctx.GetLogger()
  189. logger.Infof("Closing mqtt sink")
  190. if ms.conn != nil && ms.conn.IsConnected() {
  191. ms.conn.Disconnect(5000)
  192. }
  193. return nil
  194. }