Explorar o código

refactor(valuer): unit test

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang %!s(int64=2) %!d(string=hai) anos
pai
achega
d0c5285226

+ 5 - 4
internal/topo/operator/aggregate_operator.go

@@ -40,9 +40,10 @@ func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fu
 		case xsql.SingleCollection:
 			wr := input.GetWindowRange()
 			result := make(map[string]*xsql.GroupedTuples)
-			err := input.Range(func(i int, r xsql.TupleRow) (bool, error) {
+			err := input.Range(func(i int, ir xsql.Row) (bool, error) {
 				var name string
-				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(r, &xsql.WindowRangeValuer{WindowRange: wr}, fv)}
+				tr := ir.(xsql.TupleRow)
+				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tr, &xsql.WindowRangeValuer{WindowRange: wr}, fv)}
 				for _, d := range p.Dimensions {
 					r := ve.Eval(d.Expr)
 					if _, ok := r.(error); ok {
@@ -52,9 +53,9 @@ func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fu
 					}
 				}
 				if ts, ok := result[name]; !ok {
-					result[name] = &xsql.GroupedTuples{Content: []xsql.TupleRow{r}, WindowRange: wr}
+					result[name] = &xsql.GroupedTuples{Content: []xsql.TupleRow{tr}, WindowRange: wr}
 				} else {
-					ts.Content = append(ts.Content, r)
+					ts.Content = append(ts.Content, tr)
 				}
 				return true, nil
 			})

+ 4 - 0
internal/topo/operator/cols_func_test.go

@@ -138,6 +138,10 @@ func TestChangedColsFunc_Apply1(t *testing.T) {
 		r := make([][]map[string]interface{}, 0, len(tt.data))
 		for _, d := range tt.data {
 			result := pp.Apply(ctx, d, fv, afv)
+			if e, ok := result.(error); ok {
+				t.Errorf("apply sql %s error %v", tt.sql, e)
+				continue
+			}
 			r = append(r, result.([]map[string]interface{}))
 		}
 		if !reflect.DeepEqual(tt.result, r) {

+ 1 - 1
internal/topo/operator/filter_operator.go

@@ -52,7 +52,7 @@ func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 		}
 	case xsql.SingleCollection:
 		var sel []int
-		err := input.Range(func(i int, r xsql.TupleRow) (bool, error) {
+		err := input.Range(func(i int, r xsql.Row) (bool, error) {
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(r, fv)}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {

+ 1 - 1
internal/topo/operator/project_operator.go

@@ -62,7 +62,7 @@ func (pp *ProjectOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fun
 				return true, nil
 			})
 		} else {
-			err = input.Range(func(_ int, row xsql.TupleRow) (bool, error) {
+			err = input.Range(func(_ int, row xsql.Row) (bool, error) {
 				aggData, ok := input.(xsql.AggregateData)
 				if !ok {
 					return false, fmt.Errorf("unexpected type, cannot find aggregate data")

+ 4 - 0
internal/topo/operator/project_test.go

@@ -1061,10 +1061,12 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id2": 2,
 				"id1": 1,
 				"f1":  "v1",
+				"f2":  "w2",
 			}, {
 				"id2": 4,
 				"id1": 2,
 				"f1":  "v2",
+				"f2":  "w3",
 			}, {
 				"id1": 3,
 				"f1":  "v1",
@@ -1110,10 +1112,12 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id2": 2,
 				"id1": 1,
 				"f1":  "v1",
+				"f2":  "w2",
 			}, {
 				"id2": 4,
 				"id1": 2,
 				"f1":  "v2",
+				"f2":  "w3",
 			}, {
 				"id1": 3,
 				"f1":  "v1",

+ 2 - 2
internal/topo/operator/table_processor.go

@@ -75,11 +75,11 @@ func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql
 			tuple.Message = result
 		}
 		var newTuples []xsql.TupleRow
-		_ = p.output.Range(func(i int, r xsql.TupleRow) (bool, error) {
+		_ = p.output.Range(func(i int, r xsql.Row) (bool, error) {
 			if p.retainSize > 0 && p.output.Len() == p.retainSize && i == 0 {
 				return true, nil
 			}
-			newTuples = append(newTuples, r)
+			newTuples = append(newTuples, r.(xsql.TupleRow))
 			return true, nil
 		})
 		newTuples = append(newTuples, tuple)

+ 149 - 13
internal/xsql/collection.go

@@ -39,14 +39,20 @@ type Collection interface {
 	SortingData
 	// GroupRange through each group. For non-grouped collection, the whole data is a single group
 	GroupRange(func(i int, aggRow CollectionRow) (bool, error)) error
+	// Range through each row. For grouped collection, each row is an aggregation of groups
+	Range(func(i int, r Row) (bool, error)) error
 	Filter(indexes []int) Collection
 	GetWindowRange() *WindowRange
+	Clone() Collection
+	// ToAggMaps returns the aggregated data as a map
+	ToAggMaps() []map[string]interface{}
+	// ToRowMaps returns all the data in the collection
+	ToRowMaps() []map[string]interface{}
 }
 
 type SingleCollection interface {
 	Collection
-	// Range through each row. For grouped collection, each row is an aggregation of groups
-	Range(func(i int, r TupleRow) (bool, error)) error
+	CollectionRow
 }
 
 type GroupedCollection interface {
@@ -66,11 +72,14 @@ type MergedCollection interface {
 type WindowTuples struct {
 	Content []TupleRow // immutable
 	*WindowRange
-	Alias
-	contentBySrc map[string][]TupleRow // volatile, temporary cache
+	contentBySrc map[string][]TupleRow // volatile, temporary cache]
+
+	AffiliateRow
+	cachedMap map[string]interface{}
 }
 
 var _ MergedCollection = &WindowTuples{}
+var _ SingleCollection = &WindowTuples{}
 
 // Window Tuples is also an aggregate row
 var _ CollectionRow = &WindowTuples{}
@@ -78,10 +87,12 @@ var _ CollectionRow = &WindowTuples{}
 type JoinTuples struct {
 	Content []*JoinTuple
 	*WindowRange
-	Alias
+
+	AffiliateRow
+	cachedMap map[string]interface{}
 }
 
-var _ Collection = &JoinTuples{}
+var _ SingleCollection = &JoinTuples{}
 var _ CollectionRow = &JoinTuples{}
 
 type GroupedTuplesSet struct {
@@ -89,7 +100,7 @@ type GroupedTuplesSet struct {
 	*WindowRange
 }
 
-var _ Collection = &GroupedTuplesSet{}
+var _ GroupedCollection = &GroupedTuplesSet{}
 
 /*
  *   Collection implementations
@@ -104,6 +115,7 @@ func (w *WindowTuples) Len() int {
 }
 
 func (w *WindowTuples) Swap(i, j int) {
+	w.cachedMap = nil
 	w.Content[i], w.Content[j] = w.Content[j], w.Content[i]
 }
 
@@ -125,7 +137,7 @@ func (w *WindowTuples) GetWindowRange() *WindowRange {
 	return w.WindowRange
 }
 
-func (w *WindowTuples) Range(f func(i int, r TupleRow) (bool, error)) error {
+func (w *WindowTuples) Range(f func(i int, r Row) (bool, error)) error {
 	for i, r := range w.Content {
 		b, e := f(i, r)
 		if e != nil {
@@ -150,6 +162,7 @@ func (w *WindowTuples) AddTuple(tuple *Tuple) *WindowTuples {
 
 //Sort by tuple timestamp
 func (w *WindowTuples) Sort() {
+	w.cachedMap = nil
 	sort.SliceStable(w.Content, func(i, j int) bool {
 		return w.Content[i].(Event).GetTimestamp() < w.Content[j].(Event).GetTimestamp()
 	})
@@ -165,6 +178,7 @@ func (w *WindowTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
 
 // Filter the tuples by the given predicate
 func (w *WindowTuples) Filter(indexes []int) Collection {
+	w.cachedMap = nil
 	newC := make([]TupleRow, 0, len(indexes))
 	for _, i := range indexes {
 		newC = append(newC, w.Content[i])
@@ -174,6 +188,10 @@ func (w *WindowTuples) Filter(indexes []int) Collection {
 }
 
 func (w *WindowTuples) Value(key, table string) (interface{}, bool) {
+	r, ok := w.AffiliateRow.Value(key, table)
+	if ok {
+		return r, ok
+	}
 	return w.Content[0].Value(key, table)
 }
 
@@ -182,11 +200,51 @@ func (w *WindowTuples) Meta(key, table string) (interface{}, bool) {
 }
 
 func (w *WindowTuples) All(stream string) (Message, bool) {
-	return w.Content[0].All(stream)
+	return w.ToMap(), true
+}
+
+func (w *WindowTuples) ToMap() map[string]interface{} {
+	if w.cachedMap == nil {
+		m := make(map[string]interface{})
+		for k, v := range w.Content[0].ToMap() {
+			m[k] = v
+		}
+		w.cachedMap = m
+	}
+	w.AffiliateRow.MergeMap(w.cachedMap)
+	return w.cachedMap
 }
 
-func (s *JoinTuples) Len() int        { return len(s.Content) }
-func (s *JoinTuples) Swap(i, j int)   { s.Content[i], s.Content[j] = s.Content[j], s.Content[i] }
+func (w *WindowTuples) Clone() Collection {
+	ts := make([]TupleRow, len(w.Content))
+	for i, t := range w.Content {
+		ts[i] = t.Clone()
+	}
+	c := &WindowTuples{
+		Content:      ts,
+		WindowRange:  w.WindowRange,
+		AffiliateRow: w.AffiliateRow.Clone(),
+	}
+	return c
+}
+
+func (w *WindowTuples) ToAggMaps() []map[string]interface{} {
+	return []map[string]interface{}{w.ToMap()}
+}
+
+func (w *WindowTuples) ToRowMaps() []map[string]interface{} {
+	r := make([]map[string]interface{}, len(w.Content))
+	for i, t := range w.Content {
+		r[i] = t.ToMap()
+	}
+	return r
+}
+
+func (s *JoinTuples) Len() int { return len(s.Content) }
+func (s *JoinTuples) Swap(i, j int) {
+	s.cachedMap = nil
+	s.Content[i], s.Content[j] = s.Content[j], s.Content[i]
+}
 func (s *JoinTuples) Index(i int) Row { return s.Content[i] }
 
 func (s *JoinTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
@@ -201,7 +259,7 @@ func (s *JoinTuples) GetWindowRange() *WindowRange {
 	return s.WindowRange
 }
 
-func (s *JoinTuples) Range(f func(i int, r TupleRow) (bool, error)) error {
+func (s *JoinTuples) Range(f func(i int, r Row) (bool, error)) error {
 	for i, r := range s.Content {
 		b, e := f(i, r)
 		if e != nil {
@@ -226,10 +284,15 @@ func (s *JoinTuples) Filter(indexes []int) Collection {
 		newC = append(newC, s.Content[i])
 	}
 	s.Content = newC
+	s.cachedMap = nil
 	return s
 }
 
 func (s *JoinTuples) Value(key, table string) (interface{}, bool) {
+	r, ok := s.AffiliateRow.Value(key, table)
+	if ok {
+		return r, ok
+	}
 	return s.Content[0].Value(key, table)
 }
 
@@ -238,7 +301,44 @@ func (s *JoinTuples) Meta(key, table string) (interface{}, bool) {
 }
 
 func (s *JoinTuples) All(stream string) (Message, bool) {
-	return s.Content[0].All(stream)
+	return s.ToMap(), true
+}
+
+func (s *JoinTuples) ToMap() map[string]interface{} {
+	if s.cachedMap == nil {
+		m := make(map[string]interface{})
+		for k, v := range s.Content[0].ToMap() {
+			m[k] = v
+		}
+		s.cachedMap = m
+	}
+	s.AffiliateRow.MergeMap(s.cachedMap)
+	return s.cachedMap
+}
+
+func (s *JoinTuples) Clone() Collection {
+	ts := make([]*JoinTuple, len(s.Content))
+	for i, t := range s.Content {
+		ts[i] = t.Clone().(*JoinTuple)
+	}
+	c := &JoinTuples{
+		Content:      ts,
+		WindowRange:  s.WindowRange,
+		AffiliateRow: s.AffiliateRow.Clone(),
+	}
+	return c
+}
+
+func (s *JoinTuples) ToAggMaps() []map[string]interface{} {
+	return []map[string]interface{}{s.ToMap()}
+}
+
+func (s *JoinTuples) ToRowMaps() []map[string]interface{} {
+	r := make([]map[string]interface{}, len(s.Content))
+	for i, t := range s.Content {
+		r[i] = t.ToMap()
+	}
+	return r
 }
 
 func (s *GroupedTuplesSet) Len() int        { return len(s.Groups) }
@@ -249,6 +349,19 @@ func (s *GroupedTuplesSet) GetWindowRange() *WindowRange {
 	return s.WindowRange
 }
 
+func (s *GroupedTuplesSet) Range(f func(i int, r Row) (bool, error)) error {
+	for i, r := range s.Groups {
+		b, e := f(i, r)
+		if e != nil {
+			return e
+		}
+		if !b {
+			break
+		}
+	}
+	return nil
+}
+
 func (s *GroupedTuplesSet) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
 	for i, r := range s.Groups {
 		b, e := f(i, r)
@@ -272,6 +385,29 @@ func (s *GroupedTuplesSet) Filter(groups []int) Collection {
 	return s
 }
 
+func (s *GroupedTuplesSet) Clone() Collection {
+	ng := make([]*GroupedTuples, len(s.Groups))
+	for i, g := range s.Groups {
+		ng[i] = g.Clone().(*GroupedTuples)
+	}
+	return &GroupedTuplesSet{
+		Groups:      ng,
+		WindowRange: s.WindowRange,
+	}
+}
+
+func (s *GroupedTuplesSet) ToAggMaps() []map[string]interface{} {
+	return s.ToRowMaps()
+}
+
+func (s *GroupedTuplesSet) ToRowMaps() []map[string]interface{} {
+	r := make([]map[string]interface{}, len(s.Groups))
+	for i, t := range s.Groups {
+		r[i] = t.ToMap()
+	}
+	return r
+}
+
 /*
  *  WindowRange definitions. It should be immutable
  */

+ 241 - 0
internal/xsql/collection_test.go

@@ -0,0 +1,241 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xsql
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"reflect"
+	"strings"
+	"sync"
+	"testing"
+)
+
+func TestCollectionAgg(t *testing.T) {
+	// broadcast -> range func -> broadcast -> group aggregate -> map
+	var tests = []struct {
+		collO     Collection
+		set       [][]map[string]interface{}
+		interMaps [][]map[string]interface{}
+		result    [][][]map[string]interface{}
+	}{
+		{
+			collO: &WindowTuples{Content: []TupleRow{
+				&Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
+				&Tuple{Emitter: "a", Message: map[string]interface{}{"a": 2, "b": "4"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
+				&Tuple{Emitter: "a", Message: map[string]interface{}{"a": 3, "b": "6"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
+			}},
+			set: [][]map[string]interface{}{
+				{
+					{"a": 4, "c": "3", "@d": 4},
+					{"sum": 12},
+					{"avg": 4},
+				},
+				{
+					{"c": "4"},
+					{"sum": 6},
+					{"avg": 2},
+				},
+			},
+			interMaps: [][]map[string]interface{}{
+				{
+					{"a": 4, "b": "2", "c": "3", "d": 4},
+					{"a": 4, "b": "4", "c": "3", "d": 4},
+					{"a": 4, "b": "6", "c": "3", "d": 4},
+				}, {
+					{"a": 1, "b": "2", "c": "4"},
+					{"a": 2, "b": "4", "c": "4"},
+					{"a": 3, "b": "6", "c": "4"},
+				},
+			},
+			result: [][][]map[string]interface{}{
+				{
+					{
+						{"a": 4, "b": "2", "c": "3", "d": 4, "sum": 12},
+					},
+					{
+						{"a": 4, "b": "2", "c": "3", "d": 4, "avg": 4},
+					},
+				}, {
+					{
+						{"a": 1, "b": "2", "c": "4", "sum": 6},
+					},
+					{
+						{"a": 1, "b": "2", "c": "4", "avg": 2},
+					},
+				},
+			},
+		}, {
+			collO: &JoinTuples{Content: []*JoinTuple{
+				{
+					Tuples: []TupleRow{
+						&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
+						&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
+					},
+				}, {
+					Tuples: []TupleRow{
+						&Tuple{Emitter: "src1", Message: Message{"a": 3, "b": "v2"}},
+						&Tuple{Emitter: "src2", Message: Message{"a": 4, "c": "w1"}},
+					},
+				},
+			}},
+			set: [][]map[string]interface{}{
+				{
+					{"a": 4, "c": "3", "@d": 4},
+					{"sum": 12},
+					{"avg": 4},
+				},
+				{
+					{"c": "4"},
+					{"sum": 6},
+					{"avg": 2},
+				},
+			},
+			interMaps: [][]map[string]interface{}{
+				{
+					{"a": 4, "b": "v1", "c": "3", "d": 4},
+					{"a": 4, "b": "v2", "c": "3", "d": 4},
+				}, {
+					{"a": 1, "b": "v1", "c": "4"},
+					{"a": 3, "b": "v2", "c": "4"},
+				},
+			},
+			result: [][][]map[string]interface{}{
+				{
+					{
+						{"a": 4, "b": "v1", "c": "3", "d": 4, "sum": 12},
+					},
+					{
+						{"a": 4, "b": "v1", "c": "3", "d": 4, "avg": 4},
+					},
+				}, {
+					{
+						{"a": 1, "b": "v1", "c": "4", "sum": 6},
+					},
+					{
+						{"a": 1, "b": "v1", "c": "4", "avg": 2},
+					},
+				},
+			},
+		}, {
+			collO: &GroupedTuplesSet{Groups: []*GroupedTuples{
+				{
+					Content: []TupleRow{
+						&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
+						&Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "w2"}},
+					},
+				}, {
+					Content: []TupleRow{
+						&Tuple{Emitter: "src1", Message: Message{"a": 3, "b": "v2"}},
+						&Tuple{Emitter: "src1", Message: Message{"a": 4, "b": "w1"}},
+					},
+				},
+			}},
+			set: [][]map[string]interface{}{
+				{
+					{"a": 4, "c": "3", "@d": 4},
+					{"sum": 12},
+					{"avg": 4},
+				},
+				{
+					{"c": "4"},
+					{"sum": 6},
+					{"avg": 2},
+				},
+			},
+			interMaps: [][]map[string]interface{}{
+				{
+					{"a": 4, "b": "v1", "c": "3", "d": 4},
+					{"a": 4, "b": "v2", "c": "3", "d": 4},
+				}, {
+					{"a": 1, "b": "v1", "c": "4"},
+					{"a": 3, "b": "v2", "c": "4"},
+				},
+			},
+			result: [][][]map[string]interface{}{
+				{
+					{
+						{"a": 4, "b": "v1", "c": "3", "d": 4, "sum": 12},
+						{"a": 4, "b": "v2", "c": "3", "d": 4, "sum": 12},
+					},
+					{
+						{"a": 4, "b": "v1", "c": "3", "d": 4, "avg": 4},
+						{"a": 4, "b": "v2", "c": "3", "d": 4, "avg": 4},
+					},
+				}, {
+					{
+						{"a": 1, "b": "v1", "c": "4", "sum": 6},
+						{"a": 3, "b": "v2", "c": "4", "sum": 6},
+					},
+					{
+						{"a": 1, "b": "v1", "c": "4", "avg": 2},
+						{"a": 3, "b": "v2", "c": "4", "avg": 2},
+					},
+				},
+			},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		var (
+			wg        sync.WaitGroup
+			intermaps = make([][]map[string]interface{}, len(tt.set))
+			result    = make([][][]map[string]interface{}, len(tt.set))
+		)
+		for si, set := range tt.set {
+			wg.Add(1)
+			go func(si int, set []map[string]interface{}) {
+				nr := tt.collO.Clone()
+				nr.Range(func(_ int, row Row) (bool, error) {
+					for k, v := range set[0] {
+						if strings.HasPrefix(k, "@") {
+							row.AppendAlias(k[1:], v)
+						} else {
+							row.Set(k, v)
+						}
+					}
+					return true, nil
+				})
+				intermaps[si] = nr.ToRowMaps()
+				var wg2 sync.WaitGroup
+				result[si] = make([][]map[string]interface{}, len(set)-1)
+				for j := 1; j < len(set); j++ {
+					wg2.Add(1)
+					go func(j int) {
+						nnr := nr.Clone()
+						nnr.GroupRange(func(_ int, aggRow CollectionRow) (bool, error) {
+							for k, v := range set[j] {
+								if strings.HasPrefix(k, "@") {
+									aggRow.AppendAlias(k[1:], v)
+								} else {
+									aggRow.Set(k, v)
+								}
+							}
+							return true, nil
+						})
+						result[si][j-1] = nnr.ToAggMaps()
+						wg2.Done()
+					}(j)
+				}
+				wg2.Wait()
+				wg.Done()
+			}(si, set)
+		}
+		wg.Wait()
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, result)
+		}
+	}
+}

+ 155 - 64
internal/xsql/row.go

@@ -24,7 +24,6 @@ import (
  *  Interfaces definition
  */
 
-// TODO how to be more efficient?
 type Wildcarder interface {
 	// All Value returns the value and existence flag for a given key.
 	All(stream string) (Message, bool)
@@ -39,18 +38,19 @@ type Row interface {
 	Valuer
 	AliasValuer
 	Wildcarder
+	// Set Only for some ops like functionOp *
+	Set(col string, value interface{})
+	// ToMap converts the row to a map to export to other systems *
+	ToMap() map[string]interface{}
 }
 
+// TupleRow is a mutable row. Function with * could modify the row.
 type TupleRow interface {
 	Row
-	// Set Only for some ops like functionOp
-	Set(col string, value interface{})
 	// GetEmitter returns the emitter of the row
 	GetEmitter() string
 	// Clone when broadcast to make sure each row are dealt single threaded
 	Clone() TupleRow
-	// ToMap converts the row to a map to export to other systems
-	ToMap() map[string]interface{}
 }
 
 // CollectionRow is the aggregation row of a non-grouped collection. Thinks of it as a single group.
@@ -58,6 +58,65 @@ type TupleRow interface {
 type CollectionRow interface {
 	Row
 	AggregateData
+	// Clone when broadcast to make sure each row are dealt single threaded
+	//Clone() CollectionRow
+}
+
+// AffiliateRow part of other row types do help calculation of newly added cols
+type AffiliateRow struct {
+	CalCols map[string]interface{} // mutable and must be cloned when broadcast
+	Alias
+}
+
+func (d *AffiliateRow) Value(key, table string) (interface{}, bool) {
+	if table == "" {
+		r, ok := d.AliasValue(key)
+		if ok {
+			return r, ok
+		}
+		r, ok = d.CalCols[key]
+		if ok {
+			return r, ok
+		}
+	}
+	return nil, false
+}
+
+func (d *AffiliateRow) Set(col string, value interface{}) {
+	if d.CalCols == nil {
+		d.CalCols = make(map[string]interface{})
+	}
+	d.CalCols[col] = value
+}
+
+func (d *AffiliateRow) Clone() AffiliateRow {
+	nd := &AffiliateRow{}
+	if d.CalCols != nil && len(d.CalCols) > 0 {
+		nd.CalCols = make(map[string]interface{}, len(d.CalCols))
+		for k, v := range d.CalCols {
+			nd.CalCols[k] = v
+		}
+	}
+	if d.AliasMap != nil && len(d.AliasMap) > 0 {
+		nd.AliasMap = make(map[string]interface{}, len(d.AliasMap))
+		for k, v := range d.AliasMap {
+			nd.AliasMap[k] = v
+		}
+	}
+	return *nd
+}
+
+func (d *AffiliateRow) IsEmpty() bool {
+	return len(d.CalCols) == 0 && len(d.AliasMap) == 0
+}
+
+func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{}) {
+	for k, v := range d.CalCols {
+		cachedMap[k] = v
+	}
+	for k, v := range d.AliasMap {
+		cachedMap[k] = v
+	}
 }
 
 /*
@@ -83,18 +142,26 @@ type Alias struct {
 // Tuple The input row, produced by the source
 type Tuple struct {
 	Emitter   string
-	Message   Message // immutable
+	Message   Message // the original pointer is immutable & big; may be cloned
 	Timestamp int64
 	Metadata  Metadata // immutable
-	Alias
+
+	AffiliateRow
+
+	cachedMap map[string]interface{} // clone of the row and cached for performance
 }
 
 var _ TupleRow = &Tuple{}
 
 // JoinTuple is a row produced by a join operation
 type JoinTuple struct {
-	Tuples []TupleRow
-	Alias
+	Tuples []TupleRow // The content is immutable, but the slice may be add or removed
+	AffiliateRow
+	cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
+}
+
+func (jt *JoinTuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
+	return []interface{}{Eval(expr, MultiValuer(jt, v, &WildcardValuer{jt}))}
 }
 
 var _ TupleRow = &JoinTuple{}
@@ -103,7 +170,8 @@ var _ TupleRow = &JoinTuple{}
 type GroupedTuples struct {
 	Content []TupleRow
 	*WindowRange
-	Alias
+	AffiliateRow
+	cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
 }
 
 var _ CollectionRow = &GroupedTuples{}
@@ -193,59 +261,55 @@ func (a *Alias) AliasValue(key string) (interface{}, bool) {
 // Tuple implementation
 
 func (t *Tuple) Value(key, table string) (interface{}, bool) {
-	r, ok := t.AliasValue(key)
+	r, ok := t.AffiliateRow.Value(key, table)
 	if ok {
 		return r, ok
 	}
 	return t.Message.Value(key, table)
 }
 
-func (t *Tuple) Meta(key, table string) (interface{}, bool) {
-	if key == "*" {
-		return map[string]interface{}(t.Metadata), true
-	}
-	return t.Metadata.Value(key, table)
+func (t *Tuple) All(string) (Message, bool) {
+	return t.ToMap(), true
 }
 
-func (t *Tuple) Set(col string, value interface{}) {
-	//TODO implement me
-	panic("implement me")
+func (t *Tuple) Clone() TupleRow {
+	return &Tuple{
+		Emitter:      t.Emitter,
+		Timestamp:    t.Timestamp,
+		Message:      t.Message,
+		Metadata:     t.Metadata,
+		AffiliateRow: t.AffiliateRow.Clone(),
+	}
 }
 
-func (t *Tuple) Clone() TupleRow {
-	c := &Tuple{
-		Emitter:   t.Emitter,
-		Timestamp: t.Timestamp,
+// ToMap should only use in sink.
+func (t *Tuple) ToMap() map[string]interface{} {
+	if t.AffiliateRow.IsEmpty() {
+		return t.Message
 	}
-	if t.Message != nil {
-		m := Message{}
+	if t.cachedMap == nil { // clone the message
+		m := make(map[string]interface{})
 		for k, v := range t.Message {
 			m[k] = v
 		}
-		c.Message = m
+		t.cachedMap = m
+		t.Message = t.cachedMap
 	}
-	if t.Metadata != nil {
-		md := Metadata{}
-		for k, v := range t.Metadata {
-			md[k] = v
-		}
-		c.Metadata = md
-	}
-	return c
+	t.AffiliateRow.MergeMap(t.cachedMap)
+	return t.cachedMap
 }
 
-func (t *Tuple) ToMap() map[string]interface{} {
-	return t.Message
+func (t *Tuple) Meta(key, table string) (interface{}, bool) {
+	if key == "*" {
+		return map[string]interface{}(t.Metadata), true
+	}
+	return t.Metadata.Value(key, table)
 }
 
 func (t *Tuple) GetEmitter() string {
 	return t.Emitter
 }
 
-func (t *Tuple) All(string) (Message, bool) {
-	return t.Message, true
-}
-
 func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
 	return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
 }
@@ -254,10 +318,6 @@ func (t *Tuple) GetTimestamp() int64 {
 	return t.Timestamp
 }
 
-func (t *Tuple) GetMetadata() Metadata {
-	return t.Metadata
-}
-
 func (t *Tuple) IsWatermark() bool {
 	return false
 }
@@ -308,8 +368,12 @@ func getTupleValue(tuple Row, key string, isVal bool) (interface{}, bool) {
 	}
 }
 
+func (jt *JoinTuple) GetEmitter() string {
+	return "$$JOIN"
+}
+
 func (jt *JoinTuple) Value(key, table string) (interface{}, bool) {
-	r, ok := jt.AliasValue(key)
+	r, ok := jt.AffiliateRow.Value(key, table)
 	if ok {
 		return r, ok
 	}
@@ -333,32 +397,30 @@ func (jt *JoinTuple) All(stream string) (Message, bool) {
 	return nil, false
 }
 
-// TODO deal with cascade
 func (jt *JoinTuple) Clone() TupleRow {
 	ts := make([]TupleRow, len(jt.Tuples))
 	for i, t := range jt.Tuples {
-		ts[i] = t.Clone().(TupleRow)
+		ts[i] = t
 	}
-	return &JoinTuple{Tuples: ts}
-}
-
-func (jt *JoinTuple) Set(col string, value interface{}) {
-	//TODO implement me
-	panic("implement me")
+	c := &JoinTuple{
+		Tuples:       ts,
+		AffiliateRow: jt.AffiliateRow.Clone(),
+	}
+	return c
 }
 
 func (jt *JoinTuple) ToMap() map[string]interface{} {
-	m := make(map[string]interface{})
-	for i := len(jt.Tuples) - 1; i >= 0; i-- {
-		for k, v := range jt.Tuples[i].ToMap() {
-			m[k] = v
+	if jt.cachedMap == nil { // clone the message
+		m := make(map[string]interface{})
+		for i := len(jt.Tuples) - 1; i >= 0; i-- {
+			for k, v := range jt.Tuples[i].ToMap() {
+				m[k] = v
+			}
 		}
+		jt.cachedMap = m
 	}
-	return m
-}
-
-func (jt *JoinTuple) GetEmitter() string {
-	return "$$JOIN"
+	jt.AffiliateRow.MergeMap(jt.cachedMap)
+	return jt.cachedMap
 }
 
 // GroupedTuple implementation
@@ -372,6 +434,10 @@ func (s *GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
 }
 
 func (s *GroupedTuples) Value(key, table string) (interface{}, bool) {
+	r, ok := s.AffiliateRow.Value(key, table)
+	if ok {
+		return r, ok
+	}
 	return s.Content[0].Value(key, table)
 }
 
@@ -379,6 +445,31 @@ func (s *GroupedTuples) Meta(key, table string) (interface{}, bool) {
 	return s.Content[0].Meta(key, table)
 }
 
-func (s *GroupedTuples) All(stream string) (Message, bool) {
-	return s.Content[0].All(stream)
+func (s *GroupedTuples) All(_ string) (Message, bool) {
+	return s.ToMap(), true
+}
+
+func (s *GroupedTuples) ToMap() map[string]interface{} {
+	if s.cachedMap == nil {
+		m := make(map[string]interface{})
+		for k, v := range s.Content[0].ToMap() {
+			m[k] = v
+		}
+		s.cachedMap = m
+	}
+	s.AffiliateRow.MergeMap(s.cachedMap)
+	return s.cachedMap
+}
+
+func (s *GroupedTuples) Clone() CollectionRow {
+	ts := make([]TupleRow, len(s.Content))
+	for i, t := range s.Content {
+		ts[i] = t
+	}
+	c := &GroupedTuples{
+		Content:      ts,
+		WindowRange:  s.WindowRange,
+		AffiliateRow: s.AffiliateRow.Clone(),
+	}
+	return c
 }

+ 229 - 0
internal/xsql/row_test.go

@@ -0,0 +1,229 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xsql
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"reflect"
+	"strings"
+	"sync"
+	"testing"
+)
+
+// Row valuer, wildcarder test
+// WindowTuples, JoinTuples, GroupTuples are collectionRow
+func TestCollectionRow(t *testing.T) {
+	var tests = []struct {
+		rowC     CollectionRow
+		value    []string
+		wildcard []string
+		result   []interface{} // result of valuers and wildcards
+	}{
+		{
+			rowC:     &Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
+			value:    []string{"a", "b"},
+			wildcard: []string{""},
+			result:   []interface{}{1, "2", Message{"a": 1, "b": "2"}},
+		}, {
+			rowC:     &Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "c": 3}, Alias: Alias{AliasMap: map[string]interface{}{"b": "b1"}}}},
+			value:    []string{"a", "b", "c"},
+			wildcard: []string{""},
+			result:   []interface{}{4, "b1", 3, Message{"a": 4, "b": "b1", "c": 3}},
+		}, {
+			rowC: &JoinTuple{Tuples: []TupleRow{
+				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
+				&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
+			}},
+			value:    []string{"a", "src2.a", "b", "c"},
+			wildcard: []string{"", "src1"},
+			result:   []interface{}{1, 2, "v1", "w2", Message{"a": 1, "b": "v1", "c": "w2"}, Message{"a": 1, "b": "v1"}},
+		}, {
+			rowC: &JoinTuple{Tuples: []TupleRow{
+				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
+				&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
+			}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, Alias: Alias{AliasMap: map[string]interface{}{"d": 4}}}},
+			value:    []string{"a", "src2.a", "b", "c", "d"},
+			wildcard: []string{"", "src1"},
+			result:   []interface{}{4, 2, "v1", "w2", 4, Message{"a": 4, "b": "v1", "c": "w2", "d": 4}, Message{"a": 1, "b": "v1"}},
+		}, {
+			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}},
+			value:    []string{"a", "b"},
+			wildcard: []string{""},
+			result:   []interface{}{1, "v1", Message{"a": 1, "b": "v1"}},
+		}, {
+			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, Alias: Alias{AliasMap: map[string]interface{}{"d": 4}}}},
+			value:    []string{"a", "b", "d"},
+			wildcard: []string{""},
+			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "d": 4}},
+		}, {
+			rowC:     &WindowTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, Alias: Alias{AliasMap: map[string]interface{}{"d": 4}}}},
+			value:    []string{"a", "b", "d"},
+			wildcard: []string{""},
+			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "d": 4}},
+		}, {
+			rowC: &JoinTuples{Content: []*JoinTuple{{Tuples: []TupleRow{
+				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
+				&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
+			}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, Alias: Alias{AliasMap: map[string]interface{}{"d": 4}}}},
+			value:    []string{"a", "b", "d"},
+			wildcard: []string{""},
+			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "c": "w2", "d": 4}},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	var ok bool
+	for i, tt := range tests {
+		result := make([]interface{}, len(tt.value)+len(tt.wildcard))
+		for j, v := range tt.value {
+			var key, table string
+			strs := strings.Split(v, ".")
+			if len(strs) > 1 {
+				key = strs[1]
+				table = strs[0]
+			} else {
+				key = strs[0]
+				table = ""
+			}
+			result[j], ok = tt.rowC.Value(key, table)
+			if !ok {
+				t.Errorf("%d.%d.%d: %s", i, j, 0, "Value() failed.")
+				continue
+			}
+		}
+		for j, v := range tt.wildcard {
+			result[len(tt.value)+j], ok = tt.rowC.All(v)
+			if !ok {
+				t.Errorf("%d.%d.%d: %s", i, j, 1, "Wildcard() failed.")
+				continue
+			}
+		}
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, result)
+		}
+	}
+}
+
+func TestTupleRow(t *testing.T) {
+	// boradcast(clone) -> set -> broadcast -> set -> compare
+	var tests = []struct {
+		rowO TupleRow
+		// The multiple values to set or alias; The first value is set in the first broadcast. the next values are set in the second broadcast.
+		set    [][]map[string]interface{}
+		result [][]map[string]interface{}
+	}{
+		{
+			rowO: &Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
+			set: [][]map[string]interface{}{
+				{
+					{"a": 2, "c": "3", "@d": 4},
+					{"a": 3},
+					{"a": 4, "b": "5"},
+					{"@d": 5, "e": 5},
+				},
+				{
+					{"c": "4"},
+					{"d": "4"},
+					{"a": 5, "b": "6"},
+					{"a": 6, "@b": "7"},
+				},
+			},
+			result: [][]map[string]interface{}{
+				{
+					{"a": 3, "b": "2", "c": "3", "d": 4},
+					{"a": 4, "b": "5", "c": "3", "d": 4},
+					{"a": 2, "b": "2", "c": "3", "d": 5, "e": 5},
+				}, {
+					{"a": 1, "b": "2", "c": "4", "d": "4"},
+					{"a": 5, "b": "6", "c": "4"},
+					{"a": 6, "b": "7", "c": "4"},
+				},
+			},
+		}, {
+			rowO: &JoinTuple{Tuples: []TupleRow{
+				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
+				&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
+			}},
+			set: [][]map[string]interface{}{
+				{
+					{"a": 2, "c": "3", "@d": 4},
+					{"a": 3},
+					{"a": 4, "b": "5"},
+					{"@d": 5, "e": 5},
+				},
+				{
+					{"e": "4"},
+					{"d": "4"},
+					{"a": 5, "b": "6"},
+					{"a": 6, "@b": "7"},
+				},
+			},
+			result: [][]map[string]interface{}{
+				{
+					{"a": 3, "b": "v1", "c": "3", "d": 4},
+					{"a": 4, "b": "5", "c": "3", "d": 4},
+					{"a": 2, "b": "v1", "c": "3", "d": 5, "e": 5},
+				}, {
+					{"a": 1, "b": "v1", "c": "w2", "d": "4", "e": "4"},
+					{"a": 5, "b": "6", "c": "w2", "e": "4"},
+					{"a": 6, "b": "7", "c": "w2", "e": "4"},
+				},
+			},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		var (
+			wg     sync.WaitGroup
+			result = make([][]map[string]interface{}, len(tt.set))
+		)
+		for si, set := range tt.set {
+			wg.Add(1)
+			go func(si int, set []map[string]interface{}) {
+				nr := tt.rowO.Clone()
+				for k, v := range set[0] {
+					if strings.HasPrefix(k, "@") {
+						nr.AppendAlias(k[1:], v)
+					} else {
+						nr.Set(k, v)
+					}
+				}
+				var wg2 sync.WaitGroup
+				result[si] = make([]map[string]interface{}, len(set)-1)
+				for j := 1; j < len(set); j++ {
+					wg2.Add(1)
+					go func(j int) {
+						nnr := nr.Clone()
+						for k, v := range set[j] {
+							if strings.HasPrefix(k, "@") {
+								nnr.AppendAlias(k[1:], v)
+							} else {
+								nnr.Set(k, v)
+							}
+						}
+						result[si][j-1] = nnr.ToMap()
+						wg2.Done()
+					}(j)
+				}
+				wg2.Wait()
+				wg.Done()
+			}(si, set)
+		}
+		wg.Wait()
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, result)
+		}
+	}
+}