mqtt_source.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package source
  15. import (
  16. "crypto/tls"
  17. "encoding/json"
  18. "fmt"
  19. MQTT "github.com/eclipse/paho.mqtt.golang"
  20. "github.com/google/uuid"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "github.com/lf-edge/ekuiper/pkg/message"
  25. "path"
  26. "strconv"
  27. "strings"
  28. )
  29. type MQTTSource struct {
  30. srv string
  31. format string
  32. tpc string
  33. clientid string
  34. pVersion uint
  35. uName string
  36. password string
  37. certPath string
  38. pkeyPath string
  39. model modelVersion
  40. schema map[string]interface{}
  41. conn MQTT.Client
  42. }
  43. type MQTTConfig struct {
  44. Format string `json:"format"`
  45. Qos int `json:"qos"`
  46. Servers []string `json:"servers"`
  47. Clientid string `json:"clientid"`
  48. PVersion string `json:"protocolVersion"`
  49. Uname string `json:"username"`
  50. Password string `json:"password"`
  51. Certification string `json:"certificationPath"`
  52. PrivateKPath string `json:"privateKeyPath"`
  53. KubeedgeModelFile string `json:"kubeedgeModelFile"`
  54. KubeedgeVersion string `json:"kubeedgeVersion"`
  55. }
  56. func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
  57. return ms
  58. }
  59. func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
  60. cfg := &MQTTConfig{}
  61. err := cast.MapToStruct(props, cfg)
  62. if err != nil {
  63. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  64. }
  65. ms.tpc = topic
  66. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  67. ms.srv = srvs[0]
  68. } else {
  69. return fmt.Errorf("missing server property")
  70. }
  71. ms.format = cfg.Format
  72. ms.clientid = cfg.Clientid
  73. ms.pVersion = 3
  74. if cfg.PVersion == "3.1.1" {
  75. ms.pVersion = 4
  76. }
  77. ms.uName = cfg.Uname
  78. ms.password = strings.Trim(cfg.Password, " ")
  79. ms.certPath = cfg.Certification
  80. ms.pkeyPath = cfg.PrivateKPath
  81. if 0 != len(cfg.KubeedgeModelFile) {
  82. conf, err := conf.LoadConf(path.Join("sources", cfg.KubeedgeModelFile))
  83. if nil != err {
  84. return err
  85. }
  86. ms.model = modelFactory(cfg.KubeedgeVersion)
  87. if err = json.Unmarshal(conf, ms.model); err != nil {
  88. ms.model = nil
  89. return err
  90. }
  91. }
  92. return nil
  93. }
  94. func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  95. log := ctx.GetLogger()
  96. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
  97. if ms.clientid == "" {
  98. if uuid, err := uuid.NewUUID(); err != nil {
  99. errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
  100. } else {
  101. ms.clientid = uuid.String()
  102. opts.SetClientID(uuid.String())
  103. }
  104. } else {
  105. opts.SetClientID(ms.clientid)
  106. }
  107. if ms.certPath != "" || ms.pkeyPath != "" {
  108. log.Infof("Connect MQTT broker with certification and keys.")
  109. if cp, err := conf.ProcessPath(ms.certPath); err == nil {
  110. log.Infof("The certification file is %s.", cp)
  111. if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
  112. log.Infof("The private key file is %s.", kp)
  113. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  114. errCh <- err2
  115. } else {
  116. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  117. }
  118. } else {
  119. errCh <- err1
  120. }
  121. } else {
  122. errCh <- err
  123. }
  124. } else {
  125. log.Infof("Connect MQTT broker with username and password.")
  126. if ms.uName != "" {
  127. opts = opts.SetUsername(ms.uName)
  128. } else {
  129. log.Infof("The username is empty.")
  130. }
  131. if ms.password != "" {
  132. opts = opts.SetPassword(ms.password)
  133. } else {
  134. log.Infof("The password is empty.")
  135. }
  136. }
  137. opts.SetAutoReconnect(true)
  138. var reconn = false
  139. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  140. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  141. reconn = true
  142. subscribe(ms.tpc, client, ctx, consumer, ms.model, ms.format)
  143. })
  144. opts.SetOnConnectHandler(func(client MQTT.Client) {
  145. if reconn {
  146. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  147. subscribe(ms.tpc, client, ctx, consumer, ms.model, ms.format)
  148. }
  149. })
  150. c := MQTT.NewClient(opts)
  151. if token := c.Connect(); token.Wait() && token.Error() != nil {
  152. errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
  153. }
  154. log.Infof("The connection to server %s was established successfully", ms.srv)
  155. ms.conn = c
  156. subscribe(ms.tpc, c, ctx, consumer, ms.model, ms.format)
  157. log.Infof("Successfully subscribe to topic %s", ms.srv+": "+ms.clientid)
  158. }
  159. func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple, model modelVersion, format string) {
  160. log := ctx.GetLogger()
  161. h := func(client MQTT.Client, msg MQTT.Message) {
  162. log.Debugf("instance %d received %s", ctx.GetInstanceId(), msg.Payload())
  163. result, e := message.Decode(msg.Payload(), format)
  164. //The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
  165. if e != nil {
  166. log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), format, e)
  167. return
  168. }
  169. meta := make(map[string]interface{})
  170. meta["topic"] = msg.Topic()
  171. meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
  172. if nil != model {
  173. sliErr := model.checkType(result, msg.Topic())
  174. for _, v := range sliErr {
  175. log.Errorf(v)
  176. }
  177. }
  178. select {
  179. case consumer <- api.NewDefaultSourceTuple(result, meta):
  180. log.Debugf("send data to source node")
  181. case <-ctx.Done():
  182. return
  183. }
  184. }
  185. if token := client.Subscribe(topic, 0, h); token.Wait() && token.Error() != nil {
  186. log.Errorf("Found error: %s", token.Error())
  187. } else {
  188. log.Infof("Successfully subscribe to topic %s", topic)
  189. }
  190. }
  191. func (ms *MQTTSource) Close(ctx api.StreamContext) error {
  192. ctx.GetLogger().Infof("Mqtt Source instance %d Done", ctx.GetInstanceId())
  193. if ms.conn != nil && ms.conn.IsConnected() {
  194. ms.conn.Disconnect(5000)
  195. }
  196. return nil
  197. }