common_func.go 508 B

12345678910111213141516171819202122
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/xstream/api"
  4. "sync"
  5. )
  6. //Blocking broadcast
  7. func Broadcast(outputs map[string]chan<- interface{}, val interface{}, ctx api.StreamContext) {
  8. logger := ctx.GetLogger()
  9. var wg sync.WaitGroup
  10. wg.Add(len(outputs))
  11. for n, out := range outputs {
  12. go func(output chan<- interface{}) {
  13. output <- val
  14. wg.Done()
  15. logger.Debugf("broadcast from %s to %s done", ctx.GetOpId(), n)
  16. }(out)
  17. }
  18. logger.Debugf("broadcasting from %s", ctx.GetOpId())
  19. wg.Wait()
  20. }