mock_sink.go 958 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package test
  2. import (
  3. "context"
  4. "engine/common"
  5. )
  6. type MockSink struct {
  7. ruleId string
  8. name string
  9. results [][]byte
  10. input chan interface{}
  11. }
  12. func NewMockSink(name, ruleId string) *MockSink{
  13. m := &MockSink{
  14. ruleId: ruleId,
  15. name: name,
  16. input: make(chan interface{}),
  17. }
  18. return m
  19. }
  20. func (m *MockSink) Open(ctx context.Context, result chan<- error) {
  21. log := common.GetLogger(ctx)
  22. log.Trace("Opening mock sink")
  23. m.results = make([][]byte, 0)
  24. go func() {
  25. for {
  26. select {
  27. case item := <-m.input:
  28. if v, ok := item.([]byte); ok {
  29. log.Infof("mock sink receive %s", item)
  30. m.results = append(m.results, v)
  31. }else{
  32. log.Info("mock sink receive non byte data")
  33. }
  34. case <-ctx.Done():
  35. log.Infof("mock sink %s done", m.name)
  36. return
  37. }
  38. }
  39. }()
  40. }
  41. func (m *MockSink) GetInput() (chan<- interface{}, string) {
  42. return m.input, m.name
  43. }
  44. func (m *MockSink) GetResults() [][]byte {
  45. return m.results
  46. }