Browse Source

fix(function): window_start and window_end should be universal for non-aggregate and aggregate

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 years ago
parent
commit
4be9eb49c2

+ 2 - 2
docs/en_US/sqls/built-in_functions.md

@@ -16,8 +16,6 @@ Aggregate functions perform a calculation on a set of values and return a single
 | sum      | sum(col1)   | The sum of all the values in a group. The null values will be ignored.           |
 | sum      | sum(col1)   | The sum of all the values in a group. The null values will be ignored.           |
 | collect   | collect(*), collect(col1)   | Returns an array with all column or the whole record (when the parameter is *) values from the group.  |
 | collect   | collect(*), collect(col1)   | Returns an array with all column or the whole record (when the parameter is *) values from the group.  |
 | deduplicate| deduplicate(col, false)   | Returns the deduplicate results in the group, usually a window. The first argument is the column as the key to deduplicate; the second argument is whether to return all items or just the latest item which is not duplicate. If the latest item is a duplicate, the sink will receive an empty map. Set the sink property [omitIfEmpty](../rules/overview.md#sink_actions) to the sink to not triggering the action.   |
 | deduplicate| deduplicate(col, false)   | Returns the deduplicate results in the group, usually a window. The first argument is the column as the key to deduplicate; the second argument is whether to return all items or just the latest item which is not duplicate. If the latest item is a duplicate, the sink will receive an empty map. Set the sink property [omitIfEmpty](../rules/overview.md#sink_actions) to the sink to not triggering the action.   |
-| window_start| window_start()   | Return the window start timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.   |
-| window_end| window_end()   | Return the window end timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.  |
 
 
 ### Collect() Examples
 ### Collect() Examples
 
 
@@ -170,3 +168,5 @@ When casting to datetime type, the supported column type and casting rule are:
 | tstamp      | tstamp()          | Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 |
 | tstamp      | tstamp()          | Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 |
 | mqtt        | mqtt(topic)       | Returns the MQTT meta-data of specified key. The current supported keys<br />- topic: return the topic of message.  If there are multiple stream source, then specify the source name in parameter. Such as ``mqtt(src1.topic)``<br />- messageid: return the message id of message. If there are multiple stream source, then specify the source name in parameter. Such as ``mqtt(src2.messageid)`` |
 | mqtt        | mqtt(topic)       | Returns the MQTT meta-data of specified key. The current supported keys<br />- topic: return the topic of message.  If there are multiple stream source, then specify the source name in parameter. Such as ``mqtt(src1.topic)``<br />- messageid: return the message id of message. If there are multiple stream source, then specify the source name in parameter. Such as ``mqtt(src2.messageid)`` |
 | meta        | meta(topic)       | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as ``meta(device)``<br />- A qualified key to specify the stream, such as ``meta(src1.device)`` <br />- A key with arrow for multi level meta data, such as ``meta(src1.reading->device->name)`` This assumes reading is a map structure meta data. |
 | meta        | meta(topic)       | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as ``meta(device)``<br />- A qualified key to specify the stream, such as ``meta(src1.device)`` <br />- A key with arrow for multi level meta data, such as ``meta(src1.reading->device->name)`` This assumes reading is a map structure meta data. |
+| window_start| window_start()   | Return the window start timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.   |
+| window_end| window_end()   | Return the window end timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.  |

+ 2 - 2
docs/zh_CN/sqls/built-in_functions.md

@@ -16,8 +16,6 @@ eKuiper 具有许多内置函数,可以对数据执行计算。
 | sum      | sum(col1) | 组中所有值的总和。空值不参与计算。 |
 | sum      | sum(col1) | 组中所有值的总和。空值不参与计算。 |
 | collect   | collect(*), collect(col1)   | 返回组中指定的列或整个消息(参数为*时)的值组成的数组。    |
 | collect   | collect(*), collect(col1)   | 返回组中指定的列或整个消息(参数为*时)的值组成的数组。    |
 | deduplicate| deduplicate(col, false)   | 返回当前组去重的结果,通常用在窗口中。其中,第一个参数指定用于去重的列;第二个参数指定是否返回全部结果。若为 false ,则仅返回最近的未重复的项;若最近的项有重复,则返回空数组;此时可以设置 sink 参数 [omitIfEmpty](../rules/overview.md#sink_actions),使得 sink 接到空结果后不触发。   |
 | deduplicate| deduplicate(col, false)   | 返回当前组去重的结果,通常用在窗口中。其中,第一个参数指定用于去重的列;第二个参数指定是否返回全部结果。若为 false ,则仅返回最近的未重复的项;若最近的项有重复,则返回空数组;此时可以设置 sink 参数 [omitIfEmpty](../rules/overview.md#sink_actions),使得 sink 接到空结果后不触发。   |
-| window_start| window_start()   | 返回窗口的开始时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。   |
-| window_end| window_end()   | 返回窗口的结束时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。   |
 
 
 ### Collect() 示例
 ### Collect() 示例
 
 
@@ -170,3 +168,5 @@ eKuiper 具有许多内置函数,可以对数据执行计算。
 | tstamp      | tstamp()          | 返回当前时间戳,以1970年1月1日星期四00:00:00协调世界时(UTC)为单位。 |
 | tstamp      | tstamp()          | 返回当前时间戳,以1970年1月1日星期四00:00:00协调世界时(UTC)为单位。 |
 | mqtt        | mqtt(topic)       | 返回指定键的 MQTT 元数据。 当前支持的键包括<br />-topic:返回消息的主题。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src1.topic)`<br />- messageid:返回消息的消息ID。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src2.messageid)` |
 | mqtt        | mqtt(topic)       | 返回指定键的 MQTT 元数据。 当前支持的键包括<br />-topic:返回消息的主题。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src1.topic)`<br />- messageid:返回消息的消息ID。 如果有多个流源,则在参数中指定源名称。 如 `mqtt(src2.messageid)` |
 | meta        | meta(topic)       | 返回指定键的元数据。 键可能是:<br/>-如果 from 子句中只有一个来源,则为独立键,例如`meta(device)`<br />-用于指定流的合格键,例如 `meta(src1.device)` <br />-用于多级元数据的带有箭头的键,例如 `meta(src1.reading->device->name)`。这里假定读取是地图结构元数据。 |
 | meta        | meta(topic)       | 返回指定键的元数据。 键可能是:<br/>-如果 from 子句中只有一个来源,则为独立键,例如`meta(device)`<br />-用于指定流的合格键,例如 `meta(src1.device)` <br />-用于多级元数据的带有箭头的键,例如 `meta(src1.reading->device->name)`。这里假定读取是地图结构元数据。 |
+| window_start| window_start()   | 返回窗口的开始时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。   |
+| window_end| window_end()   | 返回窗口的结束时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。   |

+ 2 - 2
internal/topo/operator/aggregate_operator.go

@@ -29,7 +29,7 @@ type AggregateOp struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: xsql.GroupedTuplesSet
  *  output: xsql.GroupedTuplesSet
  */
  */
-func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
+func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	log.Debugf("aggregate plan receive %s", data)
 	log.Debugf("aggregate plan receive %s", data)
 	grouped := data
 	grouped := data
@@ -66,7 +66,7 @@ func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fu
 		result := make(map[string]*xsql.GroupedTuples)
 		result := make(map[string]*xsql.GroupedTuples)
 		for _, m := range ms {
 		for _, m := range ms {
 			var name string
 			var name string
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, fv)}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, &xsql.WindowRangeValuer{WindowRange: wr}, fv)}
 			for _, d := range p.Dimensions {
 			for _, d := range p.Dimensions {
 				r := ve.Eval(d.Expr)
 				r := ve.Eval(d.Expr)
 				if _, ok := r.(error); ok {
 				if _, ok := r.(error); ok {

+ 10 - 0
internal/topo/operator/project_operator.go

@@ -105,6 +105,16 @@ func (pp *ProjectOp) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xs
 	if pp.IsAggregate {
 	if pp.IsAggregate {
 		return &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, fv, tuple, fv, afv, &xsql.WildcardValuer{Data: tuple})}
 		return &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, fv, tuple, fv, afv, &xsql.WildcardValuer{Data: tuple})}
 	} else {
 	} else {
+		var wr *xsql.WindowRange
+		switch input := agg.(type) {
+		case xsql.WindowTuplesSet:
+			wr = input.WindowRange
+		case *xsql.JoinTupleSets:
+			wr = input.WindowRange
+		}
+		if wr != nil {
+			return &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.WindowRangeValuer{WindowRange: wr}, fv, &xsql.WildcardValuer{Data: tuple})}
+		}
 		return &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv, &xsql.WildcardValuer{Data: tuple})}
 		return &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv, &xsql.WildcardValuer{Data: tuple})}
 	}
 	}
 }
 }

+ 44 - 0
internal/topo/topotest/window_rule_test.go

@@ -566,11 +566,19 @@ func TestWindow(t *testing.T) {
 					"color":        "red",
 					"color":        "red",
 					"window_start": float64(1541152485000),
 					"window_start": float64(1541152485000),
 					"window_end":   float64(1541152487000),
 					"window_end":   float64(1541152487000),
+				}, {
+					"color":        "blue",
+					"window_start": float64(1541152485000),
+					"window_end":   float64(1541152487000),
 				}},
 				}},
 				{{
 				{{
 					"color":        "red",
 					"color":        "red",
 					"window_start": float64(1541152486000),
 					"window_start": float64(1541152486000),
 					"window_end":   float64(1541152488000),
 					"window_end":   float64(1541152488000),
+				}, {
+					"color":        "blue",
+					"window_start": float64(1541152486000),
+					"window_end":   float64(1541152488000),
 				}},
 				}},
 				{{
 				{{
 					"color":        "yellow",
 					"color":        "yellow",
@@ -1199,10 +1207,34 @@ func TestEventWindow(t *testing.T) {
 					"temp":         28.1,
 					"temp":         28.1,
 					"window_start": float64(1541152487932),
 					"window_start": float64(1541152487932),
 					"window_end":   float64(1541152490000),
 					"window_end":   float64(1541152490000),
+				}, {
+					"temp":         27.4,
+					"window_start": float64(1541152487932),
+					"window_end":   float64(1541152490000),
+				}, {
+					"temp":         25.5,
+					"window_start": float64(1541152487932),
+					"window_end":   float64(1541152490000),
 				}}, {{
 				}}, {{
 					"temp":         26.2,
 					"temp":         26.2,
 					"window_start": float64(1541152490000),
 					"window_start": float64(1541152490000),
 					"window_end":   float64(1541152494000),
 					"window_end":   float64(1541152494000),
+				}, {
+					"temp":         26.8,
+					"window_start": float64(1541152490000),
+					"window_end":   float64(1541152494000),
+				}, {
+					"temp":         28.9,
+					"window_start": float64(1541152490000),
+					"window_end":   float64(1541152494000),
+				}, {
+					"temp":         29.1,
+					"window_start": float64(1541152490000),
+					"window_end":   float64(1541152494000),
+				}, {
+					"temp":         32.2,
+					"window_start": float64(1541152490000),
+					"window_end":   float64(1541152494000),
 				}}, {{
 				}}, {{
 					"temp":         30.9,
 					"temp":         30.9,
 					"window_start": float64(1541152494000),
 					"window_start": float64(1541152494000),
@@ -1246,15 +1278,27 @@ func TestEventWindow(t *testing.T) {
 					"color":        "red",
 					"color":        "red",
 					"window_start": float64(1541152486000),
 					"window_start": float64(1541152486000),
 					"window_end":   float64(1541152488000),
 					"window_end":   float64(1541152488000),
+				}, {
+					"color":        "blue",
+					"window_start": float64(1541152486000),
+					"window_end":   float64(1541152488000),
 				}},
 				}},
 				{{
 				{{
 					"color":        "blue",
 					"color":        "blue",
 					"window_start": float64(1541152487000),
 					"window_start": float64(1541152487000),
 					"window_end":   float64(1541152489000),
 					"window_end":   float64(1541152489000),
+				}, {
+					"color":        "yellow",
+					"window_start": float64(1541152487000),
+					"window_end":   float64(1541152489000),
 				}}, {{
 				}}, {{
 					"color":        "yellow",
 					"color":        "yellow",
 					"window_start": float64(1541152488000),
 					"window_start": float64(1541152488000),
 					"window_end":   float64(1541152490000),
 					"window_end":   float64(1541152490000),
+				}, {
+					"color":        "red",
+					"window_start": float64(1541152488000),
+					"window_end":   float64(1541152490000),
 				}}, {{
 				}}, {{
 					"color":        "red",
 					"color":        "red",
 					"window_start": float64(1541152489000),
 					"window_start": float64(1541152489000),

+ 25 - 16
internal/xsql/collections.go

@@ -27,8 +27,6 @@ import (
 
 
 type AggregateData interface {
 type AggregateData interface {
 	AggregateEval(expr ast.Expr, v CallValuer) []interface{}
 	AggregateEval(expr ast.Expr, v CallValuer) []interface{}
-	GetWindowStart() int64
-	GetWindowEnd() int64
 }
 }
 
 
 // Message is a valuer that substitutes values for the mapped interface.
 // Message is a valuer that substitutes values for the mapped interface.
@@ -164,14 +162,6 @@ func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
 	return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
 	return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
 }
 }
 
 
-func (t *Tuple) GetWindowStart() int64 {
-	return 0
-}
-
-func (t *Tuple) GetWindowEnd() int64 {
-	return 0
-}
-
 func (t *Tuple) GetTimestamp() int64 {
 func (t *Tuple) GetTimestamp() int64 {
 	return t.Timestamp
 	return t.Timestamp
 }
 }
@@ -211,17 +201,36 @@ type WindowTuples struct {
 	Tuples  []Tuple
 	Tuples  []Tuple
 }
 }
 
 
+type WindowRangeValuer struct {
+	*WindowRange
+}
+
+func (r *WindowRangeValuer) Value(_ string) (interface{}, bool) {
+	return nil, false
+}
+
+func (r *WindowRangeValuer) Meta(_ string) (interface{}, bool) {
+	return nil, false
+}
+
+func (r *WindowRangeValuer) AppendAlias(_ string, _ interface{}) bool {
+	return false
+}
+
 type WindowRange struct {
 type WindowRange struct {
 	WindowStart int64
 	WindowStart int64
 	WindowEnd   int64
 	WindowEnd   int64
 }
 }
 
 
-func (r *WindowRange) GetWindowStart() int64 {
-	return r.WindowStart
-}
-
-func (r *WindowRange) GetWindowEnd() int64 {
-	return r.WindowEnd
+func (r *WindowRange) FuncValue(key string) (interface{}, bool) {
+	switch key {
+	case "window_start":
+		return r.WindowStart, true
+	case "window_end":
+		return r.WindowEnd, true
+	default:
+		return nil, false
+	}
 }
 }
 
 
 type WindowTuplesSet struct {
 type WindowTuplesSet struct {

+ 8 - 0
internal/xsql/funcsAggregate.go

@@ -54,6 +54,14 @@ func (v *AggregateFunctionValuer) Value(string) (interface{}, bool) {
 func (v *AggregateFunctionValuer) Meta(string) (interface{}, bool) {
 func (v *AggregateFunctionValuer) Meta(string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
+
+func (v *AggregateFunctionValuer) FuncValue(key string) (interface{}, bool) {
+	if vv, ok := v.data.(FuncValuer); ok {
+		return vv.FuncValue(key)
+	}
+	return nil, false
+}
+
 func (*AggregateFunctionValuer) AppendAlias(string, interface{}) bool {
 func (*AggregateFunctionValuer) AppendAlias(string, interface{}) bool {
 	return false
 	return false
 }
 }

+ 29 - 11
internal/xsql/valuer.go

@@ -25,6 +25,11 @@ import (
 	"time"
 	"time"
 )
 )
 
 
+var implicitValueFuncs = map[string]bool{
+	"window_start": true,
+	"window_end":   true,
+}
+
 // Valuer is the interface that wraps the Value() method.
 // Valuer is the interface that wraps the Value() method.
 type Valuer interface {
 type Valuer interface {
 	// Value returns the value and existence flag for a given key.
 	// Value returns the value and existence flag for a given key.
@@ -41,6 +46,11 @@ type CallValuer interface {
 	Call(name string, args []interface{}) (interface{}, bool)
 	Call(name string, args []interface{}) (interface{}, bool)
 }
 }
 
 
+// FuncValuer can calculate function type value like window_start and window_end
+type FuncValuer interface {
+	FuncValue(key string) (interface{}, bool)
+}
+
 type AggregateCallValuer interface {
 type AggregateCallValuer interface {
 	CallValuer
 	CallValuer
 	GetAllTuples() AggregateData
 	GetAllTuples() AggregateData
@@ -272,6 +282,17 @@ func (a multiValuer) AppendAlias(key string, value interface{}) bool {
 	return false
 	return false
 }
 }
 
 
+func (a multiValuer) FuncValue(key string) (interface{}, bool) {
+	for _, valuer := range a {
+		if vv, ok := valuer.(FuncValuer); ok {
+			if r, ok := vv.FuncValue(key); ok {
+				return r, true
+			}
+		}
+	}
+	return nil, false
+}
+
 func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
 func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	for _, valuer := range a {
 	for _, valuer := range a {
 		if valuer, ok := valuer.(CallValuer); ok {
 		if valuer, ok := valuer.(CallValuer); ok {
@@ -371,18 +392,15 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 		}
 		}
 		return &BracketEvalResult{Start: ii, End: ii}
 		return &BracketEvalResult{Start: ii, End: ii}
 	case *ast.Call:
 	case *ast.Call:
-		if valuer, ok := v.Valuer.(CallValuer); ok {
-			switch expr.Name {
-			case "window_start", "window_end":
-				if aggreValuer, ok := valuer.(AggregateCallValuer); ok {
-					ad := aggreValuer.GetAllTuples()
-					if expr.Name == "window_start" {
-						return ad.GetWindowStart()
-					} else {
-						return ad.GetWindowEnd()
-					}
+		if _, ok := implicitValueFuncs[expr.Name]; ok {
+			if vv, ok := v.Valuer.(FuncValuer); ok {
+				val, ok := vv.FuncValue(expr.Name)
+				if ok {
+					return val
 				}
 				}
-			default:
+			}
+		} else {
+			if valuer, ok := v.Valuer.(CallValuer); ok {
 				var args []interface{}
 				var args []interface{}
 				if len(expr.Args) > 0 {
 				if len(expr.Args) > 0 {
 					args = make([]interface{}, len(expr.Args))
 					args = make([]interface{}, len(expr.Args))

+ 5 - 5
pkg/ast/functions.go

@@ -40,11 +40,9 @@ var maps = []map[string]string{
 var aggFuncMap = map[string]string{"avg": "",
 var aggFuncMap = map[string]string{"avg": "",
 	"count": "",
 	"count": "",
 	"max":   "", "min": "",
 	"max":   "", "min": "",
-	"sum":          "",
-	"collect":      "",
-	"deduplicate":  "",
-	"window_start": "",
-	"window_end":   "",
+	"sum":         "",
+	"collect":     "",
+	"deduplicate": "",
 }
 }
 
 
 var funcWithAsteriskSupportMap = map[string]string{
 var funcWithAsteriskSupportMap = map[string]string{
@@ -91,6 +89,8 @@ var jsonFuncMap = map[string]string{
 
 
 var otherFuncMap = map[string]string{"isnull": "",
 var otherFuncMap = map[string]string{"isnull": "",
 	"newuuid": "", "tstamp": "", "mqtt": "", "meta": "", "cardinality": "",
 	"newuuid": "", "tstamp": "", "mqtt": "", "meta": "", "cardinality": "",
+	"window_start": "",
+	"window_end":   "",
 }
 }
 
 
 type FuncRuntime interface {
 type FuncRuntime interface {