Parcourir la source

feat(stream): export bufferLength option for source/sink and rule

ngjaying il y a 5 ans
Parent
commit
f09d3ec447

+ 13 - 1
common/utils/DynamicChannelBuffer.go

@@ -4,6 +4,7 @@ type DynamicChannelBuffer struct {
 	In chan interface{}
 	Out chan interface{}
 	buffer []interface{}
+	limit int
 }
 
 func NewDynamicChannelBuffer() *DynamicChannelBuffer {
@@ -11,14 +12,25 @@ func NewDynamicChannelBuffer() *DynamicChannelBuffer {
 		In: make(chan interface{}),
 		Out: make(chan interface{}),
 		buffer: make([]interface{}, 0),
+		limit: 102400,
 	}
 	go buffer.run()
 	return buffer
 }
 
+func (b *DynamicChannelBuffer) SetLimit(limit int){
+	if limit > 0 {
+		b.limit = limit
+	}
+}
+
 func (b *DynamicChannelBuffer) run() {
 	for {
-		if len(b.buffer) > 0 {
+		l := len(b.buffer)
+		if l >= b.limit{
+			b.Out <- b.buffer[0]
+			b.buffer = b.buffer[1:]
+		}else if l > 0 {
 			select {
 			case b.Out <- b.buffer[0]:
 				b.buffer = b.buffer[1:]

+ 18 - 9
xsql/processors/xsql_processor.go

@@ -316,6 +316,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 		isEventTime bool
 		lateTol int64
 		concurrency = 1
+		bufferLength = 1024
 	)
 
 	if iet, ok := rule.Options["isEventTime"]; ok {
@@ -340,6 +341,14 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			return nil, nil, fmt.Errorf("Invalid rule option concurrency %v, int type is required.", l)
 		}
 	}
+	if l, ok := rule.Options["bufferLength"]; ok {
+		if fl, ok := l.(float64); ok {
+			bufferLength = int(fl)
+		} else {
+			return nil, nil, fmt.Errorf("Invalid rule option bufferLength %v, int type is required.", l)
+		}
+	}
+	log.Infof("Init rule with options {isEventTime: %v, lateTolerance: %d, concurrency: %d, bufferLength: %d", isEventTime, lateTol, concurrency, bufferLength)
 	shouldCreateSource := sources == nil
 	parser := xsql.NewParser(strings.NewReader(sql))
 	if stmt, err := xsql.Language.Parse(parser); err != nil {
@@ -373,13 +382,13 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 				if shouldCreateSource {
 					node := nodes.NewSourceNode(s, streamStmt.Options)
 					tp.AddSrc(node)
-					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
+					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s, bufferLength)
 					preprocessorOp.SetConcurrency(concurrency)
 					tp.AddOperator([]api.Emitter{node}, preprocessorOp)
 					inputs = append(inputs, preprocessorOp)
 				} else {
 					tp.AddSrc(sources[i])
-					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
+					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s, bufferLength)
 					preprocessorOp.SetConcurrency(concurrency)
 					tp.AddOperator([]api.Emitter{sources[i]}, preprocessorOp)
 					inputs = append(inputs, preprocessorOp)
@@ -390,7 +399,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			if dimensions != nil {
 				w = dimensions.GetWindow()
 				if w != nil {
-					wop, err := operators.NewWindowOp("window", w, isEventTime, lateTol, streamsFromStmt)
+					wop, err := operators.NewWindowOp("window", w, isEventTime, lateTol, streamsFromStmt, bufferLength)
 					if err != nil {
 						return nil, nil, err
 					}
@@ -400,14 +409,14 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			}
 
 			if w != nil && selectStmt.Joins != nil {
-				joinOp := xstream.Transform(&plans.JoinPlan{Joins: selectStmt.Joins, From: selectStmt.Sources[0].(*xsql.Table)}, "join")
+				joinOp := xstream.Transform(&plans.JoinPlan{Joins: selectStmt.Joins, From: selectStmt.Sources[0].(*xsql.Table)}, "join", bufferLength)
 				joinOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, joinOp)
 				inputs = []api.Emitter{joinOp}
 			}
 
 			if selectStmt.Condition != nil {
-				filterOp := xstream.Transform(&plans.FilterPlan{Condition: selectStmt.Condition}, "filter")
+				filterOp := xstream.Transform(&plans.FilterPlan{Condition: selectStmt.Condition}, "filter", bufferLength)
 				filterOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, filterOp)
 				inputs = []api.Emitter{filterOp}
@@ -417,7 +426,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			if dimensions != nil {
 				ds = dimensions.GetGroups()
 				if ds != nil && len(ds) > 0 {
-					aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds}, "aggregate")
+					aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds}, "aggregate", bufferLength)
 					aggregateOp.SetConcurrency(concurrency)
 					tp.AddOperator(inputs, aggregateOp)
 					inputs = []api.Emitter{aggregateOp}
@@ -425,21 +434,21 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			}
 
 			if selectStmt.Having != nil {
-				havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having")
+				havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having", bufferLength)
 				havingOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, havingOp)
 				inputs = []api.Emitter{havingOp}
 			}
 
 			if selectStmt.SortFields != nil {
-				orderOp := xstream.Transform(&plans.OrderPlan{SortFields: selectStmt.SortFields}, "order")
+				orderOp := xstream.Transform(&plans.OrderPlan{SortFields: selectStmt.SortFields}, "order", bufferLength)
 				orderOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, orderOp)
 				inputs = []api.Emitter{orderOp}
 			}
 
 			if selectStmt.Fields != nil {
-				projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields, IsAggregate: xsql.IsAggStatement(selectStmt)}, "project")
+				projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields, IsAggregate: xsql.IsAggStatement(selectStmt)}, "project", bufferLength)
 				projectOp.SetConcurrency(concurrency)
 				tp.AddOperator(inputs, projectOp)
 				inputs = []api.Emitter{projectOp}

+ 3 - 1
xsql/processors/xsql_processor_test.go

@@ -509,7 +509,9 @@ func TestSingleSQL(t *testing.T) {
 				}
 			}
 		}
-		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options:map[string]interface{}{
+			"bufferLength": float64(100),
+		}}, sources)
 		if err != nil {
 			t.Error(err)
 		}

+ 9 - 1
xstream/nodes/sink_node.go

@@ -23,8 +23,16 @@ type SinkNode struct {
 }
 
 func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode{
+	bufferLength := 1024
+	if c, ok := props["bufferLength"]; ok {
+		if t, err := common.ToInt(c); err != nil || t <= 0 {
+			//invalid property bufferLength
+		} else {
+			bufferLength = t
+		}
+	}
 	return &SinkNode{
-		input: make(chan interface{}, 1024),
+		input: make(chan interface{}, bufferLength),
 		name: name,
 		sinkType: sinkType,
 		options: props,

+ 7 - 0
xstream/nodes/source_node.go

@@ -68,6 +68,13 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 				m.concurrency = t
 			}
 		}
+		if c, ok := props["bufferLength"]; ok {
+			if t, err := common.ToInt(c); err != nil || t <= 0 {
+				logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
+			} else {
+				m.buffer.SetLimit(t)
+			}
+		}
 		createSource := len(m.sources) == 0
 		logger.Infof("open source node %d instances", m.concurrency)
 		for i := 0; i < m.concurrency; i++ { // workers

+ 2 - 2
xstream/operators/operations.go

@@ -32,12 +32,12 @@ type UnaryOperator struct {
 }
 
 // NewUnary creates *UnaryOperator value
-func New(name string) *UnaryOperator {
+func New(name string, bufferLength int) *UnaryOperator {
 	// extract logger
 	o := new(UnaryOperator)
 
 	o.concurrency = 1
-	o.input = make(chan interface{}, 1024)
+	o.input = make(chan interface{}, bufferLength)
 	o.outputs = make(map[string]chan<- interface{})
 	o.name = name
 	return o

+ 2 - 2
xstream/operators/window_op.go

@@ -29,10 +29,10 @@ type WindowOperator struct {
 	watermarkGenerator *WatermarkGenerator //For event time only
 }
 
-func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance int64, streams []string) (*WindowOperator, error) {
+func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance int64, streams []string, bufferLength int) (*WindowOperator, error) {
 	o := new(WindowOperator)
 
-	o.input = make(chan interface{}, 1024)
+	o.input = make(chan interface{}, bufferLength)
 	o.outputs = make(map[string]chan<- interface{})
 	o.name = name
 	o.isEventTime = isEventTime

+ 2 - 33
xstream/streams.go

@@ -56,43 +56,12 @@ func (s *TopologyNew) AddOperator(inputs []api.Emitter, operator api.Operator) *
 	return s
 }
 
-func Transform(op operators.UnOperation, name string) *operators.UnaryOperator {
-	operator := operators.New(name)
+func Transform(op operators.UnOperation, name string, bufferLength int) *operators.UnaryOperator {
+	operator := operators.New(name, bufferLength)
 	operator.SetOperation(op)
 	return operator
 }
 
-func (s *TopologyNew) Map(f interface{}) *TopologyNew {
-	log := s.ctx.GetLogger()
-	op, err := MapFunc(f)
-	if err != nil {
-		log.Info(err)
-	}
-	return s.Transform(op)
-}
-
-// Filter takes a predicate user-defined func that filters the stream.
-// The specified function must be of type:
-//   func (T) bool
-// If the func returns true, current item continues downstream.
-func (s *TopologyNew) Filter(f interface{}) *TopologyNew {
-	op, err := FilterFunc(f)
-	if err != nil {
-		s.drainErr(err)
-	}
-	return s.Transform(op)
-}
-
-// Transform is the base method used to apply transfomrmative
-// unary operations to streamed elements (i.e. filter, map, etc)
-// It is exposed here for completeness, use the other more specific methods.
-func (s *TopologyNew) Transform(op operators.UnOperation) *TopologyNew {
-	operator := operators.New("default")
-	operator.SetOperation(op)
-	s.ops = append(s.ops, operator)
-	return s
-}
-
 // prepareContext setups internal context before
 // stream starts execution.
 func (s *TopologyNew) prepareContext() {