|
@@ -65,27 +65,41 @@ func (m *zmqSink) Open(ctx api.StreamContext) (err error) {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
-func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) (err error) {
|
|
|
|
|
|
+func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) error {
|
|
logger := ctx.GetLogger()
|
|
logger := ctx.GetLogger()
|
|
- if v, _, err := ctx.TransformOutput(item); err == nil {
|
|
|
|
- logger.Debugf("zmq sink receive %s", item)
|
|
|
|
- if m.topic == "" {
|
|
|
|
- _, err = m.publisher.Send(string(v), 0)
|
|
|
|
- } else {
|
|
|
|
- msgs := []string{
|
|
|
|
- m.topic,
|
|
|
|
- string(v),
|
|
|
|
- }
|
|
|
|
- _, err = m.publisher.SendMessage(msgs)
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
|
|
+ var v []byte
|
|
|
|
+ var err error
|
|
|
|
+ v, _, err = ctx.TransformOutput(item)
|
|
|
|
+ if err != nil {
|
|
logger.Debug("zmq sink receive non byte data %v", item)
|
|
logger.Debug("zmq sink receive non byte data %v", item)
|
|
|
|
+ return err
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ logger.Debugf("zmq sink receive %s", item)
|
|
|
|
+ err = m.sendToZmq(ctx, v)
|
|
if err != nil {
|
|
if err != nil {
|
|
- logger.Errorf("send to zmq error %v", err)
|
|
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (m *zmqSink) sendToZmq(ctx api.StreamContext, v []byte) error {
|
|
|
|
+ var err error
|
|
|
|
+ if m.topic == "" {
|
|
|
|
+ _, err = m.publisher.Send(string(v), 0)
|
|
|
|
+ } else {
|
|
|
|
+ msgs := []string{
|
|
|
|
+ m.topic,
|
|
|
|
+ string(v),
|
|
|
|
+ }
|
|
|
|
+ _, err = m.publisher.SendMessage(msgs)
|
|
|
|
+ }
|
|
|
|
+ if err != nil {
|
|
|
|
+ ctx.GetLogger().Errorf("send to zmq error %v", err)
|
|
return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
|
|
return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
|
|
}
|
|
}
|
|
- return
|
|
|
|
|
|
+ return nil
|
|
}
|
|
}
|
|
|
|
|
|
func (m *zmqSink) Close(ctx api.StreamContext) error {
|
|
func (m *zmqSink) Close(ctx api.StreamContext) error {
|