zmq.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/xstream/api"
  7. zmq "github.com/pebbe/zmq4"
  8. )
  9. type zmqSource struct {
  10. subscriber *zmq.Socket
  11. srv string
  12. topic string
  13. cancel context.CancelFunc
  14. }
  15. func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
  16. fmt.Printf("Configuring zmq once, is it the previous one %s and subscriber %v", s.topic, s.subscriber)
  17. s.topic = topic
  18. srv, ok := props["server"]
  19. if !ok {
  20. return fmt.Errorf("zmq source is missing property server")
  21. }
  22. s.srv = srv.(string)
  23. return nil
  24. }
  25. func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  26. logger := ctx.GetLogger()
  27. var err error
  28. s.subscriber, err = zmq.NewSocket(zmq.SUB)
  29. if err != nil {
  30. errCh <- fmt.Errorf("zmq source fails to create socket: %v", err)
  31. }
  32. err = s.subscriber.Connect(s.srv)
  33. if err != nil {
  34. errCh <- fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err)
  35. }
  36. s.subscriber.SetSubscribe(s.topic)
  37. logger.Infof("zmq source subscribe to topic %s", s.topic)
  38. exeCtx, cancel := ctx.WithCancel()
  39. s.cancel = cancel
  40. logger.Infof("start to listen")
  41. for {
  42. msgs, err := s.subscriber.RecvMessage(0)
  43. if err != nil {
  44. id, err := s.subscriber.GetIdentity()
  45. errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
  46. } else {
  47. logger.Infof("zmq source receive %v", msgs)
  48. var m string
  49. for i, msg := range msgs {
  50. if i == 0 && s.topic != "" {
  51. continue
  52. }
  53. m += msg
  54. }
  55. meta := make(map[string]interface{})
  56. if s.topic != "" {
  57. meta["topic"] = msgs[0]
  58. }
  59. result := make(map[string]interface{})
  60. if e := json.Unmarshal([]byte(m), &result); e != nil {
  61. logger.Warnf("zmq source message %s is not json", m)
  62. } else {
  63. consumer <- api.NewDefaultSourceTuple(result, meta)
  64. }
  65. }
  66. select {
  67. case <-exeCtx.Done():
  68. logger.Infof("zmq source done")
  69. if s.subscriber != nil {
  70. s.subscriber.Close()
  71. }
  72. return
  73. default:
  74. //do nothing
  75. }
  76. }
  77. }
  78. func (s *zmqSource) Close(ctx api.StreamContext) error {
  79. if s.cancel != nil {
  80. s.cancel()
  81. }
  82. return nil
  83. }
  84. func Zmq() api.Source {
  85. return &zmqSource{}
  86. }