Signed-off-by: Jiyong Huang <huangjy@emqx.io>
@@ -224,7 +224,8 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
} else {
select {
case dataCh <- outs:
- case <-ctx.Done():
+ default:
+ ctx.GetLogger().Warnf("sink node %s instance %d buffer is full, drop data %v", m.name, instance, outs)
}
if resendCh != nil {