|
@@ -16,6 +16,7 @@ type zmqSource struct {
|
|
|
}
|
|
|
|
|
|
func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
|
|
|
+ fmt.Printf("Configuring zmq once, is it the previous one %s and subscriber %v", s.topic, s.subscriber)
|
|
|
s.topic = topic
|
|
|
srv, ok := props["server"]
|
|
|
if !ok {
|
|
@@ -37,17 +38,17 @@ func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
|
|
|
errCh <- fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err)
|
|
|
}
|
|
|
s.subscriber.SetSubscribe(s.topic)
|
|
|
- logger.Debugf("zmq source subscribe to topic %s", s.topic)
|
|
|
+ logger.Infof("zmq source subscribe to topic %s", s.topic)
|
|
|
exeCtx, cancel := ctx.WithCancel()
|
|
|
s.cancel = cancel
|
|
|
- logger.Debugf("start to listen")
|
|
|
+ logger.Infof("start to listen")
|
|
|
for {
|
|
|
msgs, err := s.subscriber.RecvMessage(0)
|
|
|
if err != nil {
|
|
|
id, err := s.subscriber.GetIdentity()
|
|
|
errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
|
|
|
} else {
|
|
|
- logger.Debugf("zmq source receive %v", msgs)
|
|
|
+ logger.Infof("zmq source receive %v", msgs)
|
|
|
var m string
|
|
|
for i, msg := range msgs {
|
|
|
if i == 0 && s.topic != "" {
|
|
@@ -86,4 +87,6 @@ func (s *zmqSource) Close(ctx api.StreamContext) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-var Zmq zmqSource
|
|
|
+func Zmq() api.Source {
|
|
|
+ return &zmqSource{}
|
|
|
+}
|