Selaa lähdekoodia

feat(topo): project op to return row/collection

In order to let it work as an intermediate node in D&D
- Row/Collection add function to convert to maps
- Row should support pick up a subset of fields
- Sink node now needs to convert Row/Collection to maps

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 vuotta sitten
vanhempi
commit
cfe33f5cf0

+ 7 - 2
internal/topo/node/node.go

@@ -88,15 +88,20 @@ func (o *defaultNode) GetMetrics() (result [][]interface{}) {
 }
 
 func (o *defaultNode) Broadcast(val interface{}) error {
+
 	switch d := val.(type) {
 	case error:
 		if !o.sendError {
 			return nil
 		}
 	case xsql.TupleRow:
-		val = d.Clone()
+		if len(o.outputs) > 1 {
+			val = d.Clone()
+		}
 	case xsql.Collection:
-		val = d.Clone()
+		if len(o.outputs) > 1 {
+			val = d.Clone()
+		}
 	}
 	if o.qos >= api.AtLeastOnce {
 		boe := &checkpoint.BufferOrEvent{

+ 12 - 2
internal/topo/node/sink_node.go

@@ -22,6 +22,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/node/cache"
 	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
+	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
@@ -224,7 +225,6 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 							}
 
 						}
-						return nil
 					})
 					if panicOrError != nil {
 						infra.DrainError(ctx, panicOrError, result)
@@ -294,8 +294,18 @@ func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats met
 		outs = []map[string]interface{}{
 			{"error": val.Error()},
 		}
-	case []map[string]interface{}:
+		break
+	case xsql.Collection: // The order is important here, because some element is both a collection and a row, such as WindowTuples, JoinTuples, etc.
+		outs = val.ToMaps()
+		break
+	case xsql.Row:
+		outs = []map[string]interface{}{
+			val.ToMap(),
+		}
+		break
+	case []map[string]interface{}: // for test only
 		outs = val
+		break
 	default:
 		outs = []map[string]interface{}{
 			{"error": fmt.Sprintf("result is not a map slice but found %#v", val)},

+ 1 - 1
internal/topo/operator/aggregate_operator.go

@@ -40,7 +40,7 @@ func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fu
 		case xsql.SingleCollection:
 			wr := input.GetWindowRange()
 			result := make(map[string]*xsql.GroupedTuples)
-			err := input.Range(func(i int, ir xsql.Row) (bool, error) {
+			err := input.Range(func(i int, ir xsql.ReadonlyRow) (bool, error) {
 				var name string
 				tr := ir.(xsql.TupleRow)
 				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tr, &xsql.WindowRangeValuer{WindowRange: wr}, fv)}

+ 7 - 5
internal/topo/operator/cols_func_test.go

@@ -133,16 +133,18 @@ func TestChangedColsFunc_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectOp{Fields: stmt.Fields}
+		pp := &ProjectOp{}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(ctx)
 		r := make([][]map[string]interface{}, 0, len(tt.data))
 		for _, d := range tt.data {
-			result := pp.Apply(ctx, d, fv, afv)
-			if e, ok := result.(error); ok {
-				t.Errorf("apply sql %s error %v", tt.sql, e)
+			opResult := pp.Apply(ctx, d, fv, afv)
+			result, err := parseResult(opResult, pp.IsAggregate)
+			if err != nil {
+				t.Errorf("apply sql %s error %v", tt.sql, err)
 				continue
 			}
-			r = append(r, result.([]map[string]interface{}))
+			r = append(r, result)
 		}
 		if !reflect.DeepEqual(tt.result, r) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)

+ 1 - 1
internal/topo/operator/filter_operator.go

@@ -52,7 +52,7 @@ func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 		}
 	case xsql.SingleCollection:
 		var sel []int
-		err := input.Range(func(i int, r xsql.Row) (bool, error) {
+		err := input.Range(func(i int, r xsql.ReadonlyRow) (bool, error) {
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(r, fv)}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {

+ 8 - 2
internal/topo/operator/math_func_test.go

@@ -477,9 +477,15 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 			t.Errorf("%d: found error %q", i, err)
 			continue
 		}
-		pp := &ProjectOp{Fields: stmt.Fields}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		result, err := parseResult(opResult, pp.IsAggregate)
+		if err != nil {
+			t.Errorf("parse result error: %s", err)
+			continue
+		}
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 45 - 22
internal/topo/operator/misc_func_test.go

@@ -255,9 +255,15 @@ func TestMiscFunc_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectOp{Fields: stmt.Fields}
+		pp := &ProjectOp{}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		result, err := parseResult(opResult, pp.IsAggregate)
+		if err != nil {
+			t.Errorf("parse result error: %s", err)
+			continue
+		}
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
@@ -297,9 +303,15 @@ func TestMqttFunc_Apply2(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectOp{Fields: stmt.Fields}
+		pp := &ProjectOp{}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		result, err := parseResult(opResult, pp.IsAggregate)
+		if err != nil {
+			t.Errorf("parse result error: %s", err)
+			continue
+		}
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
@@ -401,9 +413,15 @@ func TestMetaFunc_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectOp{Fields: stmt.Fields}
+		pp := &ProjectOp{}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		result, err := parseResult(opResult, pp.IsAggregate)
+		if err != nil {
+			t.Errorf("parse result error: %s", err)
+			continue
+		}
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
@@ -801,11 +819,18 @@ func TestJsonPathFunc_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectOp{Fields: stmt.Fields}
+		pp := &ProjectOp{}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(ctx)
-		result := pp.Apply(ctx, tt.data, fv, afv)
-		switch rt := result.(type) {
-		case []map[string]interface{}:
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		if rt, ok := opResult.(error); ok {
+			if tt.err == "" {
+				t.Errorf("%d: got error:\n  exp=%s\n  got=%s\n\n", i, tt.result, rt)
+			} else if !reflect.DeepEqual(tt.err, testx.Errstring(rt)) {
+				t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, rt)
+			}
+		} else {
+			result, _ := parseResult(opResult, pp.IsAggregate)
 			if tt.err == "" {
 				if !reflect.DeepEqual(tt.result, result) {
 					t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
@@ -813,16 +838,7 @@ func TestJsonPathFunc_Apply1(t *testing.T) {
 			} else {
 				t.Errorf("%d: invalid result:\n  exp error %s\n  got=%s\n\n", i, tt.err, result)
 			}
-		case error:
-			if tt.err == "" {
-				t.Errorf("%d: got error:\n  exp=%s\n  got=%s\n\n", i, tt.result, rt)
-			} else if !reflect.DeepEqual(tt.err, testx.Errstring(rt)) {
-				t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, rt)
-			}
-		default:
-			t.Errorf("%d: Invalid returned result found %v", i, result)
 		}
-
 	}
 }
 
@@ -938,13 +954,20 @@ func TestChangedFuncs_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectOp{Fields: stmt.Fields}
+		pp := &ProjectOp{}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(ctx)
 		r := make([][]map[string]interface{}, 0, len(tt.data))
 		for _, d := range tt.data {
-			result := pp.Apply(ctx, d, fv, afv)
-			r = append(r, result.([]map[string]interface{}))
+			opResult := pp.Apply(ctx, d, fv, afv)
+			result, err := parseResult(opResult, pp.IsAggregate)
+			if err != nil {
+				t.Errorf("parse result error: %s", err)
+				continue
+			}
+			r = append(r, result)
 		}
+
 		if !reflect.DeepEqual(tt.result, r) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
 		}

+ 52 - 57
internal/topo/operator/project_operator.go

@@ -24,9 +24,19 @@ import (
 )
 
 type ProjectOp struct {
-	Fields      ast.Fields
-	IsAggregate bool
-	SendMeta    bool
+	ColNames         [][]string // list of [col, table]
+	AliasNames       []string   // list of alias name
+	ExprNames        []string   // list of expr name
+	AllWildcard      bool
+	WildcardEmitters map[string]bool
+	AliasFields      ast.Fields
+	ExprFields       ast.Fields
+	IsAggregate      bool
+
+	SendMeta bool
+
+	kvs   []interface{}
+	alias []interface{}
 }
 
 // Apply
@@ -35,43 +45,38 @@ type ProjectOp struct {
 func (pp *ProjectOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("project plan receive %s", data)
-	var results []map[string]interface{}
 	switch input := data.(type) {
 	case error:
 		return input
 	case *xsql.Tuple:
 		ve := pp.getVE(input, input, nil, fv, afv)
-		if r, err := project(pp.Fields, ve); err != nil {
+		if err := pp.project(input, ve); err != nil {
 			return fmt.Errorf("run Select error: %s", err)
 		} else {
 			if pp.SendMeta && input.Metadata != nil {
-				r[message.MetaKey] = input.Metadata
+				input.Set(message.MetaKey, input.Metadata)
 			}
-			results = append(results, r)
 		}
 	case xsql.SingleCollection:
 		var err error
 		if pp.IsAggregate {
+			input.SetIsAgg(true)
 			err = input.GroupRange(func(_ int, aggRow xsql.CollectionRow) (bool, error) {
 				ve := pp.getVE(aggRow, aggRow, input.GetWindowRange(), fv, afv)
-				if r, err := project(pp.Fields, ve); err != nil {
+				if err := pp.project(aggRow, ve); err != nil {
 					return false, fmt.Errorf("run Select error: %s", err)
-				} else {
-					results = append(results, r)
 				}
 				return true, nil
 			})
 		} else {
-			err = input.Range(func(_ int, row xsql.Row) (bool, error) {
+			err = input.RangeSet(func(_ int, row xsql.Row) (bool, error) {
 				aggData, ok := input.(xsql.AggregateData)
 				if !ok {
 					return false, fmt.Errorf("unexpected type, cannot find aggregate data")
 				}
 				ve := pp.getVE(row, aggData, input.GetWindowRange(), fv, afv)
-				if r, err := project(pp.Fields, ve); err != nil {
+				if err := pp.project(row, ve); err != nil {
 					return false, fmt.Errorf("run Select error: %s", err)
-				} else {
-					results = append(results, r)
 				}
 				return true, nil
 			})
@@ -82,10 +87,8 @@ func (pp *ProjectOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fun
 	case xsql.GroupedCollection: // The order is important, because single collection usually is also a groupedCollection
 		err := input.GroupRange(func(_ int, aggRow xsql.CollectionRow) (bool, error) {
 			ve := pp.getVE(aggRow, aggRow, input.GetWindowRange(), fv, afv)
-			if r, err := project(pp.Fields, ve); err != nil {
+			if err := pp.project(aggRow, ve); err != nil {
 				return false, fmt.Errorf("run Select error: %s", err)
-			} else {
-				results = append(results, r)
 			}
 			return true, nil
 		})
@@ -96,7 +99,7 @@ func (pp *ProjectOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fun
 		return fmt.Errorf("run Select error: invalid input %[1]T(%[1]v)", input)
 	}
 
-	return results
+	return data
 }
 
 func (pp *ProjectOp) getVE(tuple xsql.Row, agg xsql.AggregateData, wr *xsql.WindowRange, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) *xsql.ValuerEval {
@@ -111,52 +114,44 @@ func (pp *ProjectOp) getVE(tuple xsql.Row, agg xsql.AggregateData, wr *xsql.Wind
 	}
 }
 
-func project(fs ast.Fields, ve *xsql.ValuerEval) (map[string]interface{}, error) {
-	result := make(map[string]interface{}, len(fs))
-	for _, f := range fs {
+func (pp *ProjectOp) project(row xsql.Row, ve *xsql.ValuerEval) error {
+	// Calculate all fields then pick the needed ones
+	// To make sure all calculations are run with the same context (e.g. alias values)
+	// Do not set value during calculations
+
+	for _, f := range pp.ExprFields {
 		vi := ve.Eval(f.Expr)
 		if e, ok := vi.(error); ok {
-			return nil, e
+			return e
 		}
-		if _, ok := f.Expr.(*ast.Wildcard); ok || f.Name == "*" {
-			switch val := vi.(type) {
-			case map[string]interface{}:
-				for k, v := range val {
-					if _, ok := result[k]; !ok {
-						result[k] = v
-					}
-				}
-			case xsql.Message:
-				for k, v := range val {
-					if _, ok := result[k]; !ok {
-						result[k] = v
-					}
+		if vi != nil {
+			switch vt := vi.(type) {
+			case function.ResultCols:
+				for k, v := range vt {
+					pp.kvs = append(pp.kvs, k, v)
 				}
 			default:
-				return nil, fmt.Errorf("wildcarder does not return map")
-			}
-		} else {
-			if vi != nil {
-				switch vt := vi.(type) {
-				case function.ResultCols:
-					for k, v := range vt {
-						if _, ok := result[k]; !ok {
-							result[k] = v
-						}
-					}
-				default:
-					n := assignName(f.Name, f.AName)
-					result[n] = vt
-				}
+				pp.kvs = append(pp.kvs, f.Name, vi)
 			}
 		}
 	}
-	return result, nil
-}
-
-func assignName(name, alias string) string {
-	if alias != "" {
-		return alias
+	for _, f := range pp.AliasFields {
+		vi := ve.Eval(f.Expr)
+		if e, ok := vi.(error); ok {
+			return e
+		}
+		if vi != nil {
+			pp.alias = append(pp.alias, f.AName, vi)
+		}
+	}
+	row.Pick(pp.AllWildcard, pp.ColNames, pp.WildcardEmitters)
+	for i := 0; i < len(pp.kvs); i += 2 {
+		row.Set(pp.kvs[i].(string), pp.kvs[i+1])
+	}
+	pp.kvs = pp.kvs[:0]
+	for i := 0; i < len(pp.alias); i += 2 {
+		row.AppendAlias(pp.alias[i].(string), pp.alias[i+1])
 	}
-	return name
+	pp.alias = pp.alias[:0]
+	return nil
 }

+ 75 - 17
internal/topo/operator/project_test.go

@@ -20,12 +20,50 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"reflect"
 	"strings"
 	"testing"
 )
 
+func parseStmt(p *ProjectOp, fields ast.Fields) {
+	p.AllWildcard = false
+	p.WildcardEmitters = make(map[string]bool)
+	for _, field := range fields {
+		if field.AName != "" {
+			p.AliasFields = append(p.AliasFields, field)
+			p.AliasNames = append(p.AliasNames, field.AName)
+		} else {
+			switch ft := field.Expr.(type) {
+			case *ast.Wildcard:
+				p.AllWildcard = true
+			case *ast.FieldRef:
+				if ft.Name == "*" {
+					p.WildcardEmitters[string(ft.StreamName)] = true
+				} else {
+					p.ColNames = append(p.ColNames, []string{ft.Name, string(ft.StreamName)})
+				}
+			default:
+				p.ExprFields = append(p.ExprFields, field)
+				p.ExprNames = append(p.ExprNames, field.Name)
+			}
+		}
+	}
+}
+
+func parseResult(opResult interface{}, aggregate bool) (result []map[string]interface{}, err error) {
+	switch rt := opResult.(type) {
+	case xsql.TupleRow:
+		result = append(result, rt.ToMap())
+	case xsql.Collection:
+		result = rt.ToMaps()
+	default:
+		err = errors.New("unexpected result type")
+	}
+	return
+}
+
 func TestProjectPlan_Apply1(t *testing.T) {
 	var tests = []struct {
 		sql    string
@@ -555,9 +593,15 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			t.Errorf("parse sql error: %s", err)
 			continue
 		}
-		pp := &ProjectOp{Fields: stmt.Fields, SendMeta: true}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		result, err := parseResult(opResult, pp.IsAggregate)
+		if err != nil {
+			t.Errorf("parse result error: %s", err)
+			continue
+		}
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
@@ -1061,12 +1105,10 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id2": 2,
 				"id1": 1,
 				"f1":  "v1",
-				"f2":  "w2",
 			}, {
 				"id2": 4,
 				"id1": 2,
 				"f1":  "v2",
-				"f2":  "w3",
 			}, {
 				"id1": 3,
 				"f1":  "v1",
@@ -1112,12 +1154,10 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id2": 2,
 				"id1": 1,
 				"f1":  "v1",
-				"f2":  "w2",
 			}, {
 				"id2": 4,
 				"id1": 2,
 				"f1":  "v2",
-				"f2":  "w3",
 			}, {
 				"id1": 3,
 				"f1":  "v1",
@@ -1131,9 +1171,15 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
-		pp := &ProjectOp{Fields: stmt.Fields}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		result, err := parseResult(opResult, pp.IsAggregate)
+		if err != nil {
+			t.Errorf("parse result error: %s", err)
+			continue
+		}
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
@@ -1327,9 +1373,15 @@ func TestProjectPlan_Funcs(t *testing.T) {
 		if err != nil {
 			t.Error(err)
 		}
-		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		result, err := parseResult(opResult, pp.IsAggregate)
+		if err != nil {
+			t.Errorf("parse result error: %s", err)
+			continue
+		}
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
@@ -2096,9 +2148,15 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 		if err != nil {
 			t.Error(err)
 		}
-		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: true}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: true}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		result, err := parseResult(opResult, pp.IsAggregate)
+		if err != nil {
+			t.Errorf("parse result error: %s", err)
+			continue
+		}
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
@@ -2248,12 +2306,12 @@ func TestProjectPlanError(t *testing.T) {
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
-
-		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
+		pp := &ProjectOp{SendMeta: true, IsAggregate: xsql.IsAggStatement(stmt)}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
-		if !reflect.DeepEqual(tt.result, result) {
-			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		if !reflect.DeepEqual(tt.result, opResult) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, opResult)
 		}
 	}
 }

+ 9 - 3
internal/topo/operator/str_func_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -658,9 +658,15 @@ func TestStrFunc_Apply1(t *testing.T) {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
-		pp := &ProjectOp{Fields: stmt.Fields}
+		pp := &ProjectOp{}
+		parseStmt(pp, stmt.Fields)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
-		result := pp.Apply(ctx, tt.data, fv, afv)
+		opResult := pp.Apply(ctx, tt.data, fv, afv)
+		result, err := parseResult(opResult, pp.IsAggregate)
+		if err != nil {
+			t.Errorf("parse result error: %s", err)
+			continue
+		}
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 1 - 1
internal/topo/operator/table_processor.go

@@ -75,7 +75,7 @@ func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql
 			tuple.Message = result
 		}
 		var newTuples []xsql.TupleRow
-		_ = p.output.Range(func(i int, r xsql.Row) (bool, error) {
+		_ = p.output.Range(func(i int, r xsql.ReadonlyRow) (bool, error) {
 			if p.retainSize > 0 && p.output.Len() == p.retainSize && i == 0 {
 				return true, nil
 			}

+ 1 - 1
internal/topo/planner/planner.go

@@ -184,7 +184,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	case *OrderPlan:
 		op = Transform(&operator.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options)
 	case *ProjectPlan:
-		op = Transform(&operator.ProjectOp{Fields: t.fields, IsAggregate: t.isAggregate, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
+		op = Transform(&operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
 	default:
 		return nil, 0, fmt.Errorf("unknown logical plan %v", t)
 	}

+ 33 - 4
internal/topo/planner/projectPlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -18,12 +18,41 @@ import "github.com/lf-edge/ekuiper/pkg/ast"
 
 type ProjectPlan struct {
 	baseLogicalPlan
-	fields      ast.Fields
-	isAggregate bool
-	sendMeta    bool
+	isAggregate      bool
+	allWildcard      bool
+	sendMeta         bool
+	fields           ast.Fields
+	colNames         [][]string
+	aliasNames       []string
+	exprNames        []string
+	wildcardEmitters map[string]bool
+	aliasFields      ast.Fields
+	exprFields       ast.Fields
 }
 
 func (p ProjectPlan) Init() *ProjectPlan {
+	p.allWildcard = false
+	p.wildcardEmitters = make(map[string]bool)
+	for _, field := range p.fields {
+		if field.AName != "" {
+			p.aliasFields = append(p.aliasFields, field)
+			p.aliasNames = append(p.aliasNames, field.AName)
+		} else {
+			switch ft := field.Expr.(type) {
+			case *ast.Wildcard:
+				p.allWildcard = true
+			case *ast.FieldRef:
+				if ft.Name == "*" {
+					p.wildcardEmitters[string(ft.StreamName)] = true
+				} else {
+					p.colNames = append(p.colNames, []string{ft.Name, string(ft.StreamName)})
+				}
+			default:
+				p.exprNames = append(p.exprNames, field.Name)
+				p.exprFields = append(p.exprFields, field)
+			}
+		}
+	}
 	p.baseLogicalPlan.self = &p
 	return &p
 }

+ 107 - 15
internal/xsql/collection.go

@@ -40,19 +40,24 @@ type Collection interface {
 	// GroupRange through each group. For non-grouped collection, the whole data is a single group
 	GroupRange(func(i int, aggRow CollectionRow) (bool, error)) error
 	// Range through each row. For grouped collection, each row is an aggregation of groups
-	Range(func(i int, r Row) (bool, error)) error
+	Range(func(i int, r ReadonlyRow) (bool, error)) error
+	// RangeSet range through each row by cloneing the row
+	RangeSet(func(i int, r Row) (bool, error)) error
 	Filter(indexes []int) Collection
 	GetWindowRange() *WindowRange
 	Clone() Collection
-	// ToAggMaps returns the aggregated data as a map
-	ToAggMaps() []map[string]interface{}
-	// ToRowMaps returns all the data in the collection
-	ToRowMaps() []map[string]interface{}
+	// ToMaps returns the data as a map
+	ToMaps() []map[string]interface{}
 }
 
 type SingleCollection interface {
 	Collection
 	CollectionRow
+	SetIsAgg(isAgg bool)
+	// ToAggMaps returns the aggregated data as a map
+	ToAggMaps() []map[string]interface{}
+	// ToRowMaps returns all the data in the collection
+	ToRowMaps() []map[string]interface{}
 }
 
 type GroupedCollection interface {
@@ -76,6 +81,7 @@ type WindowTuples struct {
 
 	AffiliateRow
 	cachedMap map[string]interface{}
+	isAgg     bool
 }
 
 var _ MergedCollection = &WindowTuples{}
@@ -90,6 +96,7 @@ type JoinTuples struct {
 
 	AffiliateRow
 	cachedMap map[string]interface{}
+	isAgg     bool
 }
 
 var _ SingleCollection = &JoinTuples{}
@@ -137,7 +144,7 @@ func (w *WindowTuples) GetWindowRange() *WindowRange {
 	return w.WindowRange
 }
 
-func (w *WindowTuples) Range(f func(i int, r Row) (bool, error)) error {
+func (w *WindowTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
 	for i, r := range w.Content {
 		b, e := f(i, r)
 		if e != nil {
@@ -150,6 +157,21 @@ func (w *WindowTuples) Range(f func(i int, r Row) (bool, error)) error {
 	return nil
 }
 
+func (w *WindowTuples) RangeSet(f func(i int, r Row) (bool, error)) error {
+	for i, r := range w.Content {
+		rc := r.Clone()
+		b, e := f(i, rc)
+		if e != nil {
+			return e
+		}
+		if !b {
+			break
+		}
+		w.Content[i] = rc
+	}
+	return nil
+}
+
 func (w *WindowTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
 	_, err := f(0, w)
 	return err
@@ -199,7 +221,7 @@ func (w *WindowTuples) Meta(key, table string) (interface{}, bool) {
 	return w.Content[0].Meta(key, table)
 }
 
-func (w *WindowTuples) All(stream string) (Message, bool) {
+func (w *WindowTuples) All(_ string) (Message, bool) {
 	return w.ToMap(), true
 }
 
@@ -224,6 +246,7 @@ func (w *WindowTuples) Clone() Collection {
 		Content:      ts,
 		WindowRange:  w.WindowRange,
 		AffiliateRow: w.AffiliateRow.Clone(),
+		isAgg:        w.isAgg,
 	}
 	return c
 }
@@ -240,6 +263,27 @@ func (w *WindowTuples) ToRowMaps() []map[string]interface{} {
 	return r
 }
 
+func (w *WindowTuples) ToMaps() []map[string]interface{} {
+	if w.isAgg {
+		return w.ToAggMaps()
+	} else {
+		return w.ToRowMaps()
+	}
+}
+
+func (w *WindowTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+	for i, t := range w.Content {
+		tc := t.Clone()
+		tc.Pick(allWildcard, cols, wildcardEmitters)
+		w.Content[i] = tc
+	}
+	w.AffiliateRow.Reset()
+}
+
+func (w *WindowTuples) SetIsAgg(_ bool) {
+	w.isAgg = true
+}
+
 func (s *JoinTuples) Len() int { return len(s.Content) }
 func (s *JoinTuples) Swap(i, j int) {
 	s.cachedMap = nil
@@ -259,7 +303,7 @@ func (s *JoinTuples) GetWindowRange() *WindowRange {
 	return s.WindowRange
 }
 
-func (s *JoinTuples) Range(f func(i int, r Row) (bool, error)) error {
+func (s *JoinTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
 	for i, r := range s.Content {
 		b, e := f(i, r)
 		if e != nil {
@@ -272,6 +316,21 @@ func (s *JoinTuples) Range(f func(i int, r Row) (bool, error)) error {
 	return nil
 }
 
+func (s *JoinTuples) RangeSet(f func(i int, r Row) (bool, error)) error {
+	for i, r := range s.Content {
+		rc := r.Clone()
+		b, e := f(i, rc)
+		if e != nil {
+			return e
+		}
+		if !b {
+			break
+		}
+		s.Content[i] = rc.(*JoinTuple)
+	}
+	return nil
+}
+
 func (s *JoinTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
 	_, err := f(0, s)
 	return err
@@ -300,7 +359,7 @@ func (s *JoinTuples) Meta(key, table string) (interface{}, bool) {
 	return s.Content[0].Meta(key, table)
 }
 
-func (s *JoinTuples) All(stream string) (Message, bool) {
+func (s *JoinTuples) All(_ string) (Message, bool) {
 	return s.ToMap(), true
 }
 
@@ -325,6 +384,7 @@ func (s *JoinTuples) Clone() Collection {
 		Content:      ts,
 		WindowRange:  s.WindowRange,
 		AffiliateRow: s.AffiliateRow.Clone(),
+		isAgg:        s.isAgg,
 	}
 	return c
 }
@@ -341,6 +401,27 @@ func (s *JoinTuples) ToRowMaps() []map[string]interface{} {
 	return r
 }
 
+func (s *JoinTuples) ToMaps() []map[string]interface{} {
+	if s.isAgg {
+		return s.ToAggMaps()
+	} else {
+		return s.ToRowMaps()
+	}
+}
+
+func (s *JoinTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+	for i, t := range s.Content {
+		tc := t.Clone().(*JoinTuple)
+		tc.Pick(allWildcard, cols, wildcardEmitters)
+		s.Content[i] = tc
+	}
+	s.AffiliateRow.Reset()
+}
+
+func (s *JoinTuples) SetIsAgg(_ bool) {
+	s.isAgg = true
+}
+
 func (s *GroupedTuplesSet) Len() int        { return len(s.Groups) }
 func (s *GroupedTuplesSet) Swap(i, j int)   { s.Groups[i], s.Groups[j] = s.Groups[j], s.Groups[i] }
 func (s *GroupedTuplesSet) Index(i int) Row { return s.Groups[i] }
@@ -349,7 +430,7 @@ func (s *GroupedTuplesSet) GetWindowRange() *WindowRange {
 	return s.WindowRange
 }
 
-func (s *GroupedTuplesSet) Range(f func(i int, r Row) (bool, error)) error {
+func (s *GroupedTuplesSet) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
 	for i, r := range s.Groups {
 		b, e := f(i, r)
 		if e != nil {
@@ -362,6 +443,21 @@ func (s *GroupedTuplesSet) Range(f func(i int, r Row) (bool, error)) error {
 	return nil
 }
 
+func (s *GroupedTuplesSet) RangeSet(f func(i int, r Row) (bool, error)) error {
+	for i, r := range s.Groups {
+		rc := r.Clone()
+		b, e := f(i, rc)
+		if e != nil {
+			return e
+		}
+		if !b {
+			break
+		}
+		s.Groups[i] = rc.(*GroupedTuples)
+	}
+	return nil
+}
+
 func (s *GroupedTuplesSet) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
 	for i, r := range s.Groups {
 		b, e := f(i, r)
@@ -396,11 +492,7 @@ func (s *GroupedTuplesSet) Clone() Collection {
 	}
 }
 
-func (s *GroupedTuplesSet) ToAggMaps() []map[string]interface{} {
-	return s.ToRowMaps()
-}
-
-func (s *GroupedTuplesSet) ToRowMaps() []map[string]interface{} {
+func (s *GroupedTuplesSet) ToMaps() []map[string]interface{} {
 	r := make([]map[string]interface{}, len(s.Groups))
 	for i, t := range s.Groups {
 		r[i] = t.ToMap()

+ 6 - 3
internal/xsql/collection_test.go

@@ -198,7 +198,7 @@ func TestCollectionAgg(t *testing.T) {
 			wg.Add(1)
 			go func(si int, set []map[string]interface{}) {
 				nr := tt.collO.Clone()
-				nr.Range(func(_ int, row Row) (bool, error) {
+				nr.RangeSet(func(_ int, row Row) (bool, error) {
 					for k, v := range set[0] {
 						if strings.HasPrefix(k, "@") {
 							row.AppendAlias(k[1:], v)
@@ -208,13 +208,16 @@ func TestCollectionAgg(t *testing.T) {
 					}
 					return true, nil
 				})
-				intermaps[si] = nr.ToRowMaps()
+				intermaps[si] = nr.ToMaps()
 				var wg2 sync.WaitGroup
 				result[si] = make([][]map[string]interface{}, len(set)-1)
 				for j := 1; j < len(set); j++ {
 					wg2.Add(1)
 					go func(j int) {
 						nnr := nr.Clone()
+						if nnsr, ok := nnr.(SingleCollection); ok {
+							nnsr.SetIsAgg(true)
+						}
 						nnr.GroupRange(func(_ int, aggRow CollectionRow) (bool, error) {
 							for k, v := range set[j] {
 								if strings.HasPrefix(k, "@") {
@@ -225,7 +228,7 @@ func TestCollectionAgg(t *testing.T) {
 							}
 							return true, nil
 						})
-						result[si][j-1] = nnr.ToAggMaps()
+						result[si][j-1] = nnr.ToMaps()
 						wg2.Done()
 					}(j)
 				}

+ 62 - 2
internal/xsql/row.go

@@ -20,6 +20,9 @@ import (
 	"strings"
 )
 
+// The original message map may be big. Make sure it is immutable so that never make a copy of it.
+// The tuple clone should be cheap.
+
 /*
  *  Interfaces definition
  */
@@ -34,14 +37,21 @@ type Event interface {
 	IsWatermark() bool
 }
 
-type Row interface {
+type ReadonlyRow interface {
 	Valuer
 	AliasValuer
 	Wildcarder
+}
+
+type Row interface {
+	ReadonlyRow
 	// Set Only for some ops like functionOp *
 	Set(col string, value interface{})
 	// ToMap converts the row to a map to export to other systems *
 	ToMap() map[string]interface{}
+	// Pick the columns and discard others. It replaces the underlying message with a new value. There are 3 types to pick: column, alias and annonymous expressions.
+	// cols is a list [columnname, tablename]
+	Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool)
 }
 
 // TupleRow is a mutable row. Function with * could modify the row.
@@ -119,6 +129,11 @@ func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{}) {
 	}
 }
 
+func (d *AffiliateRow) Reset() {
+	d.CalCols = nil
+	d.AliasMap = nil
+}
+
 /*
  *  Message definition
  */
@@ -322,6 +337,29 @@ func (t *Tuple) IsWatermark() bool {
 	return false
 }
 
+func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+	if !allWildcard && wildcardEmitters[t.Emitter] {
+		allWildcard = true
+	}
+	if !allWildcard {
+		if len(cols) > 0 {
+			t.cachedMap = make(map[string]interface{})
+			for _, colTab := range cols {
+				if colTab[1] == "" || colTab[1] == string(ast.DefaultStream) || colTab[1] == t.Emitter {
+					if v, ok := t.Message.Value(colTab[0], colTab[1]); ok {
+						t.cachedMap[colTab[0]] = v
+					}
+				}
+			}
+			t.Message = t.cachedMap
+		} else {
+			t.Message = make(map[string]interface{})
+			t.cachedMap = t.Message
+		}
+	}
+	t.AffiliateRow.Reset()
+}
+
 // JoinTuple implementation
 
 func (jt *JoinTuple) AddTuple(tuple TupleRow) {
@@ -400,7 +438,7 @@ func (jt *JoinTuple) All(stream string) (Message, bool) {
 func (jt *JoinTuple) Clone() TupleRow {
 	ts := make([]TupleRow, len(jt.Tuples))
 	for i, t := range jt.Tuples {
-		ts[i] = t
+		ts[i] = t.Clone()
 	}
 	c := &JoinTuple{
 		Tuples:       ts,
@@ -423,6 +461,23 @@ func (jt *JoinTuple) ToMap() map[string]interface{} {
 	return jt.cachedMap
 }
 
+func (jt *JoinTuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+	if !allWildcard {
+		if len(cols) > 0 {
+			for _, tuple := range jt.Tuples {
+				if _, ok := wildcardEmitters[tuple.GetEmitter()]; ok {
+					continue
+				}
+				tuple.Pick(allWildcard, cols, wildcardEmitters)
+			}
+		} else {
+			jt.Tuples = jt.Tuples[:0]
+		}
+	}
+	jt.cachedMap = nil
+	jt.AffiliateRow.Reset()
+}
+
 // GroupedTuple implementation
 
 func (s *GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
@@ -473,3 +528,8 @@ func (s *GroupedTuples) Clone() CollectionRow {
 	}
 	return c
 }
+
+func (s *GroupedTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+	s.Content[0].Pick(allWildcard, cols, wildcardEmitters)
+	s.AffiliateRow.Reset()
+}