|
@@ -205,22 +205,19 @@ func (ss *sourceSingleton) run(name, key string) {
|
|
|
|
|
|
func (ss *sourceSingleton) broadcast(val api.SourceTuple) {
|
|
|
logger := ss.ctx.GetLogger()
|
|
|
- var wg sync.WaitGroup
|
|
|
ss.RLock()
|
|
|
- wg.Add(len(ss.outputs))
|
|
|
for n, out := range ss.outputs {
|
|
|
- go func(name string, output chan<- api.SourceTuple) {
|
|
|
+ go func(name string, dataCh *DynamicChannelBuffer) {
|
|
|
select {
|
|
|
- case output <- val:
|
|
|
+ case dataCh.Out <- val:
|
|
|
logger.Debugf("broadcast from source pool to %s done", name)
|
|
|
case <-ss.ctx.Done():
|
|
|
- // rule stop so stop waiting
|
|
|
+ case <-dataCh.done:
|
|
|
+ // detached
|
|
|
}
|
|
|
- wg.Done()
|
|
|
- }(n, out.dataCh.Out)
|
|
|
+ }(n, out.dataCh)
|
|
|
}
|
|
|
ss.RUnlock()
|
|
|
- wg.Wait()
|
|
|
}
|
|
|
|
|
|
func (ss *sourceSingleton) broadcastError(err error) {
|