memory.go 763 B

1234567891011121314151617181920212223242526272829303132333435363738
  1. package main
  2. import "github.com/emqx/kuiper/xstream/api"
  3. type memory struct {
  4. results [][]byte
  5. }
  6. func (m *memory) Open(ctx api.StreamContext) error {
  7. log := ctx.GetLogger()
  8. log.Debug("Opening memory sink")
  9. m.results = make([][]byte, 0)
  10. return nil
  11. }
  12. func (m *memory) Collect(ctx api.StreamContext, item interface{}) error {
  13. logger := ctx.GetLogger()
  14. if v, ok := item.([]byte); ok {
  15. logger.Debugf("memory sink receive %s", item)
  16. m.results = append(m.results, v)
  17. } else {
  18. logger.Debug("memory sink receive non byte data")
  19. }
  20. return nil
  21. }
  22. func (m *memory) Close(ctx api.StreamContext) error {
  23. //do nothing
  24. return nil
  25. }
  26. func (m *memory) Configure(props map[string]interface{}) error {
  27. return nil
  28. }
  29. func Memory() api.Sink {
  30. return &memory{}
  31. }