|
@@ -1,4 +1,4 @@
|
|
|
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
|
|
|
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
|
|
|
//
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
// you may not use this file except in compliance with the License.
|
|
@@ -17,7 +17,11 @@ package node
|
|
|
import (
|
|
|
"encoding/gob"
|
|
|
"fmt"
|
|
|
+ "math"
|
|
|
+ "time"
|
|
|
+
|
|
|
"github.com/benbjohnson/clock"
|
|
|
+
|
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo/node/metric"
|
|
|
"github.com/lf-edge/ekuiper/internal/xsql"
|
|
@@ -25,8 +29,6 @@ import (
|
|
|
"github.com/lf-edge/ekuiper/pkg/ast"
|
|
|
"github.com/lf-edge/ekuiper/pkg/cast"
|
|
|
"github.com/lf-edge/ekuiper/pkg/infra"
|
|
|
- "math"
|
|
|
- "time"
|
|
|
)
|
|
|
|
|
|
type WindowConfig struct {
|
|
@@ -366,7 +368,8 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
|
|
|
o.statManager.ProcessTimeStart()
|
|
|
log.Debugf("triggered by timeout")
|
|
|
inputs = o.scan(inputs, cast.TimeToUnixMilli(now), ctx)
|
|
|
- //expire all inputs, so that when timer scan there is no item
|
|
|
+ _ = inputs
|
|
|
+ // expire all inputs, so that when timer scan there is no item
|
|
|
inputs = make([]*xsql.Tuple, 0)
|
|
|
o.statManager.ProcessTimeEnd()
|
|
|
ctx.PutState(WINDOW_INPUTS_KEY, inputs)
|
|
@@ -463,7 +466,7 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
|
|
|
windowEnd = triggerTime
|
|
|
)
|
|
|
if o.window.Type == ast.HOPPING_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
|
|
|
- delta = o.calDelta(triggerTime, delta, log)
|
|
|
+ delta = o.calDelta(triggerTime, log)
|
|
|
}
|
|
|
results := &xsql.WindowTuples{
|
|
|
Content: make([]xsql.TupleRow, 0),
|
|
@@ -517,7 +520,8 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
|
|
|
return inputs[:i]
|
|
|
}
|
|
|
|
|
|
-func (o *WindowOperator) calDelta(triggerTime int64, delta int64, log api.Logger) int64 {
|
|
|
+func (o *WindowOperator) calDelta(triggerTime int64, log api.Logger) int64 {
|
|
|
+ var delta int64
|
|
|
lastTriggerTime := o.triggerTime
|
|
|
if lastTriggerTime <= 0 {
|
|
|
delta = math.MaxInt16 //max int, all events for the initial window
|