mqtt_source.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package source
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. MQTT "github.com/eclipse/paho.mqtt.golang"
  19. "github.com/google/uuid"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "github.com/lf-edge/ekuiper/pkg/message"
  24. "path"
  25. "strconv"
  26. "strings"
  27. )
  28. type MQTTSource struct {
  29. srv string
  30. format string
  31. tpc string
  32. clientid string
  33. pVersion uint
  34. uName string
  35. password string
  36. certPath string
  37. pkeyPath string
  38. model modelVersion
  39. schema map[string]interface{}
  40. conn MQTT.Client
  41. }
  42. type MQTTConfig struct {
  43. Format string `json:"format"`
  44. Qos int `json:"qos"`
  45. Servers []string `json:"servers"`
  46. Clientid string `json:"clientid"`
  47. PVersion string `json:"protocolVersion"`
  48. Uname string `json:"username"`
  49. Password string `json:"password"`
  50. Certification string `json:"certificationPath"`
  51. PrivateKPath string `json:"privateKeyPath"`
  52. KubeedgeModelFile string `json:"kubeedgeModelFile"`
  53. KubeedgeVersion string `json:"kubeedgeVersion"`
  54. }
  55. func (ms *MQTTSource) WithSchema(_ string) *MQTTSource {
  56. return ms
  57. }
  58. func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
  59. cfg := &MQTTConfig{}
  60. err := cast.MapToStruct(props, cfg)
  61. if err != nil {
  62. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  63. }
  64. ms.tpc = topic
  65. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  66. ms.srv = srvs[0]
  67. } else {
  68. return fmt.Errorf("missing server property")
  69. }
  70. ms.format = cfg.Format
  71. ms.clientid = cfg.Clientid
  72. ms.pVersion = 3
  73. if cfg.PVersion == "3.1.1" {
  74. ms.pVersion = 4
  75. }
  76. ms.uName = cfg.Uname
  77. ms.password = strings.Trim(cfg.Password, " ")
  78. ms.certPath = cfg.Certification
  79. ms.pkeyPath = cfg.PrivateKPath
  80. if 0 != len(cfg.KubeedgeModelFile) {
  81. p := path.Join("sources", cfg.KubeedgeModelFile)
  82. ms.model = modelFactory(cfg.KubeedgeVersion)
  83. err = conf.LoadConfigFromPath(p, ms.model)
  84. if err != nil {
  85. return err
  86. }
  87. }
  88. return nil
  89. }
  90. func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  91. log := ctx.GetLogger()
  92. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
  93. if ms.clientid == "" {
  94. if newUUID, err := uuid.NewUUID(); err != nil {
  95. errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
  96. return
  97. } else {
  98. ms.clientid = newUUID.String()
  99. opts.SetClientID(newUUID.String())
  100. }
  101. } else {
  102. opts.SetClientID(ms.clientid)
  103. }
  104. if ms.certPath != "" || ms.pkeyPath != "" {
  105. log.Infof("Connect MQTT broker with certification and keys.")
  106. if cp, err := conf.ProcessPath(ms.certPath); err == nil {
  107. log.Infof("The certification file is %s.", cp)
  108. if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
  109. log.Infof("The private key file is %s.", kp)
  110. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  111. errCh <- err2
  112. return
  113. } else {
  114. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  115. }
  116. } else {
  117. errCh <- err1
  118. return
  119. }
  120. } else {
  121. errCh <- err
  122. return
  123. }
  124. } else {
  125. log.Infof("Connect MQTT broker with username and password.")
  126. if ms.uName != "" {
  127. opts = opts.SetUsername(ms.uName)
  128. } else {
  129. log.Infof("The username is empty.")
  130. }
  131. if ms.password != "" {
  132. opts = opts.SetPassword(ms.password)
  133. } else {
  134. log.Infof("The password is empty.")
  135. }
  136. }
  137. opts.SetAutoReconnect(true)
  138. var reconn = false
  139. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  140. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  141. reconn = true
  142. subscribe(ms.tpc, client, ctx, consumer, ms.model, ms.format)
  143. })
  144. opts.SetOnConnectHandler(func(client MQTT.Client) {
  145. if reconn {
  146. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  147. subscribe(ms.tpc, client, ctx, consumer, ms.model, ms.format)
  148. }
  149. })
  150. c := MQTT.NewClient(opts)
  151. if token := c.Connect(); token.Wait() && token.Error() != nil {
  152. errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
  153. return
  154. }
  155. log.Infof("The connection to server %s was established successfully", ms.srv)
  156. ms.conn = c
  157. subscribe(ms.tpc, c, ctx, consumer, ms.model, ms.format)
  158. log.Infof("Successfully subscribe to topic %s", ms.srv+": "+ms.clientid)
  159. }
  160. func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple, model modelVersion, format string) {
  161. log := ctx.GetLogger()
  162. h := func(client MQTT.Client, msg MQTT.Message) {
  163. log.Debugf("instance %d received %s", ctx.GetInstanceId(), msg.Payload())
  164. result, e := message.Decode(msg.Payload(), format)
  165. //The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
  166. if e != nil {
  167. log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), format, e)
  168. return
  169. }
  170. meta := make(map[string]interface{})
  171. meta["topic"] = msg.Topic()
  172. meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
  173. if nil != model {
  174. sliErr := model.checkType(result, msg.Topic())
  175. for _, v := range sliErr {
  176. log.Errorf(v)
  177. }
  178. }
  179. select {
  180. case consumer <- api.NewDefaultSourceTuple(result, meta):
  181. log.Debugf("send data to source node")
  182. case <-ctx.Done():
  183. return
  184. }
  185. }
  186. if token := client.Subscribe(topic, 0, h); token.Wait() && token.Error() != nil {
  187. log.Errorf("Found error: %s", token.Error())
  188. } else {
  189. log.Infof("Successfully subscribe to topic %s", topic)
  190. }
  191. }
  192. func (ms *MQTTSource) Close(ctx api.StreamContext) error {
  193. ctx.GetLogger().Infof("Mqtt Source instance %d Done", ctx.GetInstanceId())
  194. if ms.conn != nil && ms.conn.IsConnected() {
  195. ms.conn.Disconnect(5000)
  196. }
  197. return nil
  198. }