zmq.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/xstream/api"
  7. zmq "github.com/pebbe/zmq4"
  8. )
  9. type zmqSource struct {
  10. subscriber *zmq.Socket
  11. srv string
  12. topic string
  13. cancel context.CancelFunc
  14. }
  15. func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
  16. s.topic = topic
  17. srv, ok := props["server"]
  18. if !ok {
  19. return fmt.Errorf("zmq source is missing property server")
  20. }
  21. s.srv = srv.(string)
  22. return nil
  23. }
  24. func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  25. logger := ctx.GetLogger()
  26. var err error
  27. s.subscriber, err = zmq.NewSocket(zmq.SUB)
  28. if err != nil {
  29. errCh <- fmt.Errorf("zmq source fails to create socket: %v", err)
  30. }
  31. err = s.subscriber.Connect(s.srv)
  32. if err != nil {
  33. errCh <- fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err)
  34. }
  35. s.subscriber.SetSubscribe(s.topic)
  36. logger.Debugf("zmq source subscribe to topic %s", s.topic)
  37. exeCtx, cancel := ctx.WithCancel()
  38. s.cancel = cancel
  39. logger.Debugf("start to listen")
  40. for {
  41. msgs, err := s.subscriber.RecvMessage(0)
  42. if err != nil {
  43. id, err := s.subscriber.GetIdentity()
  44. errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
  45. } else {
  46. logger.Debugf("zmq source receive %v", msgs)
  47. var m string
  48. for i, msg := range msgs {
  49. if i == 0 && s.topic != "" {
  50. continue
  51. }
  52. m += msg
  53. }
  54. meta := make(map[string]interface{})
  55. if s.topic != "" {
  56. meta["topic"] = msgs[0]
  57. }
  58. result := make(map[string]interface{})
  59. if e := json.Unmarshal([]byte(m), &result); e != nil {
  60. logger.Warnf("zmq source message %s is not json", m)
  61. } else {
  62. consumer <- api.NewDefaultSourceTuple(result, meta)
  63. }
  64. }
  65. select {
  66. case <-exeCtx.Done():
  67. logger.Infof("zmq source done")
  68. if s.subscriber != nil {
  69. s.subscriber.Close()
  70. }
  71. return
  72. default:
  73. //do nothing
  74. }
  75. }
  76. }
  77. func (s *zmqSource) Close(ctx api.StreamContext) error {
  78. if s.cancel != nil {
  79. s.cancel()
  80. }
  81. return nil
  82. }
  83. var Zmq zmqSource