RockyJin 5 роки тому
батько
коміт
05b1db4c32
2 змінених файлів з 94 додано та 24 видалено
  1. 37 17
      docs/en_US/rules/sinks/mqtt.md
  2. 57 7
      xstream/sinks/mqtt_sink.go

+ 37 - 17
docs/en_US/rules/sinks/mqtt.md

@@ -2,24 +2,44 @@
 
 
 The action is used for publish output message into a MQTT server. 
 The action is used for publish output message into a MQTT server. 
 
 
-| Property name    | Optional | Description                                                  |
-| ---------------- | -------- | ------------------------------------------------------------ |
-| server           | false    | The broker address of the mqtt server, such as ``tcp://127.0.0.1:1883`` |
-| topic            | false    | The mqtt topic, such as ``analysis/result``                  |
-| clientId         | true     | The client id for mqtt connection. If not specified, an uuid will be used |
-| protocol_version | true     | 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4).  If not specified, the default value is 3.1. |
-| username         | true     | The user name for the connection.                        |
-| password         | true     | The password for the connection.                             |
+| Property name      | Optional | Description                                                  |
+| ------------------ | -------- | ------------------------------------------------------------ |
+| server             | false    | The broker address of the mqtt server, such as ``tcp://127.0.0.1:1883`` |
+| topic              | false    | The mqtt topic, such as ``analysis/result``                  |
+| clientId           | true     | The client id for mqtt connection. If not specified, an uuid will be used |
+| protocol_version   | true     | 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4).  If not specified, the default value is 3.1. |
+| username           | true     | The user name for the connection.                            |
+| password           | true     | The password for the connection.                             |
+| certification_path | true     | The certification path. It can be an absolute path, or a relative path. If it is an relative path, then the base path is where you excuting the ``server`` command. For example, if you run ``cli/server`` from ``/var/kuiper``, then the base path is ``/var/kuiper``; If you run ``./server`` from ``/var/kuiper/bin``, then the base path is ````/var/kuiper/bin``. |
+| private_key_path   | true     | The private key path. It can be either absolute path, or relative path. For more detailed information, please refer to ``certification_path``. |
 
 
-Below is one of the sample configuration.
+Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.
 ```json
 ```json
-{
-  "mqtt": {
-  	"server": "tcp://sink_server:1883",
-  	"topic": "demoSink",
-  	"clientId": "client_id_1",
-    "protocol_version": "3.1.1"
-  }
-}
+    {
+      "mqtt": {
+        "server": "ssl://xyz.azure-devices.net:8883",
+        "topic": "devices/demo_001/messages/events/",
+        "protocol_version": "3.1.1",
+        "qos": 1,
+        "clientId": "demo_001",
+        "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
+        "password": "SharedAccessSignature sr=*******************"
+      }
+    }
+```
+
+Below is another sample configuration for connecting to AWS IoT by using certification and privte key auth.
+
+```json
+    {
+      "mqtt": {
+        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
+        "topic": "devices/result",
+        "qos": 1,
+        "clientId": "demo_001",
+        "certification_path": "keys/d3807d9fa5-certificate.pem",
+        "private_key_path": "keys/d3807d9fa5-private.pem.key"
+      }
+    }
 ```
 ```
 
 

+ 57 - 7
xstream/sinks/mqtt_sink.go

@@ -1,10 +1,13 @@
 package sinks
 package sinks
 
 
 import (
 import (
+	"crypto/tls"
 	"engine/xstream/api"
 	"engine/xstream/api"
 	"fmt"
 	"fmt"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	"github.com/google/uuid"
 	"github.com/google/uuid"
+	"os"
+	"path/filepath"
 	"strings"
 	"strings"
 )
 )
 
 
@@ -15,6 +18,9 @@ type MQTTSink struct {
 	pVersion uint
 	pVersion uint
 	uName 	string
 	uName 	string
 	password string
 	password string
+	certPath string
+	pkeyPath string
+
 	conn MQTT.Client
 	conn MQTT.Client
 }
 }
 
 
@@ -70,20 +76,64 @@ func NewMqttSink(properties interface{}) (*MQTTSink, error) {
 		}
 		}
 	}
 	}
 
 
-	ms := &MQTTSink{srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string), pVersion:pVersion, uName:uName, password:password}
+	certPath := ""
+	if cp, ok := ps["certification_path"]; ok {
+		if v, ok := cp.(string); ok {
+			certPath = v
+		}
+	}
+
+	pKeyPath := ""
+	if pk, ok := ps["private_key_path"]; ok {
+		if v, ok := pk.(string); ok {
+			pKeyPath = v
+		}
+	}
+
+	ms := &MQTTSink{srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string), pVersion:pVersion, uName:uName, password:password, certPath:certPath, pkeyPath:pKeyPath}
 	return ms, nil
 	return ms, nil
 }
 }
 
 
+func processPath(p string) (string, error) {
+	if abs, err := filepath.Abs(p); err != nil {
+		return "", nil
+	} else {
+		if _, err := os.Stat(abs); os.IsNotExist(err) {
+			return "", err;
+		}
+		return abs, nil
+	}
+}
+
 func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
-	log.Printf("Opening mqtt sink for rule %s", ctx.GetRuleId())
+	log.Printf("Opening mqtt sink for rule %s.", ctx.GetRuleId())
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
-	if ms.uName != "" {
-		opts = opts.SetUsername(ms.uName)
-	}
 
 
-	if ms.password != "" {
-		opts = opts.SetPassword(ms.password)
+	if ms.certPath != "" || ms.pkeyPath != "" {
+		log.Printf("Connect MQTT broker with certification and keys.")
+		if cp, err := processPath(ms.certPath); err == nil {
+			if kp, err1 := processPath(ms.pkeyPath); err1 == nil {
+				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
+					return err2
+				} else {
+					opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
+				}
+			} else {
+				return err1
+			}
+		} else {
+			return err
+		}
+	} else {
+		log.Printf("Connect MQTT broker with username and password.")
+		if ms.uName != "" {
+			opts = opts.SetUsername(ms.uName)
+		}
+
+		if ms.password != "" {
+			opts = opts.SetPassword(ms.password)
+		}
 	}
 	}
 
 
 	c := MQTT.NewClient(opts)
 	c := MQTT.NewClient(opts)