client_mqtt.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package connection
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/google/uuid"
  7. "github.com/lf-edge/ekuiper/internal/conf"
  8. "github.com/lf-edge/ekuiper/pkg/cast"
  9. "strings"
  10. )
  11. func init() {
  12. registerClientFactory("mqtt", func(s *ConSelector) Client {
  13. return &MQTTClient{selector: s}
  14. })
  15. }
  16. type MQTTConnectionConfig struct {
  17. Servers []string `json:"servers"`
  18. PVersion string `json:"protocolVersion"`
  19. ClientId string `json:"clientid"`
  20. Uname string `json:"username"`
  21. Password string `json:"password"`
  22. Certification string `json:"certificationPath"`
  23. PrivateKPath string `json:"privateKeyPath"`
  24. InsecureSkipVerify bool `json:"insecureSkipVerify"`
  25. }
  26. type MQTTClient struct {
  27. srv string
  28. clientid string
  29. pVersion uint
  30. uName string
  31. password string
  32. certPath string
  33. pkeyPath string
  34. Insecure bool
  35. selector *ConSelector
  36. conn MQTT.Client
  37. }
  38. func (ms *MQTTClient) CfgValidate(props map[string]interface{}) error {
  39. cfg := MQTTConnectionConfig{}
  40. err := cast.MapToStructStrict(props, &cfg)
  41. if err != nil {
  42. return fmt.Errorf("failed to get config for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
  43. }
  44. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  45. ms.srv = srvs[0]
  46. } else {
  47. return fmt.Errorf("missing server property for %s", ms.selector.ConnSelectorCfg)
  48. }
  49. if cfg.ClientId == "" {
  50. if newUUID, err := uuid.NewUUID(); err != nil {
  51. return fmt.Errorf("failed to get uuid for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
  52. } else {
  53. ms.clientid = newUUID.String()
  54. }
  55. } else {
  56. ms.clientid = cfg.ClientId
  57. }
  58. ms.pVersion = 3
  59. if cfg.PVersion == "3.1.1" {
  60. ms.pVersion = 4
  61. }
  62. if cfg.Certification != "" || cfg.PrivateKPath != "" {
  63. ms.certPath, err = conf.ProcessPath(cfg.Certification)
  64. if err != nil {
  65. return fmt.Errorf("failed to get certPath for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
  66. }
  67. ms.pkeyPath, err = conf.ProcessPath(cfg.PrivateKPath)
  68. if err != nil {
  69. return fmt.Errorf("failed to get keyPath for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
  70. }
  71. }
  72. ms.uName = cfg.Uname
  73. ms.password = strings.Trim(cfg.Password, " ")
  74. ms.Insecure = cfg.InsecureSkipVerify
  75. return nil
  76. }
  77. func (ms *MQTTClient) GetClient() (interface{}, error) {
  78. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion).SetCleanSession(false)
  79. if ms.certPath != "" && ms.pkeyPath != "" {
  80. if cer, err := tls.LoadX509KeyPair(ms.certPath, ms.pkeyPath); err != nil {
  81. return nil, fmt.Errorf("error when load cert/key for %s, the error is: %s", ms.selector.ConnSelectorCfg, err)
  82. } else {
  83. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.Insecure})
  84. }
  85. } else {
  86. if ms.uName != "" {
  87. opts = opts.SetUsername(ms.uName)
  88. }
  89. if ms.password != "" {
  90. opts = opts.SetPassword(ms.password)
  91. }
  92. }
  93. opts = opts.SetClientID(ms.clientid)
  94. opts = opts.SetAutoReconnect(true)
  95. c := MQTT.NewClient(opts)
  96. if token := c.Connect(); token.Wait() && token.Error() != nil {
  97. conf.Log.Errorf("The connection to mqtt broker failed for connection selector: %s ", ms.selector.ConnSelectorCfg)
  98. return nil, fmt.Errorf("found error when connecting for connection selector %s: %s", ms.selector.ConnSelectorCfg, token.Error())
  99. }
  100. conf.Log.Infof("The connection to mqtt broker is established successfully for connection selector: %s.", ms.selector.ConnSelectorCfg)
  101. ms.conn = c
  102. return c, nil
  103. }
  104. func (ms *MQTTClient) CloseClient() error {
  105. conf.Log.Infof("Closing the connection to mqtt broker for connection selector: %s", ms.selector.ConnSelectorCfg)
  106. if ms.conn != nil && ms.conn.IsConnected() {
  107. ms.conn.Disconnect(5000)
  108. }
  109. return nil
  110. }