浏览代码

bug(stream): source error handling

ngjaying 5 年之前
父节点
当前提交
56e0b391c8
共有 4 个文件被更改,包括 55 次插入60 次删除
  1. 8 10
      plugins/sources/random.go
  2. 33 35
      plugins/sources/zmq.go
  3. 4 3
      xstream/nodes/source_node.go
  4. 10 12
      xstream/test/mock_source.go

+ 8 - 10
plugins/sources/random.go

@@ -39,17 +39,15 @@ func (s *randomSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 	t := time.NewTicker(time.Duration(s.interval) * time.Millisecond)
 	exeCtx, cancel := ctx.WithCancel()
 	s.cancel = cancel
-	go func(exeCtx api.StreamContext) {
-		defer t.Stop()
-		for {
-			select {
-			case <-t.C:
-				consumer <- api.NewDefaultSourceTuple(randomize(s.pattern, s.seed), nil)
-			case <-exeCtx.Done():
-				return
-			}
+	defer t.Stop()
+	for {
+		select {
+		case <-t.C:
+			consumer <- api.NewDefaultSourceTuple(randomize(s.pattern, s.seed), nil)
+		case <-exeCtx.Done():
+			return
 		}
-	}(exeCtx)
+	}
 }
 
 func randomize(p map[string]interface{}, seed int) map[string]interface{} {

+ 33 - 35
plugins/sources/zmq.go

@@ -40,45 +40,43 @@ func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
 	logger.Debugf("zmq source subscribe to topic %s", s.topic)
 	exeCtx, cancel := ctx.WithCancel()
 	s.cancel = cancel
-	go func(exeCtx api.StreamContext) {
-		logger.Debugf("start to listen")
-		for {
-			msgs, err := s.subscriber.RecvMessage(0)
-			if err != nil {
-				id, err := s.subscriber.GetIdentity()
-				errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
-			} else {
-				logger.Debugf("zmq source receive %v", msgs)
-				var m string
-				for i, msg := range msgs {
-					if i == 0 && s.topic != "" {
-						continue
-					}
-					m += msg
-				}
-				meta := make(map[string]interface{})
-				if s.topic != "" {
-					meta["topic"] = msgs[0]
-				}
-				result := make(map[string]interface{})
-				if e := json.Unmarshal([]byte(m), &result); e != nil {
-					logger.Warnf("zmq source message %s is not json", m)
-				} else {
-					consumer <- api.NewDefaultSourceTuple(result, meta)
+	logger.Debugf("start to listen")
+	for {
+		msgs, err := s.subscriber.RecvMessage(0)
+		if err != nil {
+			id, err := s.subscriber.GetIdentity()
+			errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
+		} else {
+			logger.Debugf("zmq source receive %v", msgs)
+			var m string
+			for i, msg := range msgs {
+				if i == 0 && s.topic != "" {
+					continue
 				}
+				m += msg
 			}
-			select {
-			case <-exeCtx.Done():
-				logger.Infof("zmq source done")
-				if s.subscriber != nil {
-					s.subscriber.Close()
-				}
-				return
-			default:
-				//do nothing
+			meta := make(map[string]interface{})
+			if s.topic != "" {
+				meta["topic"] = msgs[0]
 			}
+			result := make(map[string]interface{})
+			if e := json.Unmarshal([]byte(m), &result); e != nil {
+				logger.Warnf("zmq source message %s is not json", m)
+			} else {
+				consumer <- api.NewDefaultSourceTuple(result, meta)
+			}
+		}
+		select {
+		case <-exeCtx.Done():
+			logger.Infof("zmq source done")
+			if s.subscriber != nil {
+				s.subscriber.Close()
+			}
+			return
+		default:
+			//do nothing
 		}
-	}(exeCtx)
+	}
 }
 
 func (s *zmqSource) Close(ctx api.StreamContext) error {

+ 4 - 3
xstream/nodes/source_node.go

@@ -110,8 +110,8 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 
 				buffer := NewDynamicChannelBuffer()
 				buffer.SetLimit(bl)
-				errCh := make(chan error)
-				source.Open(ctx.WithInstance(instance), buffer.In, errCh)
+				sourceErrCh := make(chan error)
+				go source.Open(ctx.WithInstance(instance), buffer.In, sourceErrCh)
 				logger.Infof("Start source %s instance %d successfully", m.name, instance)
 				for {
 					select {
@@ -119,8 +119,9 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 						logger.Infof("source %s done", m.name)
 						m.close(ctx, logger)
 						return
-					case err := <-errCh:
+					case err := <-sourceErrCh:
 						m.drainError(errCh, err, ctx, logger)
+						return
 					case data := <-buffer.Out:
 						stats.IncTotalRecordsIn()
 						stats.ProcessTimeStart()

+ 10 - 12
xstream/test/mock_source.go

@@ -27,19 +27,17 @@ func (m *MockSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple
 	log := ctx.GetLogger()
 	mockClock := GetMockClock()
 	log.Debugln("mock source starts")
-	go func() {
-		for _, d := range m.data {
-			<-m.done
-			log.Debugf("mock source is sending data %s", d)
-			if !m.isEventTime {
-				mockClock.Set(common.TimeFromUnixMilli(d.Timestamp))
-			} else {
-				mockClock.Add(1000 * time.Millisecond)
-			}
-			consumer <- api.NewDefaultSourceTuple(d.Message, nil)
-			time.Sleep(1)
+	for _, d := range m.data {
+		<-m.done
+		log.Debugf("mock source is sending data %s", d)
+		if !m.isEventTime {
+			mockClock.Set(common.TimeFromUnixMilli(d.Timestamp))
+		} else {
+			mockClock.Add(1000 * time.Millisecond)
 		}
-	}()
+		consumer <- api.NewDefaultSourceTuple(d.Message, nil)
+		time.Sleep(1)
+	}
 }
 
 func (m *MockSource) Close(ctx api.StreamContext) error {