Browse Source

Merge pull request #270 from emqx/alias

Aggregate alias
jinfahua 4 years atrás
parent
commit
531abf2a7c

+ 102 - 44
xsql/plans/aggregate_operator.go

@@ -8,66 +8,124 @@ import (
 
 type AggregatePlan struct {
 	Dimensions xsql.Dimensions
+	Alias      xsql.Fields
 }
 
 /**
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: xsql.GroupedTuplesSet
  */
-func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
+func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("aggregate plan receive %s", data)
-	var ms []xsql.DataValuer
-	switch input := data.(type) {
-	case error:
-		return input
-	case xsql.DataValuer:
-		ms = append(ms, input)
-	case xsql.WindowTuplesSet:
-		if len(input) != 1 {
-			return fmt.Errorf("run Group By error: the input WindowTuplesSet with multiple tuples cannot be evaluated")
-		}
-		ms = make([]xsql.DataValuer, len(input[0].Tuples))
-		for i, m := range input[0].Tuples {
-			//this is needed or it will always point to the last
-			t := m
-			ms[i] = &t
-		}
-	case xsql.JoinTupleSets:
-		ms = make([]xsql.DataValuer, len(input))
-		for i, m := range input {
-			t := m
-			ms[i] = &t
+	grouped := data
+	if p.Dimensions != nil {
+		var ms []xsql.DataValuer
+		switch input := data.(type) {
+		case error:
+			return input
+		case xsql.DataValuer:
+			ms = append(ms, input)
+		case xsql.WindowTuplesSet:
+			if len(input) != 1 {
+				return fmt.Errorf("run Group By error: the input WindowTuplesSet with multiple tuples cannot be evaluated")
+			}
+			ms = make([]xsql.DataValuer, len(input[0].Tuples))
+			for i, m := range input[0].Tuples {
+				//this is needed or it will always point to the last
+				t := m
+				ms[i] = &t
+			}
+		case xsql.JoinTupleSets:
+			ms = make([]xsql.DataValuer, len(input))
+			for i, m := range input {
+				t := m
+				ms[i] = &t
+			}
+		default:
+			return fmt.Errorf("run Group By error: invalid input %[1]T(%[1]v)", input)
 		}
-	default:
-		return fmt.Errorf("run Group By error: invalid input %[1]T(%[1]v)", input)
-	}
 
-	result := make(map[string]xsql.GroupedTuples)
-	for _, m := range ms {
-		var name string
-		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, fv)}
-		for _, d := range p.Dimensions {
-			r := ve.Eval(d.Expr)
-			if _, ok := r.(error); ok {
-				return fmt.Errorf("run Group By error: %s", r)
+		result := make(map[string]xsql.GroupedTuples)
+		for _, m := range ms {
+			var name string
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, fv)}
+			for _, d := range p.Dimensions {
+				r := ve.Eval(d.Expr)
+				if _, ok := r.(error); ok {
+					return fmt.Errorf("run Group By error: %s", r)
+				} else {
+					name += fmt.Sprintf("%v,", r)
+				}
+			}
+			if ts, ok := result[name]; !ok {
+				result[name] = xsql.GroupedTuples{m}
 			} else {
-				name += fmt.Sprintf("%v,", r)
+				result[name] = append(ts, m)
 			}
 		}
-		if ts, ok := result[name]; !ok {
-			result[name] = xsql.GroupedTuples{m}
+		if len(result) > 0 {
+			g := make([]xsql.GroupedTuples, 0, len(result))
+			for _, v := range result {
+				g = append(g, v)
+			}
+			grouped = xsql.GroupedTuplesSet(g)
 		} else {
-			result[name] = append(ts, m)
+			grouped = nil
 		}
 	}
-	if len(result) > 0 {
-		g := make([]xsql.GroupedTuples, 0, len(result))
-		for _, v := range result {
-			g = append(g, v)
+	if len(p.Alias) != 0 {
+		switch input := grouped.(type) {
+		case *xsql.Tuple:
+			if err := p.calculateAlias(input, input, fv, afv); err != nil {
+				return fmt.Errorf("run Aggregate function alias error: %s", err)
+			}
+		case xsql.GroupedTuplesSet:
+			for _, v := range input {
+				if err := p.calculateAlias(v[0], v, fv, afv); err != nil {
+					return fmt.Errorf("run Aggregate function alias error: %s", err)
+				}
+			}
+		case xsql.WindowTuplesSet:
+			if len(input) != 1 {
+				return fmt.Errorf("run Aggregate function alias error: the input WindowTuplesSet with multiple tuples cannot be evaluated)")
+			}
+			if err := p.calculateAlias(&input[0].Tuples[0], input, fv, afv); err != nil {
+				return fmt.Errorf("run Aggregate function alias error: %s", err)
+			}
+		case xsql.JoinTupleSets:
+			if err := p.calculateAlias(&input[0], input, fv, afv); err != nil {
+				return fmt.Errorf("run Aggregate function alias error: %s", err)
+			}
+		default:
+			return fmt.Errorf("run Aggregate function alias error: invalid input %[1]T(%[1]v)", input)
 		}
-		return xsql.GroupedTuplesSet(g)
-	} else {
-		return nil
 	}
+
+	return grouped
+}
+
+func (p *AggregatePlan) calculateAlias(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) error {
+	afv.SetData(agg)
+	ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, fv, tuple, fv, afv, &xsql.WildcardValuer{Data: tuple})}
+	for _, f := range p.Alias {
+		v := ve.Eval(f.Expr)
+		err := setTuple(tuple, f.AName, v)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func setTuple(tuple xsql.DataValuer, name string, value interface{}) error {
+	switch t := tuple.(type) {
+	case *xsql.Tuple:
+		t.Message[name] = value
+	case *xsql.JoinTuple:
+		t.Tuples[0].Message[name] = value
+	default:
+		return fmt.Errorf("invalid tuple to set alias")
+	}
+	return nil
 }

+ 373 - 0
xsql/plans/aggregate_test.go

@@ -334,6 +334,379 @@ func TestAggregatePlan_Apply(t *testing.T) {
 	}
 }
 
+func TestAggregatePlanGroupAlias_Apply(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result xsql.GroupedTuplesSet
+	}{
+		{
+			sql: "SELECT count(*) as c FROM tbl group by abc",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(6),
+					"def": "hello",
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "tbl",
+						Message: xsql.Message{
+							"abc": int64(6),
+							"def": "hello",
+							"c":   1,
+						},
+					},
+				},
+			},
+		},
+
+		{
+			sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+			},
+		},
+		{
+			sql: "SELECT abc, count(*) as c FROM src1 GROUP BY id1, TUMBLINGWINDOW(ss, 10), f1",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 1},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1", "c": 1},
+					},
+				},
+			},
+		},
+		{
+			sql: "SELECT count(*) as c FROM src1 GROUP BY meta(topic), TUMBLINGWINDOW(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter:  "src1",
+							Message:  xsql.Message{"id1": 1, "f1": "v1"},
+							Metadata: xsql.Metadata{"topic": "topic1"},
+						}, {
+							Emitter:  "src1",
+							Message:  xsql.Message{"id1": 2, "f1": "v2"},
+							Metadata: xsql.Metadata{"topic": "topic2"},
+						}, {
+							Emitter:  "src1",
+							Message:  xsql.Message{"id1": 3, "f1": "v1"},
+							Metadata: xsql.Metadata{"topic": "topic1"},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter:  "src1",
+						Message:  xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+						Metadata: xsql.Metadata{"topic": "topic1"},
+					},
+					&xsql.Tuple{
+						Emitter:  "src1",
+						Message:  xsql.Message{"id1": 3, "f1": "v1"},
+						Metadata: xsql.Metadata{"topic": "topic1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter:  "src1",
+						Message:  xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+						Metadata: xsql.Metadata{"topic": "topic2"},
+					},
+				},
+			},
+		},
+		{
+			sql: "SELECT count(*) as c FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "c": 1}},
+							{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+						},
+					},
+				},
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1}},
+							{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
+						},
+					},
+				},
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1", "c": 1}},
+						},
+					},
+				},
+			},
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestFilterPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+		var aggregateAlias xsql.Fields
+		for _, f := range stmt.Fields {
+			if f.AName != "" {
+				if xsql.HasAggFuncs(f.Expr) {
+					aggregateAlias = append(aggregateAlias, f)
+				}
+			}
+		}
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
+		result := pp.Apply(ctx, tt.data, fv, afv)
+		gr, ok := result.(xsql.GroupedTuplesSet)
+		if !ok {
+			t.Errorf("result is not GroupedTuplesSet")
+		}
+		if len(tt.result) != len(gr) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, gr)
+		}
+
+		for _, r := range tt.result {
+			matched := false
+			for _, gre := range gr {
+				if reflect.DeepEqual(r, gre) {
+					matched = true
+				}
+			}
+			if !matched {
+				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
+			}
+		}
+	}
+}
+
+func TestAggregatePlanAlias_Apply(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result interface{}
+	}{
+		{
+			sql: "SELECT count(*) as c FROM demo",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(6),
+					"def": "hello",
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(6),
+					"def": "hello",
+					"c":   1,
+				},
+			},
+		},
+		{
+			sql: `SELECT count(*) as c FROM src1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 5, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1", "c": 3},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 5, "f1": "v1"},
+						},
+					},
+				},
+			},
+		}, {
+			sql: "SELECT count(*) as c FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+						{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 3}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+						{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+					},
+				},
+			},
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestFilterPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+		var aggregateAlias xsql.Fields
+		for _, f := range stmt.Fields {
+			if f.AName != "" {
+				if xsql.HasAggFuncs(f.Expr) {
+					aggregateAlias = append(aggregateAlias, f)
+				}
+			}
+		}
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
+		result := pp.Apply(ctx, tt.data, fv, afv)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		}
+	}
+}
+
 func TestAggregatePlanError(t *testing.T) {
 	tests := []struct {
 		sql    string

+ 11 - 17
xsql/plans/having_operator.go

@@ -41,25 +41,19 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fun
 			return fmt.Errorf("run Having error: input WindowTuplesSet with multiple tuples cannot be evaluated")
 		}
 		ms := input[0].Tuples
-		r := ms[:0]
+		v := ms[0]
 		afv.SetData(input)
-		for _, v := range ms {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, fv, &v, fv, afv, &xsql.WildcardValuer{Data: &v})}
-			result := ve.Eval(p.Condition)
-			switch val := result.(type) {
-			case error:
-				return fmt.Errorf("run Having error: %s", val)
-			case bool:
-				if val {
-					r = append(r, v)
-				}
-			default:
-				return fmt.Errorf("run Having error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
+		ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, fv, &v, fv, afv, &xsql.WildcardValuer{Data: &v})}
+		result := ve.Eval(p.Condition)
+		switch val := result.(type) {
+		case error:
+			return fmt.Errorf("run Having error: %s", val)
+		case bool:
+			if val {
+				return input
 			}
-		}
-		if len(r) > 0 {
-			input[0].Tuples = r
-			return input
+		default:
+			return fmt.Errorf("run Having error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
 		}
 	case xsql.JoinTupleSets:
 		ms := input

+ 153 - 0
xsql/plans/having_test.go

@@ -270,6 +270,159 @@ func TestHavingPlan_Apply(t *testing.T) {
 	}
 }
 
+func TestHavingPlanAlias_Apply(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result interface{}
+	}{
+		{
+			sql: `SELECT avg(id1) as a FROM src1 HAVING a > 1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1", "a": 8 / 3},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 5, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1", "a": 8 / 3},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 5, "f1": "v1"},
+						},
+					},
+				},
+			},
+		},
+
+		{
+			sql: `SELECT sum(id1) as s FROM src1 HAVING s > 1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1", "s": 1},
+						},
+					},
+				},
+			},
+			result: nil,
+		}, {
+			sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having c > 1",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+			},
+		}, {
+			sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having c > 1",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+							{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+						},
+					},
+				},
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 1}},
+							{Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+							{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+						},
+					},
+				},
+			},
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		pp := &HavingPlan{Condition: stmt.Having}
+		result := pp.Apply(ctx, tt.data, fv, afv)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		}
+	}
+}
+
 func TestHavingPlanError(t *testing.T) {
 	var tests = []struct {
 		sql    string

+ 39 - 0
xsql/plans/order_test.go

@@ -369,6 +369,45 @@ func TestOrderPlan_Apply(t *testing.T) {
 			},
 		},
 		{
+			sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 ORDER BY c",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+			},
+		},
+		{
 			sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2 DESC",
 			data: xsql.GroupedTuplesSet{
 				{

+ 9 - 11
xsql/plans/preprocessor.go

@@ -15,14 +15,14 @@ import (
 
 type Preprocessor struct {
 	streamStmt      *xsql.StreamStmt
-	fields          xsql.Fields
+	aliasFields     xsql.Fields
 	isEventTime     bool
 	timestampField  string
 	timestampFormat string
 }
 
 func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocessor, error) {
-	p := &Preprocessor{streamStmt: s, fields: fs, isEventTime: iet}
+	p := &Preprocessor{streamStmt: s, aliasFields: fs, isEventTime: iet}
 	if iet {
 		if tf, ok := s.Options["TIMESTAMP"]; ok {
 			p.timestampField = tf
@@ -64,15 +64,13 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.F
 
 	//If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
 	//Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
-	for _, f := range p.fields {
-		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv)}
-			v := ve.Eval(f.Expr)
-			if _, ok := v.(error); ok {
-				return v
-			} else {
-				result[strings.ToLower(f.AName)] = v
-			}
+	for _, f := range p.aliasFields {
+		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv)}
+		v := ve.Eval(f.Expr)
+		if _, ok := v.(error); ok {
+			return v
+		} else {
+			result[strings.ToLower(f.AName)] = v
 		}
 	}
 

+ 1 - 1
xsql/plans/project_operator.go

@@ -96,7 +96,7 @@ func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) (map[string]inter
 	result := make(map[string]interface{})
 	for _, f := range fs {
 		//Avoid to re-evaluate for non-agg field has alias name, which was already evaluated in pre-processor operator.
-		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) && !isTest {
+		if f.AName != "" && !isTest {
 			fr := &xsql.FieldRef{StreamName: "", Name: f.AName}
 			v := ve.Eval(fr)
 			if e, ok := v.(error); ok {

+ 96 - 9
xsql/plans/project_test.go

@@ -1174,7 +1174,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				{
 					&xsql.JoinTuple{
 						Tuples: []xsql.Tuple{
-							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
 							{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
 						},
 					},
@@ -1188,7 +1188,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				{
 					&xsql.JoinTuple{
 						Tuples: []xsql.Tuple{
-							{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
+							{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
 							{Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
 						},
 					},
@@ -1209,7 +1209,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			}},
 		},
 		{
-			sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
+			sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
 					&xsql.JoinTuple{
@@ -1259,7 +1259,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			}},
 		},
 		{
-			sql: "SELECT max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
+			sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
 					&xsql.JoinTuple{
@@ -1303,7 +1303,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			}},
 		},
 		{
-			sql: "SELECT min(a) as min FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
+			sql: "SELECT min(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.JoinTupleSets{
 				xsql.JoinTuple{
 					Tuples: []xsql.Tuple{
@@ -1329,7 +1329,33 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"min": 68.55,
 			}},
 		}, {
-			sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
+			sql: "SELECT min(a) as m FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "m": 68.55}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+						{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+					},
+				},
+			},
+
+			result: []map[string]interface{}{{
+				"m": 68.55,
+			}},
+		}, {
+			sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
 					Emitter: "test",
@@ -1351,7 +1377,29 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"sum": float64(123203),
 			}},
 		}, {
-			sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
+			sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "test",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"a": 53, "s": 123203},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 27},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 123123},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"s": float64(123203),
+			}},
+		}, {
+			sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
 					Emitter: "test",
@@ -1374,7 +1422,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			}},
 		},
 		{
-			sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
+			sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
 					&xsql.JoinTuple{
@@ -1406,6 +1454,46 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 			},
 			result: []map[string]interface{}{{
+				"count": float64(2),
+				"meta":  "devicea",
+			}, {
+				"count": float64(2),
+				"meta":  "devicec",
+			}},
+		},
+		{
+			sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
+							{Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+							{Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
+						},
+					},
+				},
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
+							{Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
+							{Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
 				"c": float64(2),
 				"d": "devicea",
 			}, {
@@ -1424,7 +1512,6 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			t.Error(err)
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true}
-		pp.isTest = true
 		fv, afv := xsql.NewAggregateFunctionValuers()
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}

+ 14 - 4
xsql/processors/xsql_processor.go

@@ -406,12 +406,22 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			}
 			defer store.Close()
 
+			var alias, aggregateAlias xsql.Fields
+			for _, f := range selectStmt.Fields {
+				if f.AName != "" {
+					if !xsql.HasAggFuncs(f.Expr) {
+						alias = append(alias, f)
+					} else {
+						aggregateAlias = append(aggregateAlias, f)
+					}
+				}
+			}
 			for i, s := range streamsFromStmt {
 				streamStmt, err := GetStream(store, s)
 				if err != nil {
 					return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
 				}
-				pp, err := plans.NewPreprocessor(streamStmt, selectStmt.Fields, isEventTime)
+				pp, err := plans.NewPreprocessor(streamStmt, alias, isEventTime)
 				if err != nil {
 					return nil, nil, err
 				}
@@ -459,10 +469,10 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			}
 
 			var ds xsql.Dimensions
-			if dimensions != nil {
+			if dimensions != nil || len(aggregateAlias) > 0 {
 				ds = dimensions.GetGroups()
-				if ds != nil && len(ds) > 0 {
-					aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds}, "aggregate", bufferLength)
+				if (ds != nil && len(ds) > 0) || len(aggregateAlias) > 0 {
+					aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds, Alias: aggregateAlias}, "aggregate", bufferLength)
 					aggregateOp.SetConcurrency(concurrency)
 					tp.AddOperator(inputs, aggregateOp)
 					inputs = []api.Emitter{aggregateOp}

+ 50 - 0
xsql/processors/xsql_processor_test.go

@@ -1895,6 +1895,56 @@ func TestWindow(t *testing.T) {
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(3),
 			},
+		}, {
+			name: `rule8`,
+			sql:  `SELECT color, ts, count(*) as c FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
+			size: 5,
+			r: [][]map[string]interface{}{
+				{{
+					"color": "red",
+					"ts":    float64(1541152486013),
+					"c":     float64(2),
+				}},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_preprocessor_demo_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(0),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(1),
+				"op_project_0_records_out_total":  int64(1),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(1),
+				"sink_mockSink_0_records_out_total": int64(1),
+
+				"source_demo_0_exceptions_total":  int64(0),
+				"source_demo_0_records_in_total":  int64(5),
+				"source_demo_0_records_out_total": int64(5),
+
+				"op_window_0_exceptions_total":   int64(0),
+				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_records_in_total":   int64(5),
+				"op_window_0_records_out_total":  int64(3),
+
+				"op_filter_0_exceptions_total":   int64(0),
+				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_records_in_total":   int64(3),
+				"op_filter_0_records_out_total":  int64(2),
+
+				"op_aggregate_0_exceptions_total":   int64(0),
+				"op_aggregate_0_process_latency_ms": int64(0),
+				"op_aggregate_0_records_in_total":   int64(2),
+				"op_aggregate_0_records_out_total":  int64(2),
+
+				"op_having_0_exceptions_total":   int64(0),
+				"op_having_0_process_latency_ms": int64(0),
+				"op_having_0_records_in_total":   int64(2),
+				"op_having_0_records_out_total":  int64(1),
+			},
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))