mock_source.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package test
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "time"
  7. )
  8. type MockSource struct {
  9. data []*xsql.Tuple
  10. done <-chan int
  11. isEventTime bool
  12. }
  13. func NewMockSource(data []*xsql.Tuple, done <-chan int, isEventTime bool) *MockSource {
  14. mock := &MockSource{
  15. data: data,
  16. done: done,
  17. isEventTime: isEventTime,
  18. }
  19. return mock
  20. }
  21. func (m *MockSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  22. log := ctx.GetLogger()
  23. mockClock := GetMockClock()
  24. log.Debugln("mock source starts")
  25. for _, d := range m.data {
  26. <-m.done
  27. log.Debugf("mock source is sending data %s", d)
  28. if !m.isEventTime {
  29. mockClock.Set(common.TimeFromUnixMilli(d.Timestamp))
  30. } else {
  31. mockClock.Add(1000 * time.Millisecond)
  32. }
  33. consumer <- api.NewDefaultSourceTuple(d.Message, xsql.Metadata{"topic": "mock"})
  34. time.Sleep(1)
  35. }
  36. }
  37. func (m *MockSource) Close(ctx api.StreamContext) error {
  38. return nil
  39. }
  40. func (m *MockSource) Configure(topic string, props map[string]interface{}) error {
  41. return nil
  42. }