Browse Source

fix(window): align time based on unit

Window length of 2 minutes will now align to second 0. Which is different with window length of 120 seconds.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 1 year ago
parent
commit
648cc12d32

+ 3 - 3
internal/conf/time.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -41,11 +41,11 @@ func GetLocalZone() int {
 }
 
 // Time related. For Mock
-func GetTicker(duration int) *clock.Ticker {
+func GetTicker(duration int64) *clock.Ticker {
 	return Clock.Ticker(time.Duration(duration) * time.Millisecond)
 }
 
-func GetTimer(duration int) *clock.Timer {
+func GetTimer(duration int64) *clock.Timer {
 	return Clock.Timer(time.Duration(duration) * time.Millisecond)
 }
 

+ 1 - 1
internal/io/file/file_sink.go

@@ -108,7 +108,7 @@ func (m *fileSink) Open(ctx api.StreamContext) error {
 	ctx.GetLogger().Debug("Opening file sink")
 	// Check if the files have opened longer than the rolling interval, if so close it and create a new one
 	if *m.c.CheckInterval > 0 {
-		t := conf.GetTicker(int(*m.c.CheckInterval))
+		t := conf.GetTicker(*m.c.CheckInterval)
 		go func() {
 			defer t.Stop()
 			for {

+ 2 - 2
internal/io/sink/send_manager.go

@@ -65,7 +65,7 @@ func (sm *SendManager) Run(ctx context.Context) {
 }
 
 func (sm *SendManager) runWithTicker(ctx context.Context) {
-	ticker := conf.GetTicker(sm.lingerInterval)
+	ticker := conf.GetTicker(int64(sm.lingerInterval))
 	defer ticker.Stop()
 	for {
 		select {
@@ -91,7 +91,7 @@ func (sm *SendManager) runWithBatchSize(ctx context.Context) {
 }
 
 func (sm *SendManager) runWithTickerAndBatchSize(ctx context.Context) {
-	ticker := conf.GetTicker(sm.lingerInterval)
+	ticker := conf.GetTicker(int64(sm.lingerInterval))
 	defer ticker.Stop()
 	for {
 		select {

+ 2 - 2
internal/topo/checkpoint/coordinator.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -165,7 +165,7 @@ func (c *Coordinator) Activate() error {
 	if c.ticker != nil {
 		c.ticker.Stop()
 	}
-	c.ticker = conf.GetTicker(c.baseInterval)
+	c.ticker = conf.GetTicker(int64(c.baseInterval))
 	tc := c.ticker.C
 	go func() {
 		err := infra.SafeRun(func() error {

+ 2 - 2
internal/topo/lookup/cache/cache.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -50,7 +50,7 @@ func NewCache(expireTime int, cacheMissingKey bool) *Cache {
 }
 
 func (c *Cache) run(ctx context.Context) {
-	ticker := conf.GetTicker(c.expireTime * 2000)
+	ticker := conf.GetTicker(int64(c.expireTime * 2000))
 	for {
 		select {
 		case <-ticker.C:

+ 11 - 7
internal/topo/node/watermark.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"math"
 	"sort"
+	"time"
 
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -45,7 +46,9 @@ type WatermarkGenerator struct {
 	topicToTs     map[string]int64
 	window        *WindowConfig
 	lateTolerance int64
-	interval      int
+	interval      int64
+	rawInterval   int
+	timeUnit      ast.Token
 	// ticker          *clock.Ticker
 	stream chan<- interface{}
 	// state
@@ -59,6 +62,8 @@ func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream cha
 		lateTolerance: l,
 		inputTopics:   s,
 		stream:        stream,
+		rawInterval:   window.RawInterval,
+		timeUnit:      window.TimeUnit,
 	}
 	switch window.Type {
 	case ast.NOT_WINDOW:
@@ -125,14 +130,13 @@ func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64,
 	switch w.window.Type {
 	case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
 		if current > 0 {
-			return current + int64(w.interval)
-		} else { // first run without previous window
-			interval := int64(w.interval)
+			return current + w.interval
+		} else { // first run without a previous window
 			nextTs := getEarliestEventTs(inputs, current, watermark)
 			if nextTs == math.MaxInt64 {
 				return nextTs
 			}
-			return getAlignedWindowEndTime(nextTs, interval).UnixMilli()
+			return getAlignedWindowEndTime(time.UnixMilli(nextTs), w.rawInterval, w.timeUnit).UnixMilli()
 		}
 	case ast.SLIDING_WINDOW:
 		nextTs := getEarliestEventTs(inputs, current, watermark)
@@ -144,12 +148,12 @@ func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64,
 
 func (w *WatermarkGenerator) getNextSessionWindow(inputs []*xsql.Tuple) (int64, bool) {
 	if len(inputs) > 0 {
-		timeout, duration := int64(w.window.Interval), int64(w.window.Length)
+		timeout, duration := w.window.Interval, w.window.Length
 		sort.SliceStable(inputs, func(i, j int) bool {
 			return inputs[i].Timestamp < inputs[j].Timestamp
 		})
 		et := inputs[0].Timestamp
-		tick := getAlignedWindowEndTime(et, duration).UnixMilli()
+		tick := getAlignedWindowEndTime(time.UnixMilli(et), w.rawInterval, w.timeUnit).UnixMilli()
 		var p int64
 		ticked := false
 		for _, tuple := range inputs {

+ 60 - 31
internal/topo/node/window_op.go

@@ -32,15 +32,17 @@ import (
 )
 
 type WindowConfig struct {
-	Type     ast.WindowType
-	Length   int
-	Interval int // If interval is not set, it is equals to Length
+	Type        ast.WindowType
+	Length      int64
+	Interval    int64 // If the interval is not set, it is equals to Length
+	RawInterval int
+	TimeUnit    ast.Token
 }
 
 type WindowOperator struct {
 	*defaultSinkNode
 	window             *WindowConfig
-	interval           int
+	interval           int64
 	isEventTime        bool
 	watermarkGenerator *WatermarkGenerator // For event time only
 
@@ -166,16 +168,43 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 	}
 }
 
-func getAlignedWindowEndTime(n, interval int64) time.Time {
-	now := time.UnixMilli(n)
-	offset := conf.GetLocalZone()
-	start := now.Truncate(24 * time.Hour).Add(time.Duration(-1*offset) * time.Second)
-	diff := now.Sub(start).Milliseconds()
-	return now.Add(time.Duration(interval-(diff%interval)) * time.Millisecond)
+func getAlignedWindowEndTime(n time.Time, interval int, timeUnit ast.Token) time.Time {
+	switch timeUnit {
+	case ast.DD: // The interval * days starting today
+		return time.Date(n.Year(), n.Month(), n.Day()+interval, 0, 0, 0, 0, n.Location())
+	case ast.HH:
+		gap := interval
+		if n.Hour() > interval {
+			gap = interval * (n.Hour()/interval + 1)
+		}
+		return time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, n.Location()).Add(time.Duration(gap) * time.Hour)
+	case ast.MI:
+		gap := interval
+		if n.Minute() > interval {
+			gap = interval * (n.Minute()/interval + 1)
+		}
+		return time.Date(n.Year(), n.Month(), n.Day(), n.Hour(), 0, 0, 0, n.Location()).Add(time.Duration(gap) * time.Minute)
+	case ast.SS:
+		gap := interval
+		if n.Second() > interval {
+			gap = interval * (n.Second()/interval + 1)
+		}
+		return time.Date(n.Year(), n.Month(), n.Day(), n.Hour(), n.Minute(), 0, 0, n.Location()).Add(time.Duration(gap) * time.Second)
+	case ast.MS:
+		milli := n.Nanosecond() / int(time.Millisecond)
+		gap := interval
+		if milli > interval {
+			gap = interval * (milli/interval + 1)
+		}
+		return time.Date(n.Year(), n.Month(), n.Day(), n.Hour(), n.Minute(), n.Second(), 0, n.Location()).Add(time.Duration(gap) * time.Millisecond)
+	default: // should never happen
+		conf.Log.Errorf("invalid time unit %s", timeUnit)
+		return n
+	}
 }
 
-func getFirstTimer(ctx api.StreamContext, interval int64) (int64, *clock.Timer) {
-	next := getAlignedWindowEndTime(conf.GetNowInMilli(), interval)
+func getFirstTimer(ctx api.StreamContext, rawInerval int, timeUnit ast.Token) (int64, *clock.Timer) {
+	next := getAlignedWindowEndTime(conf.GetNow(), rawInerval, timeUnit)
 	ctx.GetLogger().Infof("align window timer to %v(%d)", next, next.UnixMilli())
 	return next.UnixMilli(), conf.GetTimerByTime(next)
 }
@@ -195,15 +224,15 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 	switch o.window.Type {
 	case ast.NOT_WINDOW:
 	case ast.TUMBLING_WINDOW:
-		firstTime, firstTicker = getFirstTimer(ctx, int64(o.window.Length))
+		firstTime, firstTicker = getFirstTimer(ctx, o.window.RawInterval, o.window.TimeUnit)
 		o.interval = o.window.Length
 	case ast.HOPPING_WINDOW:
-		firstTime, firstTicker = getFirstTimer(ctx, int64(o.window.Interval))
+		firstTime, firstTicker = getFirstTimer(ctx, o.window.RawInterval, o.window.TimeUnit)
 		o.interval = o.window.Interval
 	case ast.SLIDING_WINDOW:
 		o.interval = o.window.Length
 	case ast.SESSION_WINDOW:
-		firstTime, firstTicker = getFirstTimer(ctx, int64(o.window.Length))
+		firstTime, firstTicker = getFirstTimer(ctx, o.window.RawInterval, o.window.TimeUnit)
 		o.interval = o.window.Interval
 	case ast.COUNT_WINDOW:
 		o.interval = o.window.Interval
@@ -213,12 +242,12 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 		firstC = firstTicker.C
 		// resume previous window
 		if len(inputs) > 0 && o.triggerTime > 0 {
-			nextTick := conf.GetNowInMilli() + int64(o.interval)
+			nextTick := conf.GetNowInMilli() + o.interval
 			next := o.triggerTime
 			switch o.window.Type {
 			case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
 				for {
-					next = next + int64(o.interval)
+					next = next + o.interval
 					if next > nextTick {
 						break
 					}
@@ -228,7 +257,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 					ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
 				}
 			case ast.SESSION_WINDOW:
-				timeout, duration := int64(o.window.Interval), int64(o.window.Length)
+				timeout, duration := o.window.Interval, o.window.Length
 				for {
 					et := inputs[0].Timestamp
 					tick := et + (duration - et%duration)
@@ -307,12 +336,12 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 				case ast.COUNT_WINDOW:
 					o.msgCount++
 					log.Debugf(fmt.Sprintf("msgCount: %d", o.msgCount))
-					if o.msgCount%o.window.Interval != 0 {
+					if int64(o.msgCount)%o.window.Interval != 0 {
 						continue
 					}
 					o.msgCount = 0
 
-					if tl, er := NewTupleList(inputs, o.window.Length); er != nil {
+					if tl, er := NewTupleList(inputs, int(o.window.Length)); er != nil {
 						log.Error(fmt.Sprintf("Found error when trying to "))
 						infra.DrainError(ctx, er, errCh)
 						return
@@ -355,17 +384,17 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 			c = o.ticker.C
 			inputs = o.tick(ctx, inputs, firstTime, log)
 			if o.window.Type == ast.SESSION_WINDOW {
-				nextTime = firstTime + int64(o.window.Length)
+				nextTime = firstTime + o.window.Length
 			} else {
-				nextTime = firstTime + int64(o.interval)
+				nextTime = firstTime + o.interval
 			}
 		case now := <-c:
 			log.Debugf("Successive tick at %v(%d)", now, now.UnixMilli())
 			inputs = o.tick(ctx, inputs, nextTime, log)
 			if o.window.Type == ast.SESSION_WINDOW {
-				nextTime += int64(o.window.Length)
+				nextTime += o.window.Length
 			} else {
-				nextTime += int64(o.interval)
+				nextTime += o.interval
 			}
 		case now := <-timeout:
 			if len(inputs) > 0 {
@@ -394,9 +423,9 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 func (o *WindowOperator) tick(ctx api.StreamContext, inputs []*xsql.Tuple, n int64, log api.Logger) []*xsql.Tuple {
 	if o.window.Type == ast.SESSION_WINDOW {
 		log.Debugf("session window update trigger time %d with %d inputs", n, len(inputs))
-		if len(inputs) == 0 || n-int64(o.window.Length) < inputs[0].Timestamp {
+		if len(inputs) == 0 || n-o.window.Length < inputs[0].Timestamp {
 			if len(inputs) > 0 {
-				log.Debugf("session window last trigger time %d < first tuple %d", n-int64(o.window.Length), inputs[0].Timestamp)
+				log.Debugf("session window last trigger time %d < first tuple %d", n-o.window.Length, inputs[0].Timestamp)
 			}
 			return inputs
 		}
@@ -480,7 +509,7 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
 	for _, tuple := range inputs {
 		if o.window.Type == ast.HOPPING_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
 			diff := triggerTime - tuple.Timestamp
-			if diff > int64(o.window.Length)+delta {
+			if diff > o.window.Length+delta {
 				log.Debugf("diff: %d, length: %d, delta: %d", diff, o.window.Length, delta)
 				log.Debugf("tuple %s emitted at %d expired", tuple, tuple.Timestamp)
 				// Expired tuple, remove it by not adding back to inputs
@@ -503,12 +532,12 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
 	case ast.TUMBLING_WINDOW, ast.SESSION_WINDOW:
 		windowStart = o.triggerTime
 	case ast.HOPPING_WINDOW:
-		windowStart = o.triggerTime - int64(o.window.Interval)
+		windowStart = o.triggerTime - o.window.Interval
 	case ast.SLIDING_WINDOW:
-		windowStart = triggerTime - int64(o.window.Length)
+		windowStart = triggerTime - o.window.Length
 	}
 	if windowStart <= 0 {
-		windowStart = windowEnd - int64(o.window.Length)
+		windowStart = windowEnd - o.window.Length
 	}
 	results.WindowRange = xsql.NewWindowRange(windowStart, windowEnd)
 	log.Debugf("window %s triggered for %d tuples", o.name, len(inputs))
@@ -531,7 +560,7 @@ func (o *WindowOperator) calDelta(triggerTime int64, log api.Logger) int64 {
 		delta = math.MaxInt16 // max int, all events for the initial window
 	} else {
 		if !o.isEventTime && o.window.Interval > 0 {
-			delta = triggerTime - lastTriggerTime - int64(o.window.Interval)
+			delta = triggerTime - lastTriggerTime - o.window.Interval
 			if delta > 100 {
 				log.Warnf("Possible long computation in window; Previous eviction time: %d, current eviction time: %d", lastTriggerTime, triggerTime)
 			}

+ 34 - 11
internal/topo/node/window_op_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import (
 	"time"
 
 	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 )
 
 var fivet = []*xsql.Tuple{
@@ -54,46 +55,68 @@ var fivet = []*xsql.Tuple{
 func TestTime(t *testing.T) {
 	tests := []struct {
 		interval int
+		unit     ast.Token
 		end      time.Time
 	}{
 		{
 			interval: 10,
+			unit:     ast.MS,
 			end:      time.UnixMilli(1658218371340),
 		}, {
 			interval: 500,
+			unit:     ast.MS,
 			end:      time.UnixMilli(1658218371500),
 		}, {
-			interval: 1000,
+			interval: 1,
+			unit:     ast.SS,
 			end:      time.UnixMilli(1658218372000),
 		}, {
-			interval: 40000, // 4oms
+			interval: 40, // 40 seconds
+			unit:     ast.SS,
 			end:      time.UnixMilli(1658218400000),
 		}, {
-			interval: 60000,
+			interval: 1,
+			unit:     ast.MI,
 			end:      time.UnixMilli(1658218380000),
 		}, {
-			interval: 180000,
+			interval: 3,
+			unit:     ast.MI,
 			end:      time.UnixMilli(1658218500000),
 		}, {
-			interval: 3600000,
+			interval: 1,
+			unit:     ast.HH,
 			end:      time.UnixMilli(1658221200000),
 		}, {
-			interval: 7200000,
+			interval: 2,
+			unit:     ast.HH,
 			end:      time.UnixMilli(1658224800000),
 		}, {
-			interval: 18000000, // 5 hours
+			interval: 5, // 5 hours
+			unit:     ast.HH,
 			end:      time.UnixMilli(1658232000000),
 		}, {
-			interval: 3600000 * 24, // 1 day
+			interval: 1, // 1 day
+			unit:     ast.DD,
 			end:      time.UnixMilli(1658246400000),
 		}, {
-			interval: 3600000 * 24 * 7, // 1 week
+			interval: 7, // 1 week
+			unit:     ast.DD,
 			end:      time.UnixMilli(1658764800000),
 		},
 	}
+	// Set the global timezone
+	location, err := time.LoadLocation("Asia/Shanghai")
+	if err != nil {
+		fmt.Println("Error loading location:", err)
+		return
+	}
+	time.Local = location
+
+	fmt.Println(time.UnixMilli(1658218371337).String())
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+
 	for i, tt := range tests {
-		ae := getAlignedWindowEndTime(1658218371337, int64(tt.interval))
+		ae := getAlignedWindowEndTime(time.UnixMilli(1658218371337), tt.interval, tt.unit)
 		if tt.end.UnixMilli() != ae.UnixMilli() {
 			t.Errorf("%d for interval %d. error mismatch:\n  exp=%s(%d)\n  got=%s(%d)\n\n", i, tt.interval, tt.end, tt.end.UnixMilli(), ae, ae.UnixMilli())
 		}

+ 34 - 5
internal/topo/planner/planner.go

@@ -136,11 +136,20 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 			tp.AddOperator(inputs, wfilterOp)
 			inputs = []api.Emitter{wfilterOp}
 		}
-
+		l, i := convertFromDuration(t)
+		var rawInterval int
+		switch t.wtype {
+		case ast.TUMBLING_WINDOW, ast.SESSION_WINDOW:
+			rawInterval = t.length
+		case ast.HOPPING_WINDOW:
+			rawInterval = t.interval
+		}
 		op, err = node.NewWindowOp(fmt.Sprintf("%d_window", newIndex), node.WindowConfig{
-			Type:     t.wtype,
-			Length:   t.length,
-			Interval: t.interval,
+			Type:        t.wtype,
+			Length:      l,
+			Interval:    i,
+			RawInterval: rawInterval,
+			TimeUnit:    t.timeUnit,
 		}, streamsFromStmt, options)
 		if err != nil {
 			return nil, 0, err
@@ -178,6 +187,23 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	return op, newIndex, nil
 }
 
+func convertFromDuration(t *WindowPlan) (int64, int64) {
+	var unit int64 = 1
+	switch t.timeUnit {
+	case ast.DD:
+		unit = 24 * 3600 * 1000
+	case ast.HH:
+		unit = 3600 * 1000
+	case ast.MI:
+		unit = 60 * 1000
+	case ast.SS:
+		unit = 1000
+	case ast.MS:
+		unit = 1
+	}
+	return int64(t.length) * unit, int64(t.interval) * unit
+}
+
 func transformSourceNode(t *DataSourcePlan, sources []*node.SourceNode, options *api.RuleOption) (*node.SourceNode, error) {
 	isSchemaless := t.isSchemaless
 	switch t.streamStmt.StreamType {
@@ -300,9 +326,12 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 			if w.Interval != nil {
 				wp.interval = w.Interval.Val
 			} else if w.WindowType == ast.COUNT_WINDOW {
-				// if no interval value is set, and it's count window, then set interval to length value.
+				// if no interval value is set, and it's a count window, then set interval to length value.
 				wp.interval = w.Length.Val
 			}
+			if w.TimeUnit != nil {
+				wp.timeUnit = w.TimeUnit.Val
+			}
 			if w.Filter != nil {
 				wp.condition = w.Filter
 			}

+ 18 - 7
internal/topo/planner/planner_graph.go

@@ -571,9 +571,10 @@ func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
 		return nil, fmt.Errorf("window size %d is invalid", n.Size)
 	}
 	var (
-		wt       ast.WindowType
-		length   int
-		interval int
+		wt          ast.WindowType
+		length      int
+		interval    int
+		rawInterval int
 	)
 	switch strings.ToLower(n.Type) {
 	case "tumblingwindow":
@@ -581,6 +582,7 @@ func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
 		if n.Interval != 0 && n.Interval != n.Size {
 			return nil, fmt.Errorf("tumbling window interval must equal to size")
 		}
+		rawInterval = n.Size
 	case "hoppingwindow":
 		wt = ast.HOPPING_WINDOW
 		if n.Interval <= 0 {
@@ -589,11 +591,13 @@ func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
 		if n.Interval > n.Size {
 			return nil, fmt.Errorf("hopping window interval must be less than size")
 		}
+		rawInterval = n.Interval
 	case "sessionwindow":
 		wt = ast.SESSION_WINDOW
 		if n.Interval <= 0 {
 			return nil, fmt.Errorf("hopping window interval must be greater than 0")
 		}
+		rawInterval = n.Size
 	case "slidingwindow":
 		wt = ast.SLIDING_WINDOW
 		if n.Interval != 0 && n.Interval != n.Size {
@@ -613,6 +617,7 @@ func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
 	default:
 		return nil, fmt.Errorf("unknown window type %s", n.Type)
 	}
+	var timeUnit ast.Token
 	if wt == ast.COUNT_WINDOW {
 		length = n.Size
 		interval = n.Interval
@@ -621,25 +626,31 @@ func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
 		switch strings.ToLower(n.Unit) {
 		case "dd":
 			unit = 24 * 3600 * 1000
+			timeUnit = ast.DD
 		case "hh":
 			unit = 3600 * 1000
+			timeUnit = ast.HH
 		case "mi":
 			unit = 60 * 1000
+			timeUnit = ast.MI
 		case "ss":
 			unit = 1000
+			timeUnit = ast.SS
 		case "ms":
 			unit = 1
+			timeUnit = ast.MS
 		default:
 			return nil, fmt.Errorf("Invalid unit %s", n.Unit)
 		}
 		length = n.Size * unit
 		interval = n.Interval * unit
 	}
-
 	return &node.WindowConfig{
-		Type:     wt,
-		Length:   length,
-		Interval: interval,
+		RawInterval: rawInterval,
+		Type:        wt,
+		Length:      int64(length),
+		Interval:    int64(interval),
+		TimeUnit:    timeUnit,
 	}, nil
 }
 

+ 43 - 29
internal/topo/planner/planner_test.go

@@ -36,7 +36,7 @@ func init() {
 }
 
 func Test_createLogicalPlan(t *testing.T) {
-	store, err := store.GetKV("stream")
+	kv, err := store.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -73,7 +73,7 @@ func Test_createLogicalPlan(t *testing.T) {
 			t.Error(err)
 			t.Fail()
 		}
-		err = store.Set(name, string(s))
+		err = kv.Set(name, string(s))
 		if err != nil {
 			t.Error(err)
 			t.Fail()
@@ -81,7 +81,7 @@ func Test_createLogicalPlan(t *testing.T) {
 	}
 	streams := make(map[string]*ast.StreamStmt)
 	for n := range streamSqls {
-		streamStmt, err := xsql.GetDataSource(store, n)
+		streamStmt, err := xsql.GetDataSource(kv, n)
 		if err != nil {
 			t.Errorf("fail to get stream %s, please check if stream is created", n)
 			return
@@ -293,7 +293,8 @@ func Test_createLogicalPlan(t *testing.T) {
 							},
 							condition: nil,
 							wtype:     ast.TUMBLING_WINDOW,
-							length:    10000,
+							length:    10,
+							timeUnit:  ast.SS,
 							interval:  0,
 							limit:     0,
 						}.Init(),
@@ -352,7 +353,8 @@ func Test_createLogicalPlan(t *testing.T) {
 										},
 										condition: nil,
 										wtype:     ast.TUMBLING_WINDOW,
-										length:    10000,
+										length:    10,
+										timeUnit:  ast.SS,
 										interval:  0,
 										limit:     0,
 									}.Init(),
@@ -445,7 +447,8 @@ func Test_createLogicalPlan(t *testing.T) {
 							},
 							condition: nil,
 							wtype:     ast.TUMBLING_WINDOW,
-							length:    10000,
+							length:    10,
+							timeUnit:  ast.SS,
 							interval:  0,
 							limit:     0,
 						}.Init(),
@@ -610,7 +613,8 @@ func Test_createLogicalPlan(t *testing.T) {
 										},
 										condition: nil,
 										wtype:     ast.TUMBLING_WINDOW,
-										length:    10000,
+										length:    10,
+										timeUnit:  ast.SS,
 										interval:  0,
 										limit:     0,
 									}.Init(),
@@ -698,7 +702,8 @@ func Test_createLogicalPlan(t *testing.T) {
 										},
 										condition: nil,
 										wtype:     ast.TUMBLING_WINDOW,
-										length:    10000,
+										length:    10,
+										timeUnit:  ast.SS,
 										interval:  0,
 										limit:     0,
 									}.Init(),
@@ -884,7 +889,8 @@ func Test_createLogicalPlan(t *testing.T) {
 													},
 													condition: nil,
 													wtype:     ast.TUMBLING_WINDOW,
-													length:    10000,
+													length:    10,
+													timeUnit:  ast.SS,
 													interval:  0,
 													limit:     0,
 												}.Init(),
@@ -1587,7 +1593,8 @@ func Test_createLogicalPlan(t *testing.T) {
 													},
 													condition: nil,
 													wtype:     ast.SLIDING_WINDOW,
-													length:    10000,
+													length:    10,
+													timeUnit:  ast.SS,
 													interval:  0,
 													limit:     0,
 												}.Init(),
@@ -1808,7 +1815,7 @@ func Test_createLogicalPlan(t *testing.T) {
 			Qos:                0,
 			CheckpointInterval: 0,
 			SendError:          true,
-		}, store)
+		}, kv)
 		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
 			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.sql, tt.err, err)
 		} else if !reflect.DeepEqual(tt.p, p) {
@@ -1818,7 +1825,7 @@ func Test_createLogicalPlan(t *testing.T) {
 }
 
 func Test_createLogicalPlanSchemaless(t *testing.T) {
-	store, err := store.GetKV("stream")
+	kv, err := store.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -1849,7 +1856,7 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 			t.Error(err)
 			t.Fail()
 		}
-		err = store.Set(name, string(s))
+		err = kv.Set(name, string(s))
 		if err != nil {
 			t.Error(err)
 			t.Fail()
@@ -1857,7 +1864,7 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 	}
 	streams := make(map[string]*ast.StreamStmt)
 	for n := range streamSqls {
-		streamStmt, err := xsql.GetDataSource(store, n)
+		streamStmt, err := xsql.GetDataSource(kv, n)
 		if err != nil {
 			t.Errorf("fail to get stream %s, please check if stream is created", n)
 			return
@@ -1933,7 +1940,8 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 							},
 							condition: nil,
 							wtype:     ast.TUMBLING_WINDOW,
-							length:    10000,
+							length:    10,
+							timeUnit:  ast.SS,
 							interval:  0,
 							limit:     0,
 						}.Init(),
@@ -1985,7 +1993,8 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 										},
 										condition: nil,
 										wtype:     ast.TUMBLING_WINDOW,
-										length:    10000,
+										length:    10,
+										timeUnit:  ast.SS,
 										interval:  0,
 										limit:     0,
 									}.Init(),
@@ -2072,7 +2081,8 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 							},
 							condition: nil,
 							wtype:     ast.TUMBLING_WINDOW,
-							length:    10000,
+							length:    10,
+							timeUnit:  ast.SS,
 							interval:  0,
 							limit:     0,
 						}.Init(),
@@ -2214,7 +2224,8 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 										},
 										condition: nil,
 										wtype:     ast.TUMBLING_WINDOW,
-										length:    10000,
+										length:    10,
+										timeUnit:  ast.SS,
 										interval:  0,
 										limit:     0,
 									}.Init(),
@@ -2295,7 +2306,8 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 										},
 										condition: nil,
 										wtype:     ast.TUMBLING_WINDOW,
-										length:    10000,
+										length:    10,
+										timeUnit:  ast.SS,
 										interval:  0,
 										limit:     0,
 									}.Init(),
@@ -2473,7 +2485,8 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 													},
 													condition: nil,
 													wtype:     ast.TUMBLING_WINDOW,
-													length:    10000,
+													length:    10,
+													timeUnit:  ast.SS,
 													interval:  0,
 													limit:     0,
 												}.Init(),
@@ -2786,7 +2799,7 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 			Qos:                0,
 			CheckpointInterval: 0,
 			SendError:          true,
-		}, store)
+		}, kv)
 		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
 			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.sql, tt.err, err)
 		} else if !reflect.DeepEqual(tt.p, p) {
@@ -2796,7 +2809,7 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 }
 
 func Test_createLogicalPlan4Lookup(t *testing.T) {
-	store, err := store.GetKV("stream")
+	kv, err := store.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -2820,7 +2833,7 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 			t.Error(err)
 			t.Fail()
 		}
-		err = store.Set(name, string(s))
+		err = kv.Set(name, string(s))
 		if err != nil {
 			t.Error(err)
 			t.Fail()
@@ -2828,7 +2841,7 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 	}
 	streams := make(map[string]*ast.StreamStmt)
 	for n := range streamSqls {
-		streamStmt, err := xsql.GetDataSource(store, n)
+		streamStmt, err := xsql.GetDataSource(kv, n)
 		if err != nil {
 			t.Errorf("fail to get stream %s, please check if stream is created", n)
 			return
@@ -3199,7 +3212,8 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 										},
 										condition: nil,
 										wtype:     ast.TUMBLING_WINDOW,
-										length:    10000,
+										length:    10,
+										timeUnit:  ast.SS,
 										interval:  0,
 										limit:     0,
 									}.Init(),
@@ -3264,7 +3278,7 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 			Qos:                0,
 			CheckpointInterval: 0,
 			SendError:          true,
-		}, store)
+		}, kv)
 		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
 			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.sql, tt.err, err)
 		} else if !reflect.DeepEqual(tt.p, p) {
@@ -3327,13 +3341,13 @@ func TestTransformSourceNode(t *testing.T) {
 	}
 	for _, tc := range testCases {
 		t.Run(tc.name, func(t *testing.T) {
-			node, err := transformSourceNode(tc.plan, nil, &api.RuleOption{})
+			sourceNode, err := transformSourceNode(tc.plan, nil, &api.RuleOption{})
 			if err != nil {
 				t.Errorf("unexpected error: %v", err)
 				return
 			}
-			if !reflect.DeepEqual(node, tc.node) {
-				t.Errorf("unexpected result: got %v, want %v", node, tc.node)
+			if !reflect.DeepEqual(sourceNode, tc.node) {
+				t.Errorf("unexpected result: got %v, want %v", sourceNode, tc.node)
 			}
 		})
 	}

+ 2 - 1
internal/topo/planner/windowPlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ type WindowPlan struct {
 	wtype       ast.WindowType
 	length      int
 	interval    int // If interval is not set, it is equals to Length
+	timeUnit    ast.Token
 	limit       int // If limit is not positive, there will be no limit
 	isEventTime bool
 }

+ 2 - 0
internal/topo/rule/ruleState.go

@@ -145,6 +145,7 @@ func (rs *RuleState) run() {
 				if ctx != nil {
 					conf.Log.Warnf("rule %s is already started", rs.RuleId)
 				} else {
+
 					ctx, cancel = context.WithCancel(context.Background())
 					go rs.runTopo(ctx)
 				}
@@ -181,6 +182,7 @@ func (rs *RuleState) runTopo(ctx context.Context) {
 		for {
 			select {
 			case e := <-tp.Open():
+
 				er = e
 				if er != nil { // Only restart rule for errors
 					tp.GetContext().SetError(er)

+ 11 - 17
internal/xsql/parser.go

@@ -1058,25 +1058,19 @@ func (p *Parser) ConvertToWindows(wtype ast.WindowType, args []ast.Expr) (*ast.W
 		}
 		return win, nil
 	}
-	unit := 1
-	v := args[0].(*ast.TimeLiteral).Val
-	switch v {
-	case ast.DD:
-		unit = 24 * 3600 * 1000
-	case ast.HH:
-		unit = 3600 * 1000
-	case ast.MI:
-		unit = 60 * 1000
-	case ast.SS:
-		unit = 1000
-	case ast.MS:
-		unit = 1
-	default:
-		return nil, fmt.Errorf("Invalid timeliteral %s", v)
+	if tl, ok := args[0].(*ast.TimeLiteral); ok {
+		switch tl.Val {
+		case ast.DD, ast.HH, ast.MI, ast.SS, ast.MS:
+			win.TimeUnit = tl
+		default:
+			return nil, fmt.Errorf("Invalid timeliteral %s", tl.Val)
+		}
+	} else {
+		return nil, fmt.Errorf("Invalid timeliteral %s", tl.Val)
 	}
-	win.Length = &ast.IntegerLiteral{Val: args[1].(*ast.IntegerLiteral).Val * unit}
+	win.Length = &ast.IntegerLiteral{Val: args[1].(*ast.IntegerLiteral).Val}
 	if len(args) > 2 {
-		win.Interval = &ast.IntegerLiteral{Val: args[2].(*ast.IntegerLiteral).Val * unit}
+		win.Interval = &ast.IntegerLiteral{Val: args[2].(*ast.IntegerLiteral).Val}
 	} else {
 		win.Interval = &ast.IntegerLiteral{Val: 0}
 	}

+ 9 - 5
internal/xsql/parser_test.go

@@ -3052,8 +3052,9 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 					ast.Dimension{
 						Expr: &ast.Window{
 							WindowType: ast.TUMBLING_WINDOW,
-							Length:     &ast.IntegerLiteral{Val: 10000},
+							Length:     &ast.IntegerLiteral{Val: 10},
 							Interval:   &ast.IntegerLiteral{Val: 0},
+							TimeUnit:   &ast.TimeLiteral{Val: ast.SS},
 						},
 					},
 				},
@@ -3075,8 +3076,9 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 					ast.Dimension{
 						Expr: &ast.Window{
 							WindowType: ast.HOPPING_WINDOW,
-							Length:     &ast.IntegerLiteral{Val: 3e5},
-							Interval:   &ast.IntegerLiteral{Val: 6e4},
+							Length:     &ast.IntegerLiteral{Val: 5},
+							Interval:   &ast.IntegerLiteral{Val: 1},
+							TimeUnit:   &ast.TimeLiteral{Val: ast.MI},
 						},
 					},
 				},
@@ -3098,8 +3100,9 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 					ast.Dimension{
 						Expr: &ast.Window{
 							WindowType: ast.SESSION_WINDOW,
-							Length:     &ast.IntegerLiteral{Val: 1.8e7},
-							Interval:   &ast.IntegerLiteral{Val: 3.6e6},
+							Length:     &ast.IntegerLiteral{Val: 5},
+							Interval:   &ast.IntegerLiteral{Val: 1},
+							TimeUnit:   &ast.TimeLiteral{Val: ast.HH},
 						},
 					},
 				},
@@ -3123,6 +3126,7 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 							WindowType: ast.SLIDING_WINDOW,
 							Length:     &ast.IntegerLiteral{Val: 5},
 							Interval:   &ast.IntegerLiteral{Val: 0},
+							TimeUnit:   &ast.TimeLiteral{Val: ast.MS},
 						},
 					},
 				},

+ 2 - 1
pkg/ast/statement.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -175,6 +175,7 @@ type Window struct {
 	WindowType WindowType
 	Length     *IntegerLiteral
 	Interval   *IntegerLiteral
+	TimeUnit   *TimeLiteral
 	Filter     Expr
 	Expr
 }