mqtt_sink.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sink
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. MQTT "github.com/eclipse/paho.mqtt.golang"
  19. "github.com/google/uuid"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "github.com/lf-edge/ekuiper/pkg/errorx"
  24. "strings"
  25. )
  26. type MQTTSink struct {
  27. srv string
  28. tpc string
  29. clientid string
  30. pVersion uint
  31. qos byte
  32. uName string
  33. password string
  34. certPath string
  35. pkeyPath string
  36. conSel string
  37. insecureSkipVerify bool
  38. retained bool
  39. conn MQTT.Client
  40. }
  41. func (ms *MQTTSink) hasKeys(str []string, ps map[string]interface{}) bool {
  42. for _, v := range str {
  43. if _, ok := ps[v]; ok {
  44. return true
  45. }
  46. }
  47. return false
  48. }
  49. func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
  50. conSelector := ""
  51. if pk, ok := ps["connectionSelector"]; ok {
  52. if v, ok := pk.(string); ok {
  53. conSelector = v
  54. }
  55. keys := []string{"server", "clientId", "protocolVersion", "username", "password", "certificationPath", "privateKeyPath", "insecureSkipVerify"}
  56. if ms.hasKeys(keys, ps) {
  57. return fmt.Errorf("already have connection selector: %s, remove connection related config", conSelector)
  58. }
  59. ms.conSel = conSelector
  60. } else {
  61. srv := ""
  62. if pk, ok := ps["server"]; ok {
  63. if v, ok := pk.(string); ok {
  64. srv = v
  65. }
  66. } else {
  67. return fmt.Errorf("mqtt sink is missing property server")
  68. }
  69. clientid, ok := ps["clientId"]
  70. if !ok {
  71. if uuid, err := uuid.NewUUID(); err != nil {
  72. return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  73. } else {
  74. clientid = uuid.String()
  75. }
  76. }
  77. var pVersion uint = 3
  78. pVersionStr, ok := ps["protocolVersion"]
  79. if ok {
  80. v, _ := pVersionStr.(string)
  81. if v == "3.1" {
  82. pVersion = 3
  83. } else if v == "3.1.1" {
  84. pVersion = 4
  85. } else {
  86. return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
  87. }
  88. }
  89. uName := ""
  90. un, ok := ps["username"]
  91. if ok {
  92. v, _ := un.(string)
  93. if strings.Trim(v, " ") != "" {
  94. uName = v
  95. }
  96. }
  97. password := ""
  98. pwd, ok := ps["password"]
  99. if ok {
  100. v, _ := pwd.(string)
  101. if strings.Trim(v, " ") != "" {
  102. password = v
  103. }
  104. }
  105. certPath := ""
  106. if cp, ok := ps["certificationPath"]; ok {
  107. if v, ok := cp.(string); ok {
  108. certPath = v
  109. }
  110. }
  111. pKeyPath := ""
  112. if pk, ok := ps["privateKeyPath"]; ok {
  113. if v, ok := pk.(string); ok {
  114. pKeyPath = v
  115. }
  116. }
  117. insecureSkipVerify := false
  118. if pk, ok := ps["insecureSkipVerify"]; ok {
  119. if v, ok := pk.(bool); ok {
  120. insecureSkipVerify = v
  121. }
  122. }
  123. ms.srv = srv
  124. ms.clientid = clientid.(string)
  125. ms.pVersion = pVersion
  126. ms.uName = uName
  127. ms.password = password
  128. ms.certPath = certPath
  129. ms.pkeyPath = pKeyPath
  130. ms.insecureSkipVerify = insecureSkipVerify
  131. }
  132. tpc, ok := ps["topic"]
  133. if !ok {
  134. return fmt.Errorf("mqtt sink is missing property topic")
  135. }
  136. var qos byte = 0
  137. if qosRec, ok := ps["qos"]; ok {
  138. if v, err := cast.ToInt(qosRec, cast.STRICT); err == nil {
  139. qos = byte(v)
  140. }
  141. if qos != 0 && qos != 1 && qos != 2 {
  142. return fmt.Errorf("not valid qos value %v, the value could be only int 0 or 1 or 2", qos)
  143. }
  144. }
  145. retained := false
  146. if pk, ok := ps["retained"]; ok {
  147. if v, ok := pk.(bool); ok {
  148. retained = v
  149. }
  150. }
  151. ms.qos = qos
  152. ms.tpc = tpc.(string)
  153. ms.retained = retained
  154. return nil
  155. }
  156. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  157. var client MQTT.Client
  158. log := ctx.GetLogger()
  159. if ms.conSel != "" {
  160. con, err := ctx.GetConnection(ms.conSel)
  161. if err != nil {
  162. log.Errorf("The mqtt client for connection selector %s get fail with error: %s", ms.conSel, err)
  163. return err
  164. }
  165. client = con.(MQTT.Client)
  166. log.Infof("The mqtt client for connection selector %s get successfully", ms.conSel)
  167. } else {
  168. log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  169. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  170. if ms.certPath != "" || ms.pkeyPath != "" {
  171. log.Infof("Connect MQTT broker with certification and keys.")
  172. if cp, err := conf.ProcessPath(ms.certPath); err == nil {
  173. if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
  174. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  175. return err2
  176. } else {
  177. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.insecureSkipVerify})
  178. }
  179. } else {
  180. return err1
  181. }
  182. } else {
  183. return err
  184. }
  185. } else {
  186. log.Infof("Connect MQTT broker with username and password.")
  187. if ms.uName != "" {
  188. opts = opts.SetUsername(ms.uName)
  189. }
  190. if ms.password != "" {
  191. opts = opts.SetPassword(ms.password)
  192. }
  193. }
  194. opts.SetAutoReconnect(true)
  195. var reconn = false
  196. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  197. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  198. ms.conn = client
  199. reconn = true
  200. })
  201. opts.SetOnConnectHandler(func(client MQTT.Client) {
  202. if reconn {
  203. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  204. }
  205. })
  206. client = MQTT.NewClient(opts)
  207. if token := client.Connect(); token.Wait() && token.Error() != nil {
  208. return fmt.Errorf("Found error: %s", token.Error())
  209. }
  210. log.Infof("The connection to server %s:%d was established successfully", ms.srv, ms.clientid)
  211. }
  212. ms.conn = client
  213. return nil
  214. }
  215. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  216. logger := ctx.GetLogger()
  217. jsonBytes, _, err := ctx.TransformOutput(item)
  218. if err != nil {
  219. return err
  220. }
  221. c := ms.conn
  222. logger.Debugf("%s publish %s", ctx.GetOpId(), jsonBytes)
  223. tpc, err := ctx.ParseDynamicProp(ms.tpc, item)
  224. if err != nil {
  225. return err
  226. }
  227. if tpc, ok := tpc.(string); !ok {
  228. return fmt.Errorf("the value %v of dynamic prop %s for topic is not a string", ms.tpc, tpc)
  229. }
  230. if token := c.Publish(tpc.(string), ms.qos, ms.retained, jsonBytes); token.Wait() && token.Error() != nil {
  231. return fmt.Errorf("%s: %s", errorx.IOErr, token.Error())
  232. }
  233. return nil
  234. }
  235. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  236. logger := ctx.GetLogger()
  237. logger.Infof("Closing mqtt sink")
  238. if ms.conn != nil && ms.conn.IsConnected() && ms.conSel == "" {
  239. ms.conn.Disconnect(5000)
  240. }
  241. if ms.conSel != "" {
  242. ctx.ReleaseConnection(ms.conSel)
  243. }
  244. return nil
  245. }