Browse Source

feat(func): add gzip, flate compressor/decompressor

Signed-off-by: carlclone <906561974@qq.com>
carlclone 2 years atrás
parent
commit
097dab97a7

+ 1 - 1
docs/en_US/guide/sinks/builtin/mqtt.md

@@ -16,7 +16,7 @@ The action is used for publish output message into an MQTT server.
 | rootCaPath         | true     | The location of root ca path. It can be an absolute path, or a relative path, which is similar to use of certificationPath.                                                                                                                                                                                                                               |
 | rootCaPath         | true     | The location of root ca path. It can be an absolute path, or a relative path, which is similar to use of certificationPath.                                                                                                                                                                                                                               |
 | insecureSkipVerify | true     | If InsecureSkipVerify is `true`, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is `false`. The configuration item can only be used with TLS connections.                                                                   |
 | insecureSkipVerify | true     | If InsecureSkipVerify is `true`, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is `false`. The configuration item can only be used with TLS connections.                                                                   |
 | retained           | true     | If retained is `true`,The broker stores the last retained message and the corresponding QoS for that topic.The default value is `false`.                                                                                                                                                                                                                  |
 | retained           | true     | If retained is `true`,The broker stores the last retained message and the corresponding QoS for that topic.The default value is `false`.                                                                                                                                                                                                                  |
-| compression        | true     | Compress the payload with the specified compression method. Only Support `zlib` method now.                                                                                                                                                                                                                                                               |
+| compression        | true     | Compress the payload with the specified compression method. Support `zlib`, `gzip`, `flate` method now.                                                                                                                                                                                                                                                               |
 | connectionSelector | true     | reuse the connection to mqtt broker. [more info](../../sources/builtin/mqtt.md#connectionselector)                                                                                                                                                                                                                                                        | 
 | connectionSelector | true     | reuse the connection to mqtt broker. [more info](../../sources/builtin/mqtt.md#connectionselector)                                                                                                                                                                                                                                                        | 
 
 
 Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.
 Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.

+ 1 - 1
docs/zh_CN/guide/sinks/builtin/mqtt.md

@@ -16,7 +16,7 @@
 | rootCaPath         | 是    | 根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。                                                                                                                         |
 | rootCaPath         | 是    | 根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。                                                                                                                         |
 | insecureSkipVerify | 是    | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。                                                                              |
 | insecureSkipVerify | 是    | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。                                                                              |
 | retained           | 是    | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`                                                                                                                        |
 | retained           | 是    | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`                                                                                                                        |
-| compression        | true | 使用指定的压缩方法压缩 Payload。当前支持 zlib 算法。                                                                                                                                                        |
+| compression        | true | 使用指定的压缩方法压缩 Payload。当前支持 zlib, gzip, flate  算法。                                                                                                                                                        |
 | connectionSelector | 是    | 重用到 MQTT Broker 的连接,详细信息,[请参考](../../sources/builtin/mqtt.md#connectionselector)                                                                                                          |
 | connectionSelector | 是    | 重用到 MQTT Broker 的连接,详细信息,[请参考](../../sources/builtin/mqtt.md#connectionselector)                                                                                                          |
 
 
 以下为使用 SAS 连接到 Azure IoT Hub 的样例。
 以下为使用 SAS 连接到 Azure IoT Hub 的样例。

+ 1 - 1
internal/binder/function/funcs_stateful_test.go

@@ -97,7 +97,7 @@ func TestDecompressExec(t *testing.T) {
 				"foo",
 				"foo",
 				"bar",
 				"bar",
 			},
 			},
-			result: fmt.Errorf("unsupported compressor: bar"),
+			result: fmt.Errorf("unsupported decompressor: bar"),
 		}, { // 1
 		}, { // 1
 			args: []interface{}{
 			args: []interface{}{
 				[]byte{120, 156, 202, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 4, 0, 0, 255, 255, 26, 11, 4, 93},
 				[]byte{120, 156, 202, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 4, 0, 0, 255, 255, 26, 11, 4, 93},

+ 12 - 25
internal/compressor/compressor.go

@@ -15,38 +15,25 @@
 package compressor
 package compressor
 
 
 import (
 import (
-	"bytes"
-	"compress/zlib"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
 )
 
 
+const (
+	ZLIB  = "zlib"
+	GZIP  = "gzip"
+	FLATE = "flate"
+)
+
 func GetCompressor(name string) (message.Compressor, error) {
 func GetCompressor(name string) (message.Compressor, error) {
 	switch name {
 	switch name {
-	case "zlib":
-		return &zlibCompressor{
-			writer: zlib.NewWriter(nil),
-		}, nil
+	case ZLIB:
+		return newZlibCompressor()
+	case GZIP:
+		return newGzipCompressor()
+	case FLATE:
+		return newFlateCompressor()
 	default:
 	default:
 		return nil, fmt.Errorf("unsupported compressor: %s", name)
 		return nil, fmt.Errorf("unsupported compressor: %s", name)
 	}
 	}
 }
 }
-
-type zlibCompressor struct {
-	writer *zlib.Writer
-	buffer bytes.Buffer
-}
-
-func (z *zlibCompressor) Compress(data []byte) ([]byte, error) {
-	z.buffer.Reset()
-	z.writer.Reset(&z.buffer)
-	_, err := z.writer.Write(data)
-	if err != nil {
-		return nil, err
-	}
-	err = z.writer.Close()
-	if err != nil {
-		return nil, err
-	}
-	return z.buffer.Bytes(), nil
-}

+ 96 - 0
internal/compressor/compressor_test.go

@@ -0,0 +1,96 @@
+package compressor
+
+import (
+	"bytes"
+	"testing"
+)
+
+func TestGetCompressor(t *testing.T) {
+	testCases := []struct {
+		name          string
+		compressor    string
+		expectedError bool
+	}{
+		{
+			name:          "valid compressor zlib",
+			compressor:    "zlib",
+			expectedError: false,
+		},
+		{
+			name:          "valid compressor gzip",
+			compressor:    "gzip",
+			expectedError: false,
+		},
+		{
+			name:          "valid compressor flate",
+			compressor:    "flate",
+			expectedError: false,
+		},
+		{
+			name:          "unsupported compressor",
+			compressor:    "invalid",
+			expectedError: true,
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			compr, err := GetCompressor(tc.compressor)
+			if tc.expectedError && err == nil {
+				t.Errorf("expected error but got nil")
+			}
+			if !tc.expectedError && err != nil {
+				t.Errorf("unexpected error: %v", err)
+			}
+			if !tc.expectedError && compr == nil {
+				t.Errorf("expected non-nil compressor but got nil")
+			}
+		})
+	}
+}
+
+func TestCompressAndDecompress(t *testing.T) {
+	testCases := []struct {
+		name      string
+		inputData []byte
+	}{
+		{
+			name:      "compress/decompress a simple string",
+			inputData: []byte("Hello, world!"),
+		},
+		{
+			name:      "compress/decompress a larger data set",
+			inputData: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 10000),
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			for _, name := range []string{ZLIB, GZIP, FLATE} {
+				compr, err := GetCompressor(name)
+				if err != nil {
+					t.Fatalf("get compressor failed: %v", err)
+				}
+				compressedData, err := compr.Compress(tc.inputData)
+				if err != nil {
+					t.Fatalf("unexpected error while compressing data: %v", err)
+				}
+				if len(compressedData) == 0 {
+					t.Error("compressed data should not be empty")
+				}
+
+				decompr, err := GetDecompressor(name)
+				if err != nil {
+					t.Fatalf("get decompressor failed: %v", err)
+				}
+				decompressedData, err := decompr.Decompress(compressedData)
+				if err != nil {
+					t.Fatalf("unexpected error while decompressing data: %v", err)
+				}
+				if !bytes.Equal(tc.inputData, decompressedData) {
+					t.Errorf("decompressed data should be equal to input data: %s", name)
+				}
+			}
+		})
+	}
+}

+ 7 - 33
internal/compressor/decompressor.go

@@ -15,45 +15,19 @@
 package compressor
 package compressor
 
 
 import (
 import (
-	"bytes"
-	"compress/zlib"
 	"fmt"
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"github.com/lf-edge/ekuiper/pkg/message"
-	"io"
 )
 )
 
 
 func GetDecompressor(name string) (message.Decompressor, error) {
 func GetDecompressor(name string) (message.Decompressor, error) {
 	switch name {
 	switch name {
-	case "zlib":
-		return &zlibDecompressor{}, nil
+	case ZLIB:
+		return newZlibDecompressor()
+	case GZIP:
+		return newGzipDecompressor()
+	case FLATE:
+		return newFlateDecompressor()
 	default:
 	default:
-		return nil, fmt.Errorf("unsupported compressor: %s", name)
+		return nil, fmt.Errorf("unsupported decompressor: %s", name)
 	}
 	}
 }
 }
-
-type zlibDecompressor struct {
-	reader io.ReadCloser
-}
-
-func (z *zlibDecompressor) Decompress(data []byte) ([]byte, error) {
-	if z.reader == nil {
-		r, err := zlib.NewReader(bytes.NewReader(data))
-		if err != nil {
-			return nil, fmt.Errorf("failed to decompress: %v", err)
-		}
-		z.reader = r
-	} else {
-		err := z.reader.(zlib.Resetter).Reset(bytes.NewReader(data), nil)
-		if err != nil {
-			return nil, fmt.Errorf("failed to decompress: %v", err)
-		}
-	}
-	defer func() {
-		err := z.reader.Close()
-		if err != nil {
-			conf.Log.Warnf("failed to close zlib decompressor: %v", err)
-		}
-	}()
-	return io.ReadAll(z.reader)
-}

+ 61 - 0
internal/compressor/flate.go

@@ -0,0 +1,61 @@
+package compressor
+
+import (
+	"bytes"
+	"compress/flate"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"io"
+)
+
+func newFlateCompressor() (*flateCompressor, error) {
+	flateWriter, err := flate.NewWriter(nil, flate.DefaultCompression)
+	if err != nil {
+		return nil, err
+	}
+	return &flateCompressor{
+		writer: flateWriter,
+	}, nil
+}
+
+type flateCompressor struct {
+	writer *flate.Writer
+	buffer bytes.Buffer
+}
+
+func (g *flateCompressor) Compress(data []byte) ([]byte, error) {
+	g.buffer.Reset()
+	g.writer.Reset(&g.buffer)
+	_, err := g.writer.Write(data)
+	if err != nil {
+		return nil, err
+	}
+	err = g.writer.Close()
+	if err != nil {
+		return nil, err
+	}
+	return g.buffer.Bytes(), nil
+}
+
+func newFlateDecompressor() (*flateDecompressor, error) {
+	return &flateDecompressor{reader: flate.NewReader(bytes.NewReader(nil))}, nil
+}
+
+type flateDecompressor struct {
+	reader io.ReadCloser
+}
+
+func (z *flateDecompressor) Decompress(data []byte) ([]byte, error) {
+	err := z.reader.(flate.Resetter).Reset(bytes.NewReader(data), nil)
+	if err != nil {
+		return nil, fmt.Errorf("failed to decompress: %v", err)
+	}
+
+	defer func() {
+		err := z.reader.Close()
+		if err != nil {
+			conf.Log.Warnf("failed to close flate decompressor: %v", err)
+		}
+	}()
+	return io.ReadAll(z.reader)
+}

+ 64 - 0
internal/compressor/gzip.go

@@ -0,0 +1,64 @@
+package compressor
+
+import (
+	"bytes"
+	"compress/gzip"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"io"
+)
+
+func newGzipCompressor() (*gzipCompressor, error) {
+	return &gzipCompressor{
+		writer: gzip.NewWriter(nil),
+	}, nil
+}
+
+type gzipCompressor struct {
+	writer *gzip.Writer
+	buffer bytes.Buffer
+}
+
+func (g *gzipCompressor) Compress(data []byte) ([]byte, error) {
+	g.buffer.Reset()
+	g.writer.Reset(&g.buffer)
+	_, err := g.writer.Write(data)
+	if err != nil {
+		return nil, err
+	}
+	err = g.writer.Close()
+	if err != nil {
+		return nil, err
+	}
+	return g.buffer.Bytes(), nil
+}
+
+func newGzipDecompressor() (*gzipDecompressor, error) {
+	return &gzipDecompressor{}, nil
+}
+
+type gzipDecompressor struct {
+	reader *gzip.Reader
+}
+
+func (z *gzipDecompressor) Decompress(data []byte) ([]byte, error) {
+	if z.reader == nil {
+		r, err := gzip.NewReader(bytes.NewReader(data))
+		if err != nil {
+			return nil, fmt.Errorf("failed to decompress: %v", err)
+		}
+		z.reader = r
+	} else {
+		err := z.reader.Reset(bytes.NewReader(data))
+		if err != nil {
+			return nil, fmt.Errorf("failed to decompress: %v", err)
+		}
+	}
+	defer func() {
+		err := z.reader.Close()
+		if err != nil {
+			conf.Log.Warnf("failed to close gzip decompressor: %v", err)
+		}
+	}()
+	return io.ReadAll(z.reader)
+}

+ 64 - 0
internal/compressor/zlib.go

@@ -0,0 +1,64 @@
+package compressor
+
+import (
+	"bytes"
+	"compress/zlib"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"io"
+)
+
+func newZlibCompressor() (*zlibCompressor, error) {
+	return &zlibCompressor{
+		writer: zlib.NewWriter(nil),
+	}, nil
+}
+
+type zlibCompressor struct {
+	writer *zlib.Writer
+	buffer bytes.Buffer
+}
+
+func (z *zlibCompressor) Compress(data []byte) ([]byte, error) {
+	z.buffer.Reset()
+	z.writer.Reset(&z.buffer)
+	_, err := z.writer.Write(data)
+	if err != nil {
+		return nil, err
+	}
+	err = z.writer.Close()
+	if err != nil {
+		return nil, err
+	}
+	return z.buffer.Bytes(), nil
+}
+
+func newZlibDecompressor() (*zlibDecompressor, error) {
+	return &zlibDecompressor{}, nil
+}
+
+type zlibDecompressor struct {
+	reader io.ReadCloser
+}
+
+func (z *zlibDecompressor) Decompress(data []byte) ([]byte, error) {
+	if z.reader == nil {
+		r, err := zlib.NewReader(bytes.NewReader(data))
+		if err != nil {
+			return nil, fmt.Errorf("failed to decompress: %v", err)
+		}
+		z.reader = r
+	} else {
+		err := z.reader.(zlib.Resetter).Reset(bytes.NewReader(data), nil)
+		if err != nil {
+			return nil, fmt.Errorf("failed to decompress: %v", err)
+		}
+	}
+	defer func() {
+		err := z.reader.Close()
+		if err != nil {
+			conf.Log.Warnf("failed to close zlib decompressor: %v", err)
+		}
+	}()
+	return io.ReadAll(z.reader)
+}