mqtt_source.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package source
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. MQTT "github.com/eclipse/paho.mqtt.golang"
  19. "github.com/google/uuid"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "github.com/lf-edge/ekuiper/pkg/message"
  24. "path"
  25. "strconv"
  26. "strings"
  27. )
  28. type MQTTSource struct {
  29. srv string
  30. qos int
  31. format string
  32. tpc string
  33. clientid string
  34. pVersion uint
  35. uName string
  36. password string
  37. certPath string
  38. pkeyPath string
  39. conSel string
  40. InSecure bool
  41. model modelVersion
  42. schema map[string]interface{}
  43. conn MQTT.Client
  44. }
  45. type MQTTConfig struct {
  46. Format string `json:"format"`
  47. Qos int `json:"qos"`
  48. Servers []string `json:"servers"`
  49. Clientid string `json:"clientid"`
  50. PVersion string `json:"protocolVersion"`
  51. Uname string `json:"username"`
  52. Password string `json:"password"`
  53. Certification string `json:"certificationPath"`
  54. PrivateKPath string `json:"privateKeyPath"`
  55. InsecureSkipVerify bool `json:"insecureSkipVerify"`
  56. KubeedgeModelFile string `json:"kubeedgeModelFile"`
  57. KubeedgeVersion string `json:"kubeedgeVersion"`
  58. ConnectionSelector string `json:"connectionSelector"`
  59. }
  60. func (ms *MQTTSource) WithSchema(_ string) *MQTTSource {
  61. return ms
  62. }
  63. func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
  64. cfg := &MQTTConfig{}
  65. err := cast.MapToStruct(props, cfg)
  66. if err != nil {
  67. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  68. }
  69. ms.tpc = topic
  70. if cfg.ConnectionSelector == "" {
  71. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  72. ms.srv = srvs[0]
  73. } else {
  74. return fmt.Errorf("missing server property")
  75. }
  76. }
  77. ms.conSel = cfg.ConnectionSelector
  78. ms.format = cfg.Format
  79. ms.clientid = cfg.Clientid
  80. ms.qos = cfg.Qos
  81. ms.pVersion = 3
  82. if cfg.PVersion == "3.1.1" {
  83. ms.pVersion = 4
  84. }
  85. ms.uName = cfg.Uname
  86. ms.password = strings.Trim(cfg.Password, " ")
  87. ms.certPath = cfg.Certification
  88. ms.pkeyPath = cfg.PrivateKPath
  89. if 0 != len(cfg.KubeedgeModelFile) {
  90. p := path.Join("sources", cfg.KubeedgeModelFile)
  91. ms.model = modelFactory(cfg.KubeedgeVersion)
  92. err = conf.LoadConfigFromPath(p, ms.model)
  93. if err != nil {
  94. return err
  95. }
  96. }
  97. return nil
  98. }
  99. func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  100. var client MQTT.Client
  101. log := ctx.GetLogger()
  102. if ms.conSel != "" {
  103. con, err := ctx.GetConnection(ms.conSel)
  104. if err != nil {
  105. log.Errorf("The mqtt client for connection selector %s get fail with error: %s", ms.conSel, err)
  106. errCh <- err
  107. return
  108. }
  109. client = con.(MQTT.Client)
  110. log.Infof("The mqtt client for connection selector %s get successfully", ms.conSel)
  111. } else {
  112. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
  113. if ms.clientid == "" {
  114. if newUUID, err := uuid.NewUUID(); err != nil {
  115. errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
  116. return
  117. } else {
  118. ms.clientid = newUUID.String()
  119. opts = opts.SetClientID(newUUID.String())
  120. }
  121. } else {
  122. opts = opts.SetClientID(ms.clientid)
  123. }
  124. if ms.certPath != "" || ms.pkeyPath != "" {
  125. log.Infof("Connect MQTT broker with certification and keys.")
  126. if cp, err := conf.ProcessPath(ms.certPath); err == nil {
  127. log.Infof("The certification file is %s.", cp)
  128. if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
  129. log.Infof("The private key file is %s.", kp)
  130. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  131. errCh <- err2
  132. return
  133. } else {
  134. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.InSecure})
  135. }
  136. } else {
  137. errCh <- err1
  138. return
  139. }
  140. } else {
  141. errCh <- err
  142. return
  143. }
  144. } else {
  145. log.Infof("Connect MQTT broker with username and password.")
  146. if ms.uName != "" {
  147. opts = opts.SetUsername(ms.uName)
  148. } else {
  149. log.Infof("The username is empty.")
  150. }
  151. if ms.password != "" {
  152. opts = opts.SetPassword(ms.password)
  153. } else {
  154. log.Infof("The password is empty.")
  155. }
  156. }
  157. opts.SetAutoReconnect(true)
  158. var reconn = false
  159. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  160. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  161. reconn = true
  162. subscribe(ms, client, ctx, consumer)
  163. })
  164. opts.SetOnConnectHandler(func(client MQTT.Client) {
  165. if reconn {
  166. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  167. subscribe(ms, client, ctx, consumer)
  168. }
  169. })
  170. client = MQTT.NewClient(opts)
  171. if token := client.Connect(); token.Wait() && token.Error() != nil {
  172. errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
  173. return
  174. }
  175. log.Infof("The connection to server %s:%s was established successfully", ms.srv, ms.clientid)
  176. }
  177. ms.conn = client
  178. subscribe(ms, client, ctx, consumer)
  179. }
  180. func subscribe(ms *MQTTSource, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple) {
  181. log := ctx.GetLogger()
  182. h := func(client MQTT.Client, msg MQTT.Message) {
  183. log.Debugf("instance %d received %s", ctx.GetInstanceId(), msg.Payload())
  184. result, e := message.Decode(msg.Payload(), ms.format)
  185. //The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
  186. if e != nil {
  187. log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), ms.format, e)
  188. return
  189. }
  190. meta := make(map[string]interface{})
  191. meta["topic"] = msg.Topic()
  192. meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
  193. if nil != ms.model {
  194. sliErr := ms.model.checkType(result, msg.Topic())
  195. for _, v := range sliErr {
  196. log.Errorf(v)
  197. }
  198. }
  199. select {
  200. case consumer <- api.NewDefaultSourceTuple(result, meta):
  201. log.Debugf("send data to source node")
  202. case <-ctx.Done():
  203. return
  204. }
  205. }
  206. if token := client.Subscribe(ms.tpc, byte(ms.qos), h); token.Wait() && token.Error() != nil {
  207. log.Errorf("Found error: %s", token.Error())
  208. } else {
  209. log.Infof("Successfully subscribe to topic %s", ms.tpc)
  210. }
  211. }
  212. func (ms *MQTTSource) Close(ctx api.StreamContext) error {
  213. ctx.GetLogger().Infof("Mqtt Source instance %d Done", ctx.GetInstanceId())
  214. if ms.conn != nil && ms.conn.IsConnected() && ms.conSel == "" {
  215. ms.conn.Disconnect(5000)
  216. }
  217. if ms.conSel != "" {
  218. ctx.ReleaseConnection(ms.conSel)
  219. }
  220. return nil
  221. }