瀏覽代碼

support for having clause

RockyJin 5 年之前
父節點
當前提交
73b1d91bdd
共有 4 個文件被更改,包括 238 次插入1 次删除
  1. 1 1
      docs/sqls/query_language_elements.md
  2. 75 0
      xsql/plans/having_operator.go
  3. 156 0
      xsql/plans/having_test.go
  4. 6 0
      xsql/processors/xsql_processor.go

+ 1 - 1
docs/sqls/query_language_elements.md

@@ -254,7 +254,7 @@ GROUP BY column_name
 
 ### HAVING
 
-Specifies a search condition for a group or an aggregate. HAVING can be used only with the SELECT expression. HAVING is typically used in a GROUP BY clause. When GROUP BY is not used, HAVING behaves like a WHERE clause.
+The HAVING clause was added to SQL because the WHERE keyword could not be used with aggregate functions. Specifies a search condition for a group or an aggregate. HAVING can be used only with the SELECT expression. HAVING is typically used in a GROUP BY clause. 
 
 #### Syntax
 

+ 75 - 0
xsql/plans/having_operator.go

@@ -0,0 +1,75 @@
+package plans
+
+import (
+	"context"
+	"engine/common"
+	"engine/xsql"
+)
+
+type HavingPlan struct {
+	Condition xsql.Expr
+}
+
+func (p *HavingPlan) Apply(ctx context.Context, data interface{}) interface{} {
+	log := common.GetLogger(ctx)
+	log.Debugf("having plan receive %s", data)
+	switch input := data.(type) {
+	case xsql.Valuer:
+		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, &xsql.FunctionValuer{})}
+		result, ok := ve.Eval(p.Condition).(bool)
+		if ok {
+			if result {
+				return input
+			}
+		} else {
+			log.Errorf("invalid condition that returns non-bool value")
+		}
+	case xsql.WindowTuplesSet:
+		if len(input) != 1 {
+			log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
+			return nil
+		}
+		ms := input[0].Tuples
+		r := ms[:0]
+		for _, v := range ms {
+			//ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
+			result, ok := ve.Eval(p.Condition).(bool)
+			if ok {
+				if result {
+					r = append(r, v)
+				}
+			} else {
+				log.Errorf("invalid condition that returns non-bool value")
+				return nil
+			}
+		}
+		if len(r) > 0 {
+			input[0].Tuples = r
+			return input
+		}
+	case xsql.JoinTupleSets:
+		ms := input
+		r := ms[:0]
+		for _, v := range ms {
+			//ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
+			result, ok := ve.Eval(p.Condition).(bool)
+			if ok {
+				if result {
+					r = append(r, v)
+				}
+			} else {
+				log.Errorf("invalid condition that returns non-bool value")
+				return nil
+			}
+		}
+		if len(r) > 0{
+			return r
+		}
+	default:
+		log.Errorf("Expect xsql.Valuer or its array type.")
+		return nil
+	}
+	return nil
+}

+ 156 - 0
xsql/plans/having_test.go

@@ -0,0 +1,156 @@
+package plans
+
+import (
+	"engine/xsql"
+	"fmt"
+	"reflect"
+	"strings"
+	"testing"
+)
+
+func TestHavingPlan_Apply(t *testing.T) {
+	var tests = []struct {
+		sql  string
+		data interface{}
+		result interface{}
+	}{
+		{
+			sql: `SELECT id1 FROM src1 HAVING avg(id1) > 1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter:"src1",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 1, "f1" : "v1"},
+						},{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 2, "f1" : "v2"},
+						},{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 5, "f1" : "v1"},
+						},
+
+					},
+				},
+			},
+			result: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter:"src1",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 1, "f1" : "v1"},
+						},{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 2, "f1" : "v2"},
+						},{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 5, "f1" : "v1"},
+						},
+					},
+				},
+			},
+		},
+
+		{
+			sql: `SELECT id1 FROM src1 HAVING sum(id1) > 1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter:"src1",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 1, "f1" : "v1"},
+						},
+					},
+				},
+			},
+			result: nil,
+		},
+
+		{
+			sql: `SELECT id1 FROM src1 HAVING sum(id1) = 1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter:"src1",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 1, "f1" : "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter:"src1",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 1, "f1" : "v1"},
+						},
+					},
+				},
+			},
+		},
+
+
+		{
+			sql: `SELECT id1 FROM src1 HAVING max(id1) > 10`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter:"src1",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 1, "f1" : "v1"},
+						},
+					},
+				},
+			},
+			result: nil,
+		},
+
+		{
+			sql: `SELECT id1 FROM src1 HAVING max(id1) = 1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter:"src1",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 1, "f1" : "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter:"src1",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1" : 1, "f1" : "v1"},
+						},
+					},
+				},
+			},
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+
+		pp := &HavingPlan{Condition:stmt.Having}
+		result := pp.Apply(nil, tt.data)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		}
+	}
+}

+ 6 - 0
xsql/processors/xsql_processor.go

@@ -416,6 +416,12 @@ func (p *RuleProcessor) createTopoWithSources(rule *xstream.Rule, sources []xstr
 				}
 			}
 
+			if selectStmt.Having != nil {
+				havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having")
+				tp.AddOperator(inputs, havingOp)
+				inputs = []xstream.Emitter{havingOp}
+			}
+
 			if selectStmt.SortFields != nil {
 				orderOp := xstream.Transform(&plans.OrderPlan{SortFields:selectStmt.SortFields}, "order")
 				tp.AddOperator(inputs, orderOp)