|
@@ -159,22 +159,22 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
|
|
|
var sink api.Sink
|
|
|
var err error
|
|
|
if !m.isMock {
|
|
|
- logger.Debugf(fmt.Sprintf("Trying to get sink for rule %s with options %v\n", ctx.GetRuleId(), m.options))
|
|
|
+ logger.Debugf("Trying to get sink for rule %s with options %v\n", ctx.GetRuleId(), m.options)
|
|
|
sink, err = getSink(m.sinkType, m.options)
|
|
|
if err != nil {
|
|
|
m.drainError(result, err, ctx, logger)
|
|
|
return
|
|
|
}
|
|
|
- logger.Debugf(fmt.Sprintf("Successfully get the sink %s", m.sinkType))
|
|
|
+ logger.Debugf("Successfully get the sink %s", m.sinkType)
|
|
|
m.mutex.Lock()
|
|
|
m.sinks = append(m.sinks, sink)
|
|
|
m.mutex.Unlock()
|
|
|
- logger.Debugf(fmt.Sprintf("Now is to open sink for rule %s.\n", ctx.GetRuleId()))
|
|
|
+ logger.Debugf("Now is to open sink for rule %s.\n", ctx.GetRuleId())
|
|
|
if err := sink.Open(ctx); err != nil {
|
|
|
m.drainError(result, err, ctx, logger)
|
|
|
return
|
|
|
}
|
|
|
- logger.Debugf(fmt.Sprintf("Successfully open sink for rule %s.\n", ctx.GetRuleId()))
|
|
|
+ logger.Debugf("Successfully open sink for rule %s.\n", ctx.GetRuleId())
|
|
|
} else {
|
|
|
sink = m.sinks[instance]
|
|
|
}
|