浏览代码

Merge pull request #348 from smart33690/develop

MQTT sink dynamic configuration field "retained" & "qos"
jinfahua 4 年之前
父节点
当前提交
2b531a444f
共有 3 个文件被更改,包括 34 次插入6 次删除
  1. 6 3
      docs/en_US/rules/sinks/mqtt.md
  2. 6 2
      docs/zh_CN/rules/sinks/mqtt.md
  3. 22 1
      xstream/sinks/mqtt_sink.go

+ 6 - 3
docs/en_US/rules/sinks/mqtt.md

@@ -8,12 +8,13 @@ The action is used for publish output message into a MQTT server.
 | topic              | false    | The mqtt topic, such as ``analysis/result``                  |
 | clientId           | true     | The client id for mqtt connection. If not specified, an uuid will be used |
 | protocolVersion    | true     | 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4).  If not specified, the default value is 3.1. |
-| qos                | true     | The QoS for message delivery.                                |
+| qos                | true     | The QoS for message delivery. Only int type value 0 or 1 or 2.                                |
 | username           | true     | The user name for the connection.                            |
 | password           | true     | The password for the connection.                             |
 | certificationPath  | true     | The certification path. It can be an absolute path, or a relative path. If it is an relative path, then the base path is where you excuting the ``server`` command. For example, if you run ``bin/server`` from ``/var/kuiper``, then the base path is ``/var/kuiper``; If you run ``./server`` from ``/var/kuiper/bin``, then the base path is ``/var/kuiper/bin``. |
 | privateKeyPath     | true     | The private key path. It can be either absolute path, or relative path. For more detailed information, please refer to ``certificationPath``. |
 | insecureSkipVerify | true     | If InsecureSkipVerify is ``true``, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is ``false``. The configuration item can only be used with TLS connections. |
+| retained           | true     | If retained is ``true``,The broker stores the last retained message and the corresponding QoS for that topic.The default value is ``false``.
 
 Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.
 ```json
@@ -25,7 +26,8 @@ Below is sample configuration for connecting to Azure IoT Hub by using SAS authe
         "qos": 1,
         "clientId": "demo_001",
         "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
-        "password": "SharedAccessSignature sr=*******************"
+        "password": "SharedAccessSignature sr=*******************",
+        "retained": false
       }
     }
 ```
@@ -41,7 +43,8 @@ Below is another sample configuration for connecting to AWS IoT by using certifi
         "clientId": "demo_001",
         "certificationPath": "keys/d3807d9fa5-certificate.pem",
         "privateKeyPath": "keys/d3807d9fa5-private.pem.key", 
-        "insecureSkipVerify": false
+        "insecureSkipVerify": false,
+        "retained": false
       }
     }
 ```

+ 6 - 2
docs/zh_CN/rules/sinks/mqtt.md

@@ -13,6 +13,8 @@
 | password          | 是    | 连接密码                             |
 | certificationPath | 是    | 证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 `server` 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/server` ,那么父目录为 `/var/kuiper`; 如果运行从 `/var/kuiper/bin` 中运行`./server`,那么父目录为 `/var/kuiper/bin`。 |
 | privateKeyPath    | 是    | 私钥路径。可以为绝对路径,也可以为相对路径。更详细的信息,请参考 `certificationPath`. |
+| insecureSkipVerify | true     | 如果 InsecureSkipVerify 设置为 ``true``, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为``false``。配置项只能用于TLS连接。|
+| retained           | true     | 如果 retained 设置为 ``true``,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是``false``   
 
 以下为使用 SAS 连接到 Azure IoT Hub 的样例。
 ```json
@@ -24,7 +26,8 @@
         "qos": 1,
         "clientId": "demo_001",
         "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
-        "password": "SharedAccessSignature sr=*******************"
+        "password": "SharedAccessSignature sr=*******************",
+        "retained": false
       }
     }
 ```
@@ -39,7 +42,8 @@
         "qos": 1,
         "clientId": "demo_001",
         "certificationPath": "keys/d3807d9fa5-certificate.pem",
-        "privateKeyPath": "keys/d3807d9fa5-private.pem.key"
+        "privateKeyPath": "keys/d3807d9fa5-private.pem.key",
+        "retained": false
       }
     }
 ```

+ 22 - 1
xstream/sinks/mqtt_sink.go

@@ -15,12 +15,14 @@ type MQTTSink struct {
 	tpc      string
 	clientid string
 	pVersion uint
+	qos      byte
 	uName    string
 	password string
 	certPath string
 	pkeyPath string
 
 	insecureSkipVerify bool
+	retained           bool
 
 	conn MQTT.Client
 }
@@ -56,6 +58,16 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 		}
 	}
 
+	var qos byte = 0
+	if qosRec, ok := ps["qos"]; ok {
+		if v, ok := qosRec.(int); ok {
+			qos = byte(v)
+		}
+		if qos != 0 && qos != 1 && qos != 2 {
+			return fmt.Errorf("not valid qos value %v, the value could be only int 0 or 1 or 2", qos)
+		}
+	}
+
 	uName := ""
 	un, ok := ps["username"]
 	if ok {
@@ -95,15 +107,24 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 		}
 	}
 
+	retained := false
+	if pk, ok := ps["retained"]; ok {
+		if v, ok := pk.(bool); ok {
+			retained = v
+		}
+	}
+
 	ms.srv = srv.(string)
 	ms.tpc = tpc.(string)
 	ms.clientid = clientid.(string)
 	ms.pVersion = pVersion
+	ms.qos = qos
 	ms.uName = uName
 	ms.password = password
 	ms.certPath = certPath
 	ms.pkeyPath = pKeyPath
 	ms.insecureSkipVerify = insecureSkipVerify
+	ms.retained = retained
 
 	return nil
 }
@@ -167,7 +188,7 @@ func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	c := ms.conn
 	logger.Debugf("%s publish %s", ctx.GetOpId(), item)
-	if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
+	if token := c.Publish(ms.tpc, ms.qos, ms.retained, item); token.Wait() && token.Error() != nil {
 		return fmt.Errorf("publish error: %s", token.Error())
 	}
 	return nil