Explorar o código

refactor(valuer):separate row and collection

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang %!s(int64=2) %!d(string=hai) anos
pai
achega
a0d31f056a

+ 6 - 6
internal/topo/operator/aggregate_operator.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -35,17 +35,17 @@ func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fu
 	grouped := data
 	var wr *xsql.WindowRange
 	if p.Dimensions != nil {
-		var ms []xsql.DataValuer
+		var ms []xsql.Row
 		switch input := data.(type) {
 		case error:
 			return input
-		case xsql.DataValuer:
+		case xsql.Row:
 			ms = append(ms, input)
 		case xsql.WindowTuplesSet:
 			if len(input.Content) != 1 {
 				return fmt.Errorf("run Group By error: the input WindowTuplesSet with multiple tuples cannot be evaluated")
 			}
-			ms = make([]xsql.DataValuer, len(input.Content[0].Tuples))
+			ms = make([]xsql.Row, len(input.Content[0].Tuples))
 			for i, m := range input.Content[0].Tuples {
 				//this is needed or it will always point to the last
 				t := m
@@ -53,7 +53,7 @@ func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fu
 			}
 			wr = input.WindowRange
 		case *xsql.JoinTupleSets:
-			ms = make([]xsql.DataValuer, len(input.Content))
+			ms = make([]xsql.Row, len(input.Content))
 			for i, m := range input.Content {
 				t := m
 				ms[i] = &t
@@ -76,7 +76,7 @@ func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fu
 				}
 			}
 			if ts, ok := result[name]; !ok {
-				result[name] = &xsql.GroupedTuples{Content: []xsql.DataValuer{m}, WindowRange: wr}
+				result[name] = &xsql.GroupedTuples{Content: []xsql.Row{m}, WindowRange: wr}
 			} else {
 				ts.Content = append(ts.Content, m)
 			}

+ 19 - 19
internal/topo/operator/aggregate_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -43,7 +43,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{&xsql.Tuple{
+					Content: []xsql.Row{&xsql.Tuple{
 						Emitter: "tbl",
 						Message: xsql.Message{
 							"abc": int64(6),
@@ -81,7 +81,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1"},
@@ -97,7 +97,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
@@ -132,7 +132,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1"},
@@ -140,7 +140,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
@@ -148,7 +148,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 3, "f1": "v1"},
@@ -182,7 +182,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter:  "src1",
 							Message:  xsql.Message{"id1": 1, "f1": "v1"},
@@ -196,7 +196,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter:  "src1",
 							Message:  xsql.Message{"id1": 2, "f1": "v2"},
@@ -235,7 +235,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
@@ -249,7 +249,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
@@ -263,7 +263,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
@@ -302,7 +302,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
@@ -317,7 +317,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
@@ -353,7 +353,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)}},
@@ -368,7 +368,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854573431)}},
@@ -401,7 +401,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1"},
@@ -409,7 +409,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
@@ -441,7 +441,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "B", Message: xsql.Message{"module": 1, "topic": "moduleB topic", "value": 1}},

+ 15 - 15
internal/topo/operator/having_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -178,7 +178,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1"},
@@ -194,7 +194,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
@@ -208,7 +208,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
@@ -224,7 +224,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 			sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having a > 100",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
@@ -244,7 +244,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
@@ -266,7 +266,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
@@ -416,7 +416,7 @@ func TestHavingPlanAlias_Apply(t *testing.T) {
 			sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having c > 1",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
@@ -428,7 +428,7 @@ func TestHavingPlanAlias_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
@@ -438,7 +438,7 @@ func TestHavingPlanAlias_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
@@ -454,7 +454,7 @@ func TestHavingPlanAlias_Apply(t *testing.T) {
 			sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having c > 1",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
@@ -470,7 +470,7 @@ func TestHavingPlanAlias_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 1}},
@@ -482,7 +482,7 @@ func TestHavingPlanAlias_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
@@ -555,7 +555,7 @@ func TestHavingPlanError(t *testing.T) {
 			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": 3},
@@ -567,7 +567,7 @@ func TestHavingPlanError(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},

+ 17 - 17
internal/topo/operator/order_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -357,7 +357,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 			sql: "SELECT abc FROM tbl group by abc ORDER BY def",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "tbl",
 							Message: xsql.Message{
@@ -370,7 +370,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "tbl",
 							Message: xsql.Message{
@@ -386,7 +386,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 ORDER BY id1 desc",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1"},
@@ -398,7 +398,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
@@ -408,7 +408,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
@@ -416,7 +416,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1"},
@@ -433,7 +433,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 			sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 ORDER BY c",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
@@ -445,7 +445,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
@@ -455,7 +455,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
@@ -463,7 +463,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
@@ -480,7 +480,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 			sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2 DESC",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
@@ -494,7 +494,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
@@ -508,7 +508,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
@@ -523,7 +523,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 			},
 			result: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
@@ -537,7 +537,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
@@ -551,7 +551,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},

+ 1 - 1
internal/topo/operator/project_operator.go

@@ -95,7 +95,7 @@ func (pp *ProjectOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fun
 	return results
 }
 
-func (pp *ProjectOp) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) *xsql.ValuerEval {
+func (pp *ProjectOp) getVE(tuple xsql.Row, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) *xsql.ValuerEval {
 	afv.SetData(agg)
 	if pp.IsAggregate {
 		return &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, fv, tuple, fv, afv, &xsql.WildcardValuer{Data: tuple})}

+ 38 - 38
internal/topo/operator/project_test.go

@@ -820,7 +820,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			sql: "SELECT abc FROM tbl group by abc",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "tbl",
 							Message: xsql.Message{
@@ -840,7 +840,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			sql: "SELECT abc FROM tbl group by abc",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "tbl",
 							Message: xsql.Message{
@@ -857,7 +857,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1"},
@@ -869,7 +869,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
@@ -888,7 +888,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1"},
@@ -900,7 +900,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id2": 2, "f1": "v2"},
@@ -917,7 +917,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
@@ -927,7 +927,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
@@ -937,7 +937,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
@@ -1031,7 +1031,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 1, "f1": "v1"},
@@ -1043,7 +1043,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.Tuple{
 							Emitter: "src1",
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
@@ -1064,7 +1064,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
@@ -1074,7 +1074,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
@@ -1084,7 +1084,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
@@ -1111,7 +1111,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
@@ -1121,7 +1121,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
@@ -1131,7 +1131,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
@@ -1391,7 +1391,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT count(*) as c, round(a) as r, window_start() as ws, window_end() as we FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
@@ -1411,7 +1411,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
@@ -1448,7 +1448,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
@@ -1464,7 +1464,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
@@ -1499,7 +1499,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
@@ -1527,7 +1527,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
@@ -1554,7 +1554,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
@@ -1576,7 +1576,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
@@ -1785,7 +1785,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
@@ -1801,7 +1801,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
@@ -1830,7 +1830,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
@@ -1846,7 +1846,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
@@ -1875,7 +1875,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
@@ -1895,7 +1895,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
@@ -1934,7 +1934,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT collect(a) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
@@ -1950,7 +1950,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
@@ -2057,7 +2057,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT deduplicate(id, true) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
@@ -2073,7 +2073,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
@@ -2177,7 +2177,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			sql: "SELECT A.module, A.topic , max(A.value), B.topic as var2, max(B.value) as max2, C.topic as var3, max(C.value) as max3 FROM A FULL JOIN B on A.module=B.module FULL JOIN C on A.module=C.module GROUP BY A.module, TUMBLINGWINDOW(ss, 10)",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "B", Message: xsql.Message{"module": 1, "topic": "moduleB topic", "value": 1}},
@@ -2277,7 +2277,7 @@ func TestProjectPlanError(t *testing.T) {
 			sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
@@ -2305,7 +2305,7 @@ func TestProjectPlanError(t *testing.T) {
 					},
 				},
 				{
-					Content: []xsql.DataValuer{
+					Content: []xsql.Row{
 						&xsql.JoinTuple{
 							Tuples: []xsql.Tuple{
 								{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},

+ 180 - 0
internal/xsql/collection.go

@@ -0,0 +1,180 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xsql
+
+import (
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"sort"
+)
+
+/**********************************
+**	Various Data Types for SQL transformation
+ */
+
+type AggregateData interface {
+	AggregateEval(expr ast.Expr, v CallValuer) []interface{}
+}
+
+type Event interface {
+	GetTimestamp() int64
+	IsWatermark() bool
+}
+
+type WindowTuples struct {
+	Emitter string
+	Tuples  []Tuple
+}
+
+type WindowRangeValuer struct {
+	*WindowRange
+}
+
+func (r *WindowRangeValuer) Value(_, _ string) (interface{}, bool) {
+	return nil, false
+}
+
+func (r *WindowRangeValuer) Meta(_, _ string) (interface{}, bool) {
+	return nil, false
+}
+
+func (r *WindowRangeValuer) AppendAlias(_ string, _ interface{}) bool {
+	return false
+}
+
+func (r *WindowRangeValuer) AliasValue(_ string) (interface{}, bool) {
+	return nil, false
+}
+
+type WindowRange struct {
+	WindowStart int64
+	WindowEnd   int64
+}
+
+func (r *WindowRange) FuncValue(key string) (interface{}, bool) {
+	switch key {
+	case "window_start":
+		return r.WindowStart, true
+	case "window_end":
+		return r.WindowEnd, true
+	default:
+		return nil, false
+	}
+}
+
+type WindowTuplesSet struct {
+	Content []WindowTuples
+	*WindowRange
+}
+
+func (w WindowTuplesSet) GetBySrc(src string) []Tuple {
+	for _, me := range w.Content {
+		if me.Emitter == src {
+			return me.Tuples
+		}
+	}
+	return nil
+}
+
+func (w WindowTuplesSet) Len() int {
+	if len(w.Content) > 0 {
+		return len(w.Content[0].Tuples)
+	}
+	return 0
+}
+func (w WindowTuplesSet) Swap(i, j int) {
+	if len(w.Content) > 0 {
+		s := w.Content[0].Tuples
+		s[i], s[j] = s[j], s[i]
+	}
+}
+func (w WindowTuplesSet) Index(i int) Valuer {
+	if len(w.Content) > 0 {
+		s := w.Content[0].Tuples
+		return &(s[i])
+	}
+	return nil
+}
+
+func (w WindowTuplesSet) AddTuple(tuple *Tuple) WindowTuplesSet {
+	found := false
+	for i, t := range w.Content {
+		if t.Emitter == tuple.Emitter {
+			t.Tuples = append(t.Tuples, *tuple)
+			found = true
+			w.Content[i] = t
+			break
+		}
+	}
+
+	if !found {
+		ets := &WindowTuples{Emitter: tuple.Emitter}
+		ets.Tuples = append(ets.Tuples, *tuple)
+		w.Content = append(w.Content, *ets)
+	}
+	return w
+}
+
+//Sort by tuple timestamp
+func (w WindowTuplesSet) Sort() {
+	for _, t := range w.Content {
+		tuples := t.Tuples
+		sort.SliceStable(tuples, func(i, j int) bool {
+			return tuples[i].Timestamp < tuples[j].Timestamp
+		})
+		t.Tuples = tuples
+	}
+}
+
+func (w WindowTuplesSet) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
+	var result []interface{}
+	if len(w.Content) != 1 { //should never happen
+		return nil
+	}
+	for _, t := range w.Content[0].Tuples {
+		result = append(result, Eval(expr, MultiValuer(&t, &WindowRangeValuer{WindowRange: w.WindowRange}, v, &WildcardValuer{&t})))
+	}
+	return result
+}
+
+func getTupleValue(tuple Tuple, key string, isVal bool) (interface{}, bool) {
+	if isVal {
+		return tuple.Value(key, "")
+	} else {
+		return tuple.Meta(key, "")
+	}
+}
+
+type JoinTupleSets struct {
+	Content []JoinTuple
+	*WindowRange
+}
+
+func (s *JoinTupleSets) Len() int           { return len(s.Content) }
+func (s *JoinTupleSets) Swap(i, j int)      { s.Content[i], s.Content[j] = s.Content[j], s.Content[i] }
+func (s *JoinTupleSets) Index(i int) Valuer { return &(s.Content[i]) }
+
+func (s *JoinTupleSets) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
+	var result []interface{}
+	for _, t := range s.Content {
+		result = append(result, Eval(expr, MultiValuer(&t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{&t})))
+	}
+	return result
+}
+
+type GroupedTuplesSet []GroupedTuples
+
+func (s GroupedTuplesSet) Len() int           { return len(s) }
+func (s GroupedTuplesSet) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
+func (s GroupedTuplesSet) Index(i int) Valuer { return s[i].Content[0] }

+ 124 - 191
internal/xsql/collections.go

@@ -17,21 +17,71 @@ package xsql
 import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"sort"
 	"strings"
 )
 
-/**********************************
-**	Various Data Types for SQL transformation
- */
+type Row interface {
+	Valuer
+	AliasValuer
+	Wildcarder
+	// Set Only for some ops like functionOp
+	Set(col string, value interface{})
 
-type AggregateData interface {
-	AggregateEval(expr ast.Expr, v CallValuer) []interface{}
+	// Clone when broadcast to make sure each row are dealt single threaded
+	Clone() Row
+	// ToMap converts the row to a map to export to other systems
+	ToMap() map[string]interface{}
 }
 
-// Message is a valuer that substitutes values for the mapped interface.
+// Collection A collection of rows as a table. It is used for window, join, group by, etc.
+type Collection interface {
+	Index(index int) Row
+	Len() int
+}
+
+// Message is a valuer that substitutes values for the mapped interface. It is the basic type for data events.
 type Message map[string]interface{}
 
+var _ Valuer = Message{}
+
+type Metadata Message
+
+// Alias will not need to convert cases
+type Alias struct {
+	AliasMap map[string]interface{}
+}
+
+// All rows definitions, watermark, barrier
+
+// Tuple The input row, produced by the source
+type Tuple struct {
+	Emitter   string
+	Message   Message // immutable
+	Timestamp int64
+	Metadata  Metadata // immutable
+	Alias
+}
+
+var _ Row = &Tuple{}
+
+// JoinTuple is a row produced by a join operation
+type JoinTuple struct {
+	Tuples []Tuple
+	Alias
+}
+
+var _ Row = &JoinTuple{}
+
+// GroupedTuples is a collection of tuples grouped by a key
+type GroupedTuples struct {
+	Content []Row
+	*WindowRange
+}
+
+var _ Row = &GroupedTuples{}
+
+// Message implementation
+
 func ToMessage(input interface{}) (Message, bool) {
 	var result Message
 	switch m := input.(type) {
@@ -47,7 +97,6 @@ func ToMessage(input interface{}) (Message, bool) {
 	return result, true
 }
 
-// Value returns the value for a key in the Message.
 func (m Message) Value(key, _ string) (interface{}, bool) {
 	if v, ok := m[key]; ok {
 		return v, ok
@@ -78,21 +127,7 @@ func (m Message) Meta(key, table string) (interface{}, bool) {
 	return m.Value(key, table)
 }
 
-func (m Message) AppendAlias(k string, v interface{}) bool {
-	conf.Log.Debugf("append alias %s:%v\n", k, v)
-	return false
-}
-
-func (m Message) AliasValue(_ string) (interface{}, bool) {
-	return nil, false
-}
-
-type Event interface {
-	GetTimestamp() int64
-	IsWatermark() bool
-}
-
-type Metadata Message
+// MetaData implementation
 
 func (m Metadata) Value(key, table string) (interface{}, bool) {
 	msg := Message(m)
@@ -107,10 +142,7 @@ func (m Metadata) Meta(key, table string) (interface{}, bool) {
 	return msg.Meta(key, table)
 }
 
-// Alias alias will not need to convert cases
-type Alias struct {
-	AliasMap map[string]interface{}
-}
+// Alias implementation
 
 func (a *Alias) AppendAlias(key string, value interface{}) bool {
 	if a.AliasMap == nil {
@@ -128,13 +160,7 @@ func (a *Alias) AliasValue(key string) (interface{}, bool) {
 	return v, ok
 }
 
-type Tuple struct {
-	Emitter   string
-	Message   Message // immutable
-	Timestamp int64
-	Metadata  Metadata // immutable
-	Alias
-}
+// Tuple implementation
 
 func (t *Tuple) Value(key, table string) (interface{}, bool) {
 	r, ok := t.AliasValue(key)
@@ -151,27 +177,12 @@ func (t *Tuple) Meta(key, table string) (interface{}, bool) {
 	return t.Metadata.Value(key, table)
 }
 
-func (t *Tuple) All(string) (Message, bool) {
-	return t.Message, true
+func (t *Tuple) Set(col string, value interface{}) {
+	//TODO implement me
+	panic("implement me")
 }
 
-func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
-	return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
-}
-
-func (t *Tuple) GetTimestamp() int64 {
-	return t.Timestamp
-}
-
-func (t *Tuple) GetMetadata() Metadata {
-	return t.Metadata
-}
-
-func (t *Tuple) IsWatermark() bool {
-	return false
-}
-
-func (t *Tuple) Clone() DataValuer {
+func (t *Tuple) Clone() Row {
 	c := &Tuple{
 		Emitter:   t.Emitter,
 		Timestamp: t.Timestamp,
@@ -193,126 +204,32 @@ func (t *Tuple) Clone() DataValuer {
 	return c
 }
 
-type WindowTuples struct {
-	Emitter string
-	Tuples  []Tuple
-}
-
-type WindowRangeValuer struct {
-	*WindowRange
-}
-
-func (r *WindowRangeValuer) Value(_, _ string) (interface{}, bool) {
-	return nil, false
-}
-
-func (r *WindowRangeValuer) Meta(_, _ string) (interface{}, bool) {
-	return nil, false
-}
-
-func (r *WindowRangeValuer) AppendAlias(_ string, _ interface{}) bool {
-	return false
-}
-
-func (r *WindowRangeValuer) AliasValue(_ string) (interface{}, bool) {
-	return nil, false
-}
-
-type WindowRange struct {
-	WindowStart int64
-	WindowEnd   int64
+func (t *Tuple) ToMap() map[string]interface{} {
+	//TODO implement me
+	panic("implement me")
 }
 
-func (r *WindowRange) FuncValue(key string) (interface{}, bool) {
-	switch key {
-	case "window_start":
-		return r.WindowStart, true
-	case "window_end":
-		return r.WindowEnd, true
-	default:
-		return nil, false
-	}
-}
-
-type WindowTuplesSet struct {
-	Content []WindowTuples
-	*WindowRange
-}
-
-func (w WindowTuplesSet) GetBySrc(src string) []Tuple {
-	for _, me := range w.Content {
-		if me.Emitter == src {
-			return me.Tuples
-		}
-	}
-	return nil
+func (t *Tuple) All(string) (Message, bool) {
+	return t.Message, true
 }
 
-func (w WindowTuplesSet) Len() int {
-	if len(w.Content) > 0 {
-		return len(w.Content[0].Tuples)
-	}
-	return 0
-}
-func (w WindowTuplesSet) Swap(i, j int) {
-	if len(w.Content) > 0 {
-		s := w.Content[0].Tuples
-		s[i], s[j] = s[j], s[i]
-	}
-}
-func (w WindowTuplesSet) Index(i int) Valuer {
-	if len(w.Content) > 0 {
-		s := w.Content[0].Tuples
-		return &(s[i])
-	}
-	return nil
+func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
+	return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
 }
 
-func (w WindowTuplesSet) AddTuple(tuple *Tuple) WindowTuplesSet {
-	found := false
-	for i, t := range w.Content {
-		if t.Emitter == tuple.Emitter {
-			t.Tuples = append(t.Tuples, *tuple)
-			found = true
-			w.Content[i] = t
-			break
-		}
-	}
-
-	if !found {
-		ets := &WindowTuples{Emitter: tuple.Emitter}
-		ets.Tuples = append(ets.Tuples, *tuple)
-		w.Content = append(w.Content, *ets)
-	}
-	return w
+func (t *Tuple) GetTimestamp() int64 {
+	return t.Timestamp
 }
 
-//Sort by tuple timestamp
-func (w WindowTuplesSet) Sort() {
-	for _, t := range w.Content {
-		tuples := t.Tuples
-		sort.SliceStable(tuples, func(i, j int) bool {
-			return tuples[i].Timestamp < tuples[j].Timestamp
-		})
-		t.Tuples = tuples
-	}
+func (t *Tuple) GetMetadata() Metadata {
+	return t.Metadata
 }
 
-func (w WindowTuplesSet) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
-	var result []interface{}
-	if len(w.Content) != 1 { //should never happen
-		return nil
-	}
-	for _, t := range w.Content[0].Tuples {
-		result = append(result, Eval(expr, MultiValuer(&t, &WindowRangeValuer{WindowRange: w.WindowRange}, v, &WildcardValuer{&t})))
-	}
-	return result
+func (t *Tuple) IsWatermark() bool {
+	return false
 }
 
-type JoinTuple struct {
-	Tuples []Tuple
-	Alias
-}
+// JoinTuple implementation
 
 func (jt *JoinTuple) AddTuple(tuple Tuple) {
 	jt.Tuples = append(jt.Tuples, tuple)
@@ -324,14 +241,6 @@ func (jt *JoinTuple) AddTuples(tuples []Tuple) {
 	}
 }
 
-func getTupleValue(tuple Tuple, key string, isVal bool) (interface{}, bool) {
-	if isVal {
-		return tuple.Value(key, "")
-	} else {
-		return tuple.Meta(key, "")
-	}
-}
-
 func (jt *JoinTuple) doGetValue(key, table string, isVal bool) (interface{}, bool) {
 	tuples := jt.Tuples
 	if table == "" {
@@ -391,7 +300,7 @@ func (jt *JoinTuple) All(stream string) (Message, bool) {
 	return nil, false
 }
 
-func (jt *JoinTuple) Clone() DataValuer {
+func (jt *JoinTuple) Clone() Row {
 	ts := make([]Tuple, len(jt.Tuples))
 	for i, t := range jt.Tuples {
 		ts[i] = *(t.Clone().(*Tuple))
@@ -399,27 +308,17 @@ func (jt *JoinTuple) Clone() DataValuer {
 	return &JoinTuple{Tuples: ts}
 }
 
-type JoinTupleSets struct {
-	Content []JoinTuple
-	*WindowRange
+func (jt *JoinTuple) Set(col string, value interface{}) {
+	//TODO implement me
+	panic("implement me")
 }
 
-func (s *JoinTupleSets) Len() int           { return len(s.Content) }
-func (s *JoinTupleSets) Swap(i, j int)      { s.Content[i], s.Content[j] = s.Content[j], s.Content[i] }
-func (s *JoinTupleSets) Index(i int) Valuer { return &(s.Content[i]) }
-
-func (s *JoinTupleSets) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
-	var result []interface{}
-	for _, t := range s.Content {
-		result = append(result, Eval(expr, MultiValuer(&t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{&t})))
-	}
-	return result
+func (jt *JoinTuple) ToMap() map[string]interface{} {
+	//TODO implement me
+	panic("implement me")
 }
 
-type GroupedTuples struct {
-	Content []DataValuer
-	*WindowRange
-}
+// GroupedTuple implementation
 
 func (s GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
 	var result []interface{}
@@ -429,8 +328,42 @@ func (s GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
 	return result
 }
 
-type GroupedTuplesSet []GroupedTuples
+func (s GroupedTuples) Value(key, table string) (interface{}, bool) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (s GroupedTuples) Meta(key, table string) (interface{}, bool) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (s GroupedTuples) AliasValue(name string) (interface{}, bool) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (s GroupedTuples) Set(col string, value interface{}) {
+	//TODO implement me
+	panic("implement me")
+}
 
-func (s GroupedTuplesSet) Len() int           { return len(s) }
-func (s GroupedTuplesSet) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
-func (s GroupedTuplesSet) Index(i int) Valuer { return s[i].Content[0] }
+func (s GroupedTuples) AppendAlias(key string, value interface{}) bool {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (s GroupedTuples) Clone() Row {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (s GroupedTuples) ToMap() map[string]interface{} {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (s GroupedTuples) All(stream string) (Message, bool) {
+	//TODO implement me
+	panic("implement me")
+}

+ 25 - 46
internal/xsql/valuer.go

@@ -35,8 +35,14 @@ type Valuer interface {
 	// Value returns the value and existence flag for a given key.
 	Value(key, table string) (interface{}, bool)
 	Meta(key, table string) (interface{}, bool)
-	AppendAlias(key string, value interface{}) bool
+}
+
+// AliasValuer is used to calculate and cache the alias value
+type AliasValuer interface {
+	// AliasValue Get the value of alias
 	AliasValue(name string) (interface{}, bool)
+	// AppendAlias set the alias result
+	AppendAlias(key string, value interface{}) bool
 }
 
 // CallValuer implements the Call method for evaluating function calls.
@@ -63,12 +69,6 @@ type Wildcarder interface {
 	All(stream string) (Message, bool)
 }
 
-type DataValuer interface {
-	Valuer
-	Wildcarder
-	Clone() DataValuer
-}
-
 type WildcardValuer struct {
 	Data Wildcarder
 }
@@ -262,14 +262,6 @@ func validate(t string, v interface{}) error {
 	}
 }
 
-type EvalResultMessage struct {
-	Emitter string
-	Result  interface{}
-	Message Message
-}
-
-type ResultsAndMessages []EvalResultMessage
-
 // Eval evaluates expr against a map.
 func Eval(expr ast.Expr, m Valuer) interface{} {
 	eval := ValuerEval{Valuer: m}
@@ -313,8 +305,10 @@ func (a multiValuer) Meta(key, table string) (interface{}, bool) {
 
 func (a multiValuer) AppendAlias(key string, value interface{}) bool {
 	for _, valuer := range a {
-		if ok := valuer.AppendAlias(key, value); ok {
-			return true
+		if vv, ok := valuer.(AliasValuer); ok {
+			if ok := vv.AppendAlias(key, value); ok {
+				return true
+			}
 		}
 	}
 	return false
@@ -322,8 +316,8 @@ func (a multiValuer) AppendAlias(key string, value interface{}) bool {
 
 func (a multiValuer) AliasValue(key string) (interface{}, bool) {
 	for _, valuer := range a {
-		if v, ok := valuer.AliasValue(key); ok {
-			return v, true
+		if vv, ok := valuer.(AliasValuer); ok {
+			return vv.AliasValue(key)
 		}
 	}
 	return nil, false
@@ -403,6 +397,7 @@ func (ber *BracketEvalResult) isIndex() bool {
 }
 
 // Eval evaluates an expression and returns a value.
+// map the expression to the correct valuer
 func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 	if expr == nil {
 		return nil
@@ -518,10 +513,18 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 			t, n string
 		)
 		if expr.IsAlias() {
-			val, ok := v.Valuer.AliasValue(expr.Name)
-			if ok {
-				return val
+			if valuer, ok := v.Valuer.(AliasValuer); ok {
+				val, ok := valuer.AliasValue(expr.Name)
+				if ok {
+					return val
+				} else {
+					r := v.Eval(expr.Expression)
+					// TODO possible performance elevation to eliminate this cal
+					valuer.AppendAlias(expr.Name, r)
+					return r
+				}
 			}
+
 		} else if expr.StreamName == ast.DefaultStream {
 			n = expr.Name
 		} else {
@@ -534,12 +537,6 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 				return val
 			}
 		}
-		if expr.IsAlias() {
-			r := v.Eval(expr.Expression)
-			// TODO possible performance elevation to eliminate this cal
-			v.Valuer.AppendAlias(expr.Name, r)
-			return r
-		}
 		return nil
 	case *ast.MetaRef:
 		if expr.StreamName == "" || expr.StreamName == ast.DefaultStream {
@@ -652,24 +649,6 @@ func (v *ValuerEval) evalValueSet(expr *ast.ValueSetExpr) interface{} {
 	return nil
 }
 
-func isBlank(value reflect.Value) bool {
-	switch value.Kind() {
-	case reflect.String:
-		return value.Len() == 0
-	case reflect.Bool:
-		return !value.Bool()
-	case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
-		return value.Int() == 0
-	case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
-		return value.Uint() == 0
-	case reflect.Float32, reflect.Float64:
-		return value.Float() == 0
-	case reflect.Interface, reflect.Ptr:
-		return value.IsNil()
-	}
-	return reflect.DeepEqual(value.Interface(), reflect.Zero(value.Type()).Interface())
-}
-
 func (v *ValuerEval) evalSetsExpr(lhs interface{}, op ast.Token, rhsSet interface{}) interface{} {
 	switch op {
 	/*Semantic rules