mock_source.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package test
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "time"
  7. )
  8. type MockSource struct {
  9. data []*xsql.Tuple
  10. done chan<- struct{}
  11. isEventTime bool
  12. }
  13. // New creates a new CsvSource
  14. func NewMockSource(data []*xsql.Tuple, done chan<- struct{}, isEventTime bool) *MockSource {
  15. mock := &MockSource{
  16. data: data,
  17. done: done,
  18. isEventTime: isEventTime,
  19. }
  20. return mock
  21. }
  22. func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
  23. log := ctx.GetLogger()
  24. log.Debugln("mock source starts")
  25. go func(){
  26. for _, d := range m.data{
  27. log.Infof("mock source is sending data %s", d)
  28. if !m.isEventTime{
  29. common.SetMockNow(d.Timestamp)
  30. ticker := common.GetMockTicker()
  31. timer := common.GetMockTimer()
  32. if ticker != nil {
  33. ticker.DoTick(d.Timestamp)
  34. }
  35. if timer != nil {
  36. timer.DoTick(d.Timestamp)
  37. }
  38. }
  39. consume(d.Message, nil)
  40. if m.isEventTime{
  41. time.Sleep(1000 * time.Millisecond) //Let window run to make sure timers are set
  42. }else{
  43. time.Sleep(50 * time.Millisecond) //Let window run to make sure timers are set
  44. }
  45. }
  46. if !m.isEventTime {
  47. //reset now for the next test
  48. common.SetMockNow(0)
  49. }
  50. m.done <- struct{}{}
  51. }()
  52. return nil
  53. }
  54. func (m *MockSource) Close(ctx api.StreamContext) error{
  55. return nil
  56. }
  57. func (m *MockSource) Configure(topic string, props map[string]interface{}) error {
  58. return nil
  59. }