zmq.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/xstream/api"
  7. zmq "github.com/pebbe/zmq4"
  8. )
  9. type zmqSource struct {
  10. subscriber *zmq.Socket
  11. srv string
  12. topic string
  13. cancel context.CancelFunc
  14. }
  15. func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
  16. s.topic = topic
  17. srv, ok := props["server"]
  18. if !ok {
  19. return fmt.Errorf("zmq source is missing property server")
  20. }
  21. s.srv = srv.(string)
  22. return nil
  23. }
  24. func (s *zmqSource) Open(ctx api.StreamContext, consume api.ConsumeFunc, onError api.ErrorFunc) {
  25. logger := ctx.GetLogger()
  26. var err error
  27. s.subscriber, err = zmq.NewSocket(zmq.SUB)
  28. if err != nil {
  29. onError(fmt.Errorf("zmq source fails to create socket: %v", err))
  30. }
  31. err = s.subscriber.Connect(s.srv)
  32. if err != nil {
  33. onError(fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err))
  34. }
  35. s.subscriber.SetSubscribe(s.topic)
  36. logger.Debugf("zmq source subscribe to topic %s", s.topic)
  37. exeCtx, cancel := ctx.WithCancel()
  38. s.cancel = cancel
  39. go func(exeCtx api.StreamContext) {
  40. logger.Debugf("start to listen")
  41. for {
  42. msgs, err := s.subscriber.RecvMessage(0)
  43. if err != nil {
  44. id, err := s.subscriber.GetIdentity()
  45. onError(fmt.Errorf("zmq source getting message %s error: %v", id, err))
  46. } else {
  47. logger.Debugf("zmq source receive %v", msgs)
  48. var m string
  49. for i, msg := range msgs {
  50. if i == 0 && s.topic != "" {
  51. continue
  52. }
  53. m += msg
  54. }
  55. meta := make(map[string]interface{})
  56. if s.topic != "" {
  57. meta["topic"] = msgs[0]
  58. }
  59. result := make(map[string]interface{})
  60. if e := json.Unmarshal([]byte(m), &result); e != nil {
  61. logger.Warnf("zmq source message %s is not json", m)
  62. } else {
  63. consume(result, meta)
  64. }
  65. }
  66. select {
  67. case <-exeCtx.Done():
  68. logger.Infof("zmq source done")
  69. if s.subscriber != nil {
  70. s.subscriber.Close()
  71. }
  72. return
  73. default:
  74. //do nothing
  75. }
  76. }
  77. }(exeCtx)
  78. }
  79. func (s *zmqSource) Close(ctx api.StreamContext) error {
  80. if s.cancel != nil {
  81. s.cancel()
  82. }
  83. return nil
  84. }
  85. var Zmq zmqSource