Преглед на файлове

refactor(tests): use new mock clock for window testing

Test time can reduce a lot
ngjaying преди 5 години
родител
ревизия
80d8af9846

+ 1 - 156
common/data.go

@@ -1,160 +1,5 @@
 package common
 
-import (
-	"errors"
-	"time"
-)
-
 type Rule struct {
 	Name, Json string
-}
-
-/**** Timer Mock *******/
-
-/** Ticker **/
-type Ticker interface {
-	GetC() <-chan time.Time
-	Stop()
-	Trigger(ti int64)
-}
-
-type DefaultTicker struct {
-	time.Ticker
-}
-
-func NewDefaultTicker(d int) *DefaultTicker {
-	return &DefaultTicker{*(time.NewTicker(time.Duration(d) * time.Millisecond))}
-}
-
-func (t *DefaultTicker) GetC() <-chan time.Time {
-	return t.C
-}
-
-func (t *DefaultTicker) Trigger(ti int64) {
-	Log.Fatal("ticker trigger unsupported")
-}
-
-type MockTicker struct {
-	c        chan time.Time
-	duration int64
-	lastTick int64
-}
-
-func NewMockTicker(d int) *MockTicker {
-	if d <= 0 {
-		panic(errors.New("non-positive interval for MockTicker"))
-	}
-	c := make(chan time.Time, 1)
-	t := &MockTicker{
-		c:        c,
-		duration: int64(d),
-		lastTick: GetMockNow(),
-	}
-	return t
-}
-
-func (t *MockTicker) SetDuration(d int) {
-	t.duration = int64(d)
-	t.lastTick = GetMockNow()
-}
-
-func (t *MockTicker) GetC() <-chan time.Time {
-	return t.c
-}
-
-func (t *MockTicker) Stop() {
-	//do nothing
-}
-
-func (t *MockTicker) Trigger(ti int64) {
-	t.lastTick = ti
-	t.c <- time.Unix(ti/1000, ti%1000*1e6)
-}
-
-func (t *MockTicker) DoTick(c int64) {
-	Log.Debugf("do tick at %d, last tick %d", c, t.lastTick)
-	if t.lastTick == 0 {
-		t.lastTick = c
-	}
-	if c >= (t.lastTick + t.duration) {
-		Log.Debugf("trigger tick")
-		t.Trigger(t.lastTick + t.duration)
-	}
-}
-
-/** Timer **/
-type Timer interface {
-	GetC() <-chan time.Time
-	Stop() bool
-	Reset(d time.Duration) bool
-	Trigger(ti int64)
-}
-
-type DefaultTimer struct {
-	time.Timer
-}
-
-func NewDefaultTimer(d int) *DefaultTimer {
-	return &DefaultTimer{*(time.NewTimer(time.Duration(d) * time.Millisecond))}
-}
-
-func (t *DefaultTimer) GetC() <-chan time.Time {
-	return t.C
-}
-
-func (t *DefaultTimer) Trigger(ti int64) {
-	Log.Fatal("timer trigger unsupported")
-}
-
-type MockTimer struct {
-	c         chan time.Time
-	duration  int64
-	createdAt int64
-}
-
-func NewMockTimer(d int) *MockTimer {
-	if d <= 0 {
-		panic(errors.New("non-positive interval for MockTimer"))
-	}
-	c := make(chan time.Time, 1)
-	t := &MockTimer{
-		c:         c,
-		duration:  int64(d),
-		createdAt: GetMockNow(),
-	}
-	return t
-}
-
-func (t *MockTimer) GetC() <-chan time.Time {
-	return t.c
-}
-
-func (t *MockTimer) Stop() bool {
-	t.createdAt = 0
-	return true
-}
-
-func (t *MockTimer) SetDuration(d int) {
-	t.duration = int64(d)
-	t.createdAt = GetMockNow()
-	Log.Debugf("reset timer created at %v", t.createdAt)
-}
-
-func (t *MockTimer) Reset(d time.Duration) bool {
-	Log.Debugln("reset timer")
-	t.SetDuration(int(d.Nanoseconds() / 1e6))
-	return true
-}
-
-func (t *MockTimer) Trigger(ti int64) {
-	t.c <- time.Unix(ti/1000, ti%1000*1e6)
-	t.createdAt = 0
-}
-
-func (t *MockTimer) DoTick(c int64) {
-	Log.Debugf("do tick at %d, created at %v", c, t.createdAt)
-	if t.createdAt > 0 && c >= (t.createdAt+t.duration) {
-		Log.Info("trigger timer")
-		t.Trigger(t.createdAt + t.duration)
-	}
-}
+}

+ 15 - 1
common/time_util.go

@@ -2,6 +2,7 @@ package common
 
 import (
 	"fmt"
+	"github.com/benbjohnson/clock"
 	"time"
 )
 
@@ -113,7 +114,7 @@ func InterfaceToTime(i interface{}, format string) (time.Time, error) {
 }
 
 func TimeFromUnixMilli(t int64) time.Time {
-	return time.Unix(t/1000, t%1000).UTC()
+	return time.Unix(t/1000, (t%1000)*1e6).UTC()
 }
 
 func ParseTime(t string, f string) (time.Time, error) {
@@ -341,3 +342,16 @@ func convertFormat(f string) (string, error) {
 	}
 	return out, nil
 }
+
+//Time related. For Mock
+func GetTicker(duration int) *clock.Ticker {
+	return Clock.Ticker(time.Duration(duration) * time.Millisecond)
+}
+
+func GetTimer(duration int) *clock.Timer {
+	return Clock.Timer(time.Duration(duration) * time.Millisecond)
+}
+
+func GetNowInMilli() int64 {
+	return TimeToUnixMilli(Clock.Now())
+}

+ 35 - 0
common/time_util_test.go

@@ -0,0 +1,35 @@
+package common
+
+import (
+	"testing"
+	"time"
+)
+
+func TestDateToAndFromMilli(t *testing.T) {
+	var tests = []struct {
+		m int64
+		t time.Time
+	}{
+		{int64(1579140864913), time.Date(2020, time.January, 16, 2, 14, 24, 913000000, time.UTC)},
+		{int64(4913), time.Date(1970, time.January, 1, 0, 0, 4, 913000000, time.UTC)},
+		{int64(2579140864913), time.Date(2051, time.September, 24, 4, 1, 4, 913000000, time.UTC)},
+		{int64(-1579140864913), time.Date(1919, time.December, 17, 21, 45, 35, 87000000, time.UTC)},
+	}
+	for i, tt := range tests{
+		time := TimeFromUnixMilli(tt.m)
+		if !time.Equal(tt.t){
+			t.Errorf("%d time from milli result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.t, time)
+		}
+		milli := TimeToUnixMilli(tt.t)
+		if tt.m != milli{
+			t.Errorf("%d time to milli result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.m, milli)
+		}
+	}
+}
+
+func TestMockClock(t *testing.T) {
+	n := GetNowInMilli()
+	if n != 0{
+		t.Errorf("mock clock now mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", 0, n)
+	}
+}

+ 26 - 93
common/util.go

@@ -4,6 +4,7 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"github.com/benbjohnson/clock"
 	"github.com/go-yaml/yaml"
 	"github.com/patrickmn/go-cache"
 	"github.com/sirupsen/logrus"
@@ -11,45 +12,26 @@ import (
 	"os"
 	"path"
 	"path/filepath"
-	"time"
+	"strings"
 )
 
 const (
-	logFileName = "stream.log"
-	etc_dir     = "/etc/"
-	data_dir    = "/data/"
-	log_dir     = "/log/"
+	logFileName   = "stream.log"
+	etc_dir       = "/etc/"
+	data_dir      = "/data/"
+	log_dir       = "/log/"
+	StreamConf    = "kuiper.yaml"
+	KuiperBaseKey = "KuiperBaseKey"
 )
 
 var (
-	Log        *logrus.Logger
-	Config     *XStreamConf
-	IsTesting  bool
-	logFile    *os.File
-	mockTicker *MockTicker
-	mockTimer  *MockTimer
-	mockNow    int64
+	Log       *logrus.Logger
+	Config    *XStreamConf
+	IsTesting bool
+	Clock     clock.Clock
+	logFile   *os.File
 )
 
-type logRedirect struct {
-}
-
-func (l *logRedirect) Errorf(f string, v ...interface{}) {
-	Log.Error(fmt.Sprintf(f, v...))
-}
-
-func (l *logRedirect) Infof(f string, v ...interface{}) {
-	Log.Info(fmt.Sprintf(f, v...))
-}
-
-func (l *logRedirect) Warningf(f string, v ...interface{}) {
-	Log.Warning(fmt.Sprintf(f, v...))
-}
-
-func (l *logRedirect) Debugf(f string, v ...interface{}) {
-	Log.Debug(fmt.Sprintf(f, v...))
-}
-
 func LoadConf(confName string) ([]byte, error) {
 	confDir, err := GetConfLoc()
 	if err != nil {
@@ -71,16 +53,25 @@ type XStreamConf struct {
 	PrometheusPort int  `yaml:"prometheusPort"`
 }
 
-var StreamConf = "kuiper.yaml"
-
-const KuiperBaseKey = "KuiperBaseKey"
-
 func init() {
 	Log = logrus.New()
 	Log.SetFormatter(&logrus.TextFormatter{
 		DisableColors: true,
 		FullTimestamp: true,
 	})
+	Log.Debugf("init with args %s", os.Args)
+	for _, arg := range os.Args {
+		if strings.HasPrefix(arg, "-test.") {
+			IsTesting = true
+			break
+		}
+	}
+	if IsTesting {
+		Log.Debugf("running in testing mode")
+		Clock = clock.NewMock()
+	} else {
+		Clock = clock.New()
+	}
 }
 
 func InitConf() {
@@ -288,41 +279,6 @@ func GetAndCreateDataLoc(dir string) (string, error) {
 	return d, nil
 }
 
-//Time related. For Mock
-func GetTicker(duration int) Ticker {
-	if IsTesting {
-		if mockTicker == nil {
-			mockTicker = NewMockTicker(duration)
-		} else {
-			mockTicker.SetDuration(duration)
-		}
-		return mockTicker
-	} else {
-		return NewDefaultTicker(duration)
-	}
-}
-
-func GetTimer(duration int) Timer {
-	if IsTesting {
-		if mockTimer == nil {
-			mockTimer = NewMockTimer(duration)
-		} else {
-			mockTimer.SetDuration(duration)
-		}
-		return mockTimer
-	} else {
-		return NewDefaultTimer(duration)
-	}
-}
-
-func GetNowInMilli() int64 {
-	if IsTesting {
-		return GetMockNow()
-	} else {
-		return TimeToUnixMilli(time.Now())
-	}
-}
-
 func ProcessPath(p string) (string, error) {
 	if abs, err := filepath.Abs(p); err != nil {
 		return "", nil
@@ -334,29 +290,6 @@ func ProcessPath(p string) (string, error) {
 	}
 }
 
-/****** For Test Only ********/
-func GetMockTicker() *MockTicker {
-	return mockTicker
-}
-
-func ResetMockTicker() {
-	if mockTicker != nil {
-		mockTicker.lastTick = 0
-	}
-}
-
-func GetMockTimer() *MockTimer {
-	return mockTimer
-}
-
-func SetMockNow(now int64) {
-	mockNow = now
-}
-
-func GetMockNow() int64 {
-	return mockNow
-}
-
 /*********** Type Cast Utilities *****/
 //TODO datetime type
 func ToString(input interface{}) string {

+ 1 - 1
go.mod

@@ -1,7 +1,7 @@
 module github.com/emqx/kuiper
 
 require (
-	github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
+	github.com/benbjohnson/clock v1.0.0
 	github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e
 	github.com/eclipse/paho.mqtt.golang v1.2.0
 	github.com/go-yaml/yaml v2.1.0+incompatible

+ 1 - 1
xsql/plans/project_test.go

@@ -39,7 +39,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			},
 			result: []map[string]interface{}{{
-				"ts": "2019-09-19T00:56:13.000000431Z",
+				"ts": "2019-09-19T00:56:13.431Z",
 			}},
 		},
 		{

+ 46 - 110
xsql/processors/xsql_processor_test.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/nodes"
 	"github.com/emqx/kuiper/xstream/test"
@@ -554,41 +555,14 @@ func TestSingleSQL(t *testing.T) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
 			continue
 		}
-		keys, values := tp.GetMetrics()
-		//for i, k := range keys{
-		//	log.Printf("%s:%v", k, values[i])
-		//}
-		for k, v := range tt.m {
-			var (
-				index   int
-				key     string
-				matched bool
-			)
-			for index, key = range keys {
-				if k == key {
-					if values[index] == v {
-						matched = true
-					}
-					break
-				}
-			}
-			if matched {
-				continue
-			}
-			//do not find
-			if index < len(values) {
-				t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", i, tt.sql, k, v, v, values[index], values[index])
-			} else {
-				t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", i, tt.sql, k, v)
-			}
-			break
+		if err := compareMetrics(tp, tt.m, tt.sql); err != nil{
+			t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
 		}
 		tp.Cancel()
 	}
 }
 
 func TestWindow(t *testing.T) {
-	common.IsTesting = true
 	var tests = []struct {
 		name string
 		sql  string
@@ -976,8 +950,8 @@ func TestWindow(t *testing.T) {
 	defer dropStreams(t)
 	done := make(chan struct{})
 	defer close(done)
-	common.ResetMockTicker()
 	for i, tt := range tests {
+		test.ResetClock(1541152486000)
 		p := NewRuleProcessor(DbDir)
 		parser := xsql.NewParser(strings.NewReader(tt.sql))
 		var sources []*nodes.SourceNode
@@ -1017,7 +991,6 @@ func TestWindow(t *testing.T) {
 						log.Info("stream stopping")
 						return
 					}
-				default:
 				}
 			}
 		}()
@@ -1035,31 +1008,8 @@ func TestWindow(t *testing.T) {
 		if !reflect.DeepEqual(tt.r, maps) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
 		}
-		keys, values := tp.GetMetrics()
-		for k, v := range tt.m {
-			var (
-				index   int
-				key     string
-				matched bool
-			)
-			for index, key = range keys {
-				if k == key {
-					if values[index] == v {
-						matched = true
-					}
-					break
-				}
-			}
-			if matched {
-				continue
-			}
-			//do not find
-			if index < len(values) {
-				t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", i, tt.sql, k, v, v, values[index], values[index])
-			} else {
-				t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", i, tt.sql, k, v)
-			}
-			break
+		if err := compareMetrics(tp, tt.m, tt.sql); err != nil{
+			t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
 		}
 		tp.Cancel()
 	}
@@ -1348,7 +1298,6 @@ func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.Sour
 }
 
 func TestEventWindow(t *testing.T) {
-	common.IsTesting = true
 	var tests = []struct {
 		name string
 		sql  string
@@ -1711,31 +1660,8 @@ func TestEventWindow(t *testing.T) {
 	defer dropEventStreams(t)
 	done := make(chan struct{})
 	defer close(done)
-	common.ResetMockTicker()
-	//mock ticker
-	realTicker := time.NewTicker(500 * time.Millisecond)
-	tickerDone := make(chan bool)
-	go func() {
-		ticker := common.GetTicker(1000).(*common.MockTicker)
-		timer := common.GetTimer(1000).(*common.MockTimer)
-		for {
-			select {
-			case <-tickerDone:
-				log.Infof("real ticker exiting...")
-				return
-			case t := <-realTicker.C:
-				ts := common.TimeToUnixMilli(t)
-				if ticker != nil {
-					go ticker.DoTick(ts)
-				}
-				if timer != nil {
-					go timer.DoTick(ts)
-				}
-			}
-		}
-
-	}()
 	for i, tt := range tests {
+		test.ResetClock(1541152486000)
 		p := NewRuleProcessor(DbDir)
 		parser := xsql.NewParser(strings.NewReader(tt.sql))
 		var sources []*nodes.SourceNode
@@ -1781,7 +1707,6 @@ func TestEventWindow(t *testing.T) {
 						log.Info("stream stopping")
 						return
 					}
-				default:
 				}
 			}
 		}()
@@ -1799,40 +1724,51 @@ func TestEventWindow(t *testing.T) {
 		if !reflect.DeepEqual(tt.r, maps) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
 		}
-		keys, values := tp.GetMetrics()
-		//for i, k := range keys{
-		//	log.Printf("%s:%v", k, values[i])
-		//}
-		for k, v := range tt.m {
-			var (
-				index   int
-				key     string
-				matched bool
-			)
-			for index, key = range keys {
-				if k == key {
-					if values[index] == v {
+		if err := compareMetrics(tp, tt.m, tt.sql); err != nil{
+			t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
+		}
+		tp.Cancel()
+	}
+}
+
+func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
+	keys, values := tp.GetMetrics()
+	//for i, k := range keys{
+	//	log.Printf("%s:%v", k, values[i])
+	//}
+	for k, v := range m {
+		var (
+			index   int
+			key     string
+			matched bool
+		)
+		for index, key = range keys {
+			if k == key {
+				if strings.HasSuffix(k, "process_latency_ms") {
+					if values[index].(int64) >= v.(int64) {
 						matched = true
+						continue
+					} else {
+						break
 					}
-					break
 				}
+				if values[index] == v {
+					matched = true
+				}
+				break
 			}
-			if matched {
-				continue
-			}
-			//do not find
-			if index < len(values) {
-				t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", i, tt.sql, k, v, v, values[index], values[index])
-			} else {
-				t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", i, tt.sql, k, v)
-			}
-			break
 		}
-		tp.Cancel()
+		if matched {
+			continue
+		}
+		//do not find
+		if index < len(values) {
+			return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
+		} else {
+			return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
+		}
 	}
-	realTicker.Stop()
-	tickerDone <- true
-	close(tickerDone)
+	return nil
 }
 
 func errstring(err error) string {

+ 2 - 2
xstream/nodes/sink_cache.go

@@ -85,7 +85,7 @@ func (c *Cache) run(ctx api.StreamContext) {
 		return
 	}
 
-	ticker := common.NewDefaultTicker(c.saveInterval)
+	ticker := common.GetTicker(c.saveInterval)
 	for {
 		select {
 		case item := <-c.in:
@@ -100,7 +100,7 @@ func (c *Cache) run(ctx api.StreamContext) {
 		case index := <-c.Complete:
 			c.pending.delete(index)
 			c.changed = true
-		case <-ticker.GetC():
+		case <-ticker.C:
 			if c.pending.length() == 0 {
 				c.pending.reset()
 			}

+ 1 - 0
xstream/nodes/source_node.go

@@ -97,6 +97,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 					m.sources = append(m.sources, source)
 					m.mutex.Unlock()
 				} else {
+					logger.Infof("use instance %d of %d sources", instance, len(m.sources))
 					source = m.sources[instance]
 				}
 				stats, err := NewStatManager("source", ctx)

+ 3 - 2
xstream/operators/watermark.go

@@ -3,6 +3,7 @@ package operators
 import (
 	"context"
 	"fmt"
+	"github.com/benbjohnson/clock"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
@@ -30,7 +31,7 @@ type WatermarkGenerator struct {
 	window          *WindowConfig
 	lateTolerance   int64
 	interval        int
-	ticker          common.Ticker
+	ticker          *clock.Ticker
 	stream          chan<- interface{}
 }
 
@@ -85,7 +86,7 @@ func (w *WatermarkGenerator) start(ctx api.StreamContext) {
 	var c <-chan time.Time
 
 	if w.ticker != nil {
-		c = w.ticker.GetC()
+		c = w.ticker.C
 	}
 	for {
 		select {

+ 6 - 5
xstream/operators/window_op.go

@@ -2,6 +2,7 @@ package operators
 
 import (
 	"fmt"
+	"github.com/benbjohnson/clock"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
@@ -20,7 +21,7 @@ type WindowOperator struct {
 	input              chan interface{}
 	outputs            map[string]chan<- interface{}
 	name               string
-	ticker             common.Ticker //For processing time only
+	ticker             *clock.Ticker //For processing time only
 	window             *WindowConfig
 	interval           int
 	triggerTime        int64
@@ -122,12 +123,12 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 	var (
 		inputs        []*xsql.Tuple
 		c             <-chan time.Time
-		timeoutTicker common.Timer
+		timeoutTicker *clock.Timer
 		timeout       <-chan time.Time
 	)
 
 	if o.ticker != nil {
-		c = o.ticker.GetC()
+		c = o.ticker.C
 	}
 
 	for {
@@ -158,7 +159,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 						timeoutTicker.Reset(time.Duration(o.window.Interval) * time.Millisecond)
 					} else {
 						timeoutTicker = common.GetTimer(o.window.Interval)
-						timeout = timeoutTicker.GetC()
+						timeout = timeoutTicker.C
 					}
 				}
 			}
@@ -204,7 +205,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 
 func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.StreamContext) ([]*xsql.Tuple, bool) {
 	log := ctx.GetLogger()
-	log.Debugf("window %s triggered at %s", o.name, time.Unix(triggerTime/1000, triggerTime%1000))
+	log.Debugf("window %s triggered at %s(%d)", o.name, time.Unix(triggerTime/1000, triggerTime%1000), triggerTime)
 	var delta int64
 	if o.window.Type == xsql.HOPPING_WINDOW || o.window.Type == xsql.SLIDING_WINDOW {
 		delta = o.calDelta(triggerTime, delta, log)

+ 16 - 0
xstream/test/clock.go

@@ -0,0 +1,16 @@
+package test
+
+import (
+	"github.com/benbjohnson/clock"
+	"github.com/emqx/kuiper/common"
+)
+
+func ResetClock(t int64){
+	mock := clock.NewMock()
+	mock.Set(common.TimeFromUnixMilli(t))
+	common.Clock = mock
+}
+
+func GetMockClock() *clock.Mock{
+	return common.Clock.(*clock.Mock)
+}

+ 8 - 18
xstream/test/mock_source.go

@@ -25,32 +25,22 @@ func NewMockSource(data []*xsql.Tuple, done chan<- struct{}, isEventTime bool) *
 
 func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
 	log := ctx.GetLogger()
-
+	mockClock := GetMockClock()
 	log.Debugln("mock source starts")
 	go func() {
 		for _, d := range m.data {
 			log.Debugf("mock source is sending data %s", d)
 			if !m.isEventTime {
-				common.SetMockNow(d.Timestamp)
-				ticker := common.GetMockTicker()
-				timer := common.GetMockTimer()
-				if ticker != nil {
-					ticker.DoTick(d.Timestamp)
-				}
-				if timer != nil {
-					timer.DoTick(d.Timestamp)
-				}
+				mockClock.Set(common.TimeFromUnixMilli(d.Timestamp))
+			}else {
+				mockClock.Add(1000 * time.Millisecond)
 			}
 			consume(d.Message, nil)
-			if m.isEventTime{
-				time.Sleep(1000 * time.Millisecond) //Let window run to make sure timers are set
-			}else{
-				time.Sleep(50 * time.Millisecond) //Let window run to make sure timers are set
-			}
+			time.Sleep(1)
 		}
-		if !m.isEventTime {
-			//reset now for the next test
-			common.SetMockNow(0)
+		if m.isEventTime{
+			mockClock.Add(1000 * time.Millisecond)
+			time.Sleep(1)
 		}
 		m.done <- struct{}{}
 	}()