client_mqtt.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package connection
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/google/uuid"
  7. "github.com/lf-edge/ekuiper/internal/conf"
  8. "github.com/lf-edge/ekuiper/internal/pkg/cert"
  9. "github.com/lf-edge/ekuiper/pkg/cast"
  10. "strings"
  11. )
  12. func init() {
  13. registerClientFactory("mqtt", func(s *ConSelector) Client {
  14. return &MQTTClient{selector: s}
  15. })
  16. }
  17. type MQTTConnectionConfig struct {
  18. Servers []string `json:"servers"`
  19. PVersion string `json:"protocolVersion"`
  20. ClientId string `json:"clientid"`
  21. Uname string `json:"username"`
  22. Password string `json:"password"`
  23. Certification string `json:"certificationPath"`
  24. PrivateKPath string `json:"privateKeyPath"`
  25. RootCaPath string `json:"rootCaPath"`
  26. InsecureSkipVerify bool `json:"insecureSkipVerify"`
  27. }
  28. type MQTTClient struct {
  29. srv string
  30. clientid string
  31. pVersion uint
  32. uName string
  33. password string
  34. tls *tls.Config
  35. selector *ConSelector
  36. conn MQTT.Client
  37. }
  38. func (ms *MQTTClient) CfgValidate(props map[string]interface{}) error {
  39. cfg := MQTTConnectionConfig{}
  40. err := cast.MapToStructStrict(props, &cfg)
  41. if err != nil {
  42. return fmt.Errorf("failed to get config for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
  43. }
  44. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  45. ms.srv = srvs[0]
  46. } else {
  47. return fmt.Errorf("missing server property for %s", ms.selector.ConnSelectorCfg)
  48. }
  49. if cfg.ClientId == "" {
  50. if newUUID, err := uuid.NewUUID(); err != nil {
  51. return fmt.Errorf("failed to get uuid for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
  52. } else {
  53. ms.clientid = newUUID.String()
  54. }
  55. } else {
  56. ms.clientid = cfg.ClientId
  57. }
  58. ms.pVersion = 3
  59. if cfg.PVersion == "3.1.1" {
  60. ms.pVersion = 4
  61. }
  62. tlsOpts := cert.TlsConfigurationOptions{
  63. SkipCertVerify: cfg.InsecureSkipVerify,
  64. CertFile: cfg.Certification,
  65. KeyFile: cfg.PrivateKPath,
  66. CaFile: cfg.RootCaPath,
  67. }
  68. conf.Log.Infof("Connect MQTT broker with TLS configs: %v for connection selector: %s.", tlsOpts, ms.selector.ConnSelectorCfg)
  69. tlscfg, err := cert.GenerateTLSForClient(tlsOpts)
  70. if err != nil {
  71. return err
  72. }
  73. ms.tls = tlscfg
  74. ms.uName = cfg.Uname
  75. ms.password = strings.Trim(cfg.Password, " ")
  76. return nil
  77. }
  78. func (ms *MQTTClient) GetClient() (interface{}, error) {
  79. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion).SetCleanSession(false)
  80. opts = opts.SetTLSConfig(ms.tls)
  81. if ms.uName != "" {
  82. opts = opts.SetUsername(ms.uName)
  83. }
  84. if ms.password != "" {
  85. opts = opts.SetPassword(ms.password)
  86. }
  87. opts = opts.SetClientID(ms.clientid)
  88. opts = opts.SetAutoReconnect(true)
  89. c := MQTT.NewClient(opts)
  90. if token := c.Connect(); token.Wait() && token.Error() != nil {
  91. conf.Log.Errorf("The connection to mqtt broker failed for connection selector: %s ", ms.selector.ConnSelectorCfg)
  92. return nil, fmt.Errorf("found error when connecting for connection selector %s: %s", ms.selector.ConnSelectorCfg, token.Error())
  93. }
  94. conf.Log.Infof("The connection to mqtt broker is established successfully for connection selector: %s.", ms.selector.ConnSelectorCfg)
  95. ms.conn = c
  96. return c, nil
  97. }
  98. func (ms *MQTTClient) CloseClient() error {
  99. conf.Log.Infof("Closing the connection to mqtt broker for connection selector: %s", ms.selector.ConnSelectorCfg)
  100. if ms.conn != nil && ms.conn.IsConnected() {
  101. ms.conn.Disconnect(5000)
  102. }
  103. return nil
  104. }