Browse Source

feat: support projectset operation for tupleRow (#1814)

* add projectset node

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 2 years atrás
parent
commit
77983285b4

+ 8 - 1
internal/topo/node/operations.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -138,6 +138,13 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 				o.Broadcast(val)
 				stats.IncTotalExceptions(val.Error())
 				continue
+			case []interface{}:
+				stats.ProcessTimeEnd()
+				for _, v := range val {
+					o.Broadcast(v)
+					stats.IncTotalRecordsOut()
+				}
+				stats.SetBufferLength(int64(len(o.input)))
 			default:
 				stats.ProcessTimeEnd()
 				o.Broadcast(val)

+ 65 - 0
internal/topo/operator/projectset_operator.go

@@ -0,0 +1,65 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package operator
+
+import (
+	"fmt"
+
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+type ProjectSetOperator struct {
+	SrfMapping map[string]struct{}
+}
+
+func (ps *ProjectSetOperator) Apply(_ api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
+	// for now we only support 1 srf function in the field
+	srfName := ""
+	for k := range ps.SrfMapping {
+		srfName = k
+		break
+	}
+	switch input := data.(type) {
+	case error:
+		return []interface{}{input}
+	case xsql.TupleRow:
+		aValue, ok := input.Value(srfName, "")
+		if !ok {
+			return fmt.Errorf("can't find the result from the %v function", srfName)
+		}
+		aValues, ok := aValue.([]interface{})
+		if !ok {
+			return fmt.Errorf("the result from the %v function should be array", srfName)
+		}
+		newData := make([]interface{}, 0)
+		for _, v := range aValues {
+			newTupleRow := input.Clone()
+			// clear original column value
+			newTupleRow.Del(srfName)
+			if mv, ok := v.(map[string]interface{}); ok {
+				for k, v := range mv {
+					newTupleRow.Set(k, v)
+				}
+			} else {
+				newTupleRow.Set(srfName, v)
+			}
+			newData = append(newData, newTupleRow)
+		}
+		return newData
+	default:
+		return fmt.Errorf("run Select error: invalid input %[1]T(%[1]v)", input)
+	}
+}

+ 2 - 0
internal/topo/planner/planner.go

@@ -195,6 +195,8 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 		op = Transform(&operator.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options)
 	case *ProjectPlan:
 		op = Transform(&operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
+	case *ProjectSetPlan:
+		op = Transform(&operator.ProjectSetOperator{SrfMapping: t.SrfMapping}, fmt.Sprintf("%d_projectset", newIndex), options)
 	default:
 		return nil, 0, fmt.Errorf("unknown logical plan %v", t)
 	}

+ 12 - 0
internal/topo/topotest/mocknode/mock_data.go

@@ -1009,6 +1009,18 @@ var TestData = map[string][]*xsql.Tuple{
 				"arr": []int{1, 2, 3, 4, 5},
 				"x":   1,
 				"y":   2,
+				"arr2": []interface{}{
+					map[string]interface{}{
+						"a": 1,
+						"b": 2,
+					},
+					map[string]interface{}{
+						"a": 3,
+						"b": 4,
+					},
+				},
+				"a":    6,
+				"arr3": []interface{}{1, 2, 3},
 			},
 			Timestamp: 1541152487501,
 		},

+ 78 - 0
internal/topo/topotest/rule_test.go

@@ -620,6 +620,84 @@ func TestSingleSQL(t *testing.T) {
 			},
 		},
 		{
+			Name: `TestSingleSQLRule18`,
+			Sql:  `SELECT unnest(arr2) FROM demoArr`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"a": float64(1),
+						"b": float64(2),
+					},
+				},
+				{
+					{
+						"a": float64(3),
+						"b": float64(4),
+					},
+				},
+			},
+		},
+		// The mapping schema created by unnest function will cover the original column if they have the same column name
+		{
+			Name: `TestSingleSQLRule19`,
+			Sql:  `SELECT unnest(arr2),a FROM demoArr`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"a": float64(1),
+						"b": float64(2),
+					},
+				},
+				{
+					{
+						"a": float64(3),
+						"b": float64(4),
+					},
+				},
+			},
+		},
+		{
+			Name: `TestSingleSQLRule20`,
+			Sql:  `SELECT unnest(arr3) as col FROM demoArr`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"col": float64(1),
+					},
+				},
+				{
+					{
+						"col": float64(2),
+					},
+				},
+				{
+					{
+						"col": float64(3),
+					},
+				},
+			},
+		},
+		{
+			Name: `TestSingleSQLRule21`,
+			Sql:  `SELECT unnest(arr2),x FROM demoArr`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"a": float64(1),
+						"b": float64(2),
+						"x": float64(1),
+					},
+				},
+				{
+					{
+						"a": float64(3),
+						"b": float64(4),
+						"x": float64(1),
+					},
+				},
+			},
+		},
+		{
 			Name: `TestLagAlias`,
 			Sql:  "SELECT lag(size) as lastSize, lag(had_changed(true,size)), size, lastSize/size as changeRate FROM demo WHERE size > 2",
 			R: [][]map[string]interface{}{

+ 14 - 1
internal/xsql/row.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -46,6 +46,8 @@ type ReadonlyRow interface {
 
 type Row interface {
 	ReadonlyRow
+	// Del Only for some ops like functionOp * and Alias
+	Del(col string)
 	// Set Only for some ops like functionOp *
 	Set(col string, value interface{})
 	// ToMap converts the row to a map to export to other systems *
@@ -125,6 +127,17 @@ func (d *AffiliateRow) Set(col string, value interface{}) {
 	d.CalCols[col] = value
 }
 
+func (d *AffiliateRow) Del(col string) {
+	d.lock.Lock()
+	defer d.lock.Unlock()
+	if d.CalCols != nil {
+		delete(d.CalCols, col)
+	}
+	if d.AliasMap != nil {
+		delete(d.AliasMap, col)
+	}
+}
+
 func (d *AffiliateRow) Clone() AffiliateRow {
 	d.lock.RLock()
 	defer d.lock.RUnlock()

+ 2 - 2
internal/xsql/valuer.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -306,7 +306,7 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 								}
 							}
 						}
-					case ast.FuncTypeScalar:
+					case ast.FuncTypeScalar, ast.FuncTypeSrf:
 						args = make([]interface{}, len(expr.Args))
 						for i, arg := range expr.Args {
 							args[i] = v.Eval(arg)