zmq.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xstream/api"
  5. zmq "github.com/pebbe/zmq4"
  6. )
  7. type zmqSink struct {
  8. publisher *zmq.Socket
  9. srv string
  10. topic string
  11. }
  12. func (m *zmqSink) Configure(props map[string]interface{}) error {
  13. srv, ok := props["server"]
  14. if !ok {
  15. return fmt.Errorf("zmq source is missing property server")
  16. }
  17. m.srv, ok = srv.(string)
  18. if !ok {
  19. return fmt.Errorf("zmq source property server %v is not a string", srv)
  20. }
  21. if tpc, ok := props["topic"]; ok {
  22. if t, ok := tpc.(string); !ok {
  23. return fmt.Errorf("zmq source property topic %v is not a string", tpc)
  24. } else {
  25. m.topic = t
  26. }
  27. }
  28. m.srv, ok = srv.(string)
  29. if !ok {
  30. return fmt.Errorf("zmq source ssing property server")
  31. }
  32. return nil
  33. }
  34. func (m *zmqSink) Open(ctx api.StreamContext) (err error) {
  35. logger := ctx.GetLogger()
  36. m.publisher, err = zmq.NewSocket(zmq.PUB)
  37. if err != nil {
  38. return fmt.Errorf("zmq sink fails to create socket: %v", err)
  39. }
  40. err = m.publisher.Bind(m.srv)
  41. if err != nil {
  42. return fmt.Errorf("zmq sink fails to bind to %s: %v", m.srv, err)
  43. }
  44. logger.Debugf("zmq sink open")
  45. return nil
  46. }
  47. func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) (err error) {
  48. logger := ctx.GetLogger()
  49. if v, ok := item.([]byte); ok {
  50. logger.Debugf("zmq sink receive %s", item)
  51. if m.topic == "" {
  52. _, err = m.publisher.Send(string(v), 0)
  53. } else {
  54. msgs := []string{
  55. m.topic,
  56. string(v),
  57. }
  58. _, err = m.publisher.SendMessage(msgs)
  59. }
  60. } else {
  61. logger.Debug("zmq sink receive non byte data %v", item)
  62. }
  63. if err != nil {
  64. logger.Debugf("send to zmq error %v", err)
  65. }
  66. return
  67. }
  68. func (m *zmqSink) Close(ctx api.StreamContext) error {
  69. if m.publisher != nil {
  70. return m.publisher.Close()
  71. }
  72. return nil
  73. }
  74. func Zmq() api.Sink {
  75. return &zmqSink{}
  76. }