|
@@ -1,6 +1,7 @@
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
+ "context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
@@ -11,6 +12,7 @@ type zmqSource struct {
|
|
|
subscriber *zmq.Socket
|
|
|
srv string
|
|
|
topic string
|
|
|
+ cancel context.CancelFunc
|
|
|
}
|
|
|
|
|
|
func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
|
|
@@ -35,7 +37,9 @@ func (s *zmqSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err er
|
|
|
}
|
|
|
s.subscriber.SetSubscribe(s.topic)
|
|
|
logger.Debugf("zmq source subscribe to topic %s", s.topic)
|
|
|
- go func() {
|
|
|
+ exeCtx, cancel := ctx.WithCancel()
|
|
|
+ s.cancel = cancel
|
|
|
+ go func(exeCtx api.StreamContext) {
|
|
|
logger.Debugf("start to listen")
|
|
|
for {
|
|
|
msgs, err := s.subscriber.RecvMessage(0)
|
|
@@ -63,20 +67,23 @@ func (s *zmqSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err er
|
|
|
}
|
|
|
}
|
|
|
select {
|
|
|
- case <-ctx.Done():
|
|
|
+ case <-exeCtx.Done():
|
|
|
logger.Infof("zmq source done")
|
|
|
+ if s.subscriber != nil {
|
|
|
+ s.subscriber.Close()
|
|
|
+ }
|
|
|
return
|
|
|
default:
|
|
|
//do nothing
|
|
|
}
|
|
|
}
|
|
|
- }()
|
|
|
+ }(exeCtx)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
func (s *zmqSource) Close(ctx api.StreamContext) error {
|
|
|
- if s.subscriber != nil {
|
|
|
- return s.subscriber.Close()
|
|
|
+ if s.cancel != nil {
|
|
|
+ s.cancel()
|
|
|
}
|
|
|
return nil
|
|
|
}
|