func.go 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package collectors
  2. import (
  3. "engine/xstream/api"
  4. "errors"
  5. )
  6. // CollectorFunc is a function used to colllect
  7. // incoming stream data. It can be used as a
  8. // stream sink.
  9. type CollectorFunc func(api.StreamContext, interface{}) error
  10. // FuncCollector is a colletor that uses a function
  11. // to collect data. The specified function must be
  12. // of type:
  13. // CollectorFunc
  14. type FuncCollector struct {
  15. f CollectorFunc
  16. }
  17. // Func creates a new value *FuncCollector that
  18. // will use the specified function parameter to
  19. // collect streaming data.
  20. func Func(f CollectorFunc) *FuncCollector {
  21. return &FuncCollector{f: f}
  22. }
  23. // Open is the starting point that starts the collector
  24. func (c *FuncCollector) Open(ctx api.StreamContext) error {
  25. log := ctx.GetLogger()
  26. log.Println("Opening func collector")
  27. if c.f == nil {
  28. return errors.New("func collector missing function")
  29. }
  30. return nil
  31. }
  32. func (c *FuncCollector) Collect(ctx api.StreamContext, item interface{}) error {
  33. return c.f(ctx, item)
  34. }
  35. func (c *FuncCollector) Close(api.StreamContext) error {
  36. return nil
  37. }