mock_source.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package test
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql"
  6. "time"
  7. )
  8. type MockSource struct {
  9. outs map[string]chan<- interface{}
  10. data []*xsql.Tuple
  11. name string
  12. done chan<- struct{}
  13. isEventTime bool
  14. }
  15. // New creates a new CsvSource
  16. func NewMockSource(data []*xsql.Tuple, name string, done chan<- struct{}, isEventTime bool) *MockSource {
  17. mock := &MockSource{
  18. data: data,
  19. name: name,
  20. outs: make(map[string]chan<- interface{}),
  21. done: done,
  22. isEventTime: isEventTime,
  23. }
  24. return mock
  25. }
  26. func (m *MockSource) Open(ctx context.Context) (err error) {
  27. log := common.GetLogger(ctx)
  28. log.Trace("Mocksource starts")
  29. go func(){
  30. for _, d := range m.data{
  31. log.Infof("mock source is sending data %s", d)
  32. if !m.isEventTime{
  33. common.SetMockNow(d.Timestamp)
  34. ticker := common.GetMockTicker()
  35. timer := common.GetMockTimer()
  36. if ticker != nil {
  37. ticker.DoTick(d.Timestamp)
  38. }
  39. if timer != nil {
  40. timer.DoTick(d.Timestamp)
  41. }
  42. }
  43. for _, out := range m.outs{
  44. select {
  45. case out <- d:
  46. case <-ctx.Done():
  47. log.Trace("Mocksource stop")
  48. return
  49. // default: TODO non blocking must have buffer?
  50. }
  51. time.Sleep(50 * time.Millisecond)
  52. }
  53. if m.isEventTime{
  54. time.Sleep(1000 * time.Millisecond) //Let window run to make sure timers are set
  55. }else{
  56. time.Sleep(50 * time.Millisecond) //Let window run to make sure timers are set
  57. }
  58. }
  59. if !m.isEventTime {
  60. //reset now for the next test
  61. common.SetMockNow(0)
  62. }
  63. m.done <- struct{}{}
  64. }()
  65. return nil
  66. }
  67. func (m *MockSource) AddOutput(output chan<- interface{}, name string) {
  68. if _, ok := m.outs[name]; !ok{
  69. m.outs[name] = output
  70. }else{
  71. common.Log.Warnf("fail to add output %s, operator %s already has an output of the same name", name, m.name)
  72. }
  73. }