mqtt_source.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package source
  15. import (
  16. "fmt"
  17. MQTT "github.com/eclipse/paho.mqtt.golang"
  18. "github.com/google/uuid"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/pkg/cert"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "github.com/lf-edge/ekuiper/pkg/message"
  24. "path"
  25. "strconv"
  26. "strings"
  27. )
  28. type MQTTSource struct {
  29. srv string
  30. qos int
  31. format string
  32. tpc string
  33. clientid string
  34. pVersion uint
  35. uName string
  36. password string
  37. certPath string
  38. pkeyPath string
  39. rootCapath string
  40. conSel string
  41. InSecure bool
  42. model modelVersion
  43. schema map[string]interface{}
  44. conn MQTT.Client
  45. }
  46. type MQTTConfig struct {
  47. Format string `json:"format"`
  48. Qos int `json:"qos"`
  49. Servers []string `json:"servers"`
  50. Clientid string `json:"clientid"`
  51. PVersion string `json:"protocolVersion"`
  52. Uname string `json:"username"`
  53. Password string `json:"password"`
  54. Certification string `json:"certificationPath"`
  55. PrivateKPath string `json:"privateKeyPath"`
  56. RootCaPath string `json:"rootCaPath"`
  57. InsecureSkipVerify bool `json:"insecureSkipVerify"`
  58. KubeedgeModelFile string `json:"kubeedgeModelFile"`
  59. KubeedgeVersion string `json:"kubeedgeVersion"`
  60. ConnectionSelector string `json:"connectionSelector"`
  61. }
  62. func (ms *MQTTSource) WithSchema(_ string) *MQTTSource {
  63. return ms
  64. }
  65. func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
  66. cfg := &MQTTConfig{}
  67. err := cast.MapToStruct(props, cfg)
  68. if err != nil {
  69. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  70. }
  71. ms.tpc = topic
  72. if cfg.ConnectionSelector == "" {
  73. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  74. ms.srv = srvs[0]
  75. } else {
  76. return fmt.Errorf("missing server property")
  77. }
  78. }
  79. ms.conSel = cfg.ConnectionSelector
  80. ms.format = cfg.Format
  81. ms.clientid = cfg.Clientid
  82. ms.qos = cfg.Qos
  83. ms.pVersion = 3
  84. if cfg.PVersion == "3.1.1" {
  85. ms.pVersion = 4
  86. }
  87. ms.uName = cfg.Uname
  88. ms.password = strings.Trim(cfg.Password, " ")
  89. ms.certPath = cfg.Certification
  90. ms.pkeyPath = cfg.PrivateKPath
  91. ms.rootCapath = cfg.RootCaPath
  92. if 0 != len(cfg.KubeedgeModelFile) {
  93. p := path.Join("sources", cfg.KubeedgeModelFile)
  94. ms.model = modelFactory(cfg.KubeedgeVersion)
  95. err = conf.LoadConfigFromPath(p, ms.model)
  96. if err != nil {
  97. return err
  98. }
  99. }
  100. return nil
  101. }
  102. func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  103. var client MQTT.Client
  104. log := ctx.GetLogger()
  105. if ms.conSel != "" {
  106. con, err := ctx.GetConnection(ms.conSel)
  107. if err != nil {
  108. log.Errorf("The mqtt client for connection selector %s get fail with error: %s", ms.conSel, err)
  109. errCh <- err
  110. return
  111. }
  112. client = con.(MQTT.Client)
  113. log.Infof("The mqtt client for connection selector %s get successfully", ms.conSel)
  114. } else {
  115. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
  116. if ms.clientid == "" {
  117. if newUUID, err := uuid.NewUUID(); err != nil {
  118. errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
  119. return
  120. } else {
  121. ms.clientid = newUUID.String()
  122. opts = opts.SetClientID(newUUID.String())
  123. }
  124. } else {
  125. opts = opts.SetClientID(ms.clientid)
  126. }
  127. tlsOpts := cert.TlsConfigurationOptions{
  128. SkipCertVerify: ms.InSecure,
  129. CertFile: ms.certPath,
  130. KeyFile: ms.pkeyPath,
  131. CaFile: ms.rootCapath,
  132. }
  133. log.Infof("Connect MQTT broker with TLS configs. %v", tlsOpts)
  134. tlscfg, err := cert.GenerateTLSForClient(tlsOpts)
  135. if err != nil {
  136. errCh <- err
  137. return
  138. }
  139. opts = opts.SetTLSConfig(tlscfg)
  140. log.Infof("Connect MQTT broker with username and password.")
  141. if ms.uName != "" {
  142. opts = opts.SetUsername(ms.uName)
  143. } else {
  144. log.Infof("The username is empty.")
  145. }
  146. if ms.password != "" {
  147. opts = opts.SetPassword(ms.password)
  148. } else {
  149. log.Infof("The password is empty.")
  150. }
  151. opts.SetAutoReconnect(true)
  152. var reconn = false
  153. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  154. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  155. reconn = true
  156. subscribe(ms, client, ctx, consumer)
  157. })
  158. opts.SetOnConnectHandler(func(client MQTT.Client) {
  159. if reconn {
  160. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  161. subscribe(ms, client, ctx, consumer)
  162. }
  163. })
  164. client = MQTT.NewClient(opts)
  165. if token := client.Connect(); token.Wait() && token.Error() != nil {
  166. errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
  167. return
  168. }
  169. log.Infof("The connection to server %s:%s was established successfully", ms.srv, ms.clientid)
  170. }
  171. ms.conn = client
  172. subscribe(ms, client, ctx, consumer)
  173. }
  174. func subscribe(ms *MQTTSource, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple) {
  175. log := ctx.GetLogger()
  176. h := func(client MQTT.Client, msg MQTT.Message) {
  177. log.Debugf("instance %d received %s", ctx.GetInstanceId(), msg.Payload())
  178. result, e := message.Decode(msg.Payload(), ms.format)
  179. //The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
  180. if e != nil {
  181. log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), ms.format, e)
  182. return
  183. }
  184. meta := make(map[string]interface{})
  185. meta["topic"] = msg.Topic()
  186. meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
  187. if nil != ms.model {
  188. sliErr := ms.model.checkType(result, msg.Topic())
  189. for _, v := range sliErr {
  190. log.Errorf(v)
  191. }
  192. }
  193. select {
  194. case consumer <- api.NewDefaultSourceTuple(result, meta):
  195. log.Debugf("send data to source node")
  196. case <-ctx.Done():
  197. return
  198. }
  199. }
  200. if token := client.Subscribe(ms.tpc, byte(ms.qos), h); token.Wait() && token.Error() != nil {
  201. log.Errorf("Found error: %s", token.Error())
  202. } else {
  203. log.Infof("Successfully subscribe to topic %s", ms.tpc)
  204. }
  205. }
  206. func (ms *MQTTSource) Close(ctx api.StreamContext) error {
  207. ctx.GetLogger().Infof("Mqtt Source instance %d Done", ctx.GetInstanceId())
  208. if ms.conn != nil && ms.conn.IsConnected() && ms.conSel == "" {
  209. ms.conn.Disconnect(5000)
  210. }
  211. if ms.conSel != "" {
  212. ctx.ReleaseConnection(ms.conSel)
  213. }
  214. return nil
  215. }