mock_sink.go 853 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package test
  2. import (
  3. "github.com/emqx/kuiper/xstream/api"
  4. )
  5. type MockSink struct {
  6. results [][]byte
  7. }
  8. func NewMockSink() *MockSink {
  9. m := &MockSink{}
  10. return m
  11. }
  12. func (m *MockSink) Open(ctx api.StreamContext) error {
  13. log := ctx.GetLogger()
  14. log.Debugln("Opening mock sink")
  15. m.results = make([][]byte, 0)
  16. return nil
  17. }
  18. func (m *MockSink) Collect(ctx api.StreamContext, item interface{}) error {
  19. logger := ctx.GetLogger()
  20. if v, ok := item.([]byte); ok {
  21. logger.Debugf("mock sink receive %s", item)
  22. m.results = append(m.results, v)
  23. } else {
  24. logger.Info("mock sink receive non byte data")
  25. }
  26. return nil
  27. }
  28. func (m *MockSink) Close(ctx api.StreamContext) error {
  29. //do nothing
  30. return nil
  31. }
  32. func (m *MockSink) Configure(props map[string]interface{}) error {
  33. return nil
  34. }
  35. func (m *MockSink) GetResults() [][]byte {
  36. return m.results
  37. }