瀏覽代碼

feat(func): add decompressor func

1. Add decompressor func
2. Optimize cast to string implementation to support byte

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年之前
父節點
當前提交
49513897be

+ 8 - 13
internal/binder/function/funcs_misc.go

@@ -81,20 +81,11 @@ func registerMiscFunc() {
 						return fmt.Errorf("Not supported type conversion."), false
 					}
 				case "string":
-					if v1, ok1 := args[0].(int); ok1 {
-						return fmt.Sprintf("%d", v1), true
-					} else if v1, ok1 := args[0].(float64); ok1 {
-						return fmt.Sprintf("%g", v1), true
-					} else if v1, ok1 := args[0].(string); ok1 {
-						return v1, true
-					} else if v1, ok1 := args[0].(bool); ok1 {
-						if v1 {
-							return "true", true
-						} else {
-							return "false", true
-						}
+					r, e := cast.ToString(args[0], cast.CONVERT_ALL)
+					if e != nil {
+						return fmt.Errorf("Not supported type conversion, got error %v.", e), false
 					} else {
-						return fmt.Errorf("Not supported type conversion."), false
+						return r, true
 					}
 				case "boolean":
 					if v1, ok1 := args[0].(int); ok1 {
@@ -364,6 +355,10 @@ func registerMiscFunc() {
 		conf.Log.Infof("initializing compress function")
 		return &compressFunc{}
 	}
+	builtinStatfulFuncs["decompress"] = func() api.Function {
+		conf.Log.Infof("initializing decompress function")
+		return &decompressFunc{}
+	}
 	builtins["isnull"] = builtinFunc{
 		fType: ast.FuncTypeScalar,
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {

+ 47 - 4
internal/binder/function/funcs_stateful.go

@@ -73,9 +73,52 @@ func (c *compressFunc) IsAggregate() bool {
 	return false
 }
 
-func (c *compressFunc) Close(ctx api.StreamContext) error {
-	if c.compressor != nil {
-		return c.compressor.Close(ctx)
+type decompressFunc struct {
+	compressType string
+	decompressor message.Decompressor
+}
+
+func (d *decompressFunc) Validate(args []interface{}) error {
+	var eargs []ast.Expr
+	for _, arg := range args {
+		if t, ok := arg.(ast.Expr); ok {
+			eargs = append(eargs, t)
+		} else {
+			// should never happen
+			return fmt.Errorf("receive invalid arg %v", arg)
+		}
+	}
+	return ValidateTwoStrArg(nil, eargs)
+}
+
+func (d *decompressFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
+	if args[0] == nil {
+		return nil, true
 	}
-	return nil
+	arg0, err := cast.ToBytes(args[0], cast.CONVERT_SAMEKIND)
+	if err != nil {
+		return fmt.Errorf("require string or bytea parameter, but got %v", args[0]), false
+	}
+	arg1 := cast.ToStringAlways(args[1])
+	if d.decompressor != nil {
+		if d.compressType != arg1 {
+			return fmt.Errorf("decompress type must be consistent, previous %s, now %s", d.compressType, arg1), false
+		}
+	} else {
+		ctx.GetLogger().Infof("creating decompressor %s", arg1)
+		d.decompressor, err = compressor.GetDecompressor(arg1)
+		if err != nil {
+			return err, false
+		}
+		d.compressType = arg1
+	}
+	r, e := d.decompressor.Decompress(arg0)
+	if e != nil {
+		return e, false
+	}
+	return r, true
+}
+
+func (d *decompressFunc) IsAggregate() bool {
+	return false
 }

+ 54 - 0
internal/binder/function/funcs_stateful_test.go

@@ -77,3 +77,57 @@ func TestCompressExec(t *testing.T) {
 		}
 	}
 }
+
+func TestDecompressExec(t *testing.T) {
+	ff, ok := builtinStatfulFuncs["decompress"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	f := ff()
+	contextLogger := conf.Log.WithField("rule", "testDecompressExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+			},
+			result: fmt.Errorf("unsupported compressor: bar"),
+		}, { // 1
+			args: []interface{}{
+				[]byte{120, 156, 202, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 4, 0, 0, 255, 255, 26, 11, 4, 93},
+				"zlib",
+			},
+			result: []byte("hello world"),
+		}, { // 2
+			args: []interface{}{
+				[]byte{120, 156, 170, 86, 202, 75, 204, 77, 85, 178, 82, 242, 202, 207, 200, 83, 112, 201, 79, 85, 210, 81, 74, 76, 79, 85, 178, 50, 54, 208, 81, 74, 205, 77, 204, 204, 81, 178, 82, 202, 202, 207, 200, 211, 75, 201, 79, 117, 72, 173, 72, 204, 45, 200, 73, 213, 75, 206, 207, 85, 170, 5, 4, 0, 0, 255, 255, 32, 223, 19, 1},
+				"zlib",
+			},
+			result: []byte(`{"name":"John Doe","age":30,"email":"john.doe@example.com"}`),
+		}, { // 3
+			args: []interface{}{
+				`{"name":"John Doe","age":30,"email":"john.doe@example.com","address":{"street":"123 Main St","city":"Anytown","state":"CA","zip":"12345"},"phoneNumbers":[{"type":"home","number":"555-555-1234"},{"type":"work","number":"555-555-5678"}],"isActive":true}`,
+				"gzip",
+			},
+			result: fmt.Errorf("decompress type must be consistent, previous zlib, now gzip"),
+		}, { // 4
+			args: []interface{}{
+				[]byte{120, 156, 202, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 4, 0, 0, 255, 255, 26, 11, 4, 93},
+				"zlib",
+			},
+			result: []byte("hello world"),
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.Exec(tt.args, fctx)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}

+ 59 - 0
internal/compressor/decompressor.go

@@ -0,0 +1,59 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compressor
+
+import (
+	"bytes"
+	"compress/zlib"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/message"
+	"io"
+)
+
+func GetDecompressor(name string) (message.Decompressor, error) {
+	switch name {
+	case "zlib":
+		return &zlibDecompressor{}, nil
+	default:
+		return nil, fmt.Errorf("unsupported compressor: %s", name)
+	}
+}
+
+type zlibDecompressor struct {
+	reader io.ReadCloser
+}
+
+func (z *zlibDecompressor) Decompress(data []byte) ([]byte, error) {
+	if z.reader == nil {
+		r, err := zlib.NewReader(bytes.NewReader(data))
+		if err != nil {
+			return nil, fmt.Errorf("failed to decompress: %v", err)
+		}
+		z.reader = r
+	} else {
+		err := z.reader.(zlib.Resetter).Reset(bytes.NewReader(data), nil)
+		if err != nil {
+			return nil, fmt.Errorf("failed to decompress: %v", err)
+		}
+	}
+	defer func() {
+		err := z.reader.Close()
+		if err != nil {
+			conf.Log.Warnf("failed to close zlib decompressor: %v", err)
+		}
+	}()
+	return io.ReadAll(z.reader)
+}

+ 0 - 3
pkg/message/artifacts.go

@@ -14,8 +14,6 @@
 
 package message
 
-import "github.com/lf-edge/ekuiper/pkg/api"
-
 const (
 	FormatBinary    = "binary"
 	FormatJson      = "json"
@@ -53,7 +51,6 @@ type SchemaProvider interface {
 // Compressor compresses and decompresses bytes
 type Compressor interface {
 	Compress([]byte) ([]byte, error)
-	api.Closable
 }
 
 type Decompressor interface {