zmq.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. zmq "github.com/pebbe/zmq4"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/errorx"
  20. )
  21. type zmqSink struct {
  22. publisher *zmq.Socket
  23. srv string
  24. topic string
  25. }
  26. func (m *zmqSink) Configure(props map[string]interface{}) error {
  27. srv, ok := props["server"]
  28. if !ok {
  29. return fmt.Errorf("zmq source is missing property server")
  30. }
  31. m.srv, ok = srv.(string)
  32. if !ok {
  33. return fmt.Errorf("zmq source property server %v is not a string", srv)
  34. }
  35. if tpc, ok := props["topic"]; ok {
  36. if t, ok := tpc.(string); !ok {
  37. return fmt.Errorf("zmq source property topic %v is not a string", tpc)
  38. } else {
  39. m.topic = t
  40. }
  41. }
  42. m.srv, ok = srv.(string)
  43. if !ok {
  44. return fmt.Errorf("zmq source ssing property server")
  45. }
  46. return nil
  47. }
  48. func (m *zmqSink) Open(ctx api.StreamContext) (err error) {
  49. logger := ctx.GetLogger()
  50. m.publisher, err = zmq.NewSocket(zmq.PUB)
  51. if err != nil {
  52. return fmt.Errorf("zmq sink fails to create socket: %v", err)
  53. }
  54. err = m.publisher.Bind(m.srv)
  55. if err != nil {
  56. return fmt.Errorf("zmq sink fails to bind to %s: %v", m.srv, err)
  57. }
  58. logger.Debugf("zmq sink open")
  59. return nil
  60. }
  61. func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) error {
  62. logger := ctx.GetLogger()
  63. var v []byte
  64. var err error
  65. v, _, err = ctx.TransformOutput(item)
  66. if err != nil {
  67. logger.Debug("zmq sink receive non byte data %v", item)
  68. return err
  69. }
  70. logger.Debugf("zmq sink receive %s", item)
  71. err = m.sendToZmq(ctx, v)
  72. if err != nil {
  73. return err
  74. }
  75. return nil
  76. }
  77. func (m *zmqSink) sendToZmq(ctx api.StreamContext, v []byte) error {
  78. var err error
  79. if m.topic == "" {
  80. _, err = m.publisher.Send(string(v), 0)
  81. } else {
  82. msgs := []string{
  83. m.topic,
  84. string(v),
  85. }
  86. _, err = m.publisher.SendMessage(msgs)
  87. }
  88. if err != nil {
  89. ctx.GetLogger().Errorf("send to zmq error %v", err)
  90. return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
  91. }
  92. return nil
  93. }
  94. func (m *zmqSink) Close(ctx api.StreamContext) error {
  95. if m.publisher != nil {
  96. return m.publisher.Close()
  97. }
  98. return nil
  99. }
  100. func Zmq() api.Sink {
  101. return &zmqSink{}
  102. }