func.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package collectors
  2. import (
  3. "context"
  4. "engine/common"
  5. "errors"
  6. )
  7. // CollectorFunc is a function used to colllect
  8. // incoming stream data. It can be used as a
  9. // stream sink.
  10. type CollectorFunc func(context.Context, interface{}) error
  11. // FuncCollector is a colletor that uses a function
  12. // to collect data. The specified function must be
  13. // of type:
  14. // CollectorFunc
  15. type FuncCollector struct {
  16. input chan interface{}
  17. //logf api.LogFunc
  18. //errf api.ErrorFunc
  19. f CollectorFunc
  20. name string
  21. }
  22. // Func creates a new value *FuncCollector that
  23. // will use the specified function parameter to
  24. // collect streaming data.
  25. func Func(name string, f CollectorFunc) *FuncCollector {
  26. return &FuncCollector{f: f, name:name, input: make(chan interface{}, 1024)}
  27. }
  28. func (c *FuncCollector) GetName() string {
  29. return c.name
  30. }
  31. func (c *FuncCollector) GetInput() (chan<- interface{}, string) {
  32. return c.input, c.name
  33. }
  34. // Open is the starting point that starts the collector
  35. func (c *FuncCollector) Open(ctx context.Context, result chan<- error) {
  36. //c.logf = autoctx.GetLogFunc(ctx)
  37. //c.errf = autoctx.GetErrFunc(ctx)
  38. log := common.GetLogger(ctx)
  39. log.Println("Opening func collector")
  40. if c.f == nil {
  41. err := errors.New("Func collector missing function")
  42. log.Println(err)
  43. go func() { result <- err }()
  44. }
  45. go func() {
  46. for {
  47. select {
  48. case item := <-c.input:
  49. if err := c.f(ctx, item); err != nil {
  50. log.Println(err)
  51. }
  52. case <-ctx.Done():
  53. log.Infof("Func collector %s done", c.name)
  54. return
  55. }
  56. }
  57. }()
  58. }