zmq.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/xstream/api"
  6. zmq "github.com/pebbe/zmq4"
  7. )
  8. type zmqSource struct {
  9. subscriber *zmq.Socket
  10. srv string
  11. topic string
  12. }
  13. func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
  14. s.topic = topic
  15. srv, ok := props["server"]
  16. if !ok {
  17. return fmt.Errorf("zmq source is missing property server")
  18. }
  19. s.srv = srv.(string)
  20. return nil
  21. }
  22. func (s *zmqSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
  23. logger := ctx.GetLogger()
  24. s.subscriber, err = zmq.NewSocket(zmq.SUB)
  25. if err != nil {
  26. return fmt.Errorf("zmq source fails to create socket: %v", err)
  27. }
  28. err = s.subscriber.Connect(s.srv)
  29. if err != nil {
  30. return fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err)
  31. }
  32. s.subscriber.SetSubscribe(s.topic)
  33. logger.Debugf("zmq source subscribe to topic %s", s.topic)
  34. go func() {
  35. logger.Debugf("start to listen")
  36. for {
  37. msgs, err := s.subscriber.RecvMessage(0)
  38. if err != nil {
  39. id, err := s.subscriber.GetIdentity()
  40. logger.Warnf("zmq source getting message %s error: %v", id, err)
  41. } else {
  42. logger.Debugf("zmq source receive %v", msgs)
  43. var m string
  44. for i, msg := range msgs {
  45. if i == 0 && s.topic != "" {
  46. continue
  47. }
  48. m += msg
  49. }
  50. meta := make(map[string]interface{})
  51. if s.topic != "" {
  52. meta["topic"] = msgs[0]
  53. }
  54. result := make(map[string]interface{})
  55. if e := json.Unmarshal([]byte(m), &result); e != nil {
  56. logger.Warnf("zmq source message %s is not json", m)
  57. } else {
  58. consume(result, meta)
  59. }
  60. }
  61. select {
  62. case <-ctx.Done():
  63. logger.Infof("zmq source done")
  64. return
  65. default:
  66. //do nothing
  67. }
  68. }
  69. }()
  70. return nil
  71. }
  72. func (s *zmqSource) Close(ctx api.StreamContext) error {
  73. if s.subscriber != nil {
  74. return s.subscriber.Close()
  75. }
  76. return nil
  77. }
  78. var Zmq zmqSource