|
@@ -16,7 +16,6 @@ type zmqSource struct {
|
|
|
}
|
|
|
|
|
|
func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
|
|
|
- fmt.Printf("Configuring zmq once, is it the previous one %s and subscriber %v", s.topic, s.subscriber)
|
|
|
s.topic = topic
|
|
|
srv, ok := props["server"]
|
|
|
if !ok {
|
|
@@ -38,17 +37,17 @@ func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
|
|
|
errCh <- fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err)
|
|
|
}
|
|
|
s.subscriber.SetSubscribe(s.topic)
|
|
|
- logger.Infof("zmq source subscribe to topic %s", s.topic)
|
|
|
+ logger.Debugf("zmq source subscribe to topic %s", s.topic)
|
|
|
exeCtx, cancel := ctx.WithCancel()
|
|
|
s.cancel = cancel
|
|
|
- logger.Infof("start to listen")
|
|
|
+ logger.Debugf("start to listen")
|
|
|
for {
|
|
|
msgs, err := s.subscriber.RecvMessage(0)
|
|
|
if err != nil {
|
|
|
id, err := s.subscriber.GetIdentity()
|
|
|
errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
|
|
|
} else {
|
|
|
- logger.Infof("zmq source receive %v", msgs)
|
|
|
+ logger.Debugf("zmq source receive %v", msgs)
|
|
|
var m string
|
|
|
for i, msg := range msgs {
|
|
|
if i == 0 && s.topic != "" {
|