浏览代码

MQTT sink dynamic configuration field "retained" & "qos"

726518972@qq.com 4 年之前
父节点
当前提交
9b0af6d6e3
共有 1 个文件被更改,包括 25 次插入1 次删除
  1. 25 1
      xstream/sinks/mqtt_sink.go

+ 25 - 1
xstream/sinks/mqtt_sink.go

@@ -15,12 +15,14 @@ type MQTTSink struct {
 	tpc      string
 	clientid string
 	pVersion uint
+	qos      byte
 	uName    string
 	password string
 	certPath string
 	pkeyPath string
 
 	insecureSkipVerify bool
+	retained           bool
 
 	conn MQTT.Client
 }
@@ -56,6 +58,19 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 		}
 	}
 
+	var qos byte = 0
+	if qos, ok := ps["qos"]; ok {
+		if v, ok := qos.(byte); ok {
+			qos = v
+		} else if v, ok := qos.(string); ok {
+			if v == "1" {
+				qos = 1
+			} else if v == "2" {
+				qos = 2
+			}
+		}
+	}
+
 	uName := ""
 	un, ok := ps["username"]
 	if ok {
@@ -95,15 +110,24 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 		}
 	}
 
+	retained := false
+	if pk, ok := ps["retained"]; ok {
+		if v, ok := pk.(bool); ok {
+			retained = v
+		}
+	}
+
 	ms.srv = srv.(string)
 	ms.tpc = tpc.(string)
 	ms.clientid = clientid.(string)
 	ms.pVersion = pVersion
+	ms.qos = qos
 	ms.uName = uName
 	ms.password = password
 	ms.certPath = certPath
 	ms.pkeyPath = pKeyPath
 	ms.insecureSkipVerify = insecureSkipVerify
+	ms.retained = retained
 
 	return nil
 }
@@ -167,7 +191,7 @@ func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	c := ms.conn
 	logger.Debugf("%s publish %s", ctx.GetOpId(), item)
-	if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
+	if token := c.Publish(ms.tpc, ms.qos, ms.retained, item); token.Wait() && token.Error() != nil {
 		return fmt.Errorf("publish error: %s", token.Error())
 	}
 	return nil