Browse Source

feat<stream>: allow to set concurrency for a rule

ngjaying 5 years atrás
parent
commit
17c5bd48f3
1 changed files with 21 additions and 6 deletions
  1. 21 6
      xsql/processors/xsql_processor.go

+ 21 - 6
xsql/processors/xsql_processor.go

@@ -312,8 +312,12 @@ func (p *RuleProcessor) createTopo(rule *api.Rule) (*xstream.TopologyNew, []api.
 func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.SourceNode) (*xstream.TopologyNew, []api.Emitter, error) {
 	name := rule.Id
 	sql := rule.Sql
-	var isEventTime bool
-	var lateTol int64
+	var (
+		isEventTime bool
+		lateTol int64
+		concurrency = 1
+	)
+
 	if iet, ok := rule.Options["isEventTime"]; ok {
 		isEventTime, ok = iet.(bool)
 		if !ok {
@@ -329,6 +333,13 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			}
 		}
 	}
+	if l, ok := rule.Options["concurrency"]; ok {
+		if fl, ok := l.(float64); ok {
+			concurrency = int(fl)
+		} else {
+			return nil, nil, fmt.Errorf("Invalid rule option concurrency %v, int type is required.", l)
+		}
+	}
 	shouldCreateSource := sources == nil
 	parser := xsql.NewParser(strings.NewReader(sql))
 	if stmt, err := xsql.Language.Parse(parser); err != nil {
@@ -363,11 +374,13 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 					node := nodes.NewSourceNode(s, streamStmt.Options)
 					tp.AddSrc(node)
 					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
+					preprocessorOp.SetConcurrency(concurrency)
 					tp.AddOperator([]api.Emitter{node}, preprocessorOp)
 					inputs = append(inputs, preprocessorOp)
 				} else {
 					tp.AddSrc(sources[i])
 					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
+					preprocessorOp.SetConcurrency(concurrency)
 					tp.AddOperator([]api.Emitter{sources[i]}, preprocessorOp)
 					inputs = append(inputs, preprocessorOp)
 				}
@@ -388,16 +401,14 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 
 			if w != nil && selectStmt.Joins != nil {
 				joinOp := xstream.Transform(&plans.JoinPlan{Joins: selectStmt.Joins, From: selectStmt.Sources[0].(*xsql.Table)}, "join")
-				//TODO concurrency setting by command
-				//joinOp.SetConcurrency(3)
+				joinOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, joinOp)
 				inputs = []api.Emitter{joinOp}
 			}
 
 			if selectStmt.Condition != nil {
 				filterOp := xstream.Transform(&plans.FilterPlan{Condition: selectStmt.Condition}, "filter")
-				//TODO concurrency setting by command
-				// filterOp.SetConcurrency(3)
+				filterOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, filterOp)
 				inputs = []api.Emitter{filterOp}
 			}
@@ -407,6 +418,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 				ds = dimensions.GetGroups()
 				if ds != nil && len(ds) > 0 {
 					aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds}, "aggregate")
+					aggregateOp.SetConcurrency(concurrency)
 					tp.AddOperator(inputs, aggregateOp)
 					inputs = []api.Emitter{aggregateOp}
 				}
@@ -414,18 +426,21 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 
 			if selectStmt.Having != nil {
 				havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having")
+				havingOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, havingOp)
 				inputs = []api.Emitter{havingOp}
 			}
 
 			if selectStmt.SortFields != nil {
 				orderOp := xstream.Transform(&plans.OrderPlan{SortFields: selectStmt.SortFields}, "order")
+				orderOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, orderOp)
 				inputs = []api.Emitter{orderOp}
 			}
 
 			if selectStmt.Fields != nil {
 				projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields, IsAggregate: xsql.IsAggStatement(selectStmt)}, "project")
+				projectOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, projectOp)
 				inputs = []api.Emitter{projectOp}
 			}