Browse Source

refactor(stream): node status clear after restart

ngjaying 5 years atrás
parent
commit
189448e90f
2 changed files with 22 additions and 10 deletions
  1. 11 5
      xstream/nodes/sink_node.go
  2. 11 5
      xstream/nodes/source_node.go

+ 11 - 5
xstream/nodes/sink_node.go

@@ -19,6 +19,7 @@ type SinkNode struct {
 	//configs (also static for sinks)
 	concurrency int
 	options     map[string]interface{}
+	isMock      bool
 	//states varies after restart
 	ctx          api.StreamContext
 	statManagers []StatManager
@@ -53,6 +54,7 @@ func NewSinkNodeWithSink(name string, sink api.Sink) *SinkNode {
 		options:     nil,
 		concurrency: 1,
 		ctx:         nil,
+		isMock:      true,
 	}
 }
 
@@ -100,13 +102,13 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 				cacheSaveInterval = t
 			}
 		}
-		createSink := len(m.sinks) == 0
+		m.reset()
 		logger.Infof("open sink node %d instances", m.concurrency)
 		for i := 0; i < m.concurrency; i++ { // workers
 			go func(instance int) {
 				var sink api.Sink
 				var err error
-				if createSink {
+				if !m.isMock {
 					sink, err = getSink(m.sinkType, m.options)
 					if err != nil {
 						m.drainError(result, err, ctx, logger)
@@ -155,6 +157,13 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 	}()
 }
 
+func (m *SinkNode) reset() {
+	if !m.isMock {
+		m.sinks = nil
+	}
+	m.statManagers = nil
+}
+
 func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval int, signalCh chan<- int, ctx api.StreamContext) {
 	stats.IncTotalRecordsIn()
 	stats.ProcessTimeStart()
@@ -240,7 +249,4 @@ func (m *SinkNode) close(ctx api.StreamContext, logger api.Logger) {
 			logger.Warnf("close sink fails: %v", err)
 		}
 	}
-	//reset the states
-	m.sinks = nil
-	m.statManagers = nil
 }

+ 11 - 5
xstream/nodes/source_node.go

@@ -19,6 +19,7 @@ type SourceNode struct {
 	ctx         api.StreamContext
 	options     map[string]string
 	concurrency int
+	isMock      bool
 
 	mutex        sync.RWMutex
 	sources      []api.Source
@@ -52,6 +53,7 @@ func NewSourceNodeWithSource(name string, source api.Source, options map[string]
 		ctx:         nil,
 		concurrency: 1,
 		buffer:      utils.NewDynamicChannelBuffer(),
+		isMock:      true,
 	}
 }
 
@@ -75,14 +77,14 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 				m.buffer.SetLimit(t)
 			}
 		}
-		createSource := len(m.sources) == 0
+		m.reset()
 		logger.Infof("open source node %d instances", m.concurrency)
 		for i := 0; i < m.concurrency; i++ { // workers
 			go func(instance int) {
 				//Do open source instances
 				var source api.Source
 				var err error
-				if createSource {
+				if !m.isMock {
 					source, err = getSource(m.sourceType)
 					if err != nil {
 						m.drainError(errCh, err, ctx, logger)
@@ -139,6 +141,13 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 	}()
 }
 
+func (m *SourceNode) reset() {
+	if !m.isMock {
+		m.sources = nil
+	}
+	m.statManagers = nil
+}
+
 func getSource(t string) (api.Source, error) {
 	var s api.Source
 	var ok bool
@@ -173,9 +182,6 @@ func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
 			logger.Warnf("close source fails: %v", err)
 		}
 	}
-	//Reset the states
-	m.sources = nil
-	m.statManagers = nil
 }
 
 func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {