mock_source.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package mocknodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/topotest/mockclock"
  8. "sync"
  9. "time"
  10. )
  11. type MockSource struct {
  12. data []*xsql.Tuple
  13. offset int
  14. sync.Mutex
  15. }
  16. const TIMELEAP = 200
  17. func NewMockSource(data []*xsql.Tuple) *MockSource {
  18. mock := &MockSource{
  19. data: data,
  20. }
  21. return mock
  22. }
  23. func (m *MockSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
  24. log := ctx.GetLogger()
  25. mockClock := mockclock.GetMockClock()
  26. log.Infof("%d: mock source %s starts", common.GetNowInMilli(), ctx.GetOpId())
  27. log.Debugf("mock source %s starts with offset %d", ctx.GetOpId(), m.offset)
  28. for i, d := range m.data {
  29. if i < m.offset {
  30. log.Debugf("mock source is skipping %d", i)
  31. continue
  32. }
  33. log.Debugf("mock source is waiting %d", i)
  34. diff := d.Timestamp - common.GetNowInMilli()
  35. if diff <= 0 {
  36. log.Warnf("Time stamp invalid, current time is %d, but timestamp is %d", common.GetNowInMilli(), d.Timestamp)
  37. diff = TIMELEAP
  38. }
  39. next := mockClock.After(time.Duration(diff) * time.Millisecond)
  40. //Mock timer, only send out the data once the mock time goes to the timestamp.
  41. //Another mechanism must be imposed to move forward the mock time.
  42. select {
  43. case <-next:
  44. m.Lock()
  45. m.offset = i + 1
  46. consumer <- api.NewDefaultSourceTuple(d.Message, xsql.Metadata{"topic": "mock"})
  47. log.Debugf("%d: mock source %s is sending data %d:%s", common.TimeToUnixMilli(mockClock.Now()), ctx.GetOpId(), i, d)
  48. m.Unlock()
  49. case <-ctx.Done():
  50. log.Debugf("mock source open DONE")
  51. return
  52. }
  53. }
  54. log.Debugf("mock source sends out all data")
  55. }
  56. func (m *MockSource) GetOffset() (interface{}, error) {
  57. m.Lock()
  58. defer m.Unlock()
  59. return m.offset, nil
  60. }
  61. func (m *MockSource) Rewind(offset interface{}) error {
  62. oi, err := common.ToInt(offset, common.STRICT)
  63. if err != nil {
  64. return fmt.Errorf("mock source fails to rewind: %s", err)
  65. } else {
  66. m.offset = oi
  67. }
  68. return nil
  69. }
  70. func (m *MockSource) Close(_ api.StreamContext) error {
  71. m.offset = 0
  72. return nil
  73. }
  74. func (m *MockSource) Configure(_ string, _ map[string]interface{}) error {
  75. return nil
  76. }