func.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package collectors
  2. import (
  3. "engine/xstream/api"
  4. "errors"
  5. )
  6. // CollectorFunc is a function used to colllect
  7. // incoming stream data. It can be used as a
  8. // stream sink.
  9. type CollectorFunc func(api.StreamContext, interface{}) error
  10. // FuncCollector is a colletor that uses a function
  11. // to collect data. The specified function must be
  12. // of type:
  13. // CollectorFunc
  14. type FuncCollector struct {
  15. f CollectorFunc
  16. }
  17. // Func creates a new value *FuncCollector that
  18. // will use the specified function parameter to
  19. // collect streaming data.
  20. func Func(f CollectorFunc) *FuncCollector {
  21. return &FuncCollector{f: f}
  22. }
  23. func (c *FuncCollector) Configure(props map[string]interface{}) error{
  24. //do nothing
  25. return nil
  26. }
  27. // Open is the starting point that starts the collector
  28. func (c *FuncCollector) Open(ctx api.StreamContext) error {
  29. log := ctx.GetLogger()
  30. log.Println("Opening func collector")
  31. if c.f == nil {
  32. return errors.New("func collector missing function")
  33. }
  34. return nil
  35. }
  36. func (c *FuncCollector) Collect(ctx api.StreamContext, item interface{}) error {
  37. return c.f(ctx, item)
  38. }
  39. func (c *FuncCollector) Close(api.StreamContext) error {
  40. return nil
  41. }