mock_source.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package test
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "time"
  7. )
  8. type MockSource struct {
  9. data []*xsql.Tuple
  10. done chan<- struct{}
  11. isEventTime bool
  12. }
  13. // New creates a new CsvSource
  14. func NewMockSource(data []*xsql.Tuple, done chan<- struct{}, isEventTime bool) *MockSource {
  15. mock := &MockSource{
  16. data: data,
  17. done: done,
  18. isEventTime: isEventTime,
  19. }
  20. return mock
  21. }
  22. func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
  23. log := ctx.GetLogger()
  24. mockClock := GetMockClock()
  25. log.Debugln("mock source starts")
  26. go func() {
  27. for _, d := range m.data {
  28. log.Debugf("mock source is sending data %s", d)
  29. if !m.isEventTime {
  30. mockClock.Set(common.TimeFromUnixMilli(d.Timestamp))
  31. }else {
  32. mockClock.Add(1000 * time.Millisecond)
  33. }
  34. consume(d.Message, nil)
  35. time.Sleep(1)
  36. }
  37. if m.isEventTime{
  38. mockClock.Add(1000 * time.Millisecond)
  39. time.Sleep(1)
  40. }
  41. m.done <- struct{}{}
  42. }()
  43. return nil
  44. }
  45. func (m *MockSource) Close(ctx api.StreamContext) error {
  46. return nil
  47. }
  48. func (m *MockSource) Configure(topic string, props map[string]interface{}) error {
  49. return nil
  50. }