mqtt_source.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package extensions
  2. import (
  3. "crypto/tls"
  4. "encoding/json"
  5. "fmt"
  6. MQTT "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/google/uuid"
  10. "path"
  11. "strconv"
  12. "strings"
  13. )
  14. type MQTTSource struct {
  15. srv string
  16. format string
  17. tpc string
  18. clientid string
  19. pVersion uint
  20. uName string
  21. password string
  22. certPath string
  23. pkeyPath string
  24. model modelVersion
  25. schema map[string]interface{}
  26. conn MQTT.Client
  27. }
  28. type MQTTConfig struct {
  29. Format string `json:"format"`
  30. Qos int `json:"qos"`
  31. Servers []string `json:"servers"`
  32. Clientid string `json:"clientid"`
  33. PVersion string `json:"protocolVersion"`
  34. Uname string `json:"username"`
  35. Password string `json:"password"`
  36. Certification string `json:"certificationPath"`
  37. PrivateKPath string `json:"privateKeyPath"`
  38. KubeedgeModelFile string `json:"kubeedgeModelFile"`
  39. KubeedgeVersion string `json:"kubeedgeVersion"`
  40. }
  41. func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
  42. return ms
  43. }
  44. func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
  45. cfg := &MQTTConfig{}
  46. err := common.MapToStruct(props, cfg)
  47. if err != nil {
  48. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  49. }
  50. ms.tpc = topic
  51. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  52. ms.srv = srvs[0]
  53. } else {
  54. return fmt.Errorf("missing server property")
  55. }
  56. ms.format = cfg.Format
  57. ms.clientid = cfg.Clientid
  58. ms.pVersion = 3
  59. if cfg.PVersion == "3.1.1" {
  60. ms.pVersion = 4
  61. }
  62. ms.uName = cfg.Uname
  63. ms.password = strings.Trim(cfg.Password, " ")
  64. ms.certPath = cfg.Certification
  65. ms.pkeyPath = cfg.PrivateKPath
  66. if 0 != len(cfg.KubeedgeModelFile) {
  67. conf, err := common.LoadConf(path.Join("sources", cfg.KubeedgeModelFile))
  68. if nil != err {
  69. return err
  70. }
  71. ms.model = modelFactory(cfg.KubeedgeVersion)
  72. if err = json.Unmarshal(conf, ms.model); err != nil {
  73. ms.model = nil
  74. return err
  75. }
  76. }
  77. return nil
  78. }
  79. func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  80. log := ctx.GetLogger()
  81. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
  82. if ms.clientid == "" {
  83. if uuid, err := uuid.NewUUID(); err != nil {
  84. errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
  85. } else {
  86. ms.clientid = uuid.String()
  87. opts.SetClientID(uuid.String())
  88. }
  89. } else {
  90. opts.SetClientID(ms.clientid)
  91. }
  92. if ms.certPath != "" || ms.pkeyPath != "" {
  93. log.Infof("Connect MQTT broker with certification and keys.")
  94. if cp, err := common.ProcessPath(ms.certPath); err == nil {
  95. log.Infof("The certification file is %s.", cp)
  96. if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
  97. log.Infof("The private key file is %s.", kp)
  98. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  99. errCh <- err2
  100. } else {
  101. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  102. }
  103. } else {
  104. errCh <- err1
  105. }
  106. } else {
  107. errCh <- err
  108. }
  109. } else {
  110. log.Infof("Connect MQTT broker with username and password.")
  111. if ms.uName != "" {
  112. opts = opts.SetUsername(ms.uName)
  113. } else {
  114. log.Infof("The username is empty.")
  115. }
  116. if ms.password != "" {
  117. opts = opts.SetPassword(ms.password)
  118. } else {
  119. log.Infof("The password is empty.")
  120. }
  121. }
  122. opts.SetAutoReconnect(true)
  123. var reconn = false
  124. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  125. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  126. reconn = true
  127. subscribe(ms.tpc, client, ctx, consumer, ms.model, ms.format)
  128. })
  129. opts.SetOnConnectHandler(func(client MQTT.Client) {
  130. if reconn {
  131. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  132. }
  133. })
  134. c := MQTT.NewClient(opts)
  135. if token := c.Connect(); token.Wait() && token.Error() != nil {
  136. errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
  137. }
  138. log.Infof("The connection to server %s was established successfully", ms.srv)
  139. ms.conn = c
  140. subscribe(ms.tpc, c, ctx, consumer, ms.model, ms.format)
  141. log.Infof("Successfully subscribe to topic %s", ms.srv+": "+ms.clientid)
  142. }
  143. func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple, model modelVersion, format string) {
  144. log := ctx.GetLogger()
  145. h := func(client MQTT.Client, msg MQTT.Message) {
  146. log.Debugf("instance %d received %s", ctx.GetInstanceId(), msg.Payload())
  147. result, e := common.MessageDecode(msg.Payload(), format)
  148. //The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
  149. if e != nil {
  150. log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), format, e)
  151. return
  152. }
  153. meta := make(map[string]interface{})
  154. meta["topic"] = msg.Topic()
  155. meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
  156. if nil != model {
  157. sliErr := model.checkType(result, msg.Topic())
  158. for _, v := range sliErr {
  159. log.Errorf(v)
  160. }
  161. }
  162. select {
  163. case consumer <- api.NewDefaultSourceTuple(result, meta):
  164. log.Debugf("send data to source node")
  165. case <-ctx.Done():
  166. return
  167. }
  168. }
  169. if token := client.Subscribe(topic, 0, h); token.Wait() && token.Error() != nil {
  170. log.Errorf("Found error: %s", token.Error())
  171. } else {
  172. log.Infof("Successfully subscribe to topic %s", topic)
  173. }
  174. }
  175. func (ms *MQTTSource) Close(ctx api.StreamContext) error {
  176. ctx.GetLogger().Infof("Mqtt Source instance %d Done", ctx.GetInstanceId())
  177. if ms.conn != nil && ms.conn.IsConnected() {
  178. ms.conn.Disconnect(5000)
  179. }
  180. return nil
  181. }