zmq.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. zmq "github.com/pebbe/zmq4"
  19. )
  20. type zmqSink struct {
  21. publisher *zmq.Socket
  22. srv string
  23. topic string
  24. }
  25. func (m *zmqSink) Configure(props map[string]interface{}) error {
  26. srv, ok := props["server"]
  27. if !ok {
  28. return fmt.Errorf("zmq source is missing property server")
  29. }
  30. m.srv, ok = srv.(string)
  31. if !ok {
  32. return fmt.Errorf("zmq source property server %v is not a string", srv)
  33. }
  34. if tpc, ok := props["topic"]; ok {
  35. if t, ok := tpc.(string); !ok {
  36. return fmt.Errorf("zmq source property topic %v is not a string", tpc)
  37. } else {
  38. m.topic = t
  39. }
  40. }
  41. m.srv, ok = srv.(string)
  42. if !ok {
  43. return fmt.Errorf("zmq source ssing property server")
  44. }
  45. return nil
  46. }
  47. func (m *zmqSink) Open(ctx api.StreamContext) (err error) {
  48. logger := ctx.GetLogger()
  49. m.publisher, err = zmq.NewSocket(zmq.PUB)
  50. if err != nil {
  51. return fmt.Errorf("zmq sink fails to create socket: %v", err)
  52. }
  53. err = m.publisher.Bind(m.srv)
  54. if err != nil {
  55. return fmt.Errorf("zmq sink fails to bind to %s: %v", m.srv, err)
  56. }
  57. logger.Debugf("zmq sink open")
  58. return nil
  59. }
  60. func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) (err error) {
  61. logger := ctx.GetLogger()
  62. if v, ok := item.([]byte); ok {
  63. logger.Debugf("zmq sink receive %s", item)
  64. if m.topic == "" {
  65. _, err = m.publisher.Send(string(v), 0)
  66. } else {
  67. msgs := []string{
  68. m.topic,
  69. string(v),
  70. }
  71. _, err = m.publisher.SendMessage(msgs)
  72. }
  73. } else {
  74. logger.Debug("zmq sink receive non byte data %v", item)
  75. }
  76. if err != nil {
  77. logger.Debugf("send to zmq error %v", err)
  78. }
  79. return
  80. }
  81. func (m *zmqSink) Close(ctx api.StreamContext) error {
  82. if m.publisher != nil {
  83. return m.publisher.Close()
  84. }
  85. return nil
  86. }
  87. func Zmq() api.Sink {
  88. return &zmqSink{}
  89. }