Преглед на файлове

feat: support event time in continuous query

Split watermark op out;
Do not need to calculate watermark in windows op

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang преди 1 година
родител
ревизия
e46348ec33

+ 206 - 0
internal/topo/node/event_window_trigger.go

@@ -0,0 +1,206 @@
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package node
+
+import (
+	"fmt"
+	"math"
+	"time"
+
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
+
+// EventTimeTrigger scans the input tuples and find out the tuples in the current window
+// The inputs are sorted by watermark op
+type EventTimeTrigger struct {
+	window   *WindowConfig
+	interval int64
+}
+
+func NewEventTimeTrigger(window *WindowConfig) (*EventTimeTrigger, error) {
+	w := &EventTimeTrigger{
+		window: window,
+	}
+	switch window.Type {
+	case ast.NOT_WINDOW:
+	case ast.TUMBLING_WINDOW:
+		w.interval = window.Length
+	case ast.HOPPING_WINDOW:
+		w.interval = window.Interval
+	case ast.SLIDING_WINDOW:
+		w.interval = window.Length
+	case ast.SESSION_WINDOW:
+		// Use timeout to update watermark
+		w.interval = window.Interval
+	default:
+		return nil, fmt.Errorf("unsupported window type %d", window.Type)
+	}
+	return w, nil
+}
+
+// If the window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
+func (w *EventTimeTrigger) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64) int64 {
+	switch w.window.Type {
+	case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
+		if current > 0 {
+			return current + w.interval
+		} else { // first run without a previous window
+			nextTs := getEarliestEventTs(inputs, current, watermark)
+			if nextTs == math.MaxInt64 {
+				return nextTs
+			}
+			return getAlignedWindowEndTime(time.UnixMilli(nextTs), w.window.RawInterval, w.window.TimeUnit).UnixMilli()
+		}
+	case ast.SLIDING_WINDOW:
+		nextTs := getEarliestEventTs(inputs, current, watermark)
+		return nextTs
+	default:
+		return math.MaxInt64
+	}
+}
+
+func (w *EventTimeTrigger) getNextSessionWindow(inputs []*xsql.Tuple, now int64) (int64, bool) {
+	if len(inputs) > 0 {
+		timeout, duration := w.window.Interval, w.window.Length
+		et := inputs[0].Timestamp
+		tick := getAlignedWindowEndTime(time.UnixMilli(et), w.window.RawInterval, w.window.TimeUnit).UnixMilli()
+		var p int64
+		ticked := false
+		for _, tuple := range inputs {
+			var r int64 = math.MaxInt64
+			if p > 0 {
+				if tuple.Timestamp-p > timeout {
+					r = p + timeout
+				}
+			}
+			if tuple.Timestamp > tick {
+				if tick-duration > et && tick < r {
+					r = tick
+					ticked = true
+				}
+				tick += duration
+			}
+			if r < math.MaxInt64 {
+				return r, ticked
+			}
+			p = tuple.Timestamp
+		}
+		if p > 0 {
+			if now-p > timeout {
+				return p + timeout, ticked
+			}
+		}
+	}
+	return math.MaxInt64, false
+}
+
+func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.Tuple, _ chan<- error) {
+	log := ctx.GetLogger()
+	var (
+		nextWindowEndTs int64
+		prevWindowEndTs int64
+		lastTicked      bool
+	)
+	for {
+		select {
+		// process incoming item
+		case item, opened := <-o.input:
+			if !opened {
+				o.statManager.IncTotalExceptions("input channel closed")
+				break
+			}
+			if _, ok := item.(*xsql.WatermarkTuple); !ok {
+				processed := false
+				if item, processed = o.preprocess(item); processed {
+					break
+				}
+			}
+			switch d := item.(type) {
+			case error:
+				o.statManager.IncTotalRecordsIn()
+				_ = o.Broadcast(d)
+				o.statManager.IncTotalExceptions(d.Error())
+			case *xsql.WatermarkTuple:
+				ctx.GetLogger().Debug("WatermarkTuple", d.GetTimestamp())
+				watermarkTs := d.GetTimestamp()
+				windowEndTs := nextWindowEndTs
+				ticked := false
+				// Session window needs a recalculation of window because its window end depends on the inputs
+				if windowEndTs == math.MaxInt64 || o.window.Type == ast.SESSION_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
+					if o.window.Type == ast.SESSION_WINDOW {
+						windowEndTs, ticked = o.trigger.getNextSessionWindow(inputs, watermarkTs)
+					} else {
+						windowEndTs = o.trigger.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
+					}
+				}
+				for windowEndTs <= watermarkTs && windowEndTs >= 0 {
+					log.Debugf("Current input count %d", len(inputs))
+					// scan all events and find out the event in the current window
+					if o.window.Type == ast.SESSION_WINDOW && !lastTicked {
+						o.triggerTime = inputs[0].Timestamp
+					}
+					if windowEndTs > 0 {
+						inputs = o.scan(inputs, windowEndTs, ctx)
+					}
+					prevWindowEndTs = windowEndTs
+					lastTicked = ticked
+					if o.window.Type == ast.SESSION_WINDOW {
+						windowEndTs, ticked = o.trigger.getNextSessionWindow(inputs, watermarkTs)
+					} else {
+						windowEndTs = o.trigger.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
+					}
+					log.Debugf("Window end ts %d Watermark ts %d\n", windowEndTs, watermarkTs)
+				}
+				nextWindowEndTs = windowEndTs
+				log.Debugf("next window end %d", nextWindowEndTs)
+			case *xsql.Tuple:
+				ctx.GetLogger().Debug("Tuple", d.GetTimestamp())
+				o.statManager.ProcessTimeStart()
+				o.statManager.IncTotalRecordsIn()
+				log.Debugf("event window receive tuple %s", d.Message)
+				// first tuple, set the window start time, which will set to triggerTime
+				if o.triggerTime == 0 {
+					o.triggerTime = d.Timestamp
+				}
+				inputs = append(inputs, d)
+				o.statManager.ProcessTimeEnd()
+				_ = ctx.PutState(WindowInputsKey, inputs)
+			default:
+				e := fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d)
+				_ = o.Broadcast(e)
+				o.statManager.IncTotalExceptions(e.Error())
+			}
+		// is cancelling
+		case <-ctx.Done():
+			log.Infoln("Cancelling window....")
+			if o.ticker != nil {
+				o.ticker.Stop()
+			}
+			return
+		}
+	}
+}
+
+func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
+	var minTs int64 = math.MaxInt64
+	for _, t := range inputs {
+		if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
+			minTs = t.Timestamp
+		}
+	}
+	return minTs
+}

+ 5 - 5
internal/topo/node/join_align_node.go

@@ -105,7 +105,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 					}
 					switch d := item.(type) {
 					case error:
-						n.Broadcast(d)
+						_ = n.Broadcast(d)
 						n.statManager.IncTotalExceptions(d.Error())
 					case *xsql.Tuple:
 						log.Debugf("JoinAlignNode receive tuple input %s", d)
@@ -125,16 +125,16 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 							_, ok := n.batch[emitter]
 							if !ok {
 								e := fmt.Errorf("run JoinAlignNode error: receive batch input from unknown emitter %[1]T(%[1]v)", d)
-								n.Broadcast(e)
+								_ = n.Broadcast(e)
 								n.statManager.IncTotalExceptions(e.Error())
 								break
 							}
 							n.batch[emitter] = d.Content
-							ctx.PutState(BatchKey, n.batch)
+							_ = ctx.PutState(BatchKey, n.batch)
 						}
 					default:
 						e := fmt.Errorf("run JoinAlignNode error: invalid input type but got %[1]T(%[1]v)", d)
-						n.Broadcast(e)
+						_ = n.Broadcast(e)
 						n.statManager.IncTotalExceptions(e.Error())
 					}
 				case <-ctx.Done():
@@ -157,7 +157,7 @@ func (n *JoinAlignNode) alignBatch(_ api.StreamContext, w *xsql.WindowTuples) {
 		}
 	}
 
-	n.Broadcast(w)
+	_ = n.Broadcast(w)
 	n.statManager.ProcessTimeEnd()
 	n.statManager.IncTotalRecordsOut()
 	n.statManager.SetBufferLength(int64(len(n.input)))

+ 8 - 3
internal/topo/node/node.go

@@ -168,7 +168,7 @@ func (o *defaultSinkNode) preprocess(data interface{}) (interface{}, bool) {
 		b, ok := data.(*checkpoint.BufferOrEvent)
 		if ok {
 			logger.Debugf("data is BufferOrEvent, start barrier handler")
-			// if it is barrier return true and ignore the further processing
+			// if it is a barrier, return true and ignore the further processing
 			// if it is blocked(align handler), return true and then write back to the channel later
 			if o.barrierHandler.Process(b, o.ctx) {
 				return nil, true
@@ -177,6 +177,11 @@ func (o *defaultSinkNode) preprocess(data interface{}) (interface{}, bool) {
 			}
 		}
 	}
+	// Filter all the watermark tuples.
+	// Only event time window op needs this, so handle it there
+	if _, ok := data.(*xsql.WatermarkTuple); ok {
+		return nil, true
+	}
 	return data, false
 }
 
@@ -221,7 +226,7 @@ func SourceOpen(sourceType string, config map[string]interface{}) error {
 
 		contextLogger := conf.Log.WithField("rule", "TestSourceOpen"+"_"+sourceType)
 		ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
-		lns.Close(ctx)
+		_ = lns.Close(ctx)
 	} else {
 		err = ns.Configure(dataSource, config)
 		if err != nil {
@@ -230,7 +235,7 @@ func SourceOpen(sourceType string, config map[string]interface{}) error {
 
 		contextLogger := conf.Log.WithField("rule", "TestSourceOpen"+"_"+sourceType)
 		ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
-		ns.Close(ctx)
+		_ = ns.Close(ctx)
 	}
 
 	return nil

+ 4 - 4
internal/topo/node/operations.go

@@ -44,7 +44,7 @@ type UnaryOperator struct {
 	cancelled bool
 }
 
-// NewUnary creates *UnaryOperator value
+// New NewUnary creates *UnaryOperator value
 func New(name string, options *api.RuleOption) *UnaryOperator {
 	return &UnaryOperator{
 		defaultSinkNode: &defaultSinkNode{
@@ -136,19 +136,19 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 				continue
 			case error:
 				logger.Errorf("Operation %s error: %s", ctx.GetOpId(), val)
-				o.Broadcast(val)
+				_ = o.Broadcast(val)
 				stats.IncTotalExceptions(val.Error())
 				continue
 			case []xsql.TupleRow:
 				stats.ProcessTimeEnd()
 				for _, v := range val {
-					o.Broadcast(v)
+					_ = o.Broadcast(v)
 					stats.IncTotalRecordsOut()
 				}
 				stats.SetBufferLength(int64(len(o.input)))
 			default:
 				stats.ProcessTimeEnd()
-				o.Broadcast(val)
+				_ = o.Broadcast(val)
 				stats.IncTotalRecordsOut()
 				stats.SetBufferLength(int64(len(o.input)))
 			}

+ 0 - 293
internal/topo/node/watermark.go

@@ -1,293 +0,0 @@
-// Copyright 2021-2023 EMQ Technologies Co., Ltd.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package node
-
-import (
-	"context"
-	"fmt"
-	"math"
-	"sort"
-	"time"
-
-	"github.com/lf-edge/ekuiper/internal/xsql"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/ast"
-	"github.com/lf-edge/ekuiper/pkg/infra"
-)
-
-type WatermarkTuple struct {
-	Timestamp int64
-}
-
-func (t *WatermarkTuple) GetTimestamp() int64 {
-	return t.Timestamp
-}
-
-func (t *WatermarkTuple) IsWatermark() bool {
-	return true
-}
-
-const WATERMARK_KEY = "$$wartermark"
-
-type WatermarkGenerator struct {
-	inputTopics   []string
-	topicToTs     map[string]int64
-	window        *WindowConfig
-	lateTolerance int64
-	interval      int64
-	rawInterval   int
-	timeUnit      ast.Token
-	// ticker          *clock.Ticker
-	stream chan<- interface{}
-	// state
-	lastWatermarkTs int64
-}
-
-func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error) {
-	w := &WatermarkGenerator{
-		window:        window,
-		topicToTs:     make(map[string]int64),
-		lateTolerance: l,
-		inputTopics:   s,
-		stream:        stream,
-		rawInterval:   window.RawInterval,
-		timeUnit:      window.TimeUnit,
-	}
-	switch window.Type {
-	case ast.NOT_WINDOW:
-	case ast.TUMBLING_WINDOW:
-		w.interval = window.Length
-	case ast.HOPPING_WINDOW:
-		w.interval = window.Interval
-	case ast.SLIDING_WINDOW:
-		w.interval = window.Length
-	case ast.SESSION_WINDOW:
-		// Use timeout to update watermark
-		w.interval = window.Interval
-	default:
-		return nil, fmt.Errorf("unsupported window type %d", window.Type)
-	}
-	return w, nil
-}
-
-func (w *WatermarkGenerator) track(s string, ts int64, ctx api.StreamContext) bool {
-	log := ctx.GetLogger()
-	log.Debugf("watermark generator track event from topic %s at %d", s, ts)
-	currentVal, ok := w.topicToTs[s]
-	if !ok || ts > currentVal {
-		w.topicToTs[s] = ts
-	}
-	r := ts >= w.lastWatermarkTs
-	if r {
-		w.trigger(ctx)
-	}
-	return r
-}
-
-func (w *WatermarkGenerator) trigger(ctx api.StreamContext) {
-	log := ctx.GetLogger()
-	watermark := w.computeWatermarkTs(ctx)
-	log.Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
-	if watermark > w.lastWatermarkTs {
-		t := &WatermarkTuple{Timestamp: watermark}
-		select {
-		case w.stream <- t:
-		default: // TODO need to set buffer
-		}
-		w.lastWatermarkTs = watermark
-		ctx.PutState(WATERMARK_KEY, w.lastWatermarkTs)
-		log.Debugf("scan watermark event at %d", watermark)
-	}
-}
-
-func (w *WatermarkGenerator) computeWatermarkTs(_ context.Context) int64 {
-	var ts int64
-	if len(w.topicToTs) >= len(w.inputTopics) {
-		ts = math.MaxInt64
-		for _, key := range w.inputTopics {
-			if ts > w.topicToTs[key] {
-				ts = w.topicToTs[key]
-			}
-		}
-	}
-	return ts - w.lateTolerance
-}
-
-// If window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
-func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64) int64 {
-	switch w.window.Type {
-	case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
-		if current > 0 {
-			return current + w.interval
-		} else { // first run without a previous window
-			nextTs := getEarliestEventTs(inputs, current, watermark)
-			if nextTs == math.MaxInt64 {
-				return nextTs
-			}
-			return getAlignedWindowEndTime(time.UnixMilli(nextTs), w.rawInterval, w.timeUnit).UnixMilli()
-		}
-	case ast.SLIDING_WINDOW:
-		nextTs := getEarliestEventTs(inputs, current, watermark)
-		return nextTs
-	default:
-		return math.MaxInt64
-	}
-}
-
-func (w *WatermarkGenerator) getNextSessionWindow(inputs []*xsql.Tuple) (int64, bool) {
-	if len(inputs) > 0 {
-		timeout, duration := w.window.Interval, w.window.Length
-		sort.SliceStable(inputs, func(i, j int) bool {
-			return inputs[i].Timestamp < inputs[j].Timestamp
-		})
-		et := inputs[0].Timestamp
-		tick := getAlignedWindowEndTime(time.UnixMilli(et), w.rawInterval, w.timeUnit).UnixMilli()
-		var p int64
-		ticked := false
-		for _, tuple := range inputs {
-			var r int64 = math.MaxInt64
-			if p > 0 {
-				if tuple.Timestamp-p > timeout {
-					r = p + timeout
-				}
-			}
-			if tuple.Timestamp > tick {
-				if tick-duration > et && tick < r {
-					r = tick
-					ticked = true
-				}
-				tick += duration
-			}
-			if r < math.MaxInt64 {
-				return r, ticked
-			}
-			p = tuple.Timestamp
-		}
-	}
-	return math.MaxInt64, false
-}
-
-func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.Tuple, errCh chan<- error) {
-	log := ctx.GetLogger()
-	var (
-		nextWindowEndTs int64
-		prevWindowEndTs int64
-		lastTicked      bool
-	)
-
-	o.watermarkGenerator.lastWatermarkTs = 0
-	if s, err := ctx.GetState(WATERMARK_KEY); err == nil && s != nil {
-		if si, ok := s.(int64); ok {
-			o.watermarkGenerator.lastWatermarkTs = si
-		} else {
-			infra.DrainError(ctx, fmt.Errorf("restore window state `lastWatermarkTs` %v error, invalid type", s), errCh)
-			return
-		}
-	}
-	log.Infof("Start with window state lastWatermarkTs: %d", o.watermarkGenerator.lastWatermarkTs)
-	for {
-		select {
-		// process incoming item
-		case item, opened := <-o.input:
-			processed := false
-			if item, processed = o.preprocess(item); processed {
-				break
-			}
-			o.statManager.ProcessTimeStart()
-			if !opened {
-				o.statManager.IncTotalExceptions("input channel closed")
-				break
-			}
-			switch d := item.(type) {
-			case error:
-				o.statManager.IncTotalRecordsIn()
-				o.Broadcast(d)
-				o.statManager.IncTotalExceptions(d.Error())
-			case xsql.Event:
-				if d.IsWatermark() {
-					watermarkTs := d.GetTimestamp()
-					windowEndTs := nextWindowEndTs
-					ticked := false
-					// Session window needs a recalculation of window because its window end depends on the inputs
-					if windowEndTs == math.MaxInt64 || o.window.Type == ast.SESSION_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
-						if o.window.Type == ast.SESSION_WINDOW {
-							windowEndTs, ticked = o.watermarkGenerator.getNextSessionWindow(inputs)
-						} else {
-							windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
-						}
-					}
-					for windowEndTs <= watermarkTs && windowEndTs >= 0 {
-						log.Debugf("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)
-						log.Debugf("Current input count %d", len(inputs))
-						// scan all events and find out the event in the current window
-						if o.window.Type == ast.SESSION_WINDOW && !lastTicked {
-							o.triggerTime = inputs[0].Timestamp
-						}
-						if windowEndTs > 0 {
-							inputs = o.scan(inputs, windowEndTs, ctx)
-						}
-						prevWindowEndTs = windowEndTs
-						lastTicked = ticked
-						if o.window.Type == ast.SESSION_WINDOW {
-							windowEndTs, ticked = o.watermarkGenerator.getNextSessionWindow(inputs)
-						} else {
-							windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
-						}
-					}
-					nextWindowEndTs = windowEndTs
-					log.Debugf("next window end %d", nextWindowEndTs)
-				} else {
-					o.statManager.IncTotalRecordsIn()
-					tuple, ok := d.(*xsql.Tuple)
-					if !ok {
-						log.Debugf("receive non tuple element %v", d)
-					}
-					log.Debugf("event window receive tuple %s", tuple.Message)
-					// first tuple, set the window start time, which will set to triggerTime
-					if o.triggerTime == 0 {
-						o.triggerTime = tuple.Timestamp
-					}
-					if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(), ctx) {
-						inputs = append(inputs, tuple)
-					}
-				}
-				o.statManager.ProcessTimeEnd()
-				ctx.PutState(WINDOW_INPUTS_KEY, inputs)
-			default:
-				o.statManager.IncTotalRecordsIn()
-				e := fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d)
-				o.Broadcast(e)
-				o.statManager.IncTotalExceptions(e.Error())
-			}
-		// is cancelling
-		case <-ctx.Done():
-			log.Infoln("Cancelling window....")
-			if o.ticker != nil {
-				o.ticker.Stop()
-			}
-			return
-		}
-	}
-}
-
-func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
-	var minTs int64 = math.MaxInt64
-	for _, t := range inputs {
-		if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
-			minTs = t.Timestamp
-		}
-	}
-	return minTs
-}

+ 224 - 0
internal/topo/node/watermark_op.go

@@ -0,0 +1,224 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package node
+
+import (
+	"fmt"
+	"math"
+	"sort"
+
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/infra"
+)
+
+// WatermarkOp is used when event time is enabled.
+// It is used to align the event time of the input streams
+// It sends out the data in time order with watermark.
+type WatermarkOp struct {
+	*defaultSinkNode
+	statManager metric.StatManager
+	// config
+	lateTolerance int64
+	// state
+	events          []*xsql.Tuple // All the cached events in order
+	streamWMs       map[string]int64
+	lastWatermarkTs int64
+}
+
+var _ OperatorNode = &WatermarkOp{}
+
+const (
+	WatermarkKey  = "$$wartermark"
+	EventInputKey = "$$eventinputs"
+	StreamWMKey   = "$$streamwms"
+)
+
+func NewWatermarkOp(name string, streams []string, options *api.RuleOption) *WatermarkOp {
+	wms := make(map[string]int64, len(streams))
+	return &WatermarkOp{
+		defaultSinkNode: &defaultSinkNode{
+			input: make(chan interface{}, options.BufferLength),
+			defaultNode: &defaultNode{
+				outputs:   make(map[string]chan<- interface{}),
+				name:      name,
+				sendError: options.SendError,
+			},
+		},
+		lateTolerance: options.LateTol,
+		streamWMs:     wms,
+	}
+}
+
+func (w *WatermarkOp) Exec(ctx api.StreamContext, errCh chan<- error) {
+	ctx.GetLogger().Debugf("watermark node %s is started", w.name)
+	if len(w.outputs) <= 0 {
+		infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
+		return
+	}
+	stats, err := metric.NewStatManager(ctx, "op")
+	if err != nil {
+		infra.DrainError(ctx, fmt.Errorf("fail to create stat manager"), errCh)
+		return
+	}
+	w.statManager = stats
+	w.ctx = ctx
+	// restore state
+	if s, err := ctx.GetState(WatermarkKey); err == nil && s != nil {
+		if si, ok := s.(int64); ok {
+			w.lastWatermarkTs = si
+		} else {
+			infra.DrainError(ctx, fmt.Errorf("restore watermark state `lastWatermarkTs` %v error, invalid type", s), errCh)
+			return
+		}
+	}
+	if s, err := ctx.GetState(EventInputKey); err == nil {
+		switch st := s.(type) {
+		case []*xsql.Tuple:
+			w.events = st
+			ctx.GetLogger().Infof("Restore watermark events state %+v", st)
+		case nil:
+			ctx.GetLogger().Debugf("Restore watermark events state, nothing")
+		default:
+			infra.DrainError(ctx, fmt.Errorf("restore watermark event state %v error, invalid type", st), errCh)
+			return
+		}
+	} else {
+		ctx.GetLogger().Warnf("Restore watermark event state fails: %s", err)
+	}
+	if s, err := ctx.GetState(StreamWMKey); err == nil && s != nil {
+		if si, ok := s.(map[string]int64); ok {
+			w.streamWMs = si
+		} else {
+			infra.DrainError(ctx, fmt.Errorf("restore watermark stream keys state %v error, invalid type", s), errCh)
+			return
+		}
+	}
+
+	ctx.GetLogger().Infof("Start with state lastWatermarkTs: %d", w.lastWatermarkTs)
+	go func() {
+		err := infra.SafeRun(func() error {
+			for {
+				select {
+				case <-ctx.Done():
+					ctx.GetLogger().Infof("watermark node %s is finished", w.name)
+					return nil
+				case item, opened := <-w.input:
+					if !opened {
+						w.statManager.IncTotalExceptions("input channel closed")
+						break
+					}
+					processed := false
+					if item, processed = w.preprocess(item); processed {
+						break
+					}
+					switch d := item.(type) {
+					case error:
+						_ = w.Broadcast(d)
+						w.statManager.IncTotalExceptions(d.Error())
+					case *xsql.Tuple:
+						w.statManager.IncTotalRecordsIn()
+						// Start the first event processing.
+						// Later a series of events may send out in order
+						w.statManager.ProcessTimeStart()
+						// whether to drop the late event
+						if w.track(ctx, d.Emitter, d.GetTimestamp()) {
+							// If not drop, check if it can be sent out
+							w.addAndTrigger(ctx, d)
+						}
+					default:
+						e := fmt.Errorf("run watermark op error: expect *xsql.Tuple type but got %[1]T(%[1]v)", d)
+						_ = w.Broadcast(e)
+						w.statManager.IncTotalExceptions(e.Error())
+					}
+				}
+			}
+		})
+		if err != nil {
+			infra.DrainError(ctx, err, errCh)
+		}
+	}()
+}
+
+func (w *WatermarkOp) track(ctx api.StreamContext, emitter string, ts int64) bool {
+	ctx.GetLogger().Debugf("watermark generator track event from topic %s at %d", emitter, ts)
+	watermark, ok := w.streamWMs[emitter]
+	if !ok || ts > watermark {
+		w.streamWMs[emitter] = ts
+		_ = ctx.PutState(StreamWMKey, w.streamWMs)
+	}
+	r := ts >= w.lastWatermarkTs
+	return r
+}
+
+// Add an event and check if watermark proceeds
+// If yes, send out all events before the watermark
+func (w *WatermarkOp) addAndTrigger(ctx api.StreamContext, d *xsql.Tuple) {
+	w.events = append(w.events, d)
+	watermark := w.computeWatermarkTs()
+	ctx.GetLogger().Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
+	if watermark > w.lastWatermarkTs {
+		sort.SliceStable(w.events, func(i, j int) bool {
+			return w.events[i].GetTimestamp() < w.events[j].GetTimestamp()
+		})
+
+		// Find out the last event to send in this watermark change
+		c := len(w.events)
+		for i, e := range w.events {
+			if e.GetTimestamp() > watermark {
+				c = i
+				break
+			}
+		}
+		// Send out all events before the watermark
+		for i := 0; i < c; i++ {
+			if i > 0 { // The first event processing time start at the beginning of event receiving
+				w.statManager.ProcessTimeStart()
+			}
+			_ = w.Broadcast(w.events[i])
+			ctx.GetLogger().Debug("send out event", w.events[i].GetTimestamp())
+			w.statManager.IncTotalRecordsOut()
+			w.statManager.ProcessTimeEnd()
+		}
+		w.events = w.events[c:]
+		_ = ctx.PutState(EventInputKey, w.events)
+		_ = w.Broadcast(&xsql.WatermarkTuple{Timestamp: watermark})
+		w.lastWatermarkTs = watermark
+		_ = ctx.PutState(WatermarkKey, w.lastWatermarkTs)
+		ctx.GetLogger().Debugf("scan watermark event at %d", watermark)
+	}
+}
+
+// watermark is the minimum timestamp of all input topics
+func (w *WatermarkOp) computeWatermarkTs() int64 {
+	var ts int64 = math.MaxInt64
+	for _, wm := range w.streamWMs {
+		if ts > wm {
+			ts = wm
+		}
+	}
+	return ts - w.lateTolerance
+}
+
+func (w *WatermarkOp) GetMetrics() [][]interface{} {
+	if w.statManager != nil {
+		return [][]interface{}{
+			w.statManager.GetMetrics(),
+		}
+	} else {
+		return nil
+	}
+}

+ 34 - 37
internal/topo/node/window_op.go

@@ -32,19 +32,19 @@ import (
 )
 
 type WindowConfig struct {
-	Type        ast.WindowType
-	Length      int64
-	Interval    int64 // If the interval is not set, it is equals to Length
+	Type     ast.WindowType
+	Length   int64
+	Interval int64 // If the interval is not set, it is equals to Length
 	RawInterval int
 	TimeUnit    ast.Token
 }
 
 type WindowOperator struct {
 	*defaultSinkNode
-	window             *WindowConfig
-	interval           int64
-	isEventTime        bool
-	watermarkGenerator *WatermarkGenerator // For event time only
+	window      *WindowConfig
+	interval    int64
+	isEventTime bool
+	trigger     *EventTimeTrigger // For event time only
 
 	statManager metric.StatManager
 	ticker      *clock.Ticker // For processing time only
@@ -54,9 +54,9 @@ type WindowOperator struct {
 }
 
 const (
-	WINDOW_INPUTS_KEY = "$$windowInputs"
-	TRIGGER_TIME_KEY  = "$$triggerTime"
-	MSG_COUNT_KEY     = "$$msgCount"
+	WindowInputsKey = "$$windowInputs"
+	TriggerTimeKey  = "$$triggerTime"
+	MsgCountKey     = "$$msgCount"
 )
 
 func init() {
@@ -64,7 +64,7 @@ func init() {
 	gob.Register([]map[string]interface{}{})
 }
 
-func NewWindowOp(name string, w WindowConfig, streams []string, options *api.RuleOption) (*WindowOperator, error) {
+func NewWindowOp(name string, w WindowConfig, options *api.RuleOption) (*WindowOperator, error) {
 	o := new(WindowOperator)
 
 	o.defaultSinkNode = &defaultSinkNode{
@@ -78,15 +78,15 @@ func NewWindowOp(name string, w WindowConfig, streams []string, options *api.Rul
 	o.isEventTime = options.IsEventTime
 	o.window = &w
 	if o.window.Interval == 0 && o.window.Type == ast.COUNT_WINDOW {
-		// if no interval value is set and it's count window, then set interval to length value.
+		// if no interval value is set, and it's a count window, then set interval to length value.
 		o.window.Interval = o.window.Length
 	}
 	if options.IsEventTime {
 		// Create watermark generator
-		if w, err := NewWatermarkGenerator(o.window, options.LateTol, streams, o.input); err != nil {
+		if w, err := NewEventTimeTrigger(o.window); err != nil {
 			return nil, err
 		} else {
-			o.watermarkGenerator = w
+			o.trigger = w
 		}
 	}
 	return o, nil
@@ -111,7 +111,7 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 	}
 	o.statManager = stats
 	var inputs []*xsql.Tuple
-	if s, err := ctx.GetState(WINDOW_INPUTS_KEY); err == nil {
+	if s, err := ctx.GetState(WindowInputsKey); err == nil {
 		switch st := s.(type) {
 		case []*xsql.Tuple:
 			inputs = st
@@ -128,7 +128,7 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 	if !o.isEventTime {
 		o.triggerTime = conf.GetNowInMilli()
 	}
-	if s, err := ctx.GetState(TRIGGER_TIME_KEY); err == nil && s != nil {
+	if s, err := ctx.GetState(TriggerTimeKey); err == nil && s != nil {
 		if si, ok := s.(int64); ok {
 			o.triggerTime = si
 		} else {
@@ -136,7 +136,7 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 		}
 	}
 	o.msgCount = 0
-	if s, err := ctx.GetState(MSG_COUNT_KEY); err == nil && s != nil {
+	if s, err := ctx.GetState(MsgCountKey); err == nil && s != nil {
 		if si, ok := s.(int); ok {
 			o.msgCount = si
 		} else {
@@ -240,7 +240,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 
 	if firstTicker != nil {
 		firstC = firstTicker.C
-		// resume previous window
+		// resume the previous window
 		if len(inputs) > 0 && o.triggerTime > 0 {
 			nextTick := conf.GetNowInMilli() + o.interval
 			next := o.triggerTime
@@ -253,8 +253,8 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 					}
 					log.Debugf("triggered by restore inputs")
 					inputs = o.scan(inputs, next, ctx)
-					ctx.PutState(WINDOW_INPUTS_KEY, inputs)
-					ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
+					_ = ctx.PutState(WindowInputsKey, inputs)
+					_ = ctx.PutState(TriggerTimeKey, o.triggerTime)
 				}
 			case ast.SESSION_WINDOW:
 				timeout, duration := o.window.Interval, o.window.Length
@@ -289,8 +289,8 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 					}
 					log.Debugf("triggered by restore inputs")
 					inputs = o.scan(inputs, next, ctx)
-					ctx.PutState(WINDOW_INPUTS_KEY, inputs)
-					ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
+					_ = ctx.PutState(WindowInputsKey, inputs)
+					_ = ctx.PutState(TriggerTimeKey, o.triggerTime)
 				}
 			}
 		}
@@ -312,7 +312,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 			}
 			switch d := item.(type) {
 			case error:
-				o.Broadcast(d)
+				_ = o.Broadcast(d)
 				o.statManager.IncTotalExceptions(d.Error())
 			case *xsql.Tuple:
 				log.Debugf("Event window receive tuple %s", d.Message)
@@ -330,7 +330,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 						timeoutTicker = conf.GetTimer(o.window.Interval)
 						timeout = timeoutTicker.C
 						o.triggerTime = d.Timestamp
-						ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
+						_ = ctx.PutState(TriggerTimeKey, o.triggerTime)
 						log.Debugf("Session window set start time %d", o.triggerTime)
 					}
 				case ast.COUNT_WINDOW:
@@ -355,7 +355,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 							windowEnd := triggerTime
 							tsets.WindowRange = xsql.NewWindowRange(windowStart, windowEnd)
 							log.Debugf("Sent: %v", tsets)
-							o.Broadcast(tsets)
+							_ = o.Broadcast(tsets)
 							o.statManager.IncTotalRecordsOut()
 						}
 						inputs = tl.getRestTuples()
@@ -363,11 +363,11 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 				}
 				o.statManager.ProcessTimeEnd()
 				o.statManager.SetBufferLength(int64(len(o.input)))
-				ctx.PutState(WINDOW_INPUTS_KEY, inputs)
-				ctx.PutState(MSG_COUNT_KEY, o.msgCount)
+				_ = ctx.PutState(WindowInputsKey, inputs)
+				_ = ctx.PutState(MsgCountKey, o.msgCount)
 			default:
 				e := fmt.Errorf("run Window error: expect xsql.Tuple type but got %[1]T(%[1]v)", d)
-				o.Broadcast(e)
+				_ = o.Broadcast(e)
 				o.statManager.IncTotalExceptions(e.Error())
 			}
 		case now := <-firstC:
@@ -402,11 +402,11 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
 				log.Debugf("triggered by timeout")
 				inputs = o.scan(inputs, cast.TimeToUnixMilli(now), ctx)
 				_ = inputs
-				// expire all inputs, so that when timer scan there is no item
+				// expire all inputs, so that when timer scans there is no item
 				inputs = make([]*xsql.Tuple, 0)
 				o.statManager.ProcessTimeEnd()
-				ctx.PutState(WINDOW_INPUTS_KEY, inputs)
-				ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
+				_ = ctx.PutState(WindowInputsKey, inputs)
+				_ = ctx.PutState(TriggerTimeKey, o.triggerTime)
 				timeoutTicker = nil
 			}
 		// is cancelling
@@ -434,8 +434,8 @@ func (o *WindowOperator) tick(ctx api.StreamContext, inputs []*xsql.Tuple, n int
 	log.Debugf("triggered by ticker at %d", n)
 	inputs = o.scan(inputs, n, ctx)
 	o.statManager.ProcessTimeEnd()
-	ctx.PutState(WINDOW_INPUTS_KEY, inputs)
-	ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
+	_ = ctx.PutState(WindowInputsKey, inputs)
+	_ = ctx.PutState(TriggerTimeKey, o.triggerTime)
 	return inputs
 }
 
@@ -541,11 +541,8 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
 	}
 	results.WindowRange = xsql.NewWindowRange(windowStart, windowEnd)
 	log.Debugf("window %s triggered for %d tuples", o.name, len(inputs))
-	if o.isEventTime {
-		results.Sort()
-	}
 	log.Debugf("Sent: %v", results)
-	o.Broadcast(results)
+	_ = o.Broadcast(results)
 	o.statManager.IncTotalRecordsOut()
 
 	o.triggerTime = triggerTime

+ 2 - 2
internal/topo/planner/joinAlignPlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@ func (p JoinAlignPlan) Init() *JoinAlignPlan {
 	return &p
 }
 
-// Push down to table first, then push to window
+// PushDownPredicate Push down to table first, then push to window
 func (p *JoinAlignPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
 	if len(p.children) == 0 {
 		return condition, p.self

+ 3 - 3
internal/topo/planner/logicalPlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -21,9 +21,9 @@ type LogicalPlan interface {
 	SetChildren(children []LogicalPlan)
 	// PushDownPredicate pushes down the filter in the filter/where/on/having clauses as deeply as possible.
 	// It will accept a condition that is an expression slice, and return the expressions that can't be pushed.
-	// It also return the new tree of plan as it can possibly change the tree
+	// It is also return the new tree of plan as it can possibly change the tree
 	PushDownPredicate(ast.Expr) (ast.Expr, LogicalPlan)
-	// Prune the unused columns in the data source level, by pushing all needed columns down
+	// PruneColumns Prune the unused columns in the data source level, by pushing all needed columns down
 	PruneColumns(fields []ast.Expr) error
 }
 

+ 13 - 2
internal/topo/planner/planner.go

@@ -58,7 +58,7 @@ func PlanSQLWithSourcesAndSinks(rule *api.Rule, sources []*node.SourceNode, sink
 	if err != nil {
 		return nil, err
 	}
-	// Create logical plan and optimize. Logical plans are a linked list
+	// Create the logical plan and optimize. Logical plans are a linked list
 	lp, err := createLogicalPlan(stmt, rule.Options, store)
 	if err != nil {
 		return nil, err
@@ -127,6 +127,8 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 		tp.AddSrc(srcNode)
 		inputs = []api.Emitter{srcNode}
 		op = srcNode
+	case *WatermarkPlan:
+		op = node.NewWatermarkOp(fmt.Sprintf("%d_watermark", newIndex), t.Emitters, options)
 	case *AnalyticFuncsPlan:
 		op = Transform(&operator.AnalyticFuncsOp{Funcs: t.funcs}, fmt.Sprintf("%d_analytic", newIndex), options)
 	case *WindowPlan:
@@ -150,7 +152,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 			Interval:    i,
 			RawInterval: rawInterval,
 			TimeUnit:    t.timeUnit,
-		}, streamsFromStmt, options)
+		}, options)
 		if err != nil {
 			return nil, 0, err
 		}
@@ -273,6 +275,7 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 		lookupTableChildren map[string]*ast.Options
 		scanTableChildren   []LogicalPlan
 		scanTableEmitters   []string
+		streamEmitters      []string
 		w                   *ast.Window
 		ds                  ast.Dimensions
 	)
@@ -299,12 +302,20 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 			}.Init()
 			if sInfo.stmt.StreamType == ast.TypeStream {
 				children = append(children, p)
+				streamEmitters = append(streamEmitters, string(sInfo.stmt.Name))
 			} else {
 				scanTableChildren = append(scanTableChildren, p)
 				scanTableEmitters = append(scanTableEmitters, string(sInfo.stmt.Name))
 			}
 		}
 	}
+	if opt.IsEventTime {
+		p = WatermarkPlan{
+			Emitters: streamEmitters,
+		}.Init()
+		p.SetChildren(children)
+		children = []LogicalPlan{p}
+	}
 	if len(analyticFuncs) > 0 {
 		p = AnalyticFuncsPlan{
 			funcs: analyticFuncs,

+ 1 - 1
internal/topo/planner/planner_graph.go

@@ -144,7 +144,7 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 				if err != nil {
 					return nil, fmt.Errorf("parse window conf %s with %v error: %w", nodeName, gn.Props, err)
 				}
-				op, err := node.NewWindowOp(nodeName, *wconf, ruleGraph.Topo.Sources, rule.Options)
+				op, err := node.NewWindowOp(nodeName, *wconf, rule.Options)
 				if err != nil {
 					return nil, fmt.Errorf("parse window %s with %v error: %w", nodeName, gn.Props, err)
 				}

+ 44 - 0
internal/topo/planner/watermark_plan.go

@@ -0,0 +1,44 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package planner
+
+import "github.com/lf-edge/ekuiper/pkg/ast"
+
+type WatermarkPlan struct {
+	baseLogicalPlan
+	Emitters []string
+}
+
+func (p WatermarkPlan) Init() *WatermarkPlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}
+
+// PushDownPredicate Push down all the conditions to the data source.
+// The condition here must be safe to push down or it will be catched by above planner, such as countWindow planner.
+func (p *WatermarkPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
+	if len(p.children) == 0 {
+		return condition, p.self
+	}
+	rest := condition
+	for i, child := range p.children {
+		if _, ok := child.(*DataSourcePlan); ok {
+			var newChild LogicalPlan
+			rest, newChild = child.PushDownPredicate(rest)
+			p.children[i] = newChild
+		}
+	}
+	return rest, p.self
+}

+ 2 - 5
internal/topo/topotest/mocknode/mock_sink.go

@@ -15,8 +15,6 @@
 package mocknode
 
 import (
-	"fmt"
-
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
@@ -38,7 +36,6 @@ func (m *MockSink) Open(ctx api.StreamContext) error {
 
 func (m *MockSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
-	fmt.Println("mock sink receive ", item)
 	if v, _, err := ctx.TransformOutput(item); err == nil {
 		logger.Debugf("mock sink receive %s", item)
 		m.results = append(m.results, v)
@@ -48,12 +45,12 @@ func (m *MockSink) Collect(ctx api.StreamContext, item interface{}) error {
 	return nil
 }
 
-func (m *MockSink) Close(ctx api.StreamContext) error {
+func (m *MockSink) Close(_ api.StreamContext) error {
 	// do nothing
 	return nil
 }
 
-func (m *MockSink) Configure(props map[string]interface{}) error {
+func (m *MockSink) Configure(_ map[string]interface{}) error {
 	return nil
 }
 

+ 94 - 91
internal/topo/topotest/window_rule_test.go

@@ -771,10 +771,10 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_3_project_0_exceptions_total":   int64(0),
-				"op_3_project_0_process_latency_us": int64(0),
-				"op_3_project_0_records_in_total":   int64(5),
-				"op_3_project_0_records_out_total":  int64(5),
+				"op_4_project_0_exceptions_total":   int64(0),
+				"op_4_project_0_process_latency_us": int64(0),
+				"op_4_project_0_records_in_total":   int64(5),
+				"op_4_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -784,10 +784,13 @@ func TestEventWindow(t *testing.T) {
 				"source_demoE_0_records_in_total":  int64(6),
 				"source_demoE_0_records_out_total": int64(6),
 
-				"op_2_window_0_exceptions_total":   int64(0),
-				"op_2_window_0_process_latency_us": int64(0),
-				"op_2_window_0_records_in_total":   int64(6),
-				"op_2_window_0_records_out_total":  int64(5),
+				"op_3_window_0_exceptions_total":   int64(0),
+				"op_3_window_0_process_latency_us": int64(0),
+				"op_3_window_0_records_in_total":   int64(4),
+				"op_3_window_0_records_out_total":  int64(5),
+
+				"op_2_watermark_0_records_in_total":  int64(6),
+				"op_2_watermark_0_records_out_total": int64(4),
 			},
 		}, {
 			Name: `TestEventWindowRule2`,
@@ -807,10 +810,10 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_4_project_0_exceptions_total":   int64(0),
-				"op_4_project_0_process_latency_us": int64(0),
-				"op_4_project_0_records_in_total":   int64(2),
-				"op_4_project_0_records_out_total":  int64(2),
+				"op_5_project_0_exceptions_total":   int64(0),
+				"op_5_project_0_process_latency_us": int64(0),
+				"op_5_project_0_records_in_total":   int64(2),
+				"op_5_project_0_records_out_total":  int64(2),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(2),
@@ -820,15 +823,15 @@ func TestEventWindow(t *testing.T) {
 				"source_demoE_0_records_in_total":  int64(6),
 				"source_demoE_0_records_out_total": int64(6),
 
-				"op_2_window_0_exceptions_total":   int64(0),
-				"op_2_window_0_process_latency_us": int64(0),
-				"op_2_window_0_records_in_total":   int64(6),
-				"op_2_window_0_records_out_total":  int64(5),
+				"op_3_window_0_exceptions_total":   int64(0),
+				"op_3_window_0_process_latency_us": int64(0),
+				"op_3_window_0_records_in_total":   int64(4),
+				"op_3_window_0_records_out_total":  int64(5),
 
-				"op_3_filter_0_exceptions_total":   int64(0),
-				"op_3_filter_0_process_latency_us": int64(0),
-				"op_3_filter_0_records_in_total":   int64(5),
-				"op_3_filter_0_records_out_total":  int64(2),
+				"op_4_filter_0_exceptions_total":   int64(0),
+				"op_4_filter_0_process_latency_us": int64(0),
+				"op_4_filter_0_records_in_total":   int64(5),
+				"op_4_filter_0_records_out_total":  int64(2),
 			},
 		}, {
 			Name: `TestEventWindowRule3`,
@@ -865,10 +868,10 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_5_project_0_exceptions_total":   int64(0),
-				"op_5_project_0_process_latency_us": int64(0),
-				"op_5_project_0_records_in_total":   int64(5),
-				"op_5_project_0_records_out_total":  int64(5),
+				"op_6_project_0_exceptions_total":   int64(0),
+				"op_6_project_0_process_latency_us": int64(0),
+				"op_6_project_0_records_in_total":   int64(5),
+				"op_6_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -882,15 +885,15 @@ func TestEventWindow(t *testing.T) {
 				"source_demo1E_0_records_in_total":  int64(6),
 				"source_demo1E_0_records_out_total": int64(6),
 
-				"op_3_window_0_exceptions_total":   int64(0),
-				"op_3_window_0_process_latency_us": int64(0),
-				"op_3_window_0_records_in_total":   int64(12),
-				"op_3_window_0_records_out_total":  int64(5),
+				"op_4_window_0_exceptions_total":   int64(0),
+				"op_4_window_0_process_latency_us": int64(0),
+				"op_4_window_0_records_in_total":   int64(9),
+				"op_4_window_0_records_out_total":  int64(5),
 
-				"op_4_join_0_exceptions_total":   int64(0),
-				"op_4_join_0_process_latency_us": int64(0),
-				"op_4_join_0_records_in_total":   int64(5),
-				"op_4_join_0_records_out_total":  int64(5),
+				"op_5_join_0_exceptions_total":   int64(0),
+				"op_5_join_0_process_latency_us": int64(0),
+				"op_5_join_0_records_in_total":   int64(5),
+				"op_5_join_0_records_out_total":  int64(5),
 			},
 		}, {
 			Name: `TestEventWindowRule4`,
@@ -931,10 +934,10 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_5_project_0_exceptions_total":   int64(0),
-				"op_5_project_0_process_latency_us": int64(0),
-				"op_5_project_0_records_in_total":   int64(4),
-				"op_5_project_0_records_out_total":  int64(4),
+				"op_6_project_0_exceptions_total":   int64(0),
+				"op_6_project_0_process_latency_us": int64(0),
+				"op_6_project_0_records_in_total":   int64(4),
+				"op_6_project_0_records_out_total":  int64(4),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(4),
@@ -944,20 +947,20 @@ func TestEventWindow(t *testing.T) {
 				"source_demoE_0_records_in_total":  int64(6),
 				"source_demoE_0_records_out_total": int64(6),
 
-				"op_2_window_0_exceptions_total":   int64(0),
-				"op_2_window_0_process_latency_us": int64(0),
-				"op_2_window_0_records_in_total":   int64(6),
-				"op_2_window_0_records_out_total":  int64(4),
+				"op_3_window_0_exceptions_total":   int64(0),
+				"op_3_window_0_process_latency_us": int64(0),
+				"op_3_window_0_records_in_total":   int64(4),
+				"op_3_window_0_records_out_total":  int64(4),
 
-				"op_3_aggregate_0_exceptions_total":   int64(0),
-				"op_3_aggregate_0_process_latency_us": int64(0),
-				"op_3_aggregate_0_records_in_total":   int64(4),
-				"op_3_aggregate_0_records_out_total":  int64(4),
+				"op_4_aggregate_0_exceptions_total":   int64(0),
+				"op_4_aggregate_0_process_latency_us": int64(0),
+				"op_4_aggregate_0_records_in_total":   int64(4),
+				"op_4_aggregate_0_records_out_total":  int64(4),
 
-				"op_4_order_0_exceptions_total":   int64(0),
-				"op_4_order_0_process_latency_us": int64(0),
-				"op_4_order_0_records_in_total":   int64(4),
-				"op_4_order_0_records_out_total":  int64(4),
+				"op_5_order_0_exceptions_total":   int64(0),
+				"op_5_order_0_process_latency_us": int64(0),
+				"op_5_order_0_records_in_total":   int64(4),
+				"op_5_order_0_records_out_total":  int64(4),
 			},
 		}, {
 			Name: `TestEventWindowRule5`,
@@ -986,10 +989,10 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_3_project_0_exceptions_total":   int64(0),
-				"op_3_project_0_process_latency_us": int64(0),
-				"op_3_project_0_records_in_total":   int64(4),
-				"op_3_project_0_records_out_total":  int64(4),
+				"op_4_project_0_exceptions_total":   int64(0),
+				"op_4_project_0_process_latency_us": int64(0),
+				"op_4_project_0_records_in_total":   int64(4),
+				"op_4_project_0_records_out_total":  int64(4),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(4),
@@ -999,10 +1002,10 @@ func TestEventWindow(t *testing.T) {
 				"source_sessionDemoE_0_records_in_total":  int64(12),
 				"source_sessionDemoE_0_records_out_total": int64(12),
 
-				"op_2_window_0_exceptions_total":   int64(0),
-				"op_2_window_0_process_latency_us": int64(0),
-				"op_2_window_0_records_in_total":   int64(12),
-				"op_2_window_0_records_out_total":  int64(4),
+				"op_3_window_0_exceptions_total":   int64(0),
+				"op_3_window_0_process_latency_us": int64(0),
+				"op_3_window_0_records_in_total":   int64(10),
+				"op_3_window_0_records_out_total":  int64(4),
 			},
 		}, {
 			Name: `TestEventWindowRule6`,
@@ -1026,10 +1029,10 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_5_project_0_exceptions_total":   int64(0),
-				"op_5_project_0_process_latency_us": int64(0),
-				"op_5_project_0_records_in_total":   int64(5),
-				"op_5_project_0_records_out_total":  int64(5),
+				"op_6_project_0_exceptions_total":   int64(0),
+				"op_6_project_0_process_latency_us": int64(0),
+				"op_6_project_0_records_in_total":   int64(5),
+				"op_6_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -1043,14 +1046,14 @@ func TestEventWindow(t *testing.T) {
 				"source_demo1E_0_records_in_total":  int64(6),
 				"source_demo1E_0_records_out_total": int64(6),
 
-				"op_3_window_0_exceptions_total":  int64(0),
-				"op_3_window_0_records_in_total":  int64(12),
-				"op_3_window_0_records_out_total": int64(5),
+				"op_4_window_0_exceptions_total":  int64(0),
+				"op_4_window_0_records_in_total":  int64(9),
+				"op_4_window_0_records_out_total": int64(5),
 
-				"op_4_join_0_exceptions_total":   int64(0),
-				"op_4_join_0_process_latency_us": int64(0),
-				"op_4_join_0_records_in_total":   int64(5),
-				"op_4_join_0_records_out_total":  int64(5),
+				"op_5_join_0_exceptions_total":   int64(0),
+				"op_5_join_0_process_latency_us": int64(0),
+				"op_5_join_0_records_in_total":   int64(5),
+				"op_5_join_0_records_out_total":  int64(5),
 			},
 		}, {
 			Name: `TestEventWindowRule7`,
@@ -1090,10 +1093,10 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_3_project_0_exceptions_total":   int64(1),
-				"op_3_project_0_process_latency_us": int64(0),
-				"op_3_project_0_records_in_total":   int64(6),
-				"op_3_project_0_records_out_total":  int64(5),
+				"op_4_project_0_exceptions_total":   int64(1),
+				"op_4_project_0_process_latency_us": int64(0),
+				"op_4_project_0_records_in_total":   int64(6),
+				"op_4_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(6),
@@ -1103,10 +1106,10 @@ func TestEventWindow(t *testing.T) {
 				"source_demoErr_0_records_in_total":  int64(6),
 				"source_demoErr_0_records_out_total": int64(6),
 
-				"op_2_window_0_exceptions_total":   int64(1),
-				"op_2_window_0_process_latency_us": int64(0),
-				"op_2_window_0_records_in_total":   int64(6),
-				"op_2_window_0_records_out_total":  int64(5),
+				"op_3_window_0_exceptions_total":   int64(1),
+				"op_3_window_0_process_latency_us": int64(0),
+				"op_3_window_0_records_in_total":   int64(4),
+				"op_3_window_0_records_out_total":  int64(5),
 			},
 		}, {
 			Name: `TestEventWindowRule8`,
@@ -1155,10 +1158,10 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_3_project_0_exceptions_total":   int64(0),
-				"op_3_project_0_process_latency_us": int64(0),
-				"op_3_project_0_records_in_total":   int64(4),
-				"op_3_project_0_records_out_total":  int64(4),
+				"op_4_project_0_exceptions_total":   int64(0),
+				"op_4_project_0_process_latency_us": int64(0),
+				"op_4_project_0_records_in_total":   int64(4),
+				"op_4_project_0_records_out_total":  int64(4),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(4),
@@ -1168,10 +1171,10 @@ func TestEventWindow(t *testing.T) {
 				"source_sessionDemoE_0_records_in_total":  int64(12),
 				"source_sessionDemoE_0_records_out_total": int64(12),
 
-				"op_2_window_0_exceptions_total":   int64(0),
-				"op_2_window_0_process_latency_us": int64(0),
-				"op_2_window_0_records_in_total":   int64(12),
-				"op_2_window_0_records_out_total":  int64(4),
+				"op_3_window_0_exceptions_total":   int64(0),
+				"op_3_window_0_process_latency_us": int64(0),
+				"op_3_window_0_records_in_total":   int64(10),
+				"op_3_window_0_records_out_total":  int64(4),
 			},
 		}, {
 			Name: `TestEventWindowRule9`,
@@ -1216,10 +1219,10 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_3_project_0_exceptions_total":   int64(0),
-				"op_3_project_0_process_latency_us": int64(0),
-				"op_3_project_0_records_in_total":   int64(5),
-				"op_3_project_0_records_out_total":  int64(5),
+				"op_4_project_0_exceptions_total":   int64(0),
+				"op_4_project_0_process_latency_us": int64(0),
+				"op_4_project_0_records_in_total":   int64(5),
+				"op_4_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
 				"sink_mockSink_0_records_in_total":  int64(5),
@@ -1229,10 +1232,10 @@ func TestEventWindow(t *testing.T) {
 				"source_demoE_0_records_in_total":  int64(6),
 				"source_demoE_0_records_out_total": int64(6),
 
-				"op_2_window_0_exceptions_total":   int64(0),
-				"op_2_window_0_process_latency_us": int64(0),
-				"op_2_window_0_records_in_total":   int64(6),
-				"op_2_window_0_records_out_total":  int64(5),
+				"op_3_window_0_exceptions_total":   int64(0),
+				"op_3_window_0_process_latency_us": int64(0),
+				"op_3_window_0_records_in_total":   int64(4),
+				"op_3_window_0_records_out_total":  int64(5),
 			},
 		},
 	}

+ 0 - 10
internal/xsql/collection.go

@@ -15,8 +15,6 @@
 package xsql
 
 import (
-	"sort"
-
 	"github.com/lf-edge/ekuiper/pkg/ast"
 )
 
@@ -187,14 +185,6 @@ func (w *WindowTuples) AddTuple(tuple *Tuple) *WindowTuples {
 	return w
 }
 
-// Sort by tuple timestamp
-func (w *WindowTuples) Sort() {
-	w.cachedMap = nil
-	sort.SliceStable(w.Content, func(i, j int) bool {
-		return w.Content[i].(Event).GetTimestamp() < w.Content[j].(Event).GetTimestamp()
-	})
-}
-
 func (w *WindowTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
 	var result []interface{}
 	for _, t := range w.Content {

+ 13 - 1
internal/xsql/row.go

@@ -229,7 +229,7 @@ type Alias struct {
 }
 
 /*
- *   All row types definitions, watermark, barrier
+ * All row types definitions, watermark, barrier
  */
 
 // Tuple The input row, produced by the source
@@ -246,6 +246,18 @@ type Tuple struct {
 
 var _ TupleRow = &Tuple{}
 
+type WatermarkTuple struct {
+	Timestamp int64
+}
+
+func (t *WatermarkTuple) GetTimestamp() int64 {
+	return t.Timestamp
+}
+
+func (t *WatermarkTuple) IsWatermark() bool {
+	return true
+}
+
 // JoinTuple is a row produced by a join operation
 type JoinTuple struct {
 	Tuples []TupleRow // The content is immutable, but the slice may be add or removed