Просмотр исходного кода

feat(func): change compress implementation to `github.com/klauspost/compress` , add zstd compressor/decompressor

Signed-off-by: carlclone <906561974@qq.com>
carlclone 2 лет назад
Родитель
Сommit
1a6d9f4c36

+ 1 - 1
docs/en_US/guide/sinks/builtin/mqtt.md

@@ -16,7 +16,7 @@ The action is used for publish output message into an MQTT server.
 | rootCaPath         | true     | The location of root ca path. It can be an absolute path, or a relative path, which is similar to use of certificationPath.                                                                                                                                                                                                                               |
 | insecureSkipVerify | true     | If InsecureSkipVerify is `true`, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is `false`. The configuration item can only be used with TLS connections.                                                                   |
 | retained           | true     | If retained is `true`,The broker stores the last retained message and the corresponding QoS for that topic.The default value is `false`.                                                                                                                                                                                                                  |
-| compression        | true     | Compress the payload with the specified compression method. Support `zlib`, `gzip`, `flate` method now.                                                                                                                                                                                                                                                   |
+| compression        | true     | Compress the payload with the specified compression method. Support `zlib`, `gzip`, `flate`, `zstd` method now.                                                                                                                                                                                                                                           |
 | connectionSelector | true     | reuse the connection to mqtt broker. [more info](../../sources/builtin/mqtt.md#connectionselector)                                                                                                                                                                                                                                                        | 
 
 Other common sink properties are supported. Please refer to the [sink common properties](../overview.md#common-properties) for more information.

+ 1 - 1
docs/en_US/sqls/built-in_functions.md

@@ -177,7 +177,7 @@ When casting to datetime type, the supported column type and casting rule are:
 | compress   | compress(input, "zlib")   | Compress the input string or binary value with a compression method   |
 | decompress | decompress(input, "zlib") | Decompress the input string or binary value with a compression method |
 
-Currently, 'zlib', 'gzip' and 'flate' method are supported.
+Currently, 'zlib', 'gzip', 'flate' and 'zstd' method are supported.
 
 ## Analytic Functions
 

+ 1 - 1
docs/zh_CN/guide/sinks/builtin/mqtt.md

@@ -16,7 +16,7 @@
 | rootCaPath         | 是    | 根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。                                                                                                                         |
 | insecureSkipVerify | 是    | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。                                                                              |
 | retained           | 是    | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`                                                                                                                        |
-| compression        | true | 使用指定的压缩方法压缩 Payload。当前支持 zlib, gzip, flate  算法。                                                                                                                                           |
+| compression        | true | 使用指定的压缩方法压缩 Payload。当前支持 zlib, gzip, flate, zstd  算法。                                                                                                                                     |
 | connectionSelector | 是    | 重用到 MQTT Broker 的连接,详细信息,[请参考](../../sources/builtin/mqtt.md#connectionselector)                                                                                                          |
 
 其他通用的 sink 属性也支持,请参阅[公共属性](../overview.md#公共属性)。

+ 1 - 1
docs/zh_CN/sqls/built-in_functions.md

@@ -177,7 +177,7 @@ eKuiper 具有许多内置函数,可以对数据执行计算。
 | compress   | compress(input, "zlib")   | 压缩输入的字符串或二进制值。  |
 | decompress | decompress(input, "zlib") | 解压缩输入的字符串或二进制值。 |
 
-目前支持 'zlib', 'gzip' 和 'flate' 压缩算法。
+目前支持 'zlib', 'gzip', 'flate' 和 'zstd' 压缩算法。
 
 ## 分析函数
 

+ 2 - 2
etc/functions/internal.json

@@ -2953,7 +2953,7 @@
 				"optional": false,
 				"control": "select",
 				"type": "string",
-				"values": ["zlib","gzip","flate"],
+				"values": ["zlib","gzip","flate","zstd"],
 				"hint": {
 					"en_US": "The type of compression",
 					"zh_CN": "压缩方法"
@@ -3006,7 +3006,7 @@
 				"optional": false,
 				"control": "select",
 				"type": "string",
-				"values": ["zlib","gzip","flate"],
+				"values": ["zlib","gzip","flate","zstd"],
 				"hint": {
 					"en_US": "The type of decompression",
 					"zh_CN": "解压缩方法"

+ 2 - 1
etc/mqtt_source.json

@@ -235,7 +235,8 @@
 			"values": [
 				"zlib",
 				"gzip",
-				"flate"
+				"flate",
+				"zstd"
 			],
 			"hint": {
 				"en_US": "Decompress the MQTT payload with the specified compression method.",

+ 2 - 1
etc/sinks/mqtt.json

@@ -224,7 +224,8 @@
       "values": [
         "zlib",
         "gzip",
-        "flate"
+        "flate",
+        "zstd"
       ],
       "hint": {
         "en_US": "Compress the payload with the specified compression method.",

+ 1 - 0
go.mod

@@ -20,6 +20,7 @@ require (
 	github.com/gorilla/mux v1.8.0
 	github.com/jhump/protoreflect v1.15.0
 	github.com/keepeye/logrus-filename v0.0.0-20190711075016-ce01a4391dd1
+	github.com/klauspost/compress v1.15.11
 	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
 	github.com/mattn/go-sqlite3 v1.14.16
 	github.com/mitchellh/mapstructure v1.5.0

+ 1 - 0
go.sum

@@ -116,6 +116,7 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm
 github.com/keepeye/logrus-filename v0.0.0-20190711075016-ce01a4391dd1 h1:JL2rWnBX8jnbHHlLcLde3BBWs+jzqZvOmF+M3sXoNOE=
 github.com/keepeye/logrus-filename v0.0.0-20190711075016-ce01a4391dd1/go.mod h1:nNLjpEi4xVFB7358xLPpPscdvXP+pbhiHgSmjIur8z0=
 github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
+github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
 github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=

+ 3 - 3
internal/binder/function/funcs_stateful_test.go

@@ -49,13 +49,13 @@ func TestCompressExec(t *testing.T) {
 				"hello world",
 				"zlib",
 			},
-			result: []byte{120, 156, 202, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 4, 0, 0, 255, 255, 26, 11, 4, 93},
+			result: []byte{120, 156, 0, 11, 0, 244, 255, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 3, 0, 26, 11, 4, 93},
 		}, { // 2
 			args: []interface{}{
 				`{"name":"John Doe","age":30,"email":"john.doe@example.com"}`,
 				"zlib",
 			},
-			result: []byte{120, 156, 170, 86, 202, 75, 204, 77, 85, 178, 82, 242, 202, 207, 200, 83, 112, 201, 79, 85, 210, 81, 74, 76, 79, 85, 178, 50, 54, 208, 81, 74, 205, 77, 204, 204, 81, 178, 82, 202, 202, 207, 200, 211, 75, 201, 79, 117, 72, 173, 72, 204, 45, 200, 73, 213, 75, 206, 207, 85, 170, 5, 4, 0, 0, 255, 255, 32, 223, 19, 1},
+			result: []byte{120, 156, 0, 59, 0, 196, 255, 123, 34, 110, 97, 109, 101, 34, 58, 34, 74, 111, 104, 110, 32, 68, 111, 101, 34, 44, 34, 97, 103, 101, 34, 58, 51, 48, 44, 34, 101, 109, 97, 105, 108, 34, 58, 34, 106, 111, 104, 110, 46, 100, 111, 101, 64, 101, 120, 97, 109, 112, 108, 101, 46, 99, 111, 109, 34, 125, 3, 0, 32, 223, 19, 1},
 		}, { // 3
 			args: []interface{}{
 				`{"name":"John Doe","age":30,"email":"john.doe@example.com","address":{"street":"123 Main St","city":"Anytown","state":"CA","zip":"12345"},"phoneNumbers":[{"type":"home","number":"555-555-1234"},{"type":"work","number":"555-555-5678"}],"isActive":true}`,
@@ -67,7 +67,7 @@ func TestCompressExec(t *testing.T) {
 				`hello world`,
 				"zlib",
 			},
-			result: []byte{120, 156, 202, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 4, 0, 0, 255, 255, 26, 11, 4, 93},
+			result: []byte{120, 156, 0, 11, 0, 244, 255, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 3, 0, 26, 11, 4, 93},
 		},
 	}
 	for i, tt := range tests {

+ 3 - 0
internal/compressor/compressor.go

@@ -23,6 +23,7 @@ const (
 	ZLIB  = "zlib"
 	GZIP  = "gzip"
 	FLATE = "flate"
+	ZSTD  = "zstd"
 )
 
 func GetCompressor(name string) (message.Compressor, error) {
@@ -33,6 +34,8 @@ func GetCompressor(name string) (message.Compressor, error) {
 		return newGzipCompressor()
 	case FLATE:
 		return newFlateCompressor()
+	case ZSTD:
+		return newZstdCompressor()
 	default:
 		return nil, fmt.Errorf("unsupported compressor: %s", name)
 	}

+ 113 - 1
internal/compressor/compressor_test.go

@@ -1,10 +1,117 @@
+// Copyright 2023 carlclone@gmail.com.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package compressor
 
 import (
 	"bytes"
+	"fmt"
+	"io/ioutil"
 	"testing"
 )
 
+func BenchmarkCompressor(b *testing.B) {
+	compressors := []string{ZLIB, GZIP, FLATE, ZSTD}
+
+	data, err := ioutil.ReadFile("test.json")
+	if err != nil {
+		b.Fatalf("failed to read test file: %v", err)
+	}
+
+	for _, c := range compressors {
+		b.Run(c, func(b *testing.B) {
+			wc, err := GetCompressor(c)
+			firstCompressedData, err := wc.Compress(data)
+			if err != nil {
+				b.Fatal(err)
+			}
+
+			b.ResetTimer()
+			for i := 0; i < b.N; i++ {
+
+				compressedData, err := wc.Compress(data)
+				if err != nil {
+					b.Fatal(err)
+				}
+				if !bytes.Equal(firstCompressedData, compressedData) {
+					b.Errorf("decompressed data should be equal to input data: %s", c)
+				}
+			}
+		})
+	}
+}
+
+func BenchmarkDecompressor(b *testing.B) {
+	compressors := []string{ZLIB, GZIP, FLATE, ZSTD}
+
+	data, err := ioutil.ReadFile("test.json")
+	if err != nil {
+		b.Fatalf("failed to read test file: %v", err)
+	}
+
+	for _, c := range compressors {
+		wc, err := GetCompressor(c)
+		if err != nil {
+			b.Fatal(err)
+		}
+		compressedData, err := wc.Compress(data)
+		if err != nil {
+			b.Fatal(err)
+		}
+
+		b.Run(c, func(b *testing.B) {
+			de, err := GetDecompressor(c)
+			if err != nil {
+				b.Fatal(err)
+			}
+			b.ResetTimer()
+			for i := 0; i < b.N; i++ {
+				decompressedData, err := de.Decompress(compressedData)
+				if err != nil {
+					b.Fatal(err)
+				}
+
+				if !bytes.Equal(data, decompressedData) {
+					b.Errorf("decompressed data should be equal to input data: %s", c)
+				}
+			}
+		})
+	}
+}
+
+func TestCompressionRatio(t *testing.T) {
+	// Load JSON file
+	data, err := ioutil.ReadFile("test.json")
+	if err != nil {
+		t.Fatalf("failed to read test file: %v", err)
+	}
+
+	compressors := []string{ZLIB, GZIP, FLATE, ZSTD}
+
+	for _, c := range compressors {
+		wc, err := GetCompressor(c)
+		if err != nil {
+			t.Fatal(err)
+		}
+		compressed, err := wc.Compress(data)
+		if err != nil {
+			t.Fatal(err)
+		}
+		fmt.Printf("%s Compression ratio: %f\n", c, float64(len(data))/float64(len(compressed)))
+	}
+}
+
 func TestGetCompressor(t *testing.T) {
 	testCases := []struct {
 		name          string
@@ -27,6 +134,11 @@ func TestGetCompressor(t *testing.T) {
 			expectedError: false,
 		},
 		{
+			name:          "valid compressor zsstd",
+			compressor:    "zstd",
+			expectedError: false,
+		},
+		{
 			name:          "unsupported compressor",
 			compressor:    "invalid",
 			expectedError: true,
@@ -66,7 +178,7 @@ func TestCompressAndDecompress(t *testing.T) {
 
 	for _, tc := range testCases {
 		t.Run(tc.name, func(t *testing.T) {
-			for _, name := range []string{ZLIB, GZIP, FLATE} {
+			for _, name := range []string{ZLIB, GZIP, FLATE, ZSTD} {
 				compr, err := GetCompressor(name)
 				if err != nil {
 					t.Fatalf("get compressor failed: %v", err)

+ 2 - 0
internal/compressor/decompressor.go

@@ -27,6 +27,8 @@ func GetDecompressor(name string) (message.Decompressor, error) {
 		return newGzipDecompressor()
 	case FLATE:
 		return newFlateDecompressor()
+	case ZSTD:
+		return newzstdDecompressor()
 	default:
 		return nil, fmt.Errorf("unsupported decompressor: %s", name)
 	}

+ 15 - 1
internal/compressor/flate.go

@@ -1,9 +1,23 @@
+// Copyright 2023 carlclone@gmail.com.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package compressor
 
 import (
 	"bytes"
-	"compress/flate"
 	"fmt"
+	"github.com/klauspost/compress/flate"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"io"
 )

+ 15 - 1
internal/compressor/gzip.go

@@ -1,9 +1,23 @@
+// Copyright 2023 carlclone@gmail.com.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package compressor
 
 import (
 	"bytes"
-	"compress/gzip"
 	"fmt"
+	"github.com/klauspost/compress/gzip"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"io"
 )

Разница между файлами не показана из-за своего большого размера
+ 1 - 0
internal/compressor/test.json


+ 15 - 1
internal/compressor/zlib.go

@@ -1,9 +1,23 @@
+// Copyright 2023 carlclone@gmail.com.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package compressor
 
 import (
 	"bytes"
-	"compress/zlib"
 	"fmt"
+	"github.com/klauspost/compress/zlib"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"io"
 )

+ 65 - 0
internal/compressor/zstd.go

@@ -0,0 +1,65 @@
+// Copyright 2023 carlclone@gmail.com.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compressor
+
+import (
+	"bytes"
+	"github.com/klauspost/compress/zstd"
+)
+
+func newZstdCompressor() (*zstdCompressor, error) {
+	zstdWriter, err := zstd.NewWriter(nil)
+	if err != nil {
+		return nil, err
+	}
+	return &zstdCompressor{
+		writer: zstdWriter,
+	}, nil
+}
+
+type zstdCompressor struct {
+	writer *zstd.Encoder
+	buffer bytes.Buffer
+}
+
+func (g *zstdCompressor) Compress(data []byte) ([]byte, error) {
+	g.buffer.Reset()
+	g.writer.Reset(&g.buffer)
+	_, err := g.writer.Write(data)
+	if err != nil {
+		return nil, err
+	}
+	err = g.writer.Close()
+	if err != nil {
+		return nil, err
+	}
+	return g.buffer.Bytes(), nil
+}
+
+func newzstdDecompressor() (*zstdDecompressor, error) {
+	r, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
+	if err != nil {
+		return nil, err
+	}
+	return &zstdDecompressor{decoder: r}, nil
+}
+
+type zstdDecompressor struct {
+	decoder *zstd.Decoder
+}
+
+func (z *zstdDecompressor) Decompress(data []byte) ([]byte, error) {
+	return z.decoder.DecodeAll(data, nil)
+}