Преглед на файлове

feat(topo): planner for collection nodes

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang преди 2 години
родител
ревизия
4e52c3dca9
променени са 3 файла, в които са добавени 266 реда и са изтрити 17 реда
  1. 26 0
      internal/topo/graph/node.go
  2. 49 8
      internal/topo/operator/func_operator.go
  3. 191 9
      internal/topo/planner/planner_graph.go

+ 26 - 0
internal/topo/graph/node.go

@@ -25,3 +25,29 @@ type Filter struct {
 type Select struct {
 type Select struct {
 	Fields []string `json:"fields"`
 	Fields []string `json:"fields"`
 }
 }
+
+type Window struct {
+	Type     string `json:"type"`
+	Unit     string `json:"unit"`
+	Size     int    `json:"size"`
+	Interval int    `json:"interval"`
+}
+
+type Join struct {
+	From  string `json:"from"`
+	Joins []struct {
+		Type string `json:"type"`
+		On   string `json:"on"`
+	}
+}
+
+type Groupby struct {
+	Dimensions []string `json:"dimensions"`
+}
+
+type Orderby struct {
+	Sorts []struct {
+		Field string `json:"field"`
+		Order string `json:"order"`
+	}
+}

+ 49 - 8
internal/topo/operator/func_operator.go

@@ -22,29 +22,70 @@ import (
 )
 )
 
 
 type FuncOp struct {
 type FuncOp struct {
+	IsAgg    bool
 	CallExpr *ast.Call
 	CallExpr *ast.Call
 	Name     string
 	Name     string
 }
 }
 
 
-func (p *FuncOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
+func (p *FuncOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	ctx.GetLogger().Debugf("FuncOp receive: %s", data)
 	ctx.GetLogger().Debugf("FuncOp receive: %s", data)
 	switch input := data.(type) {
 	switch input := data.(type) {
 	case error:
 	case error:
 		return input
 		return input
-	case xsql.Valuer:
+	case xsql.TupleRow:
 		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, fv)}
 		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, fv)}
 		result := ve.Eval(p.CallExpr)
 		result := ve.Eval(p.CallExpr)
 		if e, ok := result.(error); ok {
 		if e, ok := result.(error); ok {
 			return e
 			return e
 		}
 		}
-		switch val := input.(type) {
-		case xsql.Row:
-			val.Set(p.Name, result)
-			return val
-		default:
-			return fmt.Errorf("unknow type")
+		input.Set(p.Name, result)
+	case xsql.SingleCollection:
+		var err error
+		if p.IsAgg {
+			input.SetIsAgg(true)
+			err = input.GroupRange(func(_ int, aggRow xsql.CollectionRow) (bool, error) {
+				afv.SetData(aggRow)
+				ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(aggRow, fv, aggRow, fv, afv, &xsql.WildcardValuer{Data: aggRow})}
+				result := ve.Eval(p.CallExpr)
+				if e, ok := result.(error); ok {
+					return false, e
+				}
+				aggRow.Set(p.Name, result)
+				return true, nil
+			})
+		} else {
+			err = input.RangeSet(func(_ int, row xsql.Row) (bool, error) {
+				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(row, &xsql.WindowRangeValuer{WindowRange: input.GetWindowRange()}, fv, &xsql.WildcardValuer{Data: row})}
+				result := ve.Eval(p.CallExpr)
+				if e, ok := result.(error); ok {
+					return false, e
+				}
+				row.Set(p.Name, result)
+				return true, nil
+			})
+		}
+		if err != nil {
+			return err
+		}
+	case xsql.GroupedCollection: // The order is important, because single collection usually is also a groupedCollection
+		if !p.IsAgg {
+			return fmt.Errorf("FuncOp: GroupedCollection is not supported for non-aggregate function")
+		}
+		err := input.GroupRange(func(_ int, aggRow xsql.CollectionRow) (bool, error) {
+			afv.SetData(aggRow)
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(aggRow, fv, aggRow, fv, afv, &xsql.WildcardValuer{Data: aggRow})}
+			result := ve.Eval(p.CallExpr)
+			if e, ok := result.(error); ok {
+				return false, e
+			}
+			aggRow.Set(p.Name, result)
+			return true, nil
+		})
+		if err != nil {
+			return err
 		}
 		}
 	default:
 	default:
 		return fmt.Errorf("run func error: invalid input %[1]T(%[1]v)", input)
 		return fmt.Errorf("run func error: invalid input %[1]T(%[1]v)", input)
 	}
 	}
+	return data
 }
 }

+ 191 - 9
internal/topo/planner/planner_graph.go

@@ -30,10 +30,9 @@ import (
 )
 )
 
 
 // PlanByGraph returns a topo.Topo object by a graph
 // PlanByGraph returns a topo.Topo object by a graph
-// TODO in the future, graph may also be converted to a plan and get optimized
 func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
-	graph := rule.Graph
-	if graph == nil {
+	ruleGraph := rule.Graph
+	if ruleGraph == nil {
 		return nil, errors.New("no graph")
 		return nil, errors.New("no graph")
 	}
 	}
 	tp, err := topo.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
 	tp, err := topo.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
@@ -45,7 +44,7 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 		sinks   = make(map[string]bool)
 		sinks   = make(map[string]bool)
 		sources = make(map[string]bool)
 		sources = make(map[string]bool)
 	)
 	)
-	for nodeName, gn := range graph.Nodes {
+	for nodeName, gn := range ruleGraph.Nodes {
 		switch gn.Type {
 		switch gn.Type {
 		case "source":
 		case "source":
 			sourceType, ok := gn.Props["source_type"]
 			sourceType, ok := gn.Props["source_type"]
@@ -80,13 +79,13 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 			}
 			}
 			sources[nodeName] = true
 			sources[nodeName] = true
 		case "sink":
 		case "sink":
-			if _, ok := graph.Topo.Edges[nodeName]; ok {
+			if _, ok := ruleGraph.Topo.Edges[nodeName]; ok {
 				return nil, fmt.Errorf("sink %s has edge", nodeName)
 				return nil, fmt.Errorf("sink %s has edge", nodeName)
 			}
 			}
 			nodeMap[nodeName] = node.NewSinkNode(nodeName, gn.NodeType, gn.Props)
 			nodeMap[nodeName] = node.NewSinkNode(nodeName, gn.NodeType, gn.Props)
 			sinks[nodeName] = true
 			sinks[nodeName] = true
 		case "operator":
 		case "operator":
-			switch gn.NodeType {
+			switch strings.ToLower(gn.NodeType) {
 			case "function":
 			case "function":
 				fop, err := parseFunc(gn.Props)
 				fop, err := parseFunc(gn.Props)
 				if err != nil {
 				if err != nil {
@@ -94,6 +93,14 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 				}
 				}
 				op := Transform(fop, nodeName, rule.Options)
 				op := Transform(fop, nodeName, rule.Options)
 				nodeMap[nodeName] = op
 				nodeMap[nodeName] = op
+			case "aggfunc":
+				fop, err := parseFunc(gn.Props)
+				if err != nil {
+					return nil, err
+				}
+				fop.IsAgg = true
+				op := Transform(fop, nodeName, rule.Options)
+				nodeMap[nodeName] = op
 			case "filter":
 			case "filter":
 				fop, err := parseFilter(gn.Props)
 				fop, err := parseFilter(gn.Props)
 				if err != nil {
 				if err != nil {
@@ -108,6 +115,37 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 				}
 				}
 				op := Transform(pop, nodeName, rule.Options)
 				op := Transform(pop, nodeName, rule.Options)
 				nodeMap[nodeName] = op
 				nodeMap[nodeName] = op
+			case "window":
+				wconf, err := parseWindow(gn.Props)
+				if err != nil {
+					return nil, err
+				}
+				op, err := node.NewWindowOp(nodeName, *wconf, ruleGraph.Topo.Sources, rule.Options)
+				if err != nil {
+					return nil, err
+				}
+				nodeMap[nodeName] = op
+			case "join":
+				jop, err := parseJoin(gn.Props)
+				if err != nil {
+					return nil, err
+				}
+				op := Transform(jop, nodeName, rule.Options)
+				nodeMap[nodeName] = op
+			case "groupby":
+				gop, err := parseGroupBy(gn.Props)
+				if err != nil {
+					return nil, err
+				}
+				op := Transform(gop, nodeName, rule.Options)
+				nodeMap[nodeName] = op
+			case "orderby":
+				oop, err := parseOrderBy(gn.Props)
+				if err != nil {
+					return nil, err
+				}
+				op := Transform(oop, nodeName, rule.Options)
+				nodeMap[nodeName] = op
 			default: // TODO other node type
 			default: // TODO other node type
 				return nil, fmt.Errorf("unknown operator type %s", gn.NodeType)
 				return nil, fmt.Errorf("unknown operator type %s", gn.NodeType)
 			}
 			}
@@ -116,14 +154,14 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 		}
 		}
 	}
 	}
 	// validate source node
 	// validate source node
-	for _, nodeName := range graph.Topo.Sources {
+	for _, nodeName := range ruleGraph.Topo.Sources {
 		if _, ok := sources[nodeName]; !ok {
 		if _, ok := sources[nodeName]; !ok {
 			return nil, fmt.Errorf("source %s is not a source type node", nodeName)
 			return nil, fmt.Errorf("source %s is not a source type node", nodeName)
 		}
 		}
 	}
 	}
 	// reverse edges
 	// reverse edges
 	reversedEdges := make(map[string][]string)
 	reversedEdges := make(map[string][]string)
-	for fromNode, toNodes := range graph.Topo.Edges {
+	for fromNode, toNodes := range ruleGraph.Topo.Edges {
 		for _, toNode := range toNodes {
 		for _, toNode := range toNodes {
 			reversedEdges[toNode] = append(reversedEdges[toNode], fromNode)
 			reversedEdges[toNode] = append(reversedEdges[toNode], fromNode)
 		}
 		}
@@ -135,6 +173,9 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 			inputs[i] = nodeMap[fromNode].(api.Emitter)
 			inputs[i] = nodeMap[fromNode].(api.Emitter)
 		}
 		}
 		n := nodeMap[nodeName]
 		n := nodeMap[nodeName]
+		if n == nil {
+			return nil, fmt.Errorf("node %s is not defined", nodeName)
+		}
 		if _, ok := sinks[nodeName]; ok {
 		if _, ok := sinks[nodeName]; ok {
 			tp.AddSink(inputs, n.(*node.SinkNode))
 			tp.AddSink(inputs, n.(*node.SinkNode))
 		} else {
 		} else {
@@ -144,9 +185,150 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 	return tp, nil
 	return tp, nil
 }
 }
 
 
+func parseOrderBy(props map[string]interface{}) (*operator.OrderOp, error) {
+	n := &graph.Orderby{}
+	err := cast.MapToStruct(props, n)
+	if err != nil {
+		return nil, err
+	}
+	stmt := "ORDER BY"
+	for _, s := range n.Sorts {
+		stmt += " " + s.Field + " " + s.Order
+	}
+	p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
+	if err != nil {
+		return nil, fmt.Errorf("invalid order by statement error: %v", err)
+	}
+	if len(p.SortFields) == 0 {
+		return nil, fmt.Errorf("order by statement is empty")
+	}
+	return &operator.OrderOp{
+		SortFields: p.SortFields,
+	}, nil
+}
+
+func parseGroupBy(props map[string]interface{}) (*operator.AggregateOp, error) {
+	n := &graph.Groupby{}
+	err := cast.MapToStruct(props, n)
+	if err != nil {
+		return nil, err
+	}
+	if len(n.Dimensions) == 0 {
+		return nil, fmt.Errorf("groupby must have at least one dimension")
+	}
+	stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",")
+	p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
+	if err != nil {
+		return nil, fmt.Errorf("invalid join statement error: %v", err)
+	}
+	return &operator.AggregateOp{Dimensions: p.Dimensions}, nil
+}
+
+func parseJoin(props map[string]interface{}) (*operator.JoinOp, error) {
+	n := &graph.Join{}
+	err := cast.MapToStruct(props, n)
+	if err != nil {
+		return nil, err
+	}
+	stmt := "SELECT * FROM " + n.From
+	for _, join := range n.Joins {
+		stmt += " " + join.Type + " JOIN ON " + join.On
+	}
+	p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
+	if err != nil {
+		return nil, fmt.Errorf("invalid join statement error: %v", err)
+	}
+	return &operator.JoinOp{Joins: p.Joins, From: p.Sources[0].(*ast.Table)}, nil
+}
+
+func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
+	n := &graph.Window{}
+	err := cast.MapToStruct(props, n)
+	if err != nil {
+		return nil, err
+	}
+	if n.Size <= 0 {
+		return nil, fmt.Errorf("window size %d is invalid", n.Size)
+	}
+	var (
+		wt       ast.WindowType
+		length   int
+		interval int
+	)
+	switch strings.ToLower(n.Type) {
+	case "tumblingwindow":
+		wt = ast.TUMBLING_WINDOW
+		if n.Interval != 0 && n.Interval != n.Size {
+			return nil, fmt.Errorf("tumbling window interval must equal to size")
+		}
+	case "hoppingwindow":
+		wt = ast.HOPPING_WINDOW
+		if n.Interval <= 0 {
+			return nil, fmt.Errorf("hopping window interval must be greater than 0")
+		}
+		if n.Interval > n.Size {
+			return nil, fmt.Errorf("hopping window interval must be less than size")
+		}
+	case "sessionwindow":
+		wt = ast.SESSION_WINDOW
+		if n.Interval <= 0 {
+			return nil, fmt.Errorf("hopping window interval must be greater than 0")
+		}
+	case "slidingwindow":
+		wt = ast.SLIDING_WINDOW
+		if n.Interval != 0 && n.Interval != n.Size {
+			return nil, fmt.Errorf("tumbling window interval must equal to size")
+		}
+	case "countwindow":
+		wt = ast.COUNT_WINDOW
+		if n.Interval < 0 {
+			return nil, fmt.Errorf("count window interval must be greater or equal to 0")
+		}
+		if n.Interval > n.Size {
+			return nil, fmt.Errorf("count window interval must be less than size")
+		}
+		if n.Interval == 0 {
+			n.Interval = n.Size
+		}
+	default:
+		return nil, fmt.Errorf("unknown window type %s", n.Type)
+	}
+	if wt == ast.COUNT_WINDOW {
+		length = n.Size
+		interval = n.Interval
+	} else {
+		var unit = 1
+		switch strings.ToLower(n.Unit) {
+		case "dd":
+			unit = 24 * 3600 * 1000
+		case "hh":
+			unit = 3600 * 1000
+		case "mi":
+			unit = 60 * 1000
+		case "ss":
+			unit = 1000
+		case "ms":
+			unit = 1
+		default:
+			return nil, fmt.Errorf("Invalid unit %s", n.Unit)
+		}
+		length = n.Size * unit
+		interval = n.Interval * unit
+	}
+
+	return &node.WindowConfig{
+		Type:     wt,
+		Length:   length,
+		Interval: interval,
+	}, nil
+}
+
 func parsePick(props map[string]interface{}) (*operator.ProjectOp, error) {
 func parsePick(props map[string]interface{}) (*operator.ProjectOp, error) {
 	n := &graph.Select{}
 	n := &graph.Select{}
-	cast.MapToStruct(props, n)
+	err := cast.MapToStruct(props, n)
+	if err != nil {
+		return nil, err
+	}
 	stmt, err := xsql.NewParser(strings.NewReader("select " + strings.Join(n.Fields, ",") + " from nonexist")).Parse()
 	stmt, err := xsql.NewParser(strings.NewReader("select " + strings.Join(n.Fields, ",") + " from nonexist")).Parse()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err