Browse Source

feat: support for more low-priority array funcs (#1947)

* support for more low-priority array funcs and test

Signed-off-by: wangxye <1031989637@qq.com>

* update duplicate function names

Signed-off-by: wangxye <1031989637@qq.com>

* update low-priority array test

Signed-off-by: wangxye <1031989637@qq.com>

* update low-priority array funcs

Signed-off-by: wangxye <1031989637@qq.com>

---------

Signed-off-by: wangxye <1031989637@qq.com>
Abyss 1 year ago
parent
commit
8353d8b5ea
2 changed files with 468 additions and 1 deletions
  1. 181 0
      internal/binder/function/funcs_array.go
  2. 287 1
      internal/binder/function/funcs_array_test.go

+ 181 - 0
internal/binder/function/funcs_array.go

@@ -17,6 +17,8 @@ package function
 import (
 	"fmt"
 	"math"
+	"math/rand"
+	"strings"
 
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -28,9 +30,14 @@ var (
 	errorArrayIndex                        = fmt.Errorf("index out of range")
 	errorArraySecondArgumentNotArrayError  = fmt.Errorf("second argument should be array of interface{}")
 	errorArrayFirstArgumentNotIntError     = fmt.Errorf("first argument should be int")
+	errorArrayFirstArgumentNotStringError  = fmt.Errorf("first argument should be string")
 	errorArraySecondArgumentNotIntError    = fmt.Errorf("second argument should be int")
+	errorArraySecondArgumentNotStringError = fmt.Errorf("second argument should be string")
 	errorArrayThirdArgumentNotIntError     = fmt.Errorf("third argument should be int")
+	errorArrayThirdArgumentNotStringError  = fmt.Errorf("third argument should be string")
 	errorArrayContainsNonNumOrBoolValError = fmt.Errorf("array contain elements that are not numeric or Boolean")
+	errorArrayNotArrayElementError         = fmt.Errorf("array elements should be array")
+	errorArrayNotStringElementError        = fmt.Errorf("array elements should be string")
 )
 
 func registerArrayFunc() {
@@ -420,4 +427,178 @@ func registerArrayFunc() {
 			return nil
 		},
 	}
+	builtins["array_cardinality"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			array, ok := args[0].([]interface{})
+			if !ok {
+				return errorArrayFirstArgumentNotArrayError, false
+			}
+			return len(array), true
+		},
+		val: func(ctx api.FunctionContext, args []ast.Expr) error {
+			return ValidateLen(1, len(args))
+		},
+	}
+	builtins["array_flatten"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			array, ok := args[0].([]interface{})
+			if !ok {
+				return errorArrayFirstArgumentNotArrayError, false
+			}
+
+			var output []interface{}
+
+			for _, val := range array {
+				innerArr, ok := val.([]interface{})
+				if !ok {
+					return errorArrayNotArrayElementError, false
+				}
+				output = append(output, innerArr...)
+			}
+
+			return output, true
+		},
+		val: func(ctx api.FunctionContext, args []ast.Expr) error {
+			return ValidateLen(1, len(args))
+		},
+	}
+	builtins["array_distinct"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			array, ok := args[0].([]interface{})
+			if !ok {
+				return errorArrayFirstArgumentNotArrayError, false
+			}
+
+			output := make([]interface{}, 0, len(array))
+			set := make(map[interface{}]bool)
+
+			for _, val := range array {
+				if !set[val] {
+					output = append(output, val)
+					set[val] = true
+				}
+			}
+
+			return output, true
+		},
+		val: func(ctx api.FunctionContext, args []ast.Expr) error {
+			return ValidateLen(1, len(args))
+		},
+	}
+	builtins["array_map"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			funcName, ok := args[0].(string)
+			if !ok {
+				return errorArrayFirstArgumentNotStringError, false
+			}
+
+			array, ok := args[1].([]interface{})
+			if !ok {
+				return errorArraySecondArgumentNotArrayError, false
+			}
+
+			mapped := make([]interface{}, 0, len(array))
+			var result interface{}
+			for _, v := range array {
+				params := []interface{}{v}
+				fs, ok := builtins[funcName]
+				if !ok {
+					return fmt.Errorf("unknown built-in function: %s.", funcName), false
+				}
+
+				if fs.fType != ast.FuncTypeScalar {
+					return fmt.Errorf("first argument should be a scalar function."), false
+				}
+				eargs := make([]ast.Expr, len(params))
+				if err := fs.val(nil, eargs); err != nil {
+					return fmt.Errorf("validate function arguments failed."), false
+				}
+
+				result, ok = fs.exec(ctx, params)
+				if !ok {
+					return result, false
+				}
+				mapped = append(mapped, result)
+			}
+
+			return mapped, true
+		},
+		val: func(ctx api.FunctionContext, args []ast.Expr) error {
+			return ValidateLen(2, len(args))
+		},
+	}
+	builtins["array_join"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			array, ok := args[0].([]interface{})
+			if !ok {
+				return errorArrayFirstArgumentNotArrayError, false
+			}
+
+			delimiter, ok := args[1].(string)
+			if !ok {
+				return errorArraySecondArgumentNotStringError, false
+			}
+
+			var nullReplacement string
+			if len(args) == 3 {
+				nullReplacement, ok = args[2].(string)
+				if !ok {
+					return errorArrayThirdArgumentNotStringError, false
+				}
+			}
+
+			var index int
+			for _, v := range array {
+				if v == nil {
+					if len(nullReplacement) != 0 {
+						array[index] = nullReplacement
+						index++
+					}
+				} else {
+					array[index], ok = v.(string)
+					index++
+					if !ok {
+						return errorArrayNotStringElementError, false
+					}
+				}
+			}
+
+			strs, err := cast.ToStringSlice(array[:index], cast.CONVERT_ALL)
+			if err != nil {
+				return err, false
+			}
+			return strings.Join(strs, delimiter), true
+		},
+		val: func(ctx api.FunctionContext, args []ast.Expr) error {
+			if err := ValidateLen(2, len(args)); err != nil {
+				if err := ValidateLen(3, len(args)); err != nil {
+					return fmt.Errorf("Expect two or three arguments but found %d.", len(args))
+				}
+			}
+			return nil
+		},
+	}
+	builtins["array_shuffle"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			array, ok := args[0].([]interface{})
+			if !ok {
+				return errorArrayFirstArgumentNotArrayError, false
+			}
+
+			rand.Shuffle(len(array), func(i, j int) {
+				array[i], array[j] = array[j], array[i]
+			})
+
+			return array, true
+		},
+		val: func(ctx api.FunctionContext, args []ast.Expr) error {
+			return ValidateLen(1, len(args))
+		},
+	}
 }

+ 287 - 1
internal/binder/function/funcs_array_test.go

@@ -25,7 +25,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
-func TestArrayFunctions(t *testing.T) {
+func TestArrayCommonFunctions(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "testExec")
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
@@ -452,6 +452,242 @@ func TestArrayFunctions(t *testing.T) {
 			},
 			result: []interface{}{10, 7, 4, 1},
 		},
+		{
+			name: "array_cardinality",
+			args: []interface{}{
+				[]interface{}{1, 2, 3},
+			},
+			result: 3,
+		},
+		{
+			name: "array_cardinality",
+			args: []interface{}{
+				1, 2, 3,
+			},
+			result: errorArrayFirstArgumentNotArrayError,
+		},
+		{
+			name: "array_flatten",
+			args: []interface{}{
+				[]interface{}{
+					[]interface{}{1, 2, 3},
+				},
+			},
+			result: []interface{}{1, 2, 3},
+		},
+		{
+			name: "array_flatten",
+			args: []interface{}{
+				1, 2,
+			},
+			result: errorArrayFirstArgumentNotArrayError,
+		},
+		{
+			name: "array_flatten",
+			args: []interface{}{
+				[]interface{}{1, 2, 3}, 4,
+			},
+			result: errorArrayNotArrayElementError,
+		},
+		{
+			name: "array_flatten",
+			args: []interface{}{
+				[]interface{}{
+					[]interface{}{1, 2, 3},
+					[]interface{}{4, 5, 6},
+				},
+			},
+			result: []interface{}{1, 2, 3, 4, 5, 6},
+		},
+		{
+			name: "array_distinct",
+			args: []interface{}{
+				[]interface{}{1, 2, 3},
+			},
+			result: []interface{}{1, 2, 3},
+		},
+		{
+			name: "array_distinct",
+			args: []interface{}{
+				1, 1,
+			},
+			result: errorArrayFirstArgumentNotArrayError,
+		},
+		{
+			name: "array_distinct",
+			args: []interface{}{
+				[]interface{}{1, 1, 1},
+			},
+			result: []interface{}{1},
+		},
+		{
+			name: "array_distinct",
+			args: []interface{}{
+				[]interface{}{1, 2, 2, 1},
+			},
+			result: []interface{}{1, 2},
+		},
+		{
+			name: "array_map",
+			args: []interface{}{
+				"round", []interface{}{0, 0.4, 1.2},
+			},
+			result: []interface{}{0.0, 0.0, 1.0},
+		},
+		{
+			name: "array_map",
+			args: []interface{}{
+				123, []interface{}{1, 2, 3},
+			},
+			result: errorArrayFirstArgumentNotStringError,
+		},
+		{
+			name: "array_map",
+			args: []interface{}{
+				"round", 1,
+			},
+			result: errorArraySecondArgumentNotArrayError,
+		},
+		{
+			name: "array_map",
+			args: []interface{}{
+				"abs", []interface{}{0, -0.4, 1.2},
+			},
+			result: []interface{}{0, 0.4, 1.2},
+		},
+		{
+			name: "array_map",
+			args: []interface{}{
+				"pow", []interface{}{0, -0.4, 1.2},
+			},
+			result: fmt.Errorf("unknown built-in function: pow."),
+		},
+		{
+			name: "array_map",
+			args: []interface{}{
+				"avg", []interface{}{0, -0.4, 1.2},
+			},
+			result: fmt.Errorf("first argument should be a scalar function."),
+		},
+		{
+			name: "array_map",
+			args: []interface{}{
+				"ceil", []interface{}{0, -1, 1.2},
+			},
+			result: []interface{}{0.0, -1.0, 2.0},
+		},
+		{
+			name: "array_map",
+			args: []interface{}{
+				"power", []interface{}{1, 2, 3},
+			},
+			result: fmt.Errorf("validate function arguments failed."),
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				"a", "",
+			},
+			result: errorArrayFirstArgumentNotArrayError,
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", "b", "c"}, 123, "a",
+			},
+			result: errorArraySecondArgumentNotStringError,
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", "b", "c"}, ":", 123,
+			},
+			result: errorArrayThirdArgumentNotStringError,
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{123, "b", "c"}, ":", "a",
+			},
+			result: errorArrayNotStringElementError,
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", "b", "c"}, "",
+			},
+			result: "abc",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", nil, "b"}, ":",
+			},
+			result: "a:b",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", "b", "c"}, ":",
+			},
+			result: "a:b:c",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", "b", "c"}, ":,%",
+			},
+			result: "a:,%b:,%c",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", nil, "c"}, ":", "nullReplacementStr",
+			},
+			result: "a:nullReplacementStr:c",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", nil, "c"}, ":", "nullReplacementStr",
+			},
+			result: "a:nullReplacementStr:c",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", "b", "c"}, ":", "a",
+			},
+			result: "a:b:c",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", "b", "c"}, ":",
+			},
+			result: "a:b:c",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{nil, nil, nil}, ",", "nullReplacementStr",
+			},
+			result: "nullReplacementStr,nullReplacementStr,nullReplacementStr",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{nil, nil, nil}, ",",
+			},
+			result: "",
+		},
+		{
+			name: "array_join",
+			args: []interface{}{
+				[]interface{}{"a", "b", nil}, ",",
+			},
+			result: "a,b",
+		},
 	}
 	for i, tt := range tests {
 		f, ok := builtins[tt.name]
@@ -464,3 +700,53 @@ func TestArrayFunctions(t *testing.T) {
 		}
 	}
 }
+
+func TestArrayShuffle(t *testing.T) {
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	tests := []struct {
+		name   string
+		args   []interface{}
+		result []interface{}
+	}{
+		{
+			name: "array_shuffle",
+			args: []interface{}{
+				[]interface{}{1, 2, 3},
+			},
+			result: []interface{}{
+				[]interface{}{1, 2, 3}, []interface{}{1, 3, 2}, []interface{}{2, 1, 3}, []interface{}{2, 3, 1}, []interface{}{3, 1, 2}, []interface{}{3, 2, 1},
+			},
+		},
+		{
+			name: "array_shuffle",
+			args: []interface{}{
+				1,
+			},
+			result: []interface{}{
+				errorArrayFirstArgumentNotArrayError,
+			},
+		},
+	}
+
+	for i, tt := range tests {
+		f, ok := builtins[tt.name]
+		if !ok {
+			t.Fatal(fmt.Sprintf("builtin %v not found", tt.name))
+		}
+		result, _ := f.exec(fctx, tt.args)
+		flag := false
+		for _, actual := range tt.result {
+			if reflect.DeepEqual(result, actual) {
+				flag = true
+				break
+			}
+		}
+
+		if !flag {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant in:\t%v", i, result, tt.result)
+		}
+	}
+}