mqtt_source.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package source
  2. import (
  3. "crypto/tls"
  4. "encoding/json"
  5. "fmt"
  6. MQTT "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/emqx/kuiper/internal/conf"
  8. "github.com/emqx/kuiper/pkg/api"
  9. "github.com/emqx/kuiper/pkg/cast"
  10. "github.com/emqx/kuiper/pkg/message"
  11. "github.com/google/uuid"
  12. "path"
  13. "strconv"
  14. "strings"
  15. )
  16. type MQTTSource struct {
  17. srv string
  18. format string
  19. tpc string
  20. clientid string
  21. pVersion uint
  22. uName string
  23. password string
  24. certPath string
  25. pkeyPath string
  26. model modelVersion
  27. schema map[string]interface{}
  28. conn MQTT.Client
  29. }
  30. type MQTTConfig struct {
  31. Format string `json:"format"`
  32. Qos int `json:"qos"`
  33. Servers []string `json:"servers"`
  34. Clientid string `json:"clientid"`
  35. PVersion string `json:"protocolVersion"`
  36. Uname string `json:"username"`
  37. Password string `json:"password"`
  38. Certification string `json:"certificationPath"`
  39. PrivateKPath string `json:"privateKeyPath"`
  40. KubeedgeModelFile string `json:"kubeedgeModelFile"`
  41. KubeedgeVersion string `json:"kubeedgeVersion"`
  42. }
  43. func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
  44. return ms
  45. }
  46. func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
  47. cfg := &MQTTConfig{}
  48. err := cast.MapToStruct(props, cfg)
  49. if err != nil {
  50. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  51. }
  52. ms.tpc = topic
  53. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  54. ms.srv = srvs[0]
  55. } else {
  56. return fmt.Errorf("missing server property")
  57. }
  58. ms.format = cfg.Format
  59. ms.clientid = cfg.Clientid
  60. ms.pVersion = 3
  61. if cfg.PVersion == "3.1.1" {
  62. ms.pVersion = 4
  63. }
  64. ms.uName = cfg.Uname
  65. ms.password = strings.Trim(cfg.Password, " ")
  66. ms.certPath = cfg.Certification
  67. ms.pkeyPath = cfg.PrivateKPath
  68. if 0 != len(cfg.KubeedgeModelFile) {
  69. conf, err := conf.LoadConf(path.Join("sources", cfg.KubeedgeModelFile))
  70. if nil != err {
  71. return err
  72. }
  73. ms.model = modelFactory(cfg.KubeedgeVersion)
  74. if err = json.Unmarshal(conf, ms.model); err != nil {
  75. ms.model = nil
  76. return err
  77. }
  78. }
  79. return nil
  80. }
  81. func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  82. log := ctx.GetLogger()
  83. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
  84. if ms.clientid == "" {
  85. if uuid, err := uuid.NewUUID(); err != nil {
  86. errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
  87. } else {
  88. ms.clientid = uuid.String()
  89. opts.SetClientID(uuid.String())
  90. }
  91. } else {
  92. opts.SetClientID(ms.clientid)
  93. }
  94. if ms.certPath != "" || ms.pkeyPath != "" {
  95. log.Infof("Connect MQTT broker with certification and keys.")
  96. if cp, err := conf.ProcessPath(ms.certPath); err == nil {
  97. log.Infof("The certification file is %s.", cp)
  98. if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
  99. log.Infof("The private key file is %s.", kp)
  100. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  101. errCh <- err2
  102. } else {
  103. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  104. }
  105. } else {
  106. errCh <- err1
  107. }
  108. } else {
  109. errCh <- err
  110. }
  111. } else {
  112. log.Infof("Connect MQTT broker with username and password.")
  113. if ms.uName != "" {
  114. opts = opts.SetUsername(ms.uName)
  115. } else {
  116. log.Infof("The username is empty.")
  117. }
  118. if ms.password != "" {
  119. opts = opts.SetPassword(ms.password)
  120. } else {
  121. log.Infof("The password is empty.")
  122. }
  123. }
  124. opts.SetAutoReconnect(true)
  125. var reconn = false
  126. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  127. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  128. reconn = true
  129. subscribe(ms.tpc, client, ctx, consumer, ms.model, ms.format)
  130. })
  131. opts.SetOnConnectHandler(func(client MQTT.Client) {
  132. if reconn {
  133. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  134. }
  135. })
  136. c := MQTT.NewClient(opts)
  137. if token := c.Connect(); token.Wait() && token.Error() != nil {
  138. errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
  139. }
  140. log.Infof("The connection to server %s was established successfully", ms.srv)
  141. ms.conn = c
  142. subscribe(ms.tpc, c, ctx, consumer, ms.model, ms.format)
  143. log.Infof("Successfully subscribe to topic %s", ms.srv+": "+ms.clientid)
  144. }
  145. func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple, model modelVersion, format string) {
  146. log := ctx.GetLogger()
  147. h := func(client MQTT.Client, msg MQTT.Message) {
  148. log.Debugf("instance %d received %s", ctx.GetInstanceId(), msg.Payload())
  149. result, e := message.Decode(msg.Payload(), format)
  150. //The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
  151. if e != nil {
  152. log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), format, e)
  153. return
  154. }
  155. meta := make(map[string]interface{})
  156. meta["topic"] = msg.Topic()
  157. meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
  158. if nil != model {
  159. sliErr := model.checkType(result, msg.Topic())
  160. for _, v := range sliErr {
  161. log.Errorf(v)
  162. }
  163. }
  164. select {
  165. case consumer <- api.NewDefaultSourceTuple(result, meta):
  166. log.Debugf("send data to source node")
  167. case <-ctx.Done():
  168. return
  169. }
  170. }
  171. if token := client.Subscribe(topic, 0, h); token.Wait() && token.Error() != nil {
  172. log.Errorf("Found error: %s", token.Error())
  173. } else {
  174. log.Infof("Successfully subscribe to topic %s", topic)
  175. }
  176. }
  177. func (ms *MQTTSource) Close(ctx api.StreamContext) error {
  178. ctx.GetLogger().Infof("Mqtt Source instance %d Done", ctx.GetInstanceId())
  179. if ms.conn != nil && ms.conn.IsConnected() {
  180. ms.conn.Disconnect(5000)
  181. }
  182. return nil
  183. }