Browse Source

fix(auth): fix cert auth bug in mqtt and http

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 3 years atrás
parent
commit
75888e1a66

+ 1 - 0
docs/en_US/rules/sinks/mqtt.md

@@ -13,6 +13,7 @@ The action is used for publish output message into an MQTT server.
 | password           | true     | The password for the connection.                             |
 | certificationPath  | true     | The certification path. It can be an absolute path, or a relative path. If it is an relative path, then the base path is where you excuting the `kuiperd` command. For example, if you run `bin/kuiperd` from `/var/kuiper`, then the base path is `/var/kuiper`; If you run `./kuiperd` from `/var/kuiper/bin`, then the base path is `/var/kuiper/bin`. |
 | privateKeyPath     | true     | The private key path. It can be either absolute path, or relative path, which is similar to use of certificationPath. |
+| rootCaPath     | true     | The location of root ca path. It can be an absolute path, or a relative path, which is similar to use of certificationPath. |
 | insecureSkipVerify | true     | If InsecureSkipVerify is `true`, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is `false`. The configuration item can only be used with TLS connections. |
 | retained           | true     | If retained is `true`,The broker stores the last retained message and the corresponding QoS for that topic.The default value is `false`.
 | connectionSelector | true     | reuse the connection to mqtt broker. [more info](../sources/mqtt.md#connectionselector)

+ 5 - 0
docs/en_US/rules/sources/http_pull.md

@@ -21,6 +21,7 @@ default:
   # Body type, none|text|json|html|xml|javascript|form
   bodyType: json
   # HTTP headers required for the request
+  insecureSkipVerify: true
   headers:
     Accept: application/json
 
@@ -61,6 +62,10 @@ The body of request, such as `'{"data": "data", "method": 1}'`
 
 Body type, it could be none|text|json|html|xml|javascript|format.
 
+### insecureSkipVerify
+
+Control if to skip the certification verification. If it is set to true, then skip certification verification; Otherwise, verify the certification
+
 ### headers
 
 The HTTP request headers that you want to send along with the HTTP request.

+ 12 - 2
docs/en_US/rules/sources/mqtt.md

@@ -11,6 +11,8 @@ default:
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
   #privateKeyPath: /var/kuiper/xyz-private.pem.key
+  #rootCaPath: /var/kuiper/xyz-rootca.pem
+  #insecureSkipVerify: true
   #connectionSelector: mqtt.mqtt_conf1
 
 
@@ -35,11 +37,11 @@ The server list for MQTT message broker. Currently, only ``ONE`` server can be s
 
 ### username
 
-The username for MQTT connection. The configuration will not be used if ``certificationPath`` or ``privateKeyPath`` is specified.
+The username for MQTT connection. 
 
 ### password
 
-The password for MQTT connection. The configuration will not be used if ``certificationPath`` or ``privateKeyPath`` is specified.
+The password for MQTT connection. 
 
 ### certificationPath
 
@@ -49,6 +51,14 @@ The location of certification path. It can be an absolute path, or a relative pa
 
 The location of private key path. It can be an absolute path, or a relative path.  For more detailed information, please refer to ``certificationPath``. Such as ``d3807d9fa5-private.pem.key``.
 
+### rootCaPath
+
+The location of root ca path. It can be an absolute path, or a relative path.
+
+### insecureSkipVerify
+
+Control if to skip the certification verification. If it is set to true, then skip certification verification; Otherwise, verify the certification
+
 ### connectionSelector
 
 specify the stream to reuse the connection to mqtt broker. The connection profile located in ``connections/connection.yaml``.

+ 1 - 0
docs/zh_CN/rules/sinks/mqtt.md

@@ -13,6 +13,7 @@
 | password          | 是    | 连接密码                             |
 | certificationPath | 是    | 证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 `kuiperd` 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/kuiperd` ,那么父目录为 `/var/kuiper`; 如果运行从 `/var/kuiper/bin` 中运行`./kuiperd`,那么父目录为 `/var/kuiper/bin`。 |
 | privateKeyPath    | 是    | 私钥路径。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。 |
+| rootCaPath    | 是    | 根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。 |
 | insecureSkipVerify | 是     | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。|
 | retained           | 是     | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`   
 | connectionSelector | 是     | 复用到 MQTT Broker 的连接,详细信息,[请参考](../sources/mqtt.md#connectionselector)

+ 3 - 0
docs/zh_CN/rules/sources/http_pull.md

@@ -61,6 +61,9 @@ http 请求的超时时间,单位为 ms。
 
 正文类型,可以是 none|text|json|html|xml|javascript|form。
 
+### insecureSkipVerify 
+控制是否跳过证书认证。如果被设置为 `true`,那么跳过证书认证;否则进行证书验证。缺省为 `true`
+
 ### headers
 
 需要与 HTTP 请求一起发送的 HTTP 请求标头。

+ 10 - 2
docs/zh_CN/rules/sources/mqtt.md

@@ -35,11 +35,11 @@ MQTT 消息代理的服务器列表。 当前,只能指定一个服务器。
 
 ### username
 
-MQTT 连接用户名。如果指定了 `certificationPath`  或者 `privateKeyPath`,那么该项配置不会被使用。
+MQTT 连接用户名。
 
 ### password
 
-MQTT 连接密码。如果指定了 `certificationPath` 或者 `privateKeyPath`,那么该项配置不会被使用。
+MQTT 连接密码。
 
 ### certificationPath
 
@@ -49,6 +49,14 @@ MQTT 连接密码。如果指定了 `certificationPath` 或者 `privateKeyPath`
 
 私钥路径。可以为绝对路径,也可以为相对路径。更详细的信息,请参考 `certificationPath`,比如 `d3807d9fa5-private.pem.key`。
 
+### rootCaPath
+
+根证书路径。可以为绝对路径,也可以为相对路径。.
+
+### insecureSkipVerify
+
+如果 InsecureSkipVerify 设置为 true, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为false。配置项只能用于TLS连接
+
 ### connectionSelector
 
 复用 MQTT 源连接。连接配置信息位于 ``connections/connection.yaml``.

+ 3 - 1
etc/connections/connection.yaml

@@ -4,7 +4,8 @@ mqtt:
     username: ekuiper
     password: password
     #certificationPath: /var/kuiper/xyz-certificate.pem
-    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
+    #privateKeyPath: /var/kuiper/xyz-private.pem.key
+    #rootCaPath: /var/kuiper/xyz-rootca.pem
     #insecureSkipVerify: false
     #protocolVersion: 3
     clientid: ekuiper
@@ -14,6 +15,7 @@ mqtt:
     password: password
     #certificationPath: /var/kuiper/xyz-certificate.pem
     #privateKeyPath: /var/kuiper/xyz-private.pem.ke
+    #rootCaPath: /var/kuiper/xyz-rootca.pem
     #insecureSkipVerify: false
     #protocolVersion: 3
 

+ 28 - 0
etc/mqtt_source.json

@@ -104,6 +104,34 @@
 				"zh_CN": "私钥路径"
 			}
 		}, {
+			"name": "rootCaPath",
+			"default": "",
+			"optional": true,
+			"control": "text",
+			"type": "string",
+			"hint": {
+				"en_US": "The location of root ca path. It can be an absolute path, or a relative path. ",
+				"zh_CN": "根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径。"
+			},
+			"label": {
+				"en_US": "Root Ca path",
+				"zh_CN": "根证书路径"
+			}
+		}, {
+			"name": "insecureSkipVerify",
+			"default": false,
+			"optional": true,
+			"control": "radio",
+			"type": "bool",
+			"hint": {
+				"en_US": "Control if to skip the certification verification. If it is set to true, then skip certification verification; Otherwise, verify the certification.",
+				"zh_CN": "控制是否跳过证书认证。如果被设置为 true,那么跳过证书认证;否则进行证书验证。"
+			},
+			"label": {
+				"en_US": "Skip Certification verification",
+				"zh_CN": "跳过证书验证"
+			}
+		}, {
 			"name": "kubeedgeVersion",
 			"default": "",
 			"optional": true,

+ 2 - 0
etc/mqtt_source.yaml

@@ -6,6 +6,8 @@ default:
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
   #privateKeyPath: /var/kuiper/xyz-private.pem.key
+  #rootCaPath: /var/kuiper/xyz-rootca.pem
+  #insecureSkipVerify: false
   #connectionSelector: mqtt.mqtt_conf1
   #kubeedgeVersion: 
   #kubeedgeModelFile: ""

+ 16 - 1
etc/sinks/mqtt.json

@@ -162,8 +162,23 @@
       }
     },
     {
+      "name": "rootCaPath",
+      "default": "",
+      "optional": true,
+      "control": "text",
+      "type": "string",
+      "hint": {
+        "en_US": "The location of root ca path. It can be an absolute path, or a relative path. ",
+        "zh_CN": "根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径。"
+      },
+      "label": {
+        "en_US": "Root Ca path",
+        "zh_CN": "根证书路径"
+      }
+    },
+    {
       "name": "insecureSkipVerify",
-      "default": true,
+      "default": false,
       "optional": true,
       "control": "radio",
       "type": "bool",

+ 14 - 0
etc/sources/httppull.json

@@ -117,6 +117,20 @@
 				"en_US": "Body type",
 				"zh_CN": "正文类型"
 			}
+		},  {
+			"name": "insecureSkipVerify",
+			"default": true,
+			"optional": true,
+			"control": "radio",
+			"type": "bool",
+			"hint": {
+				"en_US": "Control if to skip the certification verification. If it is set to true, then skip certification verification; Otherwise, verify the certification.",
+				"zh_CN": "控制是否跳过证书认证。如果被设置为 true,那么跳过证书认证;否则进行证书验证。"
+			},
+			"label": {
+				"en_US": "Skip Certification verification",
+				"zh_CN": "跳过证书验证"
+			}
 		}, {
 			"name": "headers",
 			"default": [{                                                 

+ 2 - 0
etc/sources/httppull.yaml

@@ -15,6 +15,8 @@ default:
   body: '{}'
   # Body type, none|text|json|html|xml|javascript|form
   bodyType: json
+  # Control if to skip the certification verification. If it is set to true, then skip certification verification; Otherwise, verify the certification
+  insecureSkipVerify: true
   # HTTP headers required for the request
   headers:
     Accept: application/json

+ 73 - 0
internal/pkg/cert/cert.go

@@ -0,0 +1,73 @@
+package cert
+
+import (
+	"crypto/tls"
+	"crypto/x509"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"io/ioutil"
+)
+
+type TlsConfigurationOptions struct {
+	SkipCertVerify bool
+	CertFile       string
+	KeyFile        string
+	CaFile         string
+}
+
+func GenerateTLSForClient(
+	Opts TlsConfigurationOptions) (*tls.Config, error) {
+
+	tlsConfig := &tls.Config{
+		InsecureSkipVerify: Opts.SkipCertVerify,
+	}
+
+	if len(Opts.CertFile) <= 0 && len(Opts.KeyFile) <= 0 {
+		tlsConfig.Certificates = nil
+	} else {
+		if cert, err := certLoader(Opts.CertFile, Opts.KeyFile); err != nil {
+			return nil, err
+		} else {
+			tlsConfig.Certificates = []tls.Certificate{cert}
+		}
+	}
+
+	if len(Opts.CaFile) > 0 {
+		root, err := caLoader(Opts.CaFile)
+		if err != nil {
+			return nil, err
+		}
+		tlsConfig.RootCAs = root
+	}
+
+	return tlsConfig, nil
+}
+
+func certLoader(certFilePath, keyFilePath string) (tls.Certificate, error) {
+	if cp, err := conf.ProcessPath(certFilePath); err == nil {
+		if kp, err1 := conf.ProcessPath(keyFilePath); err1 == nil {
+			if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
+				return tls.Certificate{}, err2
+			} else {
+				return cer, nil
+			}
+		} else {
+			return tls.Certificate{}, err1
+		}
+	} else {
+		return tls.Certificate{}, err
+	}
+}
+
+func caLoader(caFilePath string) (*x509.CertPool, error) {
+	if cp, err := conf.ProcessPath(caFilePath); err == nil {
+		pool := x509.NewCertPool()
+		caCrt, err1 := ioutil.ReadFile(cp)
+		if err1 != nil {
+			return nil, err1
+		}
+		pool.AppendCertsFromPEM(caCrt)
+		return pool, err1
+	} else {
+		return nil, err
+	}
+}

+ 70 - 0
internal/pkg/cert/cert_test.go

@@ -0,0 +1,70 @@
+package cert
+
+import (
+	"crypto/tls"
+	"reflect"
+	"testing"
+)
+
+func TestGenerateTLSForClient(t *testing.T) {
+	type args struct {
+		Opts TlsConfigurationOptions
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    *tls.Config
+		wantErr bool
+	}{
+		{
+			name: "do not set tls",
+			args: args{
+				Opts: TlsConfigurationOptions{
+					SkipCertVerify: true,
+					CertFile:       "",
+					KeyFile:        "",
+					CaFile:         "",
+				}},
+			want: &tls.Config{
+				InsecureSkipVerify: true,
+			},
+			wantErr: false,
+		},
+		{
+			name: "no cert/key",
+			args: args{
+				Opts: TlsConfigurationOptions{
+					SkipCertVerify: true,
+					CertFile:       "not_exist.crt",
+					KeyFile:        "not_exist.key",
+					CaFile:         "",
+				}},
+			want:    nil,
+			wantErr: true,
+		},
+		{
+			name: "no cert/key",
+			args: args{
+				Opts: TlsConfigurationOptions{
+					SkipCertVerify: true,
+					CertFile:       "",
+					KeyFile:        "",
+					CaFile:         "not_exist.crt",
+				}},
+			want:    nil,
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := GenerateTLSForClient(tt.args.Opts)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("GenerateTLSForClient() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("GenerateTLSForClient() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 22 - 27
internal/topo/connection/client_mqtt.go

@@ -6,6 +6,7 @@ import (
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	"github.com/google/uuid"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/cert"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"strings"
 )
@@ -24,6 +25,7 @@ type MQTTConnectionConfig struct {
 	Password           string   `json:"password"`
 	Certification      string   `json:"certificationPath"`
 	PrivateKPath       string   `json:"privateKeyPath"`
+	RootCaPath         string   `json:"rootCaPath"`
 	InsecureSkipVerify bool     `json:"insecureSkipVerify"`
 }
 
@@ -33,9 +35,7 @@ type MQTTClient struct {
 	pVersion uint
 	uName    string
 	password string
-	certPath string
-	pkeyPath string
-	Insecure bool
+	tls      *tls.Config
 
 	selector *ConSelector
 	conn     MQTT.Client
@@ -71,21 +71,22 @@ func (ms *MQTTClient) CfgValidate(props map[string]interface{}) error {
 		ms.pVersion = 4
 	}
 
-	if cfg.Certification != "" || cfg.PrivateKPath != "" {
-		ms.certPath, err = conf.ProcessPath(cfg.Certification)
-		if err != nil {
-			return fmt.Errorf("failed to get certPath for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
-		}
-
-		ms.pkeyPath, err = conf.ProcessPath(cfg.PrivateKPath)
-		if err != nil {
-			return fmt.Errorf("failed to get keyPath for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
-		}
+	tlsOpts := cert.TlsConfigurationOptions{
+		SkipCertVerify: cfg.InsecureSkipVerify,
+		CertFile:       cfg.Certification,
+		KeyFile:        cfg.PrivateKPath,
+		CaFile:         cfg.RootCaPath,
 	}
+	conf.Log.Infof("Connect MQTT broker with TLS configs: %v for connection selector: %s.", tlsOpts, ms.selector.ConnSelectorCfg)
+	tlscfg, err := cert.GenerateTLSForClient(tlsOpts)
+	if err != nil {
+		return err
+	}
+
+	ms.tls = tlscfg
 
 	ms.uName = cfg.Uname
 	ms.password = strings.Trim(cfg.Password, " ")
-	ms.Insecure = cfg.InsecureSkipVerify
 
 	return nil
 }
@@ -94,19 +95,13 @@ func (ms *MQTTClient) GetClient() (interface{}, error) {
 
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion).SetCleanSession(false)
 
-	if ms.certPath != "" && ms.pkeyPath != "" {
-		if cer, err := tls.LoadX509KeyPair(ms.certPath, ms.pkeyPath); err != nil {
-			return nil, fmt.Errorf("error when load cert/key for %s, the error is: %s", ms.selector.ConnSelectorCfg, err)
-		} else {
-			opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.Insecure})
-		}
-	} else {
-		if ms.uName != "" {
-			opts = opts.SetUsername(ms.uName)
-		}
-		if ms.password != "" {
-			opts = opts.SetPassword(ms.password)
-		}
+	opts = opts.SetTLSConfig(ms.tls)
+
+	if ms.uName != "" {
+		opts = opts.SetUsername(ms.uName)
+	}
+	if ms.password != "" {
+		opts = opts.SetPassword(ms.password)
 	}
 	opts = opts.SetClientID(ms.clientid)
 	opts = opts.SetAutoReconnect(true)

+ 1 - 1
internal/topo/node/node.go

@@ -206,7 +206,7 @@ func getSourceConf(ctx api.StreamContext, sourceType string, options *ast.Option
 			if def1, ok1 := def.(map[string]interface{}); ok1 {
 				props = def1
 			}
-			if c, ok := cfg[confkey]; ok {
+			if c, ok := cfg[strings.ToLower(confkey)]; ok {
 				if c1, ok := c.(map[string]interface{}); ok {
 					c2 := c1
 					for k, v := range c2 {

+ 25 - 10
internal/topo/source/httppull_source.go

@@ -16,6 +16,7 @@ package source
 
 import (
 	"crypto/md5"
+	"crypto/tls"
 	"encoding/hex"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
@@ -34,15 +35,16 @@ const DEFAULT_INTERVAL = 10000
 const DEFAULT_TIMEOUT = 5000
 
 type HTTPPullSource struct {
-	url           string
-	method        string
-	interval      int
-	timeout       int
-	incremental   bool
-	body          string
-	bodyType      string
-	headers       map[string]string
-	messageFormat string
+	url                string
+	method             string
+	interval           int
+	timeout            int
+	incremental        bool
+	body               string
+	bodyType           string
+	headers            map[string]string
+	messageFormat      string
+	insecureSkipVerify bool
 
 	client *http.Client
 }
@@ -126,6 +128,14 @@ func (hps *HTTPPullSource) Configure(device string, props map[string]interface{}
 		}
 	}
 
+	hps.insecureSkipVerify = true
+	if i, ok := props["incremental"]; ok {
+		if i1, ok1 := i.(bool); ok1 {
+			hps.insecureSkipVerify = i1
+		} else {
+			return fmt.Errorf("Not valid insecureSkipVerify value %v.", i1)
+		}
+	}
 	hps.headers = make(map[string]string)
 	if h, ok := props["headers"]; ok {
 		if h1, ok1 := h.(map[string]interface{}); ok1 {
@@ -149,8 +159,13 @@ func (hps *HTTPPullSource) Open(ctx api.StreamContext, consumer chan<- api.Sourc
 		errCh <- e
 		return
 	}
+	tr := &http.Transport{
+		TLSClientConfig: &tls.Config{InsecureSkipVerify: hps.insecureSkipVerify},
+	}
 
-	hps.client = &http.Client{Timeout: time.Duration(hps.timeout) * time.Millisecond}
+	hps.client = &http.Client{
+		Transport: tr,
+		Timeout:   time.Duration(hps.timeout) * time.Millisecond}
 	hps.initTimerPull(ctx, consumer, errCh)
 }
 

+ 40 - 44
internal/topo/source/mqtt_source.go

@@ -15,11 +15,11 @@
 package source
 
 import (
-	"crypto/tls"
 	"fmt"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	"github.com/google/uuid"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/cert"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/message"
@@ -29,18 +29,19 @@ import (
 )
 
 type MQTTSource struct {
-	srv      string
-	qos      int
-	format   string
-	tpc      string
-	clientid string
-	pVersion uint
-	uName    string
-	password string
-	certPath string
-	pkeyPath string
-	conSel   string
-	InSecure bool
+	srv        string
+	qos        int
+	format     string
+	tpc        string
+	clientid   string
+	pVersion   uint
+	uName      string
+	password   string
+	certPath   string
+	pkeyPath   string
+	rootCapath string
+	conSel     string
+	InSecure   bool
 
 	model  modelVersion
 	schema map[string]interface{}
@@ -57,6 +58,7 @@ type MQTTConfig struct {
 	Password           string   `json:"password"`
 	Certification      string   `json:"certificationPath"`
 	PrivateKPath       string   `json:"privateKeyPath"`
+	RootCaPath         string   `json:"rootCaPath"`
 	InsecureSkipVerify bool     `json:"insecureSkipVerify"`
 	KubeedgeModelFile  string   `json:"kubeedgeModelFile"`
 	KubeedgeVersion    string   `json:"kubeedgeVersion"`
@@ -95,6 +97,7 @@ func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) erro
 	ms.password = strings.Trim(cfg.Password, " ")
 	ms.certPath = cfg.Certification
 	ms.pkeyPath = cfg.PrivateKPath
+	ms.rootCapath = cfg.RootCaPath
 
 	if 0 != len(cfg.KubeedgeModelFile) {
 		p := path.Join("sources", cfg.KubeedgeModelFile)
@@ -134,39 +137,32 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl
 			opts = opts.SetClientID(ms.clientid)
 		}
 
-		if ms.certPath != "" || ms.pkeyPath != "" {
-			log.Infof("Connect MQTT broker with certification and keys.")
-			if cp, err := conf.ProcessPath(ms.certPath); err == nil {
-				log.Infof("The certification file is %s.", cp)
-				if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
-					log.Infof("The private key file is %s.", kp)
-					if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
-						errCh <- err2
-						return
-					} else {
-						opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.InSecure})
-					}
-				} else {
-					errCh <- err1
-					return
-				}
-			} else {
-				errCh <- err
-				return
-			}
+		tlsOpts := cert.TlsConfigurationOptions{
+			SkipCertVerify: ms.InSecure,
+			CertFile:       ms.certPath,
+			KeyFile:        ms.pkeyPath,
+			CaFile:         ms.rootCapath,
+		}
+		log.Infof("Connect MQTT broker with TLS configs. %v", tlsOpts)
+		tlscfg, err := cert.GenerateTLSForClient(tlsOpts)
+		if err != nil {
+			errCh <- err
+			return
+		}
+
+		opts = opts.SetTLSConfig(tlscfg)
+
+		log.Infof("Connect MQTT broker with username and password.")
+		if ms.uName != "" {
+			opts = opts.SetUsername(ms.uName)
 		} else {
-			log.Infof("Connect MQTT broker with username and password.")
-			if ms.uName != "" {
-				opts = opts.SetUsername(ms.uName)
-			} else {
-				log.Infof("The username is empty.")
-			}
+			log.Infof("The username is empty.")
+		}
 
-			if ms.password != "" {
-				opts = opts.SetPassword(ms.password)
-			} else {
-				log.Infof("The password is empty.")
-			}
+		if ms.password != "" {
+			opts = opts.SetPassword(ms.password)
+		} else {
+			log.Infof("The password is empty.")
 		}
 		opts.SetAutoReconnect(true)
 		var reconn = false