mqtt_source.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package extensions
  2. import (
  3. "crypto/tls"
  4. "encoding/json"
  5. "fmt"
  6. MQTT "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/google/uuid"
  10. "path"
  11. "strconv"
  12. "strings"
  13. )
  14. type MQTTSource struct {
  15. srv string
  16. tpc string
  17. clientid string
  18. pVersion uint
  19. uName string
  20. password string
  21. certPath string
  22. pkeyPath string
  23. model modelVersion
  24. schema map[string]interface{}
  25. conn MQTT.Client
  26. }
  27. type MQTTConfig struct {
  28. Qos int `json:"qos"`
  29. Sharedsubscription bool `json:"sharedSubscription"`
  30. Servers []string `json:"servers"`
  31. Clientid string `json:"clientid"`
  32. PVersion string `json:"protocolVersion"`
  33. Uname string `json:"username"`
  34. Password string `json:"password"`
  35. Certification string `json:"certificationPath"`
  36. PrivateKPath string `json:"privateKeyPath"`
  37. KubeedgeModelFile string `json:"kubeedgeModelFile"`
  38. KubeedgeVersion string `json:"kubeedgeVersion"`
  39. }
  40. func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
  41. return ms
  42. }
  43. func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
  44. cfg := &MQTTConfig{}
  45. err := common.MapToStruct(props, cfg)
  46. if err != nil {
  47. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  48. }
  49. ms.tpc = topic
  50. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  51. ms.srv = srvs[0]
  52. } else {
  53. return fmt.Errorf("missing server property")
  54. }
  55. ms.clientid = cfg.Clientid
  56. ms.pVersion = 3
  57. if cfg.PVersion == "3.1.1" {
  58. ms.pVersion = 4
  59. }
  60. ms.uName = cfg.Uname
  61. ms.password = strings.Trim(cfg.Password, " ")
  62. ms.certPath = cfg.Certification
  63. ms.pkeyPath = cfg.PrivateKPath
  64. if 0 != len(cfg.KubeedgeModelFile) {
  65. conf, err := common.LoadConf(path.Join("sources", cfg.KubeedgeModelFile))
  66. if nil != err {
  67. return err
  68. }
  69. ms.model = modelFactory(cfg.KubeedgeVersion)
  70. if err = json.Unmarshal(conf, ms.model); err != nil {
  71. ms.model = nil
  72. return err
  73. }
  74. }
  75. return nil
  76. }
  77. func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  78. log := ctx.GetLogger()
  79. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
  80. if ms.clientid == "" {
  81. if uuid, err := uuid.NewUUID(); err != nil {
  82. errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
  83. } else {
  84. ms.clientid = uuid.String()
  85. opts.SetClientID(uuid.String())
  86. }
  87. } else {
  88. opts.SetClientID(ms.clientid)
  89. }
  90. if ms.certPath != "" || ms.pkeyPath != "" {
  91. log.Infof("Connect MQTT broker with certification and keys.")
  92. if cp, err := common.ProcessPath(ms.certPath); err == nil {
  93. log.Infof("The certification file is %s.", cp)
  94. if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
  95. log.Infof("The private key file is %s.", kp)
  96. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  97. errCh <- err2
  98. } else {
  99. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  100. }
  101. } else {
  102. errCh <- err1
  103. }
  104. } else {
  105. errCh <- err
  106. }
  107. } else {
  108. log.Infof("Connect MQTT broker with username and password.")
  109. if ms.uName != "" {
  110. opts = opts.SetUsername(ms.uName)
  111. } else {
  112. log.Infof("The username is empty.")
  113. }
  114. if ms.password != "" {
  115. opts = opts.SetPassword(ms.password)
  116. } else {
  117. log.Infof("The password is empty.")
  118. }
  119. }
  120. opts.SetAutoReconnect(true)
  121. var reconn = false
  122. opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
  123. log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
  124. reconn = true
  125. subscribe(ms.tpc, client, ctx, consumer, ms.model)
  126. })
  127. opts.SetOnConnectHandler(func(client MQTT.Client) {
  128. if reconn {
  129. log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
  130. }
  131. })
  132. c := MQTT.NewClient(opts)
  133. if token := c.Connect(); token.Wait() && token.Error() != nil {
  134. errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
  135. }
  136. log.Infof("The connection to server %s was established successfully", ms.srv)
  137. ms.conn = c
  138. subscribe(ms.tpc, c, ctx, consumer, ms.model)
  139. log.Infof("Successfully subscribe to topic %s", ms.srv+": "+ms.clientid)
  140. }
  141. func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple, model modelVersion) {
  142. log := ctx.GetLogger()
  143. h := func(client MQTT.Client, msg MQTT.Message) {
  144. log.Debugf("instance %d received %s", ctx.GetInstanceId(), msg.Payload())
  145. result := make(map[string]interface{})
  146. //The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
  147. if e := json.Unmarshal(msg.Payload(), &result); e != nil {
  148. log.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(msg.Payload()), e)
  149. return
  150. }
  151. meta := make(map[string]interface{})
  152. meta["topic"] = msg.Topic()
  153. meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
  154. if nil != model {
  155. sliErr := model.checkType(result, msg.Topic())
  156. for _, v := range sliErr {
  157. log.Errorf(v)
  158. }
  159. }
  160. select {
  161. case consumer <- api.NewDefaultSourceTuple(result, meta):
  162. log.Debugf("send data to source node")
  163. case <-ctx.Done():
  164. return
  165. }
  166. }
  167. if token := client.Subscribe(topic, 0, h); token.Wait() && token.Error() != nil {
  168. log.Errorf("Found error: %s", token.Error())
  169. } else {
  170. log.Infof("Successfully subscribe to topic %s", topic)
  171. }
  172. }
  173. func (ms *MQTTSource) Close(ctx api.StreamContext) error {
  174. ctx.GetLogger().Infof("Mqtt Source instance %d Done", ctx.GetInstanceId())
  175. if ms.conn != nil && ms.conn.IsConnected() {
  176. ms.conn.Disconnect(5000)
  177. }
  178. return nil
  179. }