12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/emqx/kuiper/xstream/api"
- zmq "github.com/pebbe/zmq4"
- )
- type zmqSource struct {
- subscriber *zmq.Socket
- srv string
- topic string
- cancel context.CancelFunc
- }
- func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
- s.topic = topic
- srv, ok := props["server"]
- if !ok {
- return fmt.Errorf("zmq source is missing property server")
- }
- s.srv = srv.(string)
- return nil
- }
- func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
- logger := ctx.GetLogger()
- var err error
- s.subscriber, err = zmq.NewSocket(zmq.SUB)
- if err != nil {
- errCh <- fmt.Errorf("zmq source fails to create socket: %v", err)
- }
- err = s.subscriber.Connect(s.srv)
- if err != nil {
- errCh <- fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err)
- }
- s.subscriber.SetSubscribe(s.topic)
- logger.Debugf("zmq source subscribe to topic %s", s.topic)
- exeCtx, cancel := ctx.WithCancel()
- s.cancel = cancel
- logger.Debugf("start to listen")
- for {
- msgs, err := s.subscriber.RecvMessage(0)
- if err != nil {
- id, err := s.subscriber.GetIdentity()
- errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
- } else {
- logger.Debugf("zmq source receive %v", msgs)
- var m string
- for i, msg := range msgs {
- if i == 0 && s.topic != "" {
- continue
- }
- m += msg
- }
- meta := make(map[string]interface{})
- if s.topic != "" {
- meta["topic"] = msgs[0]
- }
- result := make(map[string]interface{})
- if e := json.Unmarshal([]byte(m), &result); e != nil {
- logger.Warnf("zmq source message %s is not json", m)
- } else {
- consumer <- api.NewDefaultSourceTuple(result, meta)
- }
- }
- select {
- case <-exeCtx.Done():
- logger.Infof("zmq source done")
- if s.subscriber != nil {
- s.subscriber.Close()
- }
- return
- default:
- //do nothing
- }
- }
- }
- func (s *zmqSource) Close(ctx api.StreamContext) error {
- if s.cancel != nil {
- s.cancel()
- }
- return nil
- }
- func Zmq() api.Source {
- return &zmqSource{}
- }
|