zmq.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xstream/api"
  7. zmq "github.com/pebbe/zmq4"
  8. )
  9. type zmqSource struct {
  10. subscriber *zmq.Socket
  11. srv string
  12. topic string
  13. messageFormat string
  14. cancel context.CancelFunc
  15. }
  16. func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
  17. s.topic = topic
  18. srv, ok := props["server"]
  19. if !ok {
  20. return fmt.Errorf("zmq source is missing property server")
  21. }
  22. s.srv = srv.(string)
  23. f, ok := props["format"]
  24. if !ok {
  25. s.messageFormat = common.FORMAT_JSON
  26. } else {
  27. s.messageFormat = f.(string)
  28. }
  29. return nil
  30. }
  31. func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  32. logger := ctx.GetLogger()
  33. var err error
  34. s.subscriber, err = zmq.NewSocket(zmq.SUB)
  35. if err != nil {
  36. errCh <- fmt.Errorf("zmq source fails to create socket: %v", err)
  37. }
  38. err = s.subscriber.Connect(s.srv)
  39. if err != nil {
  40. errCh <- fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err)
  41. }
  42. s.subscriber.SetSubscribe(s.topic)
  43. logger.Debugf("zmq source subscribe to topic %s", s.topic)
  44. exeCtx, cancel := ctx.WithCancel()
  45. s.cancel = cancel
  46. logger.Debugf("start to listen")
  47. for {
  48. msgs, err := s.subscriber.RecvMessageBytes(0)
  49. if err != nil {
  50. id, err := s.subscriber.GetIdentity()
  51. errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
  52. } else {
  53. logger.Debugf("zmq source receive %v", msgs)
  54. var m []byte
  55. for i, msg := range msgs {
  56. if i == 0 && s.topic != "" {
  57. continue
  58. }
  59. m = append(m, msg...)
  60. }
  61. meta := make(map[string]interface{})
  62. if s.topic != "" {
  63. meta["topic"] = string(msgs[0])
  64. }
  65. result, e := common.MessageDecode(m, s.messageFormat)
  66. if e != nil {
  67. logger.Errorf("Invalid data format, cannot decode %v to %s format with error %s", m, s.messageFormat, e)
  68. } else {
  69. consumer <- api.NewDefaultSourceTuple(result, meta)
  70. }
  71. }
  72. select {
  73. case <-exeCtx.Done():
  74. logger.Infof("zmq source done")
  75. if s.subscriber != nil {
  76. s.subscriber.Close()
  77. }
  78. return
  79. default:
  80. //do nothing
  81. }
  82. }
  83. }
  84. func (s *zmqSource) Close(ctx api.StreamContext) error {
  85. if s.cancel != nil {
  86. s.cancel()
  87. }
  88. return nil
  89. }
  90. func Zmq() api.Source {
  91. return &zmqSource{}
  92. }