|
@@ -1,6 +1,7 @@
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
+ "context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
@@ -9,11 +10,12 @@ import (
|
|
|
|
|
|
type zmqSource struct {
|
|
|
subscriber *zmq.Socket
|
|
|
- srv string
|
|
|
- topic string
|
|
|
+ srv string
|
|
|
+ topic string
|
|
|
+ cancel context.CancelFunc
|
|
|
}
|
|
|
|
|
|
-func (s *zmqSource) Configure(topic string, props map[string]interface{}) error{
|
|
|
+func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
|
|
|
s.topic = topic
|
|
|
srv, ok := props["server"]
|
|
|
if !ok {
|
|
@@ -26,16 +28,18 @@ func (s *zmqSource) Configure(topic string, props map[string]interface{}) error{
|
|
|
func (s *zmqSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
|
|
|
logger := ctx.GetLogger()
|
|
|
s.subscriber, err = zmq.NewSocket(zmq.SUB)
|
|
|
- if err != nil{
|
|
|
+ if err != nil {
|
|
|
return fmt.Errorf("zmq source fails to create socket: %v", err)
|
|
|
}
|
|
|
err = s.subscriber.Connect(s.srv)
|
|
|
- if err != nil{
|
|
|
+ if err != nil {
|
|
|
return fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err)
|
|
|
}
|
|
|
s.subscriber.SetSubscribe(s.topic)
|
|
|
logger.Debugf("zmq source subscribe to topic %s", s.topic)
|
|
|
- go func(){
|
|
|
+ exeCtx, cancel := ctx.WithCancel()
|
|
|
+ s.cancel = cancel
|
|
|
+ go func(exeCtx api.StreamContext) {
|
|
|
logger.Debugf("start to listen")
|
|
|
for {
|
|
|
msgs, err := s.subscriber.RecvMessage(0)
|
|
@@ -45,38 +49,41 @@ func (s *zmqSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err er
|
|
|
} else {
|
|
|
logger.Debugf("zmq source receive %v", msgs)
|
|
|
var m string
|
|
|
- for i, msg := range msgs{
|
|
|
- if i == 0 && s.topic != ""{
|
|
|
+ for i, msg := range msgs {
|
|
|
+ if i == 0 && s.topic != "" {
|
|
|
continue
|
|
|
}
|
|
|
m += msg
|
|
|
}
|
|
|
meta := make(map[string]interface{})
|
|
|
- if s.topic != ""{
|
|
|
+ if s.topic != "" {
|
|
|
meta["topic"] = msgs[0]
|
|
|
}
|
|
|
result := make(map[string]interface{})
|
|
|
if e := json.Unmarshal([]byte(m), &result); e != nil {
|
|
|
logger.Warnf("zmq source message %s is not json", m)
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
consume(result, meta)
|
|
|
}
|
|
|
}
|
|
|
- select{
|
|
|
- case <- ctx.Done():
|
|
|
+ select {
|
|
|
+ case <-exeCtx.Done():
|
|
|
logger.Infof("zmq source done")
|
|
|
+ if s.subscriber != nil {
|
|
|
+ s.subscriber.Close()
|
|
|
+ }
|
|
|
return
|
|
|
default:
|
|
|
//do nothing
|
|
|
}
|
|
|
}
|
|
|
- }()
|
|
|
+ }(exeCtx)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (s *zmqSource) Close(ctx api.StreamContext) error{
|
|
|
- if s.subscriber != nil{
|
|
|
- return s.subscriber.Close()
|
|
|
+func (s *zmqSource) Close(ctx api.StreamContext) error {
|
|
|
+ if s.cancel != nil {
|
|
|
+ s.cancel()
|
|
|
}
|
|
|
return nil
|
|
|
}
|