Pārlūkot izejas kodu

Support partition for analytic functions (#1432)

* feat(parser): Parse over partition to call expr

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* feat(func): partition by for analytic functions runtime

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* doc(func):  analytic functions

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 2 gadi atpakaļ
vecāks
revīzija
0ebeed5e5b

+ 30 - 3
docs/en_US/sqls/built-in_functions.md

@@ -160,6 +160,36 @@ When casting to datetime type, the supported column type and casting rule are:
 
 
 **Please refer to [json path functions](./json_expr.md#json-path-functions) for how to compose a json path.**  
 **Please refer to [json path functions](./json_expr.md#json-path-functions) for how to compose a json path.**  
 
 
+## Analytic Functions
+
+Analytic functions always use state to do analytic jobs. In streaming processing, analytic functions are evaluated first so that they are not affected by predicates in WHERE clause.
+
+Analytic function computations are performed over all the input events of the current query input, optionally you can limit analytic function to only consider events that match the partition_by_clause.
+
+The syntax is like:
+
+```text
+AnalyticFuncName(<arguments>...) OVER ([PARTITION BY <partition key>])
+```
+
+| Function     | Example                              | Description                                                                                                                                                                                                                                                                                                                                                                                       |
+|--------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| lag          | lag(expr, [offset], [default value]) | Return the former result of expression at offset, if not found, return the default value specified , if default value not set, return nil. if offset and default value not specified, offset is 1 and default value is nil                                                                                                                                                                        |
+| changed_col  | changed_col(true, col)               | Return the column value if it has changed from the last execution.                                                                                                                                                                                                                                                                                                                                |
+| had_changed  | had_changed(true, expr1, expr2, ...) | Return if any of the columns had changed since the last run. The expression could be * to easily detect the change status of all columns.                                                                                                                                                                                                                                                         |
+
+Example function call to get the previous temperature value:
+
+```text
+lag(temperature)
+```
+
+Example function call to get the previous temperature value with the same device id:
+
+```text
+lag(temperature) OVER (PARTITION BY deviceId)
+```
+
 ## Other Functions
 ## Other Functions
 | Function     | Example                              | Description                                                                                                                                                                                                                                                                                                                                                                                       |
 | Function     | Example                              | Description                                                                                                                                                                                                                                                                                                                                                                                       |
 |--------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 |--------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
@@ -171,9 +201,6 @@ When casting to datetime type, the supported column type and casting rule are:
 | meta         | meta(topic)                          | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as `meta(device)`<br />- A qualified key to specify the stream, such as `meta(src1.device)` <br />- A key with arrow for multi level meta data, such as `meta(src1.reading->device->name)` This assumes reading is a map structure meta data.              |
 | meta         | meta(topic)                          | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as `meta(device)`<br />- A qualified key to specify the stream, such as `meta(src1.device)` <br />- A key with arrow for multi level meta data, such as `meta(src1.reading->device->name)` This assumes reading is a map structure meta data.              |
 | window_start | window_start()                       | Return the window start timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                        |
 | window_start | window_start()                       | Return the window start timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                        |
 | window_end   | window_end()                         | Return the window end timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                          |
 | window_end   | window_end()                         | Return the window end timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                          |
-| lag          | lag(expr, [offset], [default value]) | Return the former result of expression at offset, if not found, return the default value specified , if default value not set, return nil. if offset and default value not specified, offset is 1 and default value is nil                                                                                                                                                                        |
-| changed_col  | changed_col(true, col)               | Return the column value if it has changed from the last execution.                                                                                                                                                                                                                                                                                                                                |
-| had_changed  | had_changed(true, expr1, expr2, ...) | Return if any of the columns had changed since the last run. The expression could be * to easily detect the change status of all columns.                                                                                                                                                                                                                                                         |
 
 
 ## Multiple Column Functions
 ## Multiple Column Functions
 
 

+ 2 - 2
docs/en_US/sqls/lexical_elements.md

@@ -31,7 +31,7 @@ SELECT `a-b`, `hello world`, `中文Chinese` from demo
 **Reserved keywords for rule SQL**: If you'd like to use the following keyword in rule SQL, you will have to use backtick to enclose them.
 **Reserved keywords for rule SQL**: If you'd like to use the following keyword in rule SQL, you will have to use backtick to enclose them.
 
 
 ```
 ```
-SELECT, FROM, JOIN, LEFT, INNER, ON, WHERE, GROUP, ORDER, HAVING, BY, ASC, DESC, AND, OR, CASE, WHEN, THEN, ELSE, END
+SELECT, FROM, JOIN, LEFT, INNER, ON, WHERE, GROUP, ORDER, HAVING, BY, ASC, DESC, AND, OR, CASE, WHEN, THEN, ELSE, END, IN, NOT, BETWEEN, LIKE, OVER, PARTITION
 ```
 ```
 
 
 The following is an example for using a stream named `from`, which is a reserved keyword in eKuiper.
 The following is an example for using a stream named `from`, which is a reserved keyword in eKuiper.
@@ -43,7 +43,7 @@ SELECT * FROM demo1 where `from`="device1"
 **Reserved keywords for streams management**: If you'd like to use the following keywords in stream management command, you will have to use backtick to enclose them.
 **Reserved keywords for streams management**: If you'd like to use the following keywords in stream management command, you will have to use backtick to enclose them.
 
 
 ```
 ```
-CREATE, RROP, EXPLAIN, DESCRIBE, SHOW, STREAM, STREAMS, WITH, BIGINT, FLOAT, STRING, DATETIME, BOOLEAN, ARRAY, STRUCT, DATASOURCE, KEY, FORMAT,CONF_KEY, TYPE, STRICT_VALIDATION, TIMESTAMP, TIMESTAMP_FORMAT
+CREATE, RROP, EXPLAIN, DESCRIBE, SHOW, STREAM, STREAMS, WITH, BIGINT, FLOAT, STRING, DATETIME, BOOLEAN, ARRAY, STRUCT, DATASOURCE, KEY, FORMAT,CONF_KEY, TYPE, KIND, SCHEMAID, STRICT_VALIDATION, TIMESTAMP, TIMESTAMP_FORMAT
 ```
 ```
 
 
 The following is an example for how to use reserved keywords in stream creation statement.
 The following is an example for how to use reserved keywords in stream creation statement.

+ 30 - 3
docs/zh_CN/sqls/built-in_functions.md

@@ -161,6 +161,36 @@ eKuiper 具有许多内置函数,可以对数据执行计算。
 
 
 **请参阅 [json 路径函数](./json_expr.md#Json-路径函数) 了解如何编写json路径。**
 **请参阅 [json 路径函数](./json_expr.md#Json-路径函数) 了解如何编写json路径。**
 
 
+## 分析函数
+
+分析函数会保持状态来做分析工作。在流式处理规则中,分析函数会首先被执行,这样它们就不会受到 WHERE 子句的影响而必不更新状态。
+
+分析函数的计算是在当前查询输入的所有输入事件上进行的,可以选择限制分析函数只考虑符合 PARTITION BY 子句的事件。
+
+分析函数可以使用 PARTITION BY 子句,语法如下:
+
+```text
+AnalyticFuncName(<arguments>...) OVER ([PARTITION BY <partition key>])
+```
+
+| Function    | Example                              | Description                                                                                        |
+|-------------|--------------------------------------|----------------------------------------------------------------------------------------------------|
+| lag         | lag(expr, [offset], [default value]) | 返回表达式前一个值在偏移 offset 处的结果,如果没有找到,则返回默认值,如果没有指定默认值则返回 nil。如果除 expression 外其余参数均未指定,偏移量默认为 1,默认值为 nil |
+| changed_col | changed_col(true, col)               | 返回列的相比上次执行后的变化值。若未变化则返回 null 。                                                                     |
+| had_changed | had_changed(true, expr1, expr2, ...) | 返回是否上次运行后列的值有变化。 其参数可以为 * 以方便地监测所有列。                                                               |
+
+示例1:获取之前温度值的函数
+
+```text
+lag(temperature)
+```
+
+示例2:获取相同设备之前温度值的函数
+
+```text
+lag(temperature) OVER (PARTITION BY deviceId)
+```
+
 ## 其它函数
 ## 其它函数
 | 函数           | 示例                                   | 说明                                                                                                                                                                                |
 | 函数           | 示例                                   | 说明                                                                                                                                                                                |
 |--------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 |--------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
@@ -172,9 +202,6 @@ eKuiper 具有许多内置函数,可以对数据执行计算。
 | meta         | meta(topic)                          | 返回指定键的元数据。 键可能是:<br/>-如果 from 子句中只有一个来源,则为独立键,例如`meta(device)`<br />-用于指定流的合格键,例如 `meta(src1.device)` <br />-用于多级元数据的带有箭头的键,例如 `meta(src1.reading->device->name)`。这里假定读取是地图结构元数据。 |
 | meta         | meta(topic)                          | 返回指定键的元数据。 键可能是:<br/>-如果 from 子句中只有一个来源,则为独立键,例如`meta(device)`<br />-用于指定流的合格键,例如 `meta(src1.device)` <br />-用于多级元数据的带有箭头的键,例如 `meta(src1.reading->device->name)`。这里假定读取是地图结构元数据。 |
 | window_start | window_start()                       | 返回窗口的开始时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
 | window_start | window_start()                       | 返回窗口的开始时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
 | window_end   | window_end()                         | 返回窗口的结束时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
 | window_end   | window_end()                         | 返回窗口的结束时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
-| lag          | lag(expr, [offset], [default value]) | 返回表达式前一个值在偏移 offset 处的结果,如果没有找到,则返回默认值,如果没有指定默认值则返回 nil。如果除 expression 外其余参数均未指定,偏移量默认为 1,默认值为 nil                                                                                |
-| changed_col  | changed_col(true, col)               | 返回列的相比上次执行后的变化值。若未变化则返回 null 。                                                                                                                                                    |
-| had_changed  | had_changed(true, expr1, expr2, ...) | 返回是否上次运行后列的值有变化。 其参数可以为 * 以方便地监测所有列。                                                                                                                                              |
 
 
 ## 多列函数
 ## 多列函数
 
 

+ 2 - 2
docs/zh_CN/sqls/lexical_elements.md

@@ -31,7 +31,7 @@ SELECT `a-b`, `hello world`, `中文Chinese` from demo
 **规则 SQL 的保留关键字**:如果您想在规则 SQL 中使用以下关键字,则必须使用反撇号将其括起来。
 **规则 SQL 的保留关键字**:如果您想在规则 SQL 中使用以下关键字,则必须使用反撇号将其括起来。
 
 
 ```
 ```
-SELECT, FROM, JOIN, LEFT, INNER, ON, WHERE, GROUP, ORDER, HAVING, BY, ASC, DESC, AND, OR, CASE, WHEN, THEN, ELSE, END
+SELECT, FROM, JOIN, LEFT, INNER, ON, WHERE, GROUP, ORDER, HAVING, BY, ASC, DESC, AND, OR, CASE, WHEN, THEN, ELSE, END, IN, NOT, BETWEEN, LIKE, OVER, PARTITION
 ```
 ```
 
 
 以下是使用名为 `from` 的流的示例,`from` 是 eKuiper 中的保留关键字。
 以下是使用名为 `from` 的流的示例,`from` 是 eKuiper 中的保留关键字。
@@ -43,7 +43,7 @@ SELECT * FROM demo1 where `from`="device1"
 **用于流管理的保留关键字**:如果您想在流管理命令中使用以下关键字,则必须使用反撇号将其括起来。
 **用于流管理的保留关键字**:如果您想在流管理命令中使用以下关键字,则必须使用反撇号将其括起来。
 
 
 ```
 ```
-CREATE, RROP, EXPLAIN, DESCRIBE, SHOW, STREAM, STREAMS, WITH, BIGINT, FLOAT, STRING, DATETIME, BOOLEAN, ARRAY, STRUCT, DATASOURCE, KEY, FORMAT,CONF_KEY, TYPE, STRICT_VALIDATION, TIMESTAMP, TIMESTAMP_FORMAT
+CREATE, RROP, EXPLAIN, DESCRIBE, SHOW, STREAM, STREAMS, WITH, BIGINT, FLOAT, STRING, DATETIME, BOOLEAN, ARRAY, STRUCT, DATASOURCE, KEY, FORMAT,CONF_KEY, TYPE, KIND, SCHEMAID, STRICT_VALIDATION, TIMESTAMP, TIMESTAMP_FORMAT
 ```
 ```
 
 
 以下是如何在流创建语句中使用保留关键字的示例。
 以下是如何在流创建语句中使用保留关键字的示例。

+ 177 - 0
internal/binder/function/funcs_analytic.go

@@ -0,0 +1,177 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package function
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"reflect"
+	"strconv"
+)
+
+// registerAnalyticFunc registers the analytic functions
+// The last parameter of the function is always the partition key
+func registerAnalyticFunc() {
+	builtins["changed_col"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			ignoreNull, ok := args[0].(bool)
+			if !ok {
+				return fmt.Errorf("first arg is not a bool but got %v", args[0]), false
+			}
+			if ignoreNull && args[1] == nil {
+				return nil, true
+			}
+			key := args[len(args)-1].(string)
+			lv, err := ctx.GetState(key)
+			if err != nil {
+				return err, false
+			}
+			if !reflect.DeepEqual(args[1], lv) {
+				err := ctx.PutState(key, args[1])
+				if err != nil {
+					return err, false
+				}
+				return args[1], true
+			}
+			return nil, true
+		},
+		val: func(_ api.FunctionContext, args []ast.Expr) error {
+			if err := ValidateLen(2, len(args)); err != nil {
+				return err
+			}
+			if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsStringArg(args[0]) {
+				return ProduceErrInfo(0, "boolean")
+			}
+			return nil
+		},
+	}
+	builtins["had_changed"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			l := len(args) - 1
+			if l <= 1 {
+				return fmt.Errorf("expect more than one arg but got %d", len(args)), false
+			}
+			ignoreNull, ok := args[0].(bool)
+			if !ok {
+				return fmt.Errorf("first arg is not a bool but got %v", args[0]), false
+			}
+			key := args[l].(string)
+			result := false
+			for i := 1; i < l; i++ {
+				v := args[i]
+				k := key + strconv.Itoa(i)
+				if ignoreNull && v == nil {
+					continue
+				}
+				lv, err := ctx.GetState(k)
+				if err != nil {
+					return fmt.Errorf("error getting state for %s: %v", k, err), false
+				}
+				if !reflect.DeepEqual(v, lv) {
+					result = true
+					err := ctx.PutState(k, v)
+					if err != nil {
+						return fmt.Errorf("error setting state for %s: %v", k, err), false
+					}
+				}
+			}
+			return result, true
+		},
+		val: func(_ api.FunctionContext, args []ast.Expr) error {
+			if len(args) <= 1 {
+				return fmt.Errorf("expect more than one arg but got %d", len(args))
+			}
+			if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsStringArg(args[0]) {
+				return ProduceErrInfo(0, "bool")
+			}
+			return nil
+		},
+	}
+
+	builtins["lag"] = builtinFunc{
+		fType: ast.FuncTypeScalar,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			l := len(args) - 1
+			key := args[l].(string)
+			if l != 1 && l != 2 && l != 3 {
+				return fmt.Errorf("expect one two or three args but got %d", l), false
+			}
+			v, err := ctx.GetState(key)
+			if err != nil {
+				return fmt.Errorf("error getting state for %s: %v", key, err), false
+			}
+			if v == nil {
+				size := 0
+				var dftVal interface{} = nil
+				if l == 3 {
+					dftVal = args[2]
+				}
+				// first time call, need create state for lag
+				if l == 1 {
+					size = 1
+				} else {
+					siz, ok := args[1].(int)
+					if !ok {
+						return fmt.Errorf("second arg is not a int but got %v", args[1]), false
+					}
+					size = siz
+				}
+
+				rq := newRingqueue(size)
+				rq.fill(dftVal)
+
+				rtnVal, _ := rq.fetch()
+				rq.append(args[0])
+				err := ctx.PutState(key, rq)
+				if err != nil {
+					return fmt.Errorf("error setting state for %s: %v", key, err), false
+				}
+				return rtnVal, true
+			} else {
+				rq, ok := v.(*ringqueue)
+				if !ok {
+					return fmt.Errorf("error getting state for %s: %v", key, err), false
+				}
+				rtnVal, _ := rq.fetch()
+				rq.append(args[0])
+				err := ctx.PutState(key, rq)
+				if err != nil {
+					return fmt.Errorf("error setting state for %s: %v", key, err), false
+				}
+				return rtnVal, true
+			}
+		},
+		val: func(_ api.FunctionContext, args []ast.Expr) error {
+			l := len(args)
+			if l != 1 && l != 2 && l != 3 {
+				return fmt.Errorf("expect one two or three args but got %d", l)
+			}
+			if l >= 2 {
+				if ast.IsFloatArg(args[1]) || ast.IsTimeArg(args[1]) || ast.IsBooleanArg(args[1]) || ast.IsStringArg(args[1]) || ast.IsFieldRefArg(args[1]) {
+					return ProduceErrInfo(1, "int")
+				}
+				if s, ok := args[1].(*ast.IntegerLiteral); ok {
+					if s.Val < 0 {
+						return fmt.Errorf("the index should not be a nagtive integer")
+					}
+				}
+			}
+			return nil
+		},
+	}
+}

+ 818 - 0
internal/binder/function/funcs_analytic_test.go

@@ -0,0 +1,818 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package function
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/state"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"reflect"
+	"testing"
+)
+
+func TestChangedColValidation(t *testing.T) {
+	f, ok := builtins["changed_col"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	var tests = []struct {
+		args []ast.Expr
+		err  error
+	}{
+		{
+			args: []ast.Expr{
+				&ast.StringLiteral{Val: "foo"},
+			},
+			err: fmt.Errorf("Expect 2 arguments but found 1."),
+		}, {
+			args: []ast.Expr{
+				&ast.StringLiteral{Val: "foo"},
+				&ast.StringLiteral{Val: "bar"},
+			},
+			err: fmt.Errorf("Expect boolean type for parameter 1"),
+		}, {
+			args: []ast.Expr{
+				&ast.StringLiteral{Val: "foo"},
+				&ast.StringLiteral{Val: "bar"},
+				&ast.StringLiteral{Val: "baz"},
+			},
+			err: fmt.Errorf("Expect 2 arguments but found 3."),
+		}, {
+			args: []ast.Expr{
+				&ast.BooleanLiteral{Val: true},
+				&ast.StringLiteral{Val: "baz"},
+			},
+		},
+	}
+	for i, tt := range tests {
+		err := f.val(nil, tt.args)
+		if !reflect.DeepEqual(err, tt.err) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, err, tt.err)
+		}
+	}
+}
+
+func TestChangedColExec(t *testing.T) {
+	f, ok := builtins["changed_col"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+				"self",
+			},
+			result: fmt.Errorf("first arg is not a bool but got foo"),
+		}, { // 1
+			args: []interface{}{
+				true,
+				"bar",
+				"self",
+			},
+			result: "bar",
+		}, { // 2
+			args: []interface{}{
+				true,
+				"bar",
+				"self",
+			},
+			result: nil,
+		}, { // 3
+			args: []interface{}{
+				true,
+				"baz",
+				"self",
+			},
+			result: "baz",
+		}, { // 4
+			args: []interface{}{
+				false,
+				nil,
+				"self",
+			},
+			result: nil,
+		}, { // 5
+			args: []interface{}{
+				false,
+				"baz",
+				"self",
+			},
+			result: "baz",
+		}, { // 6
+			args: []interface{}{
+				true,
+				"foo",
+				"self",
+			},
+			result: "foo",
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestChangedColPartition(t *testing.T) {
+	f, ok := builtins["changed_col"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+				"1",
+			},
+			result: fmt.Errorf("first arg is not a bool but got foo"),
+		}, { // 1
+			args: []interface{}{
+				true,
+				"bar",
+				"2",
+			},
+			result: "bar",
+		}, { // 2
+			args: []interface{}{
+				true,
+				"bar",
+				"1",
+			},
+			result: "bar",
+		}, { // 3
+			args: []interface{}{
+				true,
+				"baz",
+				"2",
+			},
+			result: "baz",
+		}, { // 4
+			args: []interface{}{
+				false,
+				nil,
+				"1",
+			},
+			result: nil,
+		}, { // 5
+			args: []interface{}{
+				false,
+				"baz",
+				"2",
+			},
+			result: nil,
+		}, { // 6
+			args: []interface{}{
+				true,
+				"foo",
+				"1",
+			},
+			result: "foo",
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestHadChangedValidation(t *testing.T) {
+	f, ok := builtins["had_changed"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	var tests = []struct {
+		args []ast.Expr
+		err  error
+	}{
+		{
+			args: []ast.Expr{
+				&ast.StringLiteral{Val: "foo"},
+			},
+			err: fmt.Errorf("expect more than one arg but got 1"),
+		}, {
+			args: []ast.Expr{
+				&ast.StringLiteral{Val: "foo"},
+				&ast.StringLiteral{Val: "bar"},
+				&ast.StringLiteral{Val: "baz"},
+			},
+			err: fmt.Errorf("Expect bool type for parameter 1"),
+		}, {
+			args: []ast.Expr{
+				&ast.IntegerLiteral{Val: 20},
+				&ast.BooleanLiteral{Val: true},
+				&ast.StringLiteral{Val: "baz"},
+			},
+			err: fmt.Errorf("Expect bool type for parameter 1"),
+		}, {
+			args: []ast.Expr{
+				&ast.FieldRef{
+					StreamName: "demo",
+					Name:       "a",
+					AliasRef:   nil,
+				},
+				&ast.BooleanLiteral{Val: true},
+				&ast.StringLiteral{Val: "baz"},
+			},
+			err: nil,
+		},
+	}
+	for i, tt := range tests {
+		err := f.val(nil, tt.args)
+		if !reflect.DeepEqual(err, tt.err) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, err, tt.err)
+		}
+	}
+}
+
+func TestHadChangedExec(t *testing.T) {
+	f, ok := builtins["had_changed"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+				"baz",
+				"self",
+			},
+			result: fmt.Errorf("first arg is not a bool but got foo"),
+		}, { // 1
+			args: []interface{}{
+				"foo",
+				"bar",
+				"self",
+			},
+			result: fmt.Errorf("first arg is not a bool but got foo"),
+		}, { // 2
+			args: []interface{}{
+				true,
+				"bar",
+				20,
+				"self",
+			},
+			result: true,
+		}, { // 3
+			args: []interface{}{
+				true,
+				"baz",
+				44,
+				"self",
+			},
+			result: true,
+		}, { // 4
+			args: []interface{}{
+				true,
+				"baz",
+				44,
+				"self",
+			},
+			result: false,
+		}, { // 5
+			args: []interface{}{
+				true,
+				"foo",
+				44,
+				"self",
+			},
+			result: true,
+		}, { // 6
+			args: []interface{}{
+				true,
+				"foo",
+				nil,
+				"self",
+			},
+			result: false,
+		}, { // 7
+			args: []interface{}{
+				true,
+				"foo",
+				44,
+				"self",
+			},
+			result: false,
+		}, { // 8
+			args: []interface{}{
+				true,
+				"baz",
+				44,
+				"self",
+			},
+			result: true,
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestHadChangedExecAllowNull(t *testing.T) {
+	f, ok := builtins["had_changed"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+				"baz",
+				"self",
+			},
+			result: fmt.Errorf("first arg is not a bool but got foo"),
+		}, { // 1
+			args: []interface{}{
+				"foo",
+				"bar",
+				"self",
+			},
+			result: fmt.Errorf("first arg is not a bool but got foo"),
+		}, { // 2
+			args: []interface{}{
+				false,
+				"bar",
+				20,
+				"self",
+			},
+			result: true,
+		}, { // 3
+			args: []interface{}{
+				false,
+				"baz",
+				nil,
+				"self",
+			},
+			result: true,
+		}, { // 4
+			args: []interface{}{
+				false,
+				"baz",
+				44,
+				"self",
+			},
+			result: true,
+		}, { // 5
+			args: []interface{}{
+				false,
+				nil,
+				44,
+				"self",
+			},
+			result: true,
+		}, { // 6
+			args: []interface{}{
+				false,
+				"baz",
+				44,
+				"self",
+			},
+			result: true,
+		}, { // 7
+			args: []interface{}{
+				false,
+				"baz",
+				44,
+				"self",
+			},
+			result: false,
+		}, { // 8
+			args: []interface{}{
+				false,
+				nil,
+				nil,
+				"self",
+			},
+			result: true,
+		}, { // 9
+			args: []interface{}{
+				false,
+				"baz",
+				44,
+				"self",
+			},
+			result: true,
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestHadChangedPartition(t *testing.T) {
+	f, ok := builtins["had_changed"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+				"baz",
+				"1",
+			},
+			result: fmt.Errorf("first arg is not a bool but got foo"),
+		}, { // 1
+			args: []interface{}{
+				"foo",
+				"bar",
+				"1",
+			},
+			result: fmt.Errorf("first arg is not a bool but got foo"),
+		}, { // 2
+			args: []interface{}{
+				true,
+				"bar",
+				20,
+				"3",
+			},
+			result: true,
+		}, { // 3
+			args: []interface{}{
+				true,
+				"baz",
+				44,
+				"2",
+			},
+			result: true,
+		}, { // 4
+			args: []interface{}{
+				true,
+				"baz",
+				44,
+				"2",
+			},
+			result: false,
+		}, { // 5
+			args: []interface{}{
+				true,
+				"foo",
+				44,
+				"3",
+			},
+			result: true,
+		}, { // 6
+			args: []interface{}{
+				true,
+				"foo",
+				nil,
+				"1",
+			},
+			result: true,
+		}, { // 7
+			args: []interface{}{
+				true,
+				"foo",
+				44,
+				"2",
+			},
+			result: true,
+		}, { // 8
+			args: []interface{}{
+				true,
+				"baz",
+				44,
+				"3",
+			},
+			result: true,
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestLagExec(t *testing.T) {
+	f, ok := builtins["lag"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args:   []interface{}{"self"},
+			result: fmt.Errorf("expect one two or three args but got 0"),
+		}, { // 1
+			args: []interface{}{
+				"foo",
+				"self",
+			},
+			result: nil,
+		},
+		{ // 2
+			args: []interface{}{
+				"bar",
+				"self",
+			},
+			result: "foo",
+		},
+		{ // 3
+			args: []interface{}{
+				"bar",
+				"self",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				"self",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				"self",
+			},
+			result: "foo",
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestLagPartition(t *testing.T) {
+	f, ok := builtins["lag"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args:   []interface{}{"self"},
+			result: fmt.Errorf("expect one two or three args but got 0"),
+		}, { // 1
+			args: []interface{}{
+				"foo",
+				"1",
+			},
+			result: nil,
+		},
+		{ // 2
+			args: []interface{}{
+				"bar",
+				"1",
+			},
+			result: "foo",
+		},
+		{ // 3
+			args: []interface{}{
+				"bar",
+				"2",
+			},
+			result: nil,
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				"1",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				"2",
+			},
+			result: "bar",
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestLagExecIndexWithDefaultValue(t *testing.T) {
+	f, ok := builtins["lag"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+				"baz",
+				"baw",
+				"self",
+			},
+			result: fmt.Errorf("expect one two or three args but got 4"),
+		}, { // 1
+			args: []interface{}{
+				"bar",
+				2,
+				"no result",
+				"self",
+			},
+			result: "no result",
+		},
+		{ // 2
+			args: []interface{}{
+				"bar",
+				2,
+				"no result",
+				"self",
+			},
+			result: "no result",
+		},
+		{ // 3
+			args: []interface{}{
+				"foo",
+				2,
+				"no result",
+				"self",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				2,
+				"no result",
+				"self",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				2,
+				"no result",
+				"self",
+			},
+			result: "foo",
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestLagExecIndex(t *testing.T) {
+	f, ok := builtins["lag"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+				"baz",
+				"baw",
+				"self",
+			},
+			result: fmt.Errorf("expect one two or three args but got 4"),
+		}, { // 1
+			args: []interface{}{
+				"bar",
+				2,
+				"self",
+			},
+			result: nil,
+		},
+		{ // 2
+			args: []interface{}{
+				"bar",
+				2,
+				"self",
+			},
+			result: nil,
+		},
+		{ // 3
+			args: []interface{}{
+				"foo",
+				2,
+				"self",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				2,
+				"self",
+			},
+			result: "bar",
+		},
+		{ // 4
+			args: []interface{}{
+				"foo",
+				2,
+				"self",
+			},
+			result: "foo",
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}

+ 0 - 145
internal/binder/function/funcs_misc.go

@@ -479,151 +479,6 @@ func registerMiscFunc() {
 		exec:  nil, // directly return in the valuer
 		exec:  nil, // directly return in the valuer
 		val:   ValidateNoArg,
 		val:   ValidateNoArg,
 	}
 	}
-	builtins["changed_col"] = builtinFunc{
-		fType: ast.FuncTypeScalar,
-		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
-			ignoreNull, ok := args[0].(bool)
-			if !ok {
-				return fmt.Errorf("first arg is not a bool but got %v", args[0]), false
-			}
-			if ignoreNull && args[1] == nil {
-				return nil, true
-			}
-			lv, err := ctx.GetState("self")
-			if err != nil {
-				return err, false
-			}
-			if !reflect.DeepEqual(args[1], lv) {
-				err := ctx.PutState("self", args[1])
-				if err != nil {
-					return err, false
-				}
-				return args[1], true
-			}
-			return nil, true
-		},
-		val: func(_ api.FunctionContext, args []ast.Expr) error {
-			if err := ValidateLen(2, len(args)); err != nil {
-				return err
-			}
-			if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsStringArg(args[0]) {
-				return ProduceErrInfo(0, "boolean")
-			}
-			return nil
-		},
-	}
-	builtins["had_changed"] = builtinFunc{
-		fType: ast.FuncTypeScalar,
-		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
-			if len(args) <= 1 {
-				return fmt.Errorf("expect more than one arg but got %d", len(args)), false
-			}
-			ignoreNull, ok := args[0].(bool)
-			if !ok {
-				return fmt.Errorf("first arg is not a bool but got %v", args[0]), false
-			}
-			result := false
-			for i := 1; i < len(args); i++ {
-				v := args[i]
-				k := strconv.Itoa(i)
-				if ignoreNull && v == nil {
-					continue
-				}
-				lv, err := ctx.GetState(k)
-				if err != nil {
-					return fmt.Errorf("error getting state for %s: %v", k, err), false
-				}
-				if !reflect.DeepEqual(v, lv) {
-					result = true
-					err := ctx.PutState(k, v)
-					if err != nil {
-						return fmt.Errorf("error setting state for %s: %v", k, err), false
-					}
-				}
-			}
-			return result, true
-		},
-		val: func(_ api.FunctionContext, args []ast.Expr) error {
-			if len(args) <= 1 {
-				return fmt.Errorf("expect more than one arg but got %d", len(args))
-			}
-			if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsStringArg(args[0]) {
-				return ProduceErrInfo(0, "bool")
-			}
-			return nil
-		},
-	}
-
-	builtins["lag"] = builtinFunc{
-		fType: ast.FuncTypeScalar,
-		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
-			if len(args) != 1 && len(args) != 2 && len(args) != 3 {
-				return fmt.Errorf("expect one two or three args but got %d", len(args)), false
-			}
-			lagkey := "self"
-			v, err := ctx.GetState(lagkey)
-			if err != nil {
-				return fmt.Errorf("error getting state for %s: %v", lagkey, err), false
-			}
-			if v == nil {
-				size := 0
-				var dftVal interface{} = nil
-				if len(args) == 3 {
-					dftVal = args[2]
-				}
-				// first time call, need create state for lag
-				if len(args) == 1 {
-					size = 1
-				} else {
-					siz, ok := args[1].(int)
-					if !ok {
-						return fmt.Errorf("second arg is not a int but got %v", args[1]), false
-					}
-					size = siz
-				}
-
-				rq := newRingqueue(size)
-				rq.fill(dftVal)
-
-				rtnVal, _ := rq.fetch()
-				rq.append(args[0])
-				err := ctx.PutState(lagkey, rq)
-				if err != nil {
-					return fmt.Errorf("error setting state for %s: %v", lagkey, err), false
-				}
-				return rtnVal, true
-			} else {
-				rq, ok := v.(*ringqueue)
-				if !ok {
-					return fmt.Errorf("error getting state for %s: %v", lagkey, err), false
-				}
-				rtnVal, _ := rq.fetch()
-				rq.append(args[0])
-				err := ctx.PutState(lagkey, rq)
-				if err != nil {
-					return fmt.Errorf("error setting state for %s: %v", lagkey, err), false
-				}
-				return rtnVal, true
-			}
-		},
-		val: func(_ api.FunctionContext, args []ast.Expr) error {
-			if len(args) != 1 && len(args) != 2 && len(args) != 3 {
-				return fmt.Errorf("expect one two or three args but got %d", len(args))
-			}
-			if len(args) >= 2 {
-				if ast.IsFloatArg(args[1]) || ast.IsTimeArg(args[1]) || ast.IsBooleanArg(args[1]) || ast.IsStringArg(args[1]) || ast.IsFieldRefArg(args[1]) {
-					return ProduceErrInfo(1, "int")
-				}
-				if s, ok := args[1].(*ast.IntegerLiteral); ok {
-					if s.Val < 0 {
-						return fmt.Errorf("the index should not be a nagtive integer")
-					}
-				}
-			}
-			return nil
-		},
-	}
-
 	builtins["object_construct"] = builtinFunc{
 	builtins["object_construct"] = builtinFunc{
 		fType: ast.FuncTypeScalar,
 		fType: ast.FuncTypeScalar,
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {

+ 0 - 524
internal/binder/function/funcs_misc_test.go

@@ -20,118 +20,10 @@ import (
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/ast"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
 )
 )
 
 
-func TestChangedColValidation(t *testing.T) {
-	f, ok := builtins["changed_col"]
-	if !ok {
-		t.Fatal("builtin not found")
-	}
-	var tests = []struct {
-		args []ast.Expr
-		err  error
-	}{
-		{
-			args: []ast.Expr{
-				&ast.StringLiteral{Val: "foo"},
-			},
-			err: fmt.Errorf("Expect 2 arguments but found 1."),
-		}, {
-			args: []ast.Expr{
-				&ast.StringLiteral{Val: "foo"},
-				&ast.StringLiteral{Val: "bar"},
-			},
-			err: fmt.Errorf("Expect boolean type for parameter 1"),
-		}, {
-			args: []ast.Expr{
-				&ast.StringLiteral{Val: "foo"},
-				&ast.StringLiteral{Val: "bar"},
-				&ast.StringLiteral{Val: "baz"},
-			},
-			err: fmt.Errorf("Expect 2 arguments but found 3."),
-		}, {
-			args: []ast.Expr{
-				&ast.BooleanLiteral{Val: true},
-				&ast.StringLiteral{Val: "baz"},
-			},
-		},
-	}
-	for i, tt := range tests {
-		err := f.val(nil, tt.args)
-		if !reflect.DeepEqual(err, tt.err) {
-			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, err, tt.err)
-		}
-	}
-}
-
-func TestChangedColExec(t *testing.T) {
-	f, ok := builtins["changed_col"]
-	if !ok {
-		t.Fatal("builtin not found")
-	}
-	contextLogger := conf.Log.WithField("rule", "testExec")
-	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
-	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
-	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
-		args   []interface{}
-		result interface{}
-	}{
-		{ // 0
-			args: []interface{}{
-				"foo",
-				"bar",
-			},
-			result: fmt.Errorf("first arg is not a bool but got foo"),
-		}, { // 1
-			args: []interface{}{
-				true,
-				"bar",
-			},
-			result: "bar",
-		}, { // 2
-			args: []interface{}{
-				true,
-				"bar",
-			},
-			result: nil,
-		}, { // 3
-			args: []interface{}{
-				true,
-				"baz",
-			},
-			result: "baz",
-		}, { // 4
-			args: []interface{}{
-				false,
-				nil,
-			},
-			result: nil,
-		}, { // 5
-			args: []interface{}{
-				false,
-				"baz",
-			},
-			result: "baz",
-		}, { // 6
-			args: []interface{}{
-				true,
-				"foo",
-			},
-			result: "foo",
-		},
-	}
-	for i, tt := range tests {
-		result, _ := f.exec(fctx, tt.args)
-		if !reflect.DeepEqual(result, tt.result) {
-			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
-		}
-	}
-}
-
 func TestToMap(t *testing.T) {
 func TestToMap(t *testing.T) {
 	f, ok := builtins["object_construct"]
 	f, ok := builtins["object_construct"]
 	if !ok {
 	if !ok {
@@ -179,419 +71,3 @@ func TestToMap(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
-
-func TestHadChangedValidation(t *testing.T) {
-	f, ok := builtins["had_changed"]
-	if !ok {
-		t.Fatal("builtin not found")
-	}
-	var tests = []struct {
-		args []ast.Expr
-		err  error
-	}{
-		{
-			args: []ast.Expr{
-				&ast.StringLiteral{Val: "foo"},
-			},
-			err: fmt.Errorf("expect more than one arg but got 1"),
-		}, {
-			args: []ast.Expr{
-				&ast.StringLiteral{Val: "foo"},
-				&ast.StringLiteral{Val: "bar"},
-				&ast.StringLiteral{Val: "baz"},
-			},
-			err: fmt.Errorf("Expect bool type for parameter 1"),
-		}, {
-			args: []ast.Expr{
-				&ast.IntegerLiteral{Val: 20},
-				&ast.BooleanLiteral{Val: true},
-				&ast.StringLiteral{Val: "baz"},
-			},
-			err: fmt.Errorf("Expect bool type for parameter 1"),
-		}, {
-			args: []ast.Expr{
-				&ast.FieldRef{
-					StreamName: "demo",
-					Name:       "a",
-					AliasRef:   nil,
-				},
-				&ast.BooleanLiteral{Val: true},
-				&ast.StringLiteral{Val: "baz"},
-			},
-			err: nil,
-		},
-	}
-	for i, tt := range tests {
-		err := f.val(nil, tt.args)
-		if !reflect.DeepEqual(err, tt.err) {
-			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, err, tt.err)
-		}
-	}
-}
-
-func TestHadChangedExec(t *testing.T) {
-	f, ok := builtins["had_changed"]
-	if !ok {
-		t.Fatal("builtin not found")
-	}
-	contextLogger := conf.Log.WithField("rule", "testExec")
-	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
-	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
-	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
-	var tests = []struct {
-		args   []interface{}
-		result interface{}
-	}{
-		{ // 0
-			args: []interface{}{
-				"foo",
-				"bar",
-				"baz",
-			},
-			result: fmt.Errorf("first arg is not a bool but got foo"),
-		}, { // 1
-			args: []interface{}{
-				"foo",
-				"bar",
-			},
-			result: fmt.Errorf("first arg is not a bool but got foo"),
-		}, { // 2
-			args: []interface{}{
-				true,
-				"bar",
-				20,
-			},
-			result: true,
-		}, { // 3
-			args: []interface{}{
-				true,
-				"baz",
-				44,
-			},
-			result: true,
-		}, { // 4
-			args: []interface{}{
-				true,
-				"baz",
-				44,
-			},
-			result: false,
-		}, { // 5
-			args: []interface{}{
-				true,
-				"foo",
-				44,
-			},
-			result: true,
-		}, { // 6
-			args: []interface{}{
-				true,
-				"foo",
-				nil,
-			},
-			result: false,
-		}, { // 7
-			args: []interface{}{
-				true,
-				"foo",
-				44,
-			},
-			result: false,
-		}, { // 8
-			args: []interface{}{
-				true,
-				"baz",
-				44,
-			},
-			result: true,
-		},
-	}
-	for i, tt := range tests {
-		result, _ := f.exec(fctx, tt.args)
-		if !reflect.DeepEqual(result, tt.result) {
-			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
-		}
-	}
-}
-
-func TestHadChangedExecAllowNull(t *testing.T) {
-	f, ok := builtins["had_changed"]
-	if !ok {
-		t.Fatal("builtin not found")
-	}
-	contextLogger := conf.Log.WithField("rule", "testExec")
-	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
-	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
-	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
-	var tests = []struct {
-		args   []interface{}
-		result interface{}
-	}{
-		{ // 0
-			args: []interface{}{
-				"foo",
-				"bar",
-				"baz",
-			},
-			result: fmt.Errorf("first arg is not a bool but got foo"),
-		}, { // 1
-			args: []interface{}{
-				"foo",
-				"bar",
-			},
-			result: fmt.Errorf("first arg is not a bool but got foo"),
-		}, { // 2
-			args: []interface{}{
-				false,
-				"bar",
-				20,
-			},
-			result: true,
-		}, { // 3
-			args: []interface{}{
-				false,
-				"baz",
-				nil,
-			},
-			result: true,
-		}, { // 4
-			args: []interface{}{
-				false,
-				"baz",
-				44,
-			},
-			result: true,
-		}, { // 5
-			args: []interface{}{
-				false,
-				nil,
-				44,
-			},
-			result: true,
-		}, { // 6
-			args: []interface{}{
-				false,
-				"baz",
-				44,
-			},
-			result: true,
-		}, { // 7
-			args: []interface{}{
-				false,
-				"baz",
-				44,
-			},
-			result: false,
-		}, { // 8
-			args: []interface{}{
-				false,
-				nil,
-				nil,
-			},
-			result: true,
-		}, { // 9
-			args: []interface{}{
-				false,
-				"baz",
-				44,
-			},
-			result: true,
-		},
-	}
-	for i, tt := range tests {
-		result, _ := f.exec(fctx, tt.args)
-		if !reflect.DeepEqual(result, tt.result) {
-			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
-		}
-	}
-}
-
-func TestLagExec(t *testing.T) {
-	f, ok := builtins["lag"]
-	if !ok {
-		t.Fatal("builtin not found")
-	}
-	contextLogger := conf.Log.WithField("rule", "testExec")
-	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
-	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
-	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
-		args   []interface{}
-		result interface{}
-	}{
-		{ // 0
-			args:   []interface{}{},
-			result: fmt.Errorf("expect one two or three args but got 0"),
-		}, { // 1
-			args: []interface{}{
-				"foo",
-			},
-			result: nil,
-		},
-		{ // 2
-			args: []interface{}{
-				"bar",
-			},
-			result: "foo",
-		},
-		{ // 3
-			args: []interface{}{
-				"bar",
-			},
-			result: "bar",
-		},
-		{ // 4
-			args: []interface{}{
-				"foo",
-			},
-			result: "bar",
-		},
-		{ // 4
-			args: []interface{}{
-				"foo",
-			},
-			result: "foo",
-		},
-	}
-	for i, tt := range tests {
-		result, _ := f.exec(fctx, tt.args)
-		if !reflect.DeepEqual(result, tt.result) {
-			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
-		}
-	}
-}
-
-func TestLagExecIndexWithDefaultValue(t *testing.T) {
-	f, ok := builtins["lag"]
-	if !ok {
-		t.Fatal("builtin not found")
-	}
-	contextLogger := conf.Log.WithField("rule", "testExec")
-	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
-	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
-	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
-		args   []interface{}
-		result interface{}
-	}{
-		{ // 0
-			args: []interface{}{
-				"foo",
-				"bar",
-				"baz",
-				"baw",
-			},
-			result: fmt.Errorf("expect one two or three args but got 4"),
-		}, { // 1
-			args: []interface{}{
-				"bar",
-				2,
-				"no result",
-			},
-			result: "no result",
-		},
-		{ // 2
-			args: []interface{}{
-				"bar",
-				2,
-				"no result",
-			},
-			result: "no result",
-		},
-		{ // 3
-			args: []interface{}{
-				"foo",
-				2,
-				"no result",
-			},
-			result: "bar",
-		},
-		{ // 4
-			args: []interface{}{
-				"foo",
-				2,
-				"no result",
-			},
-			result: "bar",
-		},
-		{ // 4
-			args: []interface{}{
-				"foo",
-				2,
-				"no result",
-			},
-			result: "foo",
-		},
-	}
-	for i, tt := range tests {
-		result, _ := f.exec(fctx, tt.args)
-		if !reflect.DeepEqual(result, tt.result) {
-			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
-		}
-	}
-}
-
-func TestLagExecIndex(t *testing.T) {
-	f, ok := builtins["lag"]
-	if !ok {
-		t.Fatal("builtin not found")
-	}
-	contextLogger := conf.Log.WithField("rule", "testExec")
-	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
-	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
-	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
-		args   []interface{}
-		result interface{}
-	}{
-		{ // 0
-			args: []interface{}{
-				"foo",
-				"bar",
-				"baz",
-				"baw",
-			},
-			result: fmt.Errorf("expect one two or three args but got 4"),
-		}, { // 1
-			args: []interface{}{
-				"bar",
-				2,
-			},
-			result: nil,
-		},
-		{ // 2
-			args: []interface{}{
-				"bar",
-				2,
-			},
-			result: nil,
-		},
-		{ // 3
-			args: []interface{}{
-				"foo",
-				2,
-			},
-			result: "bar",
-		},
-		{ // 4
-			args: []interface{}{
-				"foo",
-				2,
-			},
-			result: "bar",
-		},
-		{ // 4
-			args: []interface{}{
-				"foo",
-				2,
-			},
-			result: "foo",
-		},
-	}
-	for i, tt := range tests {
-		result, _ := f.exec(fctx, tt.args)
-		if !reflect.DeepEqual(result, tt.result) {
-			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
-		}
-	}
-}

+ 1 - 0
internal/binder/function/function.go

@@ -38,6 +38,7 @@ func init() {
 	registerMathFunc()
 	registerMathFunc()
 	registerStrFunc()
 	registerStrFunc()
 	registerMiscFunc()
 	registerMiscFunc()
+	registerAnalyticFunc()
 	registerColsFunc()
 	registerColsFunc()
 }
 }
 
 

+ 1 - 0
internal/topo/planner/analyzer.go

@@ -130,6 +130,7 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*ast.StreamStmt,
 					FuncType:    f.FuncType,
 					FuncType:    f.FuncType,
 					Args:        f.Args,
 					Args:        f.Args,
 					CachedField: f.CachedField,
 					CachedField: f.CachedField,
+					Partition:   f.Partition,
 				})
 				})
 			}
 			}
 		}
 		}

+ 39 - 0
internal/topo/topotest/rule_test.go

@@ -599,6 +599,45 @@ func TestSingleSQL(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 				"source_demo_0_records_out_total": int64(5),
 			},
 			},
 		},
 		},
+		{
+			Name: `TestLagPartition`,
+			Sql:  "SELECT color, lag(size) over (partition by color) as lastSize, size, lastSize/size as changeRate FROM demo",
+			R: [][]map[string]interface{}{
+				{{
+					"color": "red",
+					"size":  float64(3),
+				}},
+				{{
+					"color": "blue",
+					"size":  float64(6),
+				}},
+				{{
+					"color":      "blue",
+					"lastSize":   float64(6),
+					"size":       float64(2),
+					"changeRate": float64(3),
+				}},
+				{{
+					"color": "yellow",
+					"size":  float64(4),
+				}},
+				{{
+					"color":      "red",
+					"lastSize":   float64(3),
+					"size":       float64(1),
+					"changeRate": float64(3),
+				}},
+			},
+			M: map[string]interface{}{
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(5),
+				"sink_mockSink_0_records_out_total": int64(5),
+
+				"source_demo_0_exceptions_total":  int64(0),
+				"source_demo_0_records_in_total":  int64(5),
+				"source_demo_0_records_out_total": int64(5),
+			},
+		},
 	}
 	}
 	HandleStream(true, streamList, t)
 	HandleStream(true, streamList, t)
 	options := []*api.RuleOption{
 	options := []*api.RuleOption{

+ 4 - 0
internal/xsql/lexical.go

@@ -221,6 +221,10 @@ func (s *Scanner) ScanIdent() (tok ast.Token, lit string) {
 		return ast.BETWEEN, lit
 		return ast.BETWEEN, lit
 	case "LIKE":
 	case "LIKE":
 		return ast.LIKE, lit
 		return ast.LIKE, lit
+	case "OVER":
+		return ast.OVER, lit
+	case "PARTITION":
+		return ast.PARTITION, lit
 	case "CREATE":
 	case "CREATE":
 		return ast.CREATE, lit
 		return ast.CREATE, lit
 	case "DROP":
 	case "DROP":

+ 51 - 13
internal/xsql/parser.go

@@ -829,12 +829,7 @@ func (p *Parser) parseCall(n string) (ast.Expr, error) {
 	var args []ast.Expr
 	var args []ast.Expr
 	for {
 	for {
 		if tok, _ := p.scanIgnoreWhitespace(); tok == ast.RPAREN {
 		if tok, _ := p.scanIgnoreWhitespace(); tok == ast.RPAREN {
-			if valErr := validateFuncs(name, nil); valErr != nil {
-				return nil, valErr
-			}
-			c := &ast.Call{Name: name, Args: args, FuncId: p.fn, FuncType: ft}
-			p.fn += 1
-			return c, nil
+			break
 		} else {
 		} else {
 			p.unscan()
 			p.unscan()
 		}
 		}
@@ -850,15 +845,13 @@ func (p *Parser) parseCall(n string) (ast.Expr, error) {
 			}
 			}
 		}
 		}
 
 
-		if tok, _ := p.scanIgnoreWhitespace(); tok != ast.COMMA {
-			p.unscan()
+		if tok, lit := p.scanIgnoreWhitespace(); tok != ast.COMMA {
+			if tok != ast.RPAREN {
+				return nil, fmt.Errorf("found function call %q, expected ), but with %q.", name, lit)
+			}
 			break
 			break
 		}
 		}
 	}
 	}
-
-	if tok, lit := p.scanIgnoreWhitespace(); tok != ast.RPAREN {
-		return nil, fmt.Errorf("found function call %q, expected ), but with %q.", name, lit)
-	}
 	if wt, err := validateWindows(name, args); wt == ast.NOT_WINDOW {
 	if wt, err := validateWindows(name, args); wt == ast.NOT_WINDOW {
 		if valErr := validateFuncs(name, args); valErr != nil {
 		if valErr := validateFuncs(name, args); valErr != nil {
 			return nil, valErr
 			return nil, valErr
@@ -869,7 +862,8 @@ func (p *Parser) parseCall(n string) (ast.Expr, error) {
 		}
 		}
 		c := &ast.Call{Name: name, Args: args, FuncId: p.fn, FuncType: ft}
 		c := &ast.Call{Name: name, Args: args, FuncId: p.fn, FuncType: ft}
 		p.fn += 1
 		p.fn += 1
-		return c, nil
+		e := p.parseOverPartition(c)
+		return c, e
 	} else {
 	} else {
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
@@ -1503,3 +1497,47 @@ func (p *Parser) parseAsterisk() (ast.Expr, error) {
 func (p *Parser) inmeta() bool {
 func (p *Parser) inmeta() bool {
 	return p.inFunc == "meta" || p.inFunc == "mqtt"
 	return p.inFunc == "meta" || p.inFunc == "mqtt"
 }
 }
+
+func (p *Parser) parseOverPartition(c *ast.Call) error {
+	if tok, _ := p.scanIgnoreWhitespace(); tok != ast.OVER {
+		p.unscan()
+		return nil
+	} else if function.IsAnalyticFunc(c.Name) {
+		if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.LPAREN {
+			if t, _ := p.scanIgnoreWhitespace(); t == ast.PARTITION {
+				if t1, l1 := p.scanIgnoreWhitespace(); t1 == ast.BY {
+					pe := &ast.PartitionExpr{}
+					for {
+						if exp, err := p.ParseExpr(); err != nil {
+							return err
+						} else {
+							pe.Exprs = append(pe.Exprs, exp)
+						}
+						if tok, _ := p.scanIgnoreWhitespace(); tok == ast.COMMA {
+							continue
+						} else {
+							p.unscan()
+							break
+						}
+					}
+					if ttt, _ := p.scanIgnoreWhitespace(); ttt != ast.RPAREN {
+						return fmt.Errorf("found %q, expect right parentheses after PARTITION BY", ttt)
+					}
+					if len(pe.Exprs) == 0 {
+						return fmt.Errorf("PARTITION BY must have at least one expression.")
+					}
+					c.Partition = pe
+					return nil
+				} else {
+					return fmt.Errorf("found %q, expected by after partition.", l1)
+				}
+			} else {
+				return fmt.Errorf("Found %q after OVER (, expect partition by.", tok1)
+			}
+		} else {
+			return fmt.Errorf("Found %q after OVER, expect parentheses.", tok1)
+		}
+	} else {
+		return fmt.Errorf("Found OVER after non analytic function %s", c.Name)
+	}
+}

+ 83 - 0
internal/xsql/parser_test.go

@@ -1795,6 +1795,89 @@ func TestParser_ParseStatement(t *testing.T) {
 			s:   `SELECT name FROM tbl WHERE name IN (abc,def OR name in (abc)`,
 			s:   `SELECT name FROM tbl WHERE name IN (abc,def OR name in (abc)`,
 			err: `expect ) for IN expression, but got "EOF"`,
 			err: `expect ) for IN expression, but got "EOF"`,
 		},
 		},
+		{
+			s: `SELECT lag(name) OVER (PARTITION BY device) FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr: &ast.Call{
+							Name:   "lag",
+							FuncId: 0,
+							Args: []ast.Expr{
+								&ast.FieldRef{Name: "name", StreamName: ast.DefaultStream},
+							},
+							Partition: &ast.PartitionExpr{
+								Exprs: []ast.Expr{
+									&ast.FieldRef{Name: "device", StreamName: ast.DefaultStream},
+								},
+							},
+						},
+						Name:  "lag",
+						AName: ""},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
+		{
+			s:   `SELECT name OVER (PARTITION BY device) FROM tbl`,
+			err: `found "OVER", expected FROM.`,
+		},
+		{
+			s:   `SELECT avg(name) OVER (PARTITION BY device) FROM tbl`,
+			err: `Found OVER after non analytic function avg`,
+		},
+		{
+			s: `SELECT name FROM tbl WHERE lag(name) OVER (PARTITION BY device, groupName) > 3`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr:  &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream},
+						Name:  "name",
+						AName: ""},
+				},
+				Condition: &ast.BinaryExpr{
+					LHS: &ast.Call{
+						Name:   "lag",
+						FuncId: 0,
+						Args: []ast.Expr{
+							&ast.FieldRef{Name: "name", StreamName: ast.DefaultStream},
+						},
+						Partition: &ast.PartitionExpr{
+							Exprs: []ast.Expr{
+								&ast.FieldRef{Name: "device", StreamName: ast.DefaultStream},
+								&ast.FieldRef{Name: "groupName", StreamName: ast.DefaultStream},
+							},
+						},
+					},
+					OP:  ast.GT,
+					RHS: &ast.IntegerLiteral{Val: 3},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
+		{
+			s: `SELECT lag(name) OVER (PARTITION BY device) as ll FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr: &ast.Call{
+							Name:   "lag",
+							FuncId: 0,
+							Args: []ast.Expr{
+								&ast.FieldRef{Name: "name", StreamName: ast.DefaultStream},
+							},
+							Partition: &ast.PartitionExpr{
+								Exprs: []ast.Expr{
+									&ast.FieldRef{Name: "device", StreamName: ast.DefaultStream},
+								},
+							},
+						},
+						Name:  "lag",
+						AName: "ll"},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
 	}
 	}
 
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 15 - 1
internal/xsql/valuer.go

@@ -346,7 +346,21 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 						// won't happen
 						// won't happen
 						return fmt.Errorf("unknown function type")
 						return fmt.Errorf("unknown function type")
 					}
 					}
-
+				}
+				if function.IsAnalyticFunc(expr.Name) { // analytic func must put the partition key into the args
+					if expr.Partition != nil && len(expr.Partition.Exprs) > 0 {
+						pk := ""
+						for _, pe := range expr.Partition.Exprs {
+							temp := v.Eval(pe)
+							if _, ok := temp.(error); ok {
+								return temp
+							}
+							pk += fmt.Sprintf("%v", temp)
+						}
+						args = append(args, pk)
+					} else {
+						args = append(args, "self")
+					}
 				}
 				}
 				val, _ := valuer.Call(expr.Name, expr.FuncId, args)
 				val, _ := valuer.Call(expr.Name, expr.FuncId, args)
 				return val
 				return val

+ 5 - 0
pkg/ast/expr.go

@@ -141,12 +141,17 @@ type Call struct {
 	// This cachedField cached the new field name and when evaluating, just return the field access evaluated value.
 	// This cachedField cached the new field name and when evaluating, just return the field access evaluated value.
 	CachedField string
 	CachedField string
 	Cached      bool
 	Cached      bool
+	Partition   *PartitionExpr
 }
 }
 
 
 func (c *Call) expr()    {}
 func (c *Call) expr()    {}
 func (c *Call) literal() {}
 func (c *Call) literal() {}
 func (c *Call) node()    {}
 func (c *Call) node()    {}
 
 
+type PartitionExpr struct {
+	Exprs []Expr
+}
+
 type BinaryExpr struct {
 type BinaryExpr struct {
 	OP  Token
 	OP  Token
 	LHS Expr
 	LHS Expr

+ 23 - 19
pkg/ast/token.go

@@ -101,6 +101,8 @@ const (
 	THEN
 	THEN
 	ELSE
 	ELSE
 	END
 	END
+	OVER
+	PARTITION
 
 
 	TRUE
 	TRUE
 	FALSE
 	FALSE
@@ -188,25 +190,27 @@ var Tokens = []string{
 	COLON:     ":",
 	COLON:     ":",
 	COLSEP:    "\007",
 	COLSEP:    "\007",
 
 
-	SELECT: "SELECT",
-	FROM:   "FROM",
-	JOIN:   "JOIN",
-	LEFT:   "LEFT",
-	INNER:  "INNER",
-	ON:     "ON",
-	WHERE:  "WHERE",
-	GROUP:  "GROUP",
-	ORDER:  "ORDER",
-	HAVING: "HAVING",
-	BY:     "BY",
-	ASC:    "ASC",
-	DESC:   "DESC",
-	FILTER: "FILTER",
-	CASE:   "CASE",
-	WHEN:   "WHEN",
-	THEN:   "THEN",
-	ELSE:   "ELSE",
-	END:    "END",
+	SELECT:    "SELECT",
+	FROM:      "FROM",
+	JOIN:      "JOIN",
+	LEFT:      "LEFT",
+	INNER:     "INNER",
+	ON:        "ON",
+	WHERE:     "WHERE",
+	GROUP:     "GROUP",
+	ORDER:     "ORDER",
+	HAVING:    "HAVING",
+	BY:        "BY",
+	ASC:       "ASC",
+	DESC:      "DESC",
+	FILTER:    "FILTER",
+	CASE:      "CASE",
+	WHEN:      "WHEN",
+	THEN:      "THEN",
+	ELSE:      "ELSE",
+	END:       "END",
+	OVER:      "OVER",
+	PARTITION: "PARTITION",
 
 
 	CREATE:   "CREATE",
 	CREATE:   "CREATE",
 	DROP:     "RROP",
 	DROP:     "RROP",