|
@@ -146,6 +146,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
|
|
result <- fmt.Errorf(msg)
|
|
result <- fmt.Errorf(msg)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
+ ctx = context.WithValue(ctx.(*context.DefaultContext), context.TransKey, tf)
|
|
|
|
|
|
m.reset()
|
|
m.reset()
|
|
logger.Infof("open sink node %d instances", m.concurrency)
|
|
logger.Infof("open sink node %d instances", m.concurrency)
|
|
@@ -194,9 +195,9 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
|
|
}
|
|
}
|
|
stats.SetBufferLength(int64(len(m.input)))
|
|
stats.SetBufferLength(int64(len(m.input)))
|
|
if sconf.RunAsync {
|
|
if sconf.RunAsync {
|
|
- go doCollect(ctx, sink, data, stats, sconf, tf, nil)
|
|
|
|
|
|
+ go doCollect(ctx, sink, data, stats, sconf, nil)
|
|
} else {
|
|
} else {
|
|
- doCollect(ctx, sink, data, stats, sconf, tf, nil)
|
|
|
|
|
|
+ doCollect(ctx, sink, data, stats, sconf, nil)
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
logger.Infof("sink node %s instance %d done", m.name, instance)
|
|
logger.Infof("sink node %s instance %d done", m.name, instance)
|
|
@@ -226,9 +227,9 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
|
|
}
|
|
}
|
|
stats.SetBufferLength(int64(len(m.input)))
|
|
stats.SetBufferLength(int64(len(m.input)))
|
|
if sconf.RunAsync {
|
|
if sconf.RunAsync {
|
|
- go doCollect(ctx, sink, data, stats, sconf, tf, cache.Complete)
|
|
|
|
|
|
+ go doCollect(ctx, sink, data, stats, sconf, cache.Complete)
|
|
} else {
|
|
} else {
|
|
- doCollect(ctx, sink, data, stats, sconf, tf, cache.Complete)
|
|
|
|
|
|
+ doCollect(ctx, sink, data, stats, sconf, cache.Complete)
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
logger.Infof("sink node %s instance %d done", m.name, instance)
|
|
logger.Infof("sink node %s instance %d done", m.name, instance)
|
|
@@ -251,7 +252,7 @@ func (m *SinkNode) reset() {
|
|
m.statManagers = nil
|
|
m.statManagers = nil
|
|
}
|
|
}
|
|
|
|
|
|
-func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats StatManager, sconf *SinkConf, tp transform.TransFunc, signalCh chan<- int) {
|
|
|
|
|
|
+func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats StatManager, sconf *SinkConf, signalCh chan<- int) {
|
|
stats.IncTotalRecordsIn()
|
|
stats.IncTotalRecordsIn()
|
|
stats.ProcessTimeStart()
|
|
stats.ProcessTimeStart()
|
|
defer stats.ProcessTimeEnd()
|
|
defer stats.ProcessTimeEnd()
|
|
@@ -273,20 +274,16 @@ func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats Sta
|
|
return
|
|
return
|
|
}
|
|
}
|
|
if !sconf.SendSingle {
|
|
if !sconf.SendSingle {
|
|
- doCollectData(ctx, sink, outs, stats, sconf, tp, signalCh)
|
|
|
|
|
|
+ doCollectData(ctx, sink, outs, stats, sconf, signalCh)
|
|
} else {
|
|
} else {
|
|
for _, d := range outs {
|
|
for _, d := range outs {
|
|
- doCollectData(ctx, sink, d, stats, sconf, tp, signalCh)
|
|
|
|
|
|
+ doCollectData(ctx, sink, d, stats, sconf, signalCh)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// doCollectData outData must be map or []map
|
|
// doCollectData outData must be map or []map
|
|
-func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats StatManager, sconf *SinkConf, tf transform.TransFunc, signalCh chan<- int) {
|
|
|
|
- vCtx := context.WithValue(ctx.(*context.DefaultContext), context.TransKey, &context.TransConfig{
|
|
|
|
- Data: outData,
|
|
|
|
- TFunc: tf,
|
|
|
|
- })
|
|
|
|
|
|
+func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats StatManager, sconf *SinkConf, signalCh chan<- int) {
|
|
retries := sconf.RetryCount
|
|
retries := sconf.RetryCount
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
@@ -294,7 +291,7 @@ func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, st
|
|
ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
|
|
ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
|
|
return
|
|
return
|
|
default:
|
|
default:
|
|
- if err := sink.Collect(vCtx, outData); err != nil {
|
|
|
|
|
|
+ if err := sink.Collect(ctx, outData); err != nil {
|
|
stats.IncTotalExceptions()
|
|
stats.IncTotalExceptions()
|
|
ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outData, err)
|
|
ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outData, err)
|
|
if sconf.RetryInterval > 0 && retries > 0 && strings.HasPrefix(err.Error(), errorx.IOErr) {
|
|
if sconf.RetryInterval > 0 && retries > 0 && strings.HasPrefix(err.Error(), errorx.IOErr) {
|