mqtt_sink.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sink
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. MQTT "github.com/eclipse/paho.mqtt.golang"
  19. "github.com/google/uuid"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "strings"
  24. )
  25. type MQTTSink struct {
  26. srv string
  27. tpc string
  28. clientid string
  29. pVersion uint
  30. qos byte
  31. uName string
  32. password string
  33. certPath string
  34. pkeyPath string
  35. conSel string
  36. insecureSkipVerify bool
  37. retained bool
  38. conn MQTT.Client
  39. }
  40. func (ms *MQTTSink) hasKeys(str []string, ps map[string]interface{}) bool {
  41. for _, v := range str {
  42. if _, ok := ps[v]; ok {
  43. return true
  44. }
  45. }
  46. return false
  47. }
  48. func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
  49. conSelector := ""
  50. if pk, ok := ps["connectionSelector"]; ok {
  51. if v, ok := pk.(string); ok {
  52. conSelector = v
  53. }
  54. keys := []string{"server", "clientId", "protocolVersion", "username", "password", "certificationPath", "privateKeyPath", "insecureSkipVerify"}
  55. if ms.hasKeys(keys, ps) {
  56. return fmt.Errorf("already have connection selector: %s, remove connection related config", conSelector)
  57. }
  58. ms.conSel = conSelector
  59. } else {
  60. srv := ""
  61. if pk, ok := ps["server"]; ok {
  62. if v, ok := pk.(string); ok {
  63. srv = v
  64. }
  65. } else {
  66. return fmt.Errorf("mqtt sink is missing property server")
  67. }
  68. clientid, ok := ps["clientId"]
  69. if !ok {
  70. if uuid, err := uuid.NewUUID(); err != nil {
  71. return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  72. } else {
  73. clientid = uuid.String()
  74. }
  75. }
  76. var pVersion uint = 3
  77. pVersionStr, ok := ps["protocolVersion"]
  78. if ok {
  79. v, _ := pVersionStr.(string)
  80. if v == "3.1" {
  81. pVersion = 3
  82. } else if v == "3.1.1" {
  83. pVersion = 4
  84. } else {
  85. return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
  86. }
  87. }
  88. uName := ""
  89. un, ok := ps["username"]
  90. if ok {
  91. v, _ := un.(string)
  92. if strings.Trim(v, " ") != "" {
  93. uName = v
  94. }
  95. }
  96. password := ""
  97. pwd, ok := ps["password"]
  98. if ok {
  99. v, _ := pwd.(string)
  100. if strings.Trim(v, " ") != "" {
  101. password = v
  102. }
  103. }
  104. certPath := ""
  105. if cp, ok := ps["certificationPath"]; ok {
  106. if v, ok := cp.(string); ok {
  107. certPath = v
  108. }
  109. }
  110. pKeyPath := ""
  111. if pk, ok := ps["privateKeyPath"]; ok {
  112. if v, ok := pk.(string); ok {
  113. pKeyPath = v
  114. }
  115. }
  116. insecureSkipVerify := false
  117. if pk, ok := ps["insecureSkipVerify"]; ok {
  118. if v, ok := pk.(bool); ok {
  119. insecureSkipVerify = v
  120. }
  121. }
  122. ms.srv = srv
  123. ms.clientid = clientid.(string)
  124. ms.pVersion = pVersion
  125. ms.uName = uName
  126. ms.password = password
  127. ms.certPath = certPath
  128. ms.pkeyPath = pKeyPath
  129. ms.insecureSkipVerify = insecureSkipVerify
  130. }
  131. tpc, ok := ps["topic"]
  132. if !ok {
  133. return fmt.Errorf("mqtt sink is missing property topic")
  134. }
  135. var qos byte = 0
  136. if qosRec, ok := ps["qos"]; ok {
  137. if v, err := cast.ToInt(qosRec, cast.STRICT); err == nil {
  138. qos = byte(v)
  139. }
  140. if qos != 0 && qos != 1 && qos != 2 {
  141. return fmt.Errorf("not valid qos value %v, the value could be only int 0 or 1 or 2", qos)
  142. }
  143. }
  144. retained := false
  145. if pk, ok := ps["retained"]; ok {
  146. if v, ok := pk.(bool); ok {
  147. retained = v
  148. }
  149. }
  150. ms.qos = qos
  151. ms.tpc = tpc.(string)
  152. ms.retained = retained
  153. return nil
  154. }
  155. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  156. var client MQTT.Client
  157. log := ctx.GetLogger()
  158. if ms.conSel != "" {
  159. con, err := ctx.GetConnection(ms.conSel)
  160. if err != nil {
  161. log.Errorf("The mqtt client for connection selector %s get fail with error: %s", ms.conSel, err)
  162. return err
  163. }
  164. client = con.(MQTT.Client)
  165. log.Infof("The mqtt client for connection selector %s get successfully", ms.conSel)
  166. } else {
  167. log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  168. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  169. if ms.certPath != "" || ms.pkeyPath != "" {
  170. log.Infof("Connect MQTT broker with certification and keys.")
  171. if cp, err := conf.ProcessPath(ms.certPath); err == nil {
  172. if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
  173. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  174. return err2
  175. } else {
  176. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.insecureSkipVerify})
  177. }
  178. } else {
  179. return err1
  180. }
  181. } else {
  182. return err
  183. }
  184. } else {
  185. log.Infof("Connect MQTT broker with username and password.")
  186. if ms.uName != "" {
  187. opts = opts.SetUsername(ms.uName)
  188. }
  189. if ms.password != "" {
  190. opts = opts.SetPassword(ms.password)
  191. }
  192. }
  193. opts.SetAutoReconnect(true)
  194. var reconn = false
  195. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  196. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  197. ms.conn = client
  198. reconn = true
  199. })
  200. opts.SetOnConnectHandler(func(client MQTT.Client) {
  201. if reconn {
  202. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  203. }
  204. })
  205. client = MQTT.NewClient(opts)
  206. if token := client.Connect(); token.Wait() && token.Error() != nil {
  207. return fmt.Errorf("Found error: %s", token.Error())
  208. }
  209. log.Infof("The connection to server %s:%d was established successfully", ms.srv, ms.clientid)
  210. }
  211. ms.conn = client
  212. return nil
  213. }
  214. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  215. logger := ctx.GetLogger()
  216. c := ms.conn
  217. logger.Debugf("%s publish %s", ctx.GetOpId(), item)
  218. if token := c.Publish(ms.tpc, ms.qos, ms.retained, item); token.Wait() && token.Error() != nil {
  219. return fmt.Errorf("publish error: %s", token.Error())
  220. }
  221. return nil
  222. }
  223. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  224. logger := ctx.GetLogger()
  225. logger.Infof("Closing mqtt sink")
  226. if ms.conn != nil && ms.conn.IsConnected() && ms.conSel == "" {
  227. ms.conn.Disconnect(5000)
  228. }
  229. if ms.conSel != "" {
  230. ctx.ReleaseConnection(ms.conSel)
  231. }
  232. return nil
  233. }