Explorar o código

feat(func): add merge_agg function (#2016)

* refactor(valuer): wildcard should not return internal type

Return map instead of xsql.Message

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* feat(func): add merge_agg func

An aggregate function to merge objects into one.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

---------

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying hai 1 ano
pai
achega
da6c741fba

+ 62 - 1
docs/en_US/sqls/functions/aggregate_functions.md

@@ -51,7 +51,7 @@ The sum of all the values in a group. The null values will be ignored.
 
 ```text
 collect(*)
-collect(col1, col2 ...)
+collect(col)
 ```
 
 Returns an array with all columns or the whole record (when the parameter is *) values from the group.
@@ -78,6 +78,67 @@ Examples:
     SELECT collect(*)[1]->a as r1 FROM test GROUP BY TumblingWindow(ss, 10)
     ```
 
+## MERGE_AGG
+
+```text
+merge_agg(*)
+merge_agg(col)
+```
+
+Concatenate values from the group into a single value.
+It concatenates multiple objects by generating an object containing the union of their keys,
+taking the second object's value when there are duplicate keys.
+It does not operate recursively; only the top-level object structure is merged.
+
+If the parameter is a column,
+the result will be an object containing the union of the keys of all the objects in the column.
+If the column contains only non-object values, the result will be an empty object.
+
+## Examples
+
+Given the following values in the group:
+
+```json lines
+{
+  "a": {
+    "a": 2
+  },
+  "b": 2,
+  "c": 3
+}
+{
+  "a": {
+    "b": 2
+  },
+  "b": 5,
+  "d": 6
+}
+{
+  "a": {
+    "a": 3
+  },
+  "b": 8
+}
+```
+
+* Concat wildcard, the result will be: `{"a": {"a": 3}, "b": 8, "c": 3, "d": 6}`
+
+    ```sql
+    SELECT merge_agg(*) as r1 FROM test GROUP BY TumblingWindow(ss, 10)
+    ```
+
+* Concat a specified object column, the result will be: `{"a": 3, "b": 2}`
+
+    ```sql
+    SELECT merge_agg(a) as r1 FROM test GROUP BY TumblingWindow(ss, 10)
+    ```
+
+* Concat a specified non-object column, the result will be: `{}`
+
+    ```sql
+    SELECT merge_agg(b) as r1 FROM test GROUP BY TumblingWindow(ss, 10)
+    ```
+
 ## DEDUPLICATE
 
 ```text

+ 39 - 0
docs/zh_CN/sqls/functions/aggregate_functions.md

@@ -75,6 +75,45 @@ collect(col)
     SELECT collect(*)[1]->a as r1 FROM test GROUP BY TumblingWindow(ss, 10)
     ```
 
+## MERGE_AGG
+
+```text
+merge_agg(*)
+merge_agg(col)
+```
+
+将组中的值合并为单个值。若存在重复键时取较后对象的值。它不进行递归操作;只合并顶级键值。
+
+如果参数是列,结果将是一个包含列中所有对象的键的联合的对象。如果列只包含非对象值,则结果将是一个空对象。
+
+## 范例
+
+假设组中的值如下所示:
+
+```json lines
+{"a": {"a": 2}, "b": 2, "c": 3}
+{"a": {"b": 2}, "b": 5, "d": 6}
+{"a": {"a": 3}, "b": 8}
+```
+
+* 合并 * 结果为: `{"a": {"a": 3}, "b": 8, "c": 3, "d": 6}`
+
+    ```sql
+    SELECT merge_agg(*) as r1 FROM test GROUP BY TumblingWindow(ss, 10)
+    ```
+
+* 合并对象列,结果为: `{"a": 3, "b": 2}`
+
+    ```sql
+    SELECT merge_agg(a) as r1 FROM test GROUP BY TumblingWindow(ss, 10)
+    ```
+
+* 合并非对象列: `{}`
+
+    ```sql
+    SELECT merge_agg(b) as r1 FROM test GROUP BY TumblingWindow(ss, 10)
+    ```
+
 ## DEDUPLICATE
 
 ```text

+ 21 - 1
internal/binder/function/funcs_agg.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -189,6 +189,26 @@ func registerAggFunc() {
 		},
 		val: ValidateOneArg,
 	}
+	builtins["merge_agg"] = builtinFunc{
+		fType: ast.FuncTypeAgg,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			data, ok := args[0].([]interface{})
+			if ok {
+				result := make(map[string]interface{})
+				for _, ele := range data {
+					if m, ok := ele.(map[string]interface{}); ok {
+						for k, v := range m {
+							result[k] = v
+						}
+					}
+				}
+				return result, true
+			}
+			return nil, true
+		},
+		val:   ValidateOneArg,
+		check: returnNilIfHasAnyNil,
+	}
 	builtins["deduplicate"] = builtinFunc{
 		fType: ast.FuncTypeAgg,
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {

+ 70 - 1
internal/binder/function/funcs_agg_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@ import (
 	"reflect"
 	"testing"
 
+	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
 	"github.com/lf-edge/ekuiper/internal/conf"
@@ -264,6 +265,70 @@ func TestPercentileExec(t *testing.T) {
 	}
 }
 
+func TestConcatExec(t *testing.T) {
+	fcon, ok := builtins["merge_agg"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	tests := []struct {
+		name   string
+		args   []interface{}
+		result map[string]interface{}
+	}{
+		{ // 0
+			name: "concat wildcard",
+			args: []interface{}{
+				[]interface{}{
+					map[string]interface{}{
+						"foo": "bar",
+						"a":   123,
+					},
+					map[string]interface{}{
+						"foo1": "bar",
+						"a":    243,
+					},
+					map[string]interface{}{
+						"foo": "bar1",
+						"a":   342,
+					},
+				},
+			},
+			result: map[string]interface{}{
+				"foo":  "bar1",
+				"a":    342,
+				"foo1": "bar",
+			},
+		}, { // 1
+			name: "concat int column",
+			args: []interface{}{
+				[]interface{}{
+					int64(100),
+					int64(150),
+					int64(200),
+				},
+			},
+			result: map[string]interface{}{},
+		}, { // 2
+			name: "concat empty",
+			args: []interface{}{
+				[]interface{}{},
+			},
+			result: map[string]interface{}{},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			r, ok := fcon.exec(fctx, tt.args)
+			assert.True(t, ok, "failed to execute concat")
+			assert.Equal(t, tt.result, r)
+		})
+	}
+}
+
 func TestAggFuncNil(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "testExec")
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
@@ -331,6 +396,10 @@ func TestAggFuncNil(t *testing.T) {
 			r, b := function.exec(fctx, []interface{}{nil})
 			require.True(t, b, fmt.Sprintf("%v failed", name))
 			require.Nil(t, r, fmt.Sprintf("%v failed", name))
+		case "merge_agg":
+			r, b := function.exec(fctx, []interface{}{nil})
+			require.True(t, b, fmt.Sprintf("%v failed", name))
+			require.Nil(t, r, fmt.Sprintf("%v failed", name))
 		default:
 			r, b := function.check([]interface{}{nil})
 			require.True(t, b, fmt.Sprintf("%v failed", name))

internal/binder/function/funcs_stateful.go → internal/binder/function/funcs_trans.go


internal/binder/function/funcs_stateful_test.go → internal/binder/function/funcs_trans_test.go


+ 1 - 1
internal/topo/operator/cols_func_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.

+ 4 - 4
internal/topo/operator/misc_func_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -938,17 +938,17 @@ func TestChangedFuncs_Apply1(t *testing.T) {
 				},
 			},
 			result: [][]map[string]interface{}{{{
-				"changed_col": xsql.Message{
+				"changed_col": map[string]interface{}{
 					"a": "a1",
 					"b": "b1",
 				},
 			}}, {{
-				"changed_col": xsql.Message{
+				"changed_col": map[string]interface{}{
 					"a": "a1",
 					"c": "c1",
 				},
 			}}, {{}}, {{
-				"changed_col": xsql.Message{
+				"changed_col": map[string]interface{}{
 					"a": "a1",
 					"b": "b2",
 					"c": "c2",

+ 5 - 5
internal/topo/operator/project_test.go

@@ -2036,7 +2036,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				WindowRange: xsql.NewWindowRange(1541152486013, 1541152487013),
 			},
 			result: []map[string]interface{}{{
-				"c1": xsql.Message{
+				"c1": map[string]interface{}{
 					"a": 27,
 				},
 			}},
@@ -2127,13 +2127,13 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			result: []map[string]interface{}{
 				{
 					"r1": []interface{}{
-						xsql.Message{"a": 122.33, "c": 2, "color": "w2", "id": 1, "r": 122},
-						xsql.Message{"a": 177.51, "color": "w2", "id": 5},
+						map[string]interface{}{"a": 122.33, "c": 2, "color": "w2", "id": 1, "r": 122},
+						map[string]interface{}{"a": 177.51, "color": "w2", "id": 5},
 					},
 				}, {
 					"r1": []interface{}{
-						xsql.Message{"a": 89.03, "c": 2, "color": "w1", "id": 2, "r": 89},
-						xsql.Message{"a": 14.6, "color": "w1", "id": 4},
+						map[string]interface{}{"a": 89.03, "c": 2, "color": "w1", "id": 2, "r": 89},
+						map[string]interface{}{"a": 14.6, "color": "w1", "id": 4},
 					},
 				},
 			},

+ 2 - 2
internal/xsql/collection.go

@@ -222,7 +222,7 @@ func (w *WindowTuples) Meta(key, table string) (interface{}, bool) {
 	return nil, false
 }
 
-func (w *WindowTuples) All(_ string) (Message, bool) {
+func (w *WindowTuples) All(_ string) (map[string]interface{}, bool) {
 	return w.ToMap(), true
 }
 
@@ -362,7 +362,7 @@ func (s *JoinTuples) Meta(key, table string) (interface{}, bool) {
 	return s.Content[0].Meta(key, table)
 }
 
-func (s *JoinTuples) All(_ string) (Message, bool) {
+func (s *JoinTuples) All(_ string) (map[string]interface{}, bool) {
 	return s.ToMap(), true
 }
 

+ 4 - 4
internal/xsql/row.go

@@ -31,7 +31,7 @@ import (
 
 type Wildcarder interface {
 	// All Value returns the value and existence flag for a given key.
-	All(stream string) (Message, bool)
+	All(stream string) (map[string]interface{}, bool)
 }
 
 type Event interface {
@@ -357,7 +357,7 @@ func (t *Tuple) Value(key, table string) (interface{}, bool) {
 	return t.Message.Value(key, table)
 }
 
-func (t *Tuple) All(string) (Message, bool) {
+func (t *Tuple) All(string) (map[string]interface{}, bool) {
 	return t.ToMap(), true
 }
 
@@ -498,7 +498,7 @@ func (jt *JoinTuple) Meta(key, table string) (interface{}, bool) {
 	return jt.doGetValue(key, table, false)
 }
 
-func (jt *JoinTuple) All(stream string) (Message, bool) {
+func (jt *JoinTuple) All(stream string) (map[string]interface{}, bool) {
 	if stream != "" {
 		for _, t := range jt.Tuples {
 			if t.GetEmitter() == stream {
@@ -580,7 +580,7 @@ func (s *GroupedTuples) Meta(key, table string) (interface{}, bool) {
 	return s.Content[0].Meta(key, table)
 }
 
-func (s *GroupedTuples) All(_ string) (Message, bool) {
+func (s *GroupedTuples) All(_ string) (map[string]interface{}, bool) {
 	return s.ToMap(), true
 }
 

+ 9 - 9
internal/xsql/row_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -37,12 +37,12 @@ func TestCollectionRow(t *testing.T) {
 			rowC:     &Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
 			value:    []string{"a", "b"},
 			wildcard: []string{""},
-			result:   []interface{}{1, "2", Message{"a": 1, "b": "2"}},
+			result:   []interface{}{1, "2", map[string]interface{}{"a": 1, "b": "2"}},
 		}, {
 			rowC:     &Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "c": 3}, AliasMap: map[string]interface{}{"b": "b1"}}},
 			value:    []string{"a", "b", "c"},
 			wildcard: []string{""},
-			result:   []interface{}{4, "b1", 3, Message{"a": 4, "b": "b1", "c": 3}},
+			result:   []interface{}{4, "b1", 3, map[string]interface{}{"a": 4, "b": "b1", "c": 3}},
 		}, {
 			rowC: &JoinTuple{Tuples: []TupleRow{
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
@@ -50,7 +50,7 @@ func TestCollectionRow(t *testing.T) {
 			}},
 			value:    []string{"a", "src2.a", "b", "c"},
 			wildcard: []string{"", "src1"},
-			result:   []interface{}{1, 2, "v1", "w2", Message{"a": 1, "b": "v1", "c": "w2"}, Message{"a": 1, "b": "v1"}},
+			result:   []interface{}{1, 2, "v1", "w2", map[string]interface{}{"a": 1, "b": "v1", "c": "w2"}, map[string]interface{}{"a": 1, "b": "v1"}},
 		}, {
 			rowC: &JoinTuple{Tuples: []TupleRow{
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
@@ -58,22 +58,22 @@ func TestCollectionRow(t *testing.T) {
 			}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "src2.a", "b", "c", "d"},
 			wildcard: []string{"", "src1"},
-			result:   []interface{}{4, 2, "v1", "w2", 4, Message{"a": 4, "b": "v1", "c": "w2", "d": 4}, Message{"a": 1, "b": "v1"}},
+			result:   []interface{}{4, 2, "v1", "w2", 4, map[string]interface{}{"a": 4, "b": "v1", "c": "w2", "d": 4}, map[string]interface{}{"a": 1, "b": "v1"}},
 		}, {
 			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}},
 			value:    []string{"a", "b"},
 			wildcard: []string{""},
-			result:   []interface{}{1, "v1", Message{"a": 1, "b": "v1"}},
+			result:   []interface{}{1, "v1", map[string]interface{}{"a": 1, "b": "v1"}},
 		}, {
 			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
-			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "d": 4}},
+			result:   []interface{}{4, "v1", 4, map[string]interface{}{"a": 4, "b": "v1", "d": 4}},
 		}, {
 			rowC:     &WindowTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
-			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "d": 4}},
+			result:   []interface{}{4, "v1", 4, map[string]interface{}{"a": 4, "b": "v1", "d": 4}},
 		}, {
 			rowC: &JoinTuples{Content: []*JoinTuple{{Tuples: []TupleRow{
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"b": "v2", "$$lag_a": 1}}},
@@ -81,7 +81,7 @@ func TestCollectionRow(t *testing.T) {
 			}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
-			result:   []interface{}{4, "v2", 4, Message{"a": 4, "b": "v2", "c": "w2", "d": 4}},
+			result:   []interface{}{4, "v2", 4, map[string]interface{}{"a": 4, "b": "v2", "c": "w2", "d": 4}},
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 2 - 2
internal/xsql/valuer.go

@@ -329,9 +329,9 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 							}
 							switch cf.Expr.(type) {
 							case *ast.Wildcard:
-								m, ok := temp.(Message)
+								m, ok := temp.(map[string]interface{})
 								if !ok {
-									return fmt.Errorf("wildcarder return non message result")
+									return fmt.Errorf("wildcarder return non map result")
 								}
 								for kk, vv := range m {
 									args = append(args, vv)