Jelajahi Sumber

feat(sink): MQTT sink supports zlib compression

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 tahun lalu
induk
melakukan
9c722d71e9

+ 1 - 0
docs/en_US/guide/sinks/builtin/mqtt.md

@@ -16,6 +16,7 @@ The action is used for publish output message into an MQTT server.
 | rootCaPath         | true     | The location of root ca path. It can be an absolute path, or a relative path, which is similar to use of certificationPath.                                                                                                                                                                                                                               |
 | rootCaPath         | true     | The location of root ca path. It can be an absolute path, or a relative path, which is similar to use of certificationPath.                                                                                                                                                                                                                               |
 | insecureSkipVerify | true     | If InsecureSkipVerify is `true`, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is `false`. The configuration item can only be used with TLS connections.                                                                   |
 | insecureSkipVerify | true     | If InsecureSkipVerify is `true`, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is `false`. The configuration item can only be used with TLS connections.                                                                   |
 | retained           | true     | If retained is `true`,The broker stores the last retained message and the corresponding QoS for that topic.The default value is `false`.                                                                                                                                                                                                                  |
 | retained           | true     | If retained is `true`,The broker stores the last retained message and the corresponding QoS for that topic.The default value is `false`.                                                                                                                                                                                                                  |
+| compression        | true     | Compress the payload with the specified compression method. Only Support `zlib` method now.                                                                                                                                                                                                                                                               |
 | connectionSelector | true     | reuse the connection to mqtt broker. [more info](../../sources/builtin/mqtt.md#connectionselector)                                                                                                                                                                                                                                                        | 
 | connectionSelector | true     | reuse the connection to mqtt broker. [more info](../../sources/builtin/mqtt.md#connectionselector)                                                                                                                                                                                                                                                        | 
 
 
 Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.
 Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.

+ 1 - 0
docs/zh_CN/guide/sinks/builtin/mqtt.md

@@ -16,6 +16,7 @@
 | rootCaPath         | 是    | 根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。                                                                                                                         |
 | rootCaPath         | 是    | 根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。                                                                                                                         |
 | insecureSkipVerify | 是    | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。                                                                              |
 | insecureSkipVerify | 是    | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。                                                                              |
 | retained           | 是    | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`                                                                                                                        |
 | retained           | 是    | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`                                                                                                                        |
+| compression        | true | 使用指定的压缩方法压缩 Payload。当前仅支持 zlib 算法。                                                                                                                                                        |
 | connectionSelector | 是    | 重用到 MQTT Broker 的连接,详细信息,[请参考](../../sources/builtin/mqtt.md#connectionselector)                                                                                                          |
 | connectionSelector | 是    | 重用到 MQTT Broker 的连接,详细信息,[请参考](../../sources/builtin/mqtt.md#connectionselector)                                                                                                          |
 
 
 以下为使用 SAS 连接到 Azure IoT Hub 的样例。
 以下为使用 SAS 连接到 Azure IoT Hub 的样例。

+ 17 - 0
etc/sinks/mqtt.json

@@ -215,6 +215,23 @@
         "en_US": "Skip Certification verification",
         "en_US": "Skip Certification verification",
         "zh_CN": "跳过证书验证"
         "zh_CN": "跳过证书验证"
       }
       }
+    },
+    {
+      "name": "compression",
+      "optional": true,
+      "control": "select",
+      "type": "string",
+      "values": [
+        "zlib"
+      ],
+      "hint": {
+        "en_US": "Compress the payload with the specified compression method.",
+        "zh_CN": "使用指定的压缩方法压缩 Payload。"
+      },
+      "label": {
+        "en_US": "Compression",
+        "zh_CN": "压缩"
+      }
     }
     }
   ],
   ],
   "node": {
   "node": {

+ 36 - 32
internal/io/mqtt/mqtt_sink.go

@@ -16,19 +16,27 @@ package mqtt
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/compressor"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
+	"github.com/lf-edge/ekuiper/pkg/message"
 )
 )
 
 
-type MQTTSink struct {
-	tpc      string
-	qos      byte
-	retained bool
+// AdConf is the advanced configuration for the mqtt sink
+type AdConf struct {
+	Tpc         string `json:"topic"`
+	Qos         byte   `json:"qos"`
+	Retained    bool   `json:"retained"`
+	Compression string `json:"compression"`
+}
 
 
-	config map[string]interface{}
-	cli    api.MessageClient
+type MQTTSink struct {
+	adconf     *AdConf
+	config     map[string]interface{}
+	cli        api.MessageClient
+	compressor message.Compressor
 }
 }
 
 
 func (ms *MQTTSink) hasKeys(str []string, ps map[string]interface{}) bool {
 func (ms *MQTTSink) hasKeys(str []string, ps map[string]interface{}) bool {
@@ -41,33 +49,24 @@ func (ms *MQTTSink) hasKeys(str []string, ps map[string]interface{}) bool {
 }
 }
 
 
 func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
-	tpc, ok := ps["topic"]
-	if !ok {
+	adconf := &AdConf{}
+	cast.MapToStruct(ps, adconf)
+
+	if adconf.Tpc == "" {
 		return fmt.Errorf("mqtt sink is missing property topic")
 		return fmt.Errorf("mqtt sink is missing property topic")
 	}
 	}
-
-	var qos byte = 0
-	if qosRec, ok := ps["qos"]; ok {
-		if v, err := cast.ToInt(qosRec, cast.STRICT); err == nil {
-			qos = byte(v)
-		}
-		if qos != 0 && qos != 1 && qos != 2 {
-			return fmt.Errorf("not valid qos value %v, the value could be only int 0 or 1 or 2", qos)
-		}
+	if adconf.Qos != 0 && adconf.Qos != 1 && adconf.Qos != 2 {
+		return fmt.Errorf("invalid qos value %v, the value could be only int 0 or 1 or 2", adconf.Qos)
 	}
 	}
-
-	retained := false
-	if pk, ok := ps["retained"]; ok {
-		if v, ok := pk.(bool); ok {
-			retained = v
+	var err error
+	if adconf.Compression != "" {
+		ms.compressor, err = compressor.GetCompressor(adconf.Compression)
+		if err != nil {
+			return fmt.Errorf("invalid compression method %s", adconf.Compression)
 		}
 		}
 	}
 	}
-
 	ms.config = ps
 	ms.config = ps
-	ms.qos = qos
-	ms.tpc = tpc.(string)
-	ms.retained = retained
-
+	ms.adconf = adconf
 	return nil
 	return nil
 }
 }
 
 
@@ -78,7 +77,6 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 		log.Errorf("found error when get mqtt client config %v, error %s", ms.config, err.Error())
 		log.Errorf("found error when get mqtt client config %v, error %s", ms.config, err.Error())
 		return err
 		return err
 	}
 	}
-
 	ms.cli = cli
 	ms.cli = cli
 
 
 	return nil
 	return nil
@@ -90,16 +88,22 @@ func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-
 	logger.Debugf("%s publish %s", ctx.GetOpId(), jsonBytes)
 	logger.Debugf("%s publish %s", ctx.GetOpId(), jsonBytes)
-	tpc, err := ctx.ParseTemplate(ms.tpc, item)
+	if ms.compressor != nil {
+		jsonBytes, err = ms.compressor.Compress(jsonBytes)
+		if err != nil {
+			return err
+		}
+	}
+
+	tpc, err := ctx.ParseTemplate(ms.adconf.Tpc, item)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
 	para := map[string]interface{}{
 	para := map[string]interface{}{
-		"qos":      ms.qos,
-		"retained": ms.retained,
+		"qos":      ms.adconf.Qos,
+		"retained": ms.adconf.Retained,
 	}
 	}
 
 
 	if err := ms.cli.Publish(ctx, tpc, jsonBytes, para); err != nil {
 	if err := ms.cli.Publish(ctx, tpc, jsonBytes, para); err != nil {

+ 97 - 0
internal/io/mqtt/mqtt_sink_test.go

@@ -0,0 +1,97 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mqtt
+
+import (
+	"fmt"
+	"reflect"
+	"testing"
+)
+
+func TestSinkConfigure(t *testing.T) {
+	tests := []struct {
+		name           string
+		input          map[string]interface{}
+		expectedErr    error
+		expectedAdConf *AdConf
+	}{
+		{
+			name: "Missing topic",
+			input: map[string]interface{}{
+				"qos":         1,
+				"retained":    false,
+				"compression": "zlib",
+			},
+			expectedErr: fmt.Errorf("mqtt sink is missing property topic"),
+		},
+		{
+			name: "Invalid QoS",
+			input: map[string]interface{}{
+				"topic":       "testTopic",
+				"qos":         3,
+				"retained":    false,
+				"compression": "gzip",
+			},
+			expectedErr: fmt.Errorf("invalid qos value %v, the value could be only int 0 or 1 or 2", 3),
+		},
+		{
+			name: "Valid configuration with QoS 0 and no compression",
+			input: map[string]interface{}{
+				"topic":       "testTopic3",
+				"qos":         0,
+				"retained":    false,
+				"compression": "",
+			},
+			expectedErr: nil,
+			expectedAdConf: &AdConf{
+				Tpc:         "testTopic3",
+				Qos:         0,
+				Retained:    false,
+				Compression: "",
+			},
+		},
+		{
+			name: "Valid configuration with QoS 1 and no retained",
+			input: map[string]interface{}{
+				"topic":       "testTopic4",
+				"qos":         1,
+				"retained":    false,
+				"compression": "zlib",
+			},
+			expectedErr: nil,
+			expectedAdConf: &AdConf{
+				Tpc:         "testTopic4",
+				Qos:         1,
+				Retained:    false,
+				Compression: "zlib",
+			},
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ms := &MQTTSink{}
+			err := ms.Configure(tt.input)
+			if !reflect.DeepEqual(err, tt.expectedErr) {
+				t.Errorf("\n Expected error: \t%v\n \t\t\tgot: \t%v", tt.expectedErr, err)
+				return
+			}
+			if !reflect.DeepEqual(ms.adconf, tt.expectedAdConf) {
+				t.Errorf("\n Expected adConf: \t%v\n \t\t\tgot: \t%v", tt.expectedAdConf, ms.adconf)
+				return
+			}
+		})
+	}
+}