mock_source.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package test
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "time"
  8. )
  9. type MockSource struct {
  10. data []*xsql.Tuple
  11. done <-chan int
  12. isEventTime bool
  13. offset int
  14. }
  15. func NewMockSource(data []*xsql.Tuple, done <-chan int, isEventTime bool) *MockSource {
  16. mock := &MockSource{
  17. data: data,
  18. done: done,
  19. isEventTime: isEventTime,
  20. }
  21. return mock
  22. }
  23. func (m *MockSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  24. log := ctx.GetLogger()
  25. mockClock := GetMockClock()
  26. log.Debugf("mock source starts with offset %d", m.offset)
  27. for i, d := range m.data {
  28. if i < m.offset {
  29. log.Debugf("mock source is skipping %d", i)
  30. continue
  31. }
  32. log.Debugf("mock source is waiting", i)
  33. select {
  34. case j, ok := <-m.done:
  35. if ok {
  36. log.Debugf("mock source receives data %d", j)
  37. } else {
  38. log.Debugf("sync channel done at %d", i)
  39. }
  40. case <-ctx.Done():
  41. log.Debugf("mock source open DONE")
  42. return
  43. }
  44. if !m.isEventTime {
  45. mockClock.Set(common.TimeFromUnixMilli(d.Timestamp))
  46. log.Debugf("set time at %d", d.Timestamp)
  47. } else {
  48. mockClock.Add(1000 * time.Millisecond)
  49. }
  50. select {
  51. case <-ctx.Done():
  52. log.Debugf("mock source open DONE")
  53. return
  54. default:
  55. }
  56. consumer <- api.NewDefaultSourceTuple(d.Message, xsql.Metadata{"topic": "mock"})
  57. log.Debugf("mock source is sending data %s", d)
  58. m.offset = i + 1
  59. time.Sleep(1)
  60. }
  61. log.Debugf("mock source sends out all data")
  62. }
  63. func (m *MockSource) GetOffset() (interface{}, error) {
  64. return m.offset, nil
  65. }
  66. func (m *MockSource) Rewind(offset interface{}) error {
  67. oi, err := common.ToInt(offset)
  68. if err != nil {
  69. return fmt.Errorf("mock source fails to rewind: %s", err)
  70. } else {
  71. m.offset = oi
  72. }
  73. return nil
  74. }
  75. func (m *MockSource) Close(ctx api.StreamContext) error {
  76. m.offset = 0
  77. return nil
  78. }
  79. func (m *MockSource) Configure(topic string, props map[string]interface{}) error {
  80. return nil
  81. }