소스 검색

add source with cert & key

RockyJin 5 년 전
부모
커밋
711f97ad92
7개의 변경된 파일123개의 추가작업 그리고 49개의 파일을 삭제
  1. 10 0
      common/util.go
  2. 14 13
      docs/en_US/rules/sinks/mqtt.md
  3. 26 6
      docs/en_US/rules/sources/mqtt.md
  4. 5 2
      etc/mqtt_source.yaml
  5. 54 6
      xstream/extensions/mqtt_source.go
  6. 6 4
      xstream/extensions/mqtt_source.yaml
  7. 8 18
      xstream/sinks/mqtt_sink.go

+ 10 - 0
common/util.go

@@ -272,6 +272,16 @@ func GetTimer(duration int) Timer {
 	}
 }
 
+func ProcessPath(p string) (string, error) {
+	if abs, err := filepath.Abs(p); err != nil {
+		return "", nil
+	} else {
+		if _, err := os.Stat(abs); os.IsNotExist(err) {
+			return "", err;
+		}
+		return abs, nil
+	}
+}
 
 /****** For Test Only ********/
 func GetMockTicker() *MockTicker{

+ 14 - 13
docs/en_US/rules/sinks/mqtt.md

@@ -2,16 +2,17 @@
 
 The action is used for publish output message into a MQTT server. 
 
-| Property name      | Optional | Description                                                  |
-| ------------------ | -------- | ------------------------------------------------------------ |
-| server             | false    | The broker address of the mqtt server, such as ``tcp://127.0.0.1:1883`` |
-| topic              | false    | The mqtt topic, such as ``analysis/result``                  |
-| clientId           | true     | The client id for mqtt connection. If not specified, an uuid will be used |
-| protocol_version   | true     | 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4).  If not specified, the default value is 3.1. |
-| username           | true     | The user name for the connection.                            |
-| password           | true     | The password for the connection.                             |
-| certification_path | true     | The certification path. It can be an absolute path, or a relative path. If it is an relative path, then the base path is where you excuting the ``server`` command. For example, if you run ``cli/server`` from ``/var/kuiper``, then the base path is ``/var/kuiper``; If you run ``./server`` from ``/var/kuiper/bin``, then the base path is ````/var/kuiper/bin``. |
-| private_key_path   | true     | The private key path. It can be either absolute path, or relative path. For more detailed information, please refer to ``certification_path``. |
+| Property name     | Optional | Description                                                  |
+| ----------------- | -------- | ------------------------------------------------------------ |
+| server            | false    | The broker address of the mqtt server, such as ``tcp://127.0.0.1:1883`` |
+| topic             | false    | The mqtt topic, such as ``analysis/result``                  |
+| clientId          | true     | The client id for mqtt connection. If not specified, an uuid will be used |
+| protocolVersion   | true     | 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4).  If not specified, the default value is 3.1. |
+| qos               | true     | The QoS for message delivery.                                |
+| username          | true     | The user name for the connection.                            |
+| password          | true     | The password for the connection.                             |
+| certificationPath | true     | The certification path. It can be an absolute path, or a relative path. If it is an relative path, then the base path is where you excuting the ``server`` command. For example, if you run ``bin/server`` from ``/var/kuiper``, then the base path is ``/var/kuiper``; If you run ``./server`` from ``/var/kuiper/bin``, then the base path is ``/var/kuiper/bin``. |
+| privateKeyPath    | true     | The private key path. It can be either absolute path, or relative path. For more detailed information, please refer to ``certificationPath``. |
 
 Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.
 ```json
@@ -19,7 +20,7 @@ Below is sample configuration for connecting to Azure IoT Hub by using SAS authe
       "mqtt": {
         "server": "ssl://xyz.azure-devices.net:8883",
         "topic": "devices/demo_001/messages/events/",
-        "protocol_version": "3.1.1",
+        "protocolVersion": "3.1.1",
         "qos": 1,
         "clientId": "demo_001",
         "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
@@ -37,8 +38,8 @@ Below is another sample configuration for connecting to AWS IoT by using certifi
         "topic": "devices/result",
         "qos": 1,
         "clientId": "demo_001",
-        "certification_path": "keys/d3807d9fa5-certificate.pem",
-        "private_key_path": "keys/d3807d9fa5-private.pem.key"
+        "certificationPath": "keys/d3807d9fa5-certificate.pem",
+        "privateKeyPath": "keys/d3807d9fa5-private.pem.key"
       }
     }
 ```

+ 26 - 6
docs/en_US/rules/sources/mqtt.md

@@ -6,15 +6,19 @@ Kuiper provides built-in support for MQTT source stream, which can subscribe the
 #Global MQTT configurations
 default:
   qos: 1
-  sharedsubscription: true
+  sharedSubscription: true
   servers: [tcp://127.0.0.1:1883]
-  #TODO: Other global configurations
+  #username: user1
+  #password: password
+  #certificationPath: /var/kuiper/xyz-certificate.pem
+  #privateKeyPath: /var/kuiper/xyz-private.pem.key
 
 
 #Override the global configurations
-demo: #Conf_key
+demo_conf: #Conf_key
   qos: 0
-  servers: [tcp://10.211.55.6:1883]
+  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
+
 ```
 
 ## Global MQTT configurations
@@ -25,14 +29,30 @@ Use can specify the global MQTT settings here. The configuration items specified
 
 The default subscription QoS level.
 
-### sharedsubscription
+### sharedSubscription
 
-Whether use the shared subscription mode or not. If using the shared subscription mode, then if there are multiple Kuiper process can be load balanced.
+Whether use the shared subscription mode or not. If using the shared subscription mode, then there are multiple Kuiper process can be load balanced.
 
 ### servers
 
 The server list for MQTT message broker. Currently, only ``ONE`` server can be specified.
 
+### username
+
+The username for MQTT connection. The configuration will not be used if ``certificationPath`` or ``privateKeyPath`` is specified.
+
+### password
+
+The password for MQTT connection. The configuration will not be used if ``certificationPath`` or ``privateKeyPath`` is specified.
+
+### certificationPath
+
+The location of certification path. It can be an absolute path, or a relative path. If it is an relative path, then the base path is where you excuting the ``server`` command. For example, if you run ``bin/server`` from ``/var/kuiper``, then the base path is ``/var/kuiper``; If you run ``./server`` from ``/var/kuiper/bin``, then the base path is ``/var/kuiper/bin``.  Such as  ``d3807d9fa5-certificate.pem``.
+
+### privateKeyPath
+
+The location of private key path. It can be an absolute path, or a relative path.  For more detailed information, please refer to ``certificationPath``. Such as ``d3807d9fa5-private.pem.key``.
+
 ## Override the default settings
 
 If you have a specific connection that need to overwrite the default settings, you can create a customized section. In the previous sample, we create a specific setting named with ``demo``.  Then you can specify the configuration with option ``CONF_KEY`` when creating the stream definition (see [stream specs](../../sqls/streams.md) for more info).

+ 5 - 2
etc/mqtt_source.yaml

@@ -1,9 +1,12 @@
 #Global MQTT configurations
 default:
   qos: 1
-  sharedsubscription: true
+  sharedSubscription: true
   servers: [tcp://127.0.0.1:1883]
-  #TODO: Other global configurations
+  #username: user1
+  #password: password
+  #certificationPath: /var/kuiper/xyz-certificate.pem
+  #privateKeyPath: /var/kuiper/xyz-private.pem.key
 
 
 #Override the global configurations

+ 54 - 6
xstream/extensions/mqtt_source.go

@@ -1,6 +1,7 @@
 package extensions
 
 import (
+	"crypto/tls"
 	"encoding/json"
 	"engine/common"
 	"engine/xsql"
@@ -21,6 +22,8 @@ type MQTTSource struct {
 	pVersion uint
 	uName 	 string
 	password string
+	certPath string
+	pkeyPath string
 
 	schema   map[string]interface{}
 	conn MQTT.Client
@@ -29,12 +32,14 @@ type MQTTSource struct {
 
 type MQTTConfig struct {
 	Qos string `yaml:"qos"`
-	Sharedsubscription string `yaml:"sharedsubscription"`
+	Sharedsubscription string `yaml:"sharedSubscription"`
 	Servers []string `yaml:"servers"`
 	Clientid string `yaml:"clientid"`
 	PVersion string `yaml:"protocolVersion"`
 	Uname string `yaml:"username"`
 	Password string `yaml:"password"`
+	Certification string `yaml:"certificationPath"`
+	PrivateKPath string `yaml:"privateKeyPath"`
 }
 
 const confName string = "mqtt_source.yaml"
@@ -71,15 +76,36 @@ func NewMQTTSource(topic string, confKey string) (*MQTTSource, error) {
 		if pv == "3.1.1" {
 			pversion = 4
 		}
+	} else {
+		pv = cfg["default"].PVersion
+		if pv == "3.1.1" {
+			pversion = 4
+		}
 	}
 	ms.pVersion = pversion
 
 	if uname := cfg[confKey].Uname; uname != "" {
 		ms.uName = strings.Trim(uname, " ")
+	} else {
+		ms.uName = cfg["default"].Uname
 	}
 
 	if password := cfg[confKey].Password; password != "" {
 		ms.password = strings.Trim(password, " ")
+	} else {
+		ms.password = cfg["default"].Password
+	}
+
+	if cpath := cfg[confKey].Certification; cpath != "" {
+		ms.certPath = cpath
+	} else {
+		ms.certPath = cfg["default"].Certification
+	}
+
+	if pkpath := cfg[confKey].PrivateKPath; pkpath != "" {
+		ms.pkeyPath = pkpath
+	} else {
+		ms.pkeyPath = cfg["default"].PrivateKPath
 	}
 
 	return ms, nil
@@ -89,6 +115,7 @@ func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
 	return ms
 }
 
+
 func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error {
 	log := ctx.GetLogger()
 
@@ -102,12 +129,33 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error
 	} else {
 		opts.SetClientID(ms.clientid)
 	}
-	if ms.uName != "" {
-		opts.SetUsername(ms.uName)
-	}
 
-	if ms.password != "" {
-		opts.SetPassword(ms.password)
+	if ms.certPath != "" || ms.pkeyPath != "" {
+		log.Printf("Connect MQTT broker with certification and keys.")
+		if cp, err := common.ProcessPath(ms.certPath); err == nil {
+			log.Printf("The certification file is %s.", cp)
+			if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
+				log.Printf("The private key file is %s.", kp)
+				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
+					return err2
+				} else {
+					opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
+				}
+			} else {
+				return err1
+			}
+		} else {
+			return err
+		}
+	} else {
+		log.Printf("Connect MQTT broker with username and password.")
+		if ms.uName != "" {
+			opts = opts.SetUsername(ms.uName)
+		}
+
+		if ms.password != "" {
+			opts = opts.SetPassword(ms.password)
+		}
 	}
 
 	h := func(client MQTT.Client, msg MQTT.Message) {

+ 6 - 4
xstream/extensions/mqtt_source.yaml

@@ -1,13 +1,15 @@
 #Global MQTT configurations
 default:
   qos: 1
-  sharedsubscription: true
+  sharedSubscription: true
   servers: [tcp://127.0.0.1:1883]
-  clientid: xstream_client
-  #TODO: Other global configurations
+  #username: user1
+  #password: password
+  #certificationPath: /var/kuiper/xyz-certificate.pem
+  #privateKeyPath: /var/kuiper/xyz-private.pem.key
 
 
 #Override the global configurations
 demo_conf: #Conf_key
   qos: 0
-  servers: [tls://10.211.55.6:1883, tcp://127.0.0.1]
+  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]

+ 8 - 18
xstream/sinks/mqtt_sink.go

@@ -2,12 +2,12 @@ package sinks
 
 import (
 	"crypto/tls"
+	"engine/common"
 	"engine/xstream/api"
 	"fmt"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	"github.com/google/uuid"
-	"os"
-	"path/filepath"
+	"log"
 	"strings"
 )
 
@@ -46,7 +46,7 @@ func NewMqttSink(properties interface{}) (*MQTTSink, error) {
 		}
 	}
 	var pVersion uint = 3
-	pVersionStr, ok := ps["protocol_version"];
+	pVersionStr, ok := ps["protocolVersion"];
 	if ok {
 		v, _ := pVersionStr.(string)
 		if v == "3.1" {
@@ -77,14 +77,14 @@ func NewMqttSink(properties interface{}) (*MQTTSink, error) {
 	}
 
 	certPath := ""
-	if cp, ok := ps["certification_path"]; ok {
+	if cp, ok := ps["certificationPath"]; ok {
 		if v, ok := cp.(string); ok {
 			certPath = v
 		}
 	}
 
 	pKeyPath := ""
-	if pk, ok := ps["private_key_path"]; ok {
+	if pk, ok := ps["privateKeyPath"]; ok {
 		if v, ok := pk.(string); ok {
 			pKeyPath = v
 		}
@@ -94,26 +94,16 @@ func NewMqttSink(properties interface{}) (*MQTTSink, error) {
 	return ms, nil
 }
 
-func processPath(p string) (string, error) {
-	if abs, err := filepath.Abs(p); err != nil {
-		return "", nil
-	} else {
-		if _, err := os.Stat(abs); os.IsNotExist(err) {
-			return "", err;
-		}
-		return abs, nil
-	}
-}
+
 
 func (ms *MQTTSink) Open(ctx api.StreamContext) error {
-	log := ctx.GetLogger()
 	log.Printf("Opening mqtt sink for rule %s.", ctx.GetRuleId())
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
 
 	if ms.certPath != "" || ms.pkeyPath != "" {
 		log.Printf("Connect MQTT broker with certification and keys.")
-		if cp, err := processPath(ms.certPath); err == nil {
-			if kp, err1 := processPath(ms.pkeyPath); err1 == nil {
+		if cp, err := common.ProcessPath(ms.certPath); err == nil {
+			if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
 				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
 					return err2
 				} else {