Browse Source

feat: support window function row_number (#2140)

* support row_number

Signed-off-by: yisaer <disxiaofei@163.com>

* add test

Signed-off-by: yisaer <disxiaofei@163.com>

* revise plan

Signed-off-by: yisaer <disxiaofei@163.com>

* fix plan

Signed-off-by: yisaer <disxiaofei@163.com>

* add docs

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comments

Signed-off-by: yisaer <disxiaofei@163.com>

* fix lint

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 year ago
parent
commit
27d55766ff

+ 8 - 0
docs/directory.json

@@ -611,6 +611,10 @@
 						{
 							"title": "预定义函数插件",
 							"path": "sqls/functions/custom_functions"
+						},
+						{
+							"title": "窗口函数",
+							"path": "sqls/functions/window_funcs"
 						}
 					]
 				},
@@ -1330,6 +1334,10 @@
 						{
 							"title": "Predefined function plugins",
 							"path": "sqls/functions/custom_functions"
+						},
+						{
+							"title": "Window Functions",
+							"path": "sqls/functions/window_functions"
 						}
 					]
 				},

+ 1 - 0
docs/en_US/sqls/functions/overview.md

@@ -15,6 +15,7 @@ eKuiper has many built-in functions for performing calculations on data.
 - [Analytic Functions](./analytic_functions.md)
 - [Multi-Row Functions](./multi_row_functions.md)
 - [Multi-Column Functions](./multi_column_functions.md)
+- [Window Functions](./window_functions.md)
 
 eKuiper also provides a set of functions though shipped plugins. Users need to install the plugins before using them.
 

+ 11 - 0
docs/en_US/sqls/functions/window_functions.md

@@ -0,0 +1,11 @@
+# Window Functions
+
+A window function performs a calculation across a set of table rows that are somehow related to the current row. This is comparable to the type of calculation that can be done with an aggregate function. For now, window functions can only be used in select fields.
+
+## ROW_NUMBER
+
+```text
+row_number()
+```
+
+ROW_NUMBER numbers all rows sequentially (for example 1, 2, 3, 4, 5).

+ 1 - 0
docs/zh_CN/sqls/functions/overview.md

@@ -15,6 +15,7 @@ eKuiper 具有许多内置函数,可以对数据执行计算。
 - [分析函数](./analytic_functions.md)
 - [多行函数](./multi_row_functions.md)
 - [多列函数](./multi_column_functions.md)
+- [窗口函数](./window_funcs.md)
 
 eKuiper 也有一组通过插件提供的函数。用户需要在使用之前安装插件。
 

+ 11 - 0
docs/zh_CN/sqls/functions/window_funcs.md

@@ -0,0 +1,11 @@
+# 窗口函数
+
+窗口函数用于对数据进行聚合操作,并将结果添加到每一行数据中。目前,窗口函数目前只能被用在 select field 中。
+
+## ROW_NUMBER
+
+```text
+row_number()
+```
+
+row_number() 将从 1 开始,为每一条记录返回一个数字。

+ 31 - 0
internal/binder/function/funcs_window.go

@@ -0,0 +1,31 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package function
+
+import (
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
+
+func registerWindowFunc() {
+	builtins["row_number"] = builtinFunc{
+		fType: ast.FuncTypeWindow,
+		// we implement window functions in windowFuncOperator instead of exec.
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			return nil, true
+		},
+		val: ValidateNoArg,
+	}
+}

+ 42 - 0
internal/binder/function/funcs_window_test.go

@@ -0,0 +1,42 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package function
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/require"
+
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
+
+func TestWindowFuncValidate(t *testing.T) {
+	testcases := []struct {
+		name string
+		args []ast.Expr
+	}{
+		{
+			name: "row_number",
+			args: nil,
+		},
+	}
+
+	for _, tc := range testcases {
+		f, ok := builtins[tc.name]
+		require.True(t, ok)
+		err := f.val(nil, tc.args)
+		require.NoError(t, err)
+	}
+}

+ 10 - 0
internal/binder/function/function.go

@@ -54,6 +54,7 @@ func init() {
 	registerObjectFunc()
 	registerGlobalStateFunc()
 	registerGlobalAggFunc()
+	registerWindowFunc()
 }
 
 //var funcWithAsteriskSupportMap = map[string]string{
@@ -73,8 +74,17 @@ var analyticFuncs = map[string]struct{}{
 	"acc_count":   {},
 }
 
+var windowFuncs = map[string]struct{}{
+	"row_number": {},
+}
+
 const AnalyticPrefix = "$$a"
 
+func IsWindowFunc(name string) bool {
+	_, ok := windowFuncs[name]
+	return ok
+}
+
 func IsAnalyticFunc(name string) bool {
 	_, ok := analyticFuncs[name]
 	return ok

+ 11 - 0
internal/topo/operator/project_operator.go

@@ -29,6 +29,7 @@ type ProjectOp struct {
 	AliasNames       []string   // list of alias name
 	ExprNames        []string   // list of expr name
 	ExceptNames      []string   // list of except name
+	WindowFuncNames  map[string]struct{}
 	AllWildcard      bool
 	WildcardEmitters map[string]bool
 	AliasFields      ast.Fields
@@ -152,6 +153,11 @@ func (pp *ProjectOp) project(row xsql.Row, ve *xsql.ValuerEval) error {
 	// Do not set value during calculations
 
 	for _, f := range pp.ExprFields {
+		if _, ok := pp.WindowFuncNames[f.Name]; ok {
+			vi, _ := row.Value(f.Name, "")
+			pp.kvs = append(pp.kvs, f.Name, vi)
+			continue
+		}
 		vi := ve.Eval(f.Expr)
 		if e, ok := vi.(error); ok {
 			return e
@@ -168,6 +174,11 @@ func (pp *ProjectOp) project(row xsql.Row, ve *xsql.ValuerEval) error {
 		}
 	}
 	for _, f := range pp.AliasFields {
+		if _, ok := pp.WindowFuncNames[f.AName]; ok {
+			vi, _ := row.Value(f.AName, "")
+			pp.kvs = append(pp.kvs, f.AName, vi)
+			continue
+		}
 		vi := ve.Eval(f.Expr)
 		if e, ok := vi.(error); ok {
 			return e

+ 84 - 0
internal/topo/operator/windowfunc_operator.go

@@ -0,0 +1,84 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package operator
+
+import (
+	"fmt"
+
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
+
+type WindowFuncOperator struct {
+	WindowFuncFields ast.Fields
+}
+
+type windowFuncHandle interface {
+	handleTuple(input xsql.TupleRow)
+	handleCollection(input xsql.Collection)
+}
+
+type rowNumberFuncHandle struct {
+	name string
+}
+
+func (rh *rowNumberFuncHandle) handleTuple(input xsql.TupleRow) {
+	input.Set(rh.name, 1)
+}
+
+func (rh *rowNumberFuncHandle) handleCollection(input xsql.Collection) {
+	index := 1
+	input.RangeSet(func(i int, r xsql.Row) (bool, error) {
+		r.Set(rh.name, index)
+		index++
+		return true, nil
+	})
+}
+
+func (wf *WindowFuncOperator) Apply(_ api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
+	for _, windowFuncField := range wf.WindowFuncFields {
+		name := windowFuncField.Name
+		if windowFuncField.AName != "" {
+			name = windowFuncField.AName
+		}
+		var funcName string
+		switch c := windowFuncField.Expr.(type) {
+		case *ast.Call:
+			funcName = c.Name
+		case *ast.FieldRef:
+			funcName = c.AliasRef.Expression.(*ast.Call).Name
+		}
+		wh, err := getWindowFuncHandle(funcName, name)
+		if err != nil {
+			return err
+		}
+		switch input := data.(type) {
+		case xsql.TupleRow:
+			wh.handleTuple(input)
+		case xsql.Collection:
+			wh.handleCollection(input)
+		}
+	}
+	return data
+}
+
+func getWindowFuncHandle(funcName, colName string) (windowFuncHandle, error) {
+	switch funcName {
+	case "row_number":
+		return &rowNumberFuncHandle{name: colName}, nil
+	}
+	return nil, fmt.Errorf("")
+}

+ 37 - 6
internal/topo/planner/planner.go

@@ -177,9 +177,11 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	case *OrderPlan:
 		op = Transform(&operator.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options)
 	case *ProjectPlan:
-		op = Transform(&operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, ExceptNames: t.exceptNames, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta, LimitCount: t.limitCount, EnableLimit: t.enableLimit}, fmt.Sprintf("%d_project", newIndex), options)
+		op = Transform(&operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, ExceptNames: t.exceptNames, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta, LimitCount: t.limitCount, EnableLimit: t.enableLimit, WindowFuncNames: t.windowFuncNames}, fmt.Sprintf("%d_project", newIndex), options)
 	case *ProjectSetPlan:
 		op = Transform(&operator.ProjectSetOperator{SrfMapping: t.SrfMapping, LimitCount: t.limitCount, EnableLimit: t.enableLimit}, fmt.Sprintf("%d_projectset", newIndex), options)
+	case *WindowFuncPlan:
+		op = Transform(&operator.WindowFuncOperator{WindowFuncFields: t.windowFuncFields}, fmt.Sprintf("%d_windowFunc", newIndex), options)
 	default:
 		err = fmt.Errorf("unknown logical plan %v", t)
 	}
@@ -463,6 +465,14 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 		p.SetChildren(children)
 		children = []LogicalPlan{p}
 	}
+	windowFuncFields, windowFuncsNames := extractWindowFuncFields(stmt)
+	if len(windowFuncFields) > 0 {
+		p = WindowFuncPlan{
+			windowFuncFields: windowFuncFields,
+		}.Init()
+		p.SetChildren(children)
+		children = []LogicalPlan{p}
+	}
 	srfMapping := extractSRFMapping(stmt)
 	if stmt.Fields != nil {
 		enableLimit := false
@@ -472,11 +482,12 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 			limitCount = stmt.Limit.(*ast.LimitExpr).LimitCount.Val
 		}
 		p = ProjectPlan{
-			fields:      stmt.Fields,
-			isAggregate: xsql.WithAggFields(stmt),
-			sendMeta:    opt.SendMetaToSink,
-			enableLimit: enableLimit,
-			limitCount:  limitCount,
+			windowFuncNames: windowFuncsNames,
+			fields:          stmt.Fields,
+			isAggregate:     xsql.WithAggFields(stmt),
+			sendMeta:        opt.SendMetaToSink,
+			enableLimit:     enableLimit,
+			limitCount:      limitCount,
 		}.Init()
 		p.SetChildren(children)
 		children = []LogicalPlan{p}
@@ -525,3 +536,23 @@ func Transform(op node.UnOperation, name string, options *api.RuleOption) *node.
 	unaryOperator.SetOperation(op)
 	return unaryOperator
 }
+
+func extractWindowFuncFields(stmt *ast.SelectStatement) (ast.Fields, map[string]struct{}) {
+	windowFuncsName := make(map[string]struct{})
+	windowFuncFields := make([]ast.Field, 0)
+	for _, field := range stmt.Fields {
+		if wf, ok := field.Expr.(*ast.Call); ok && wf.FuncType == ast.FuncTypeWindow {
+			windowFuncFields = append(windowFuncFields, field)
+			windowFuncsName[wf.Name] = struct{}{}
+			continue
+		}
+		if ref, ok := field.Expr.(*ast.FieldRef); ok && ref.AliasRef != nil {
+			if wf, ok := ref.AliasRef.Expression.(*ast.Call); ok && wf.FuncType == ast.FuncTypeWindow {
+				windowFuncFields = append(windowFuncFields, field)
+				windowFuncsName[ref.Name] = struct{}{}
+				continue
+			}
+		}
+	}
+	return windowFuncFields, windowFuncsName
+}

+ 122 - 0
internal/topo/planner/planner_test.go

@@ -92,12 +92,134 @@ func Test_createLogicalPlan(t *testing.T) {
 	// boolTrue = true
 	boolFalse := false
 
+	ref := &ast.AliasRef{
+		Expression: &ast.Call{
+			Name:     "row_number",
+			FuncType: ast.FuncTypeWindow,
+		},
+	}
+	ref.SetRefSource([]string{})
+
 	tests := []struct {
 		sql string
 		p   LogicalPlan
 		err string
 	}{
 		{
+			sql: "select name, row_number() as index from src1",
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						WindowFuncPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									DataSourcePlan{
+										baseLogicalPlan: baseLogicalPlan{},
+										name:            "src1",
+										streamFields: map[string]*ast.JsonStreamField{
+											"name": {
+												Type: "string",
+											},
+										},
+										streamStmt:  streams["src1"],
+										metaFields:  []string{},
+										pruneFields: []string{},
+									}.Init(),
+								},
+							},
+							windowFuncFields: []ast.Field{
+								{
+									Name:  "row_number",
+									AName: "index",
+									Expr: &ast.FieldRef{
+										StreamName: ast.AliasStream,
+										Name:       "index",
+										AliasRef:   ref,
+									},
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []ast.Field{
+					{
+						Name:  "row_number",
+						AName: "index",
+						Expr: &ast.FieldRef{
+							StreamName: ast.AliasStream,
+							Name:       "index",
+							AliasRef:   ref,
+						},
+					},
+					{
+						Name: "name",
+						Expr: &ast.FieldRef{
+							StreamName: "src1",
+							Name:       "name",
+						},
+					},
+				},
+				windowFuncNames: map[string]struct{}{
+					"index": {},
+				},
+			}.Init(),
+		},
+		{
+			sql: "select name, row_number() from src1",
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						WindowFuncPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									DataSourcePlan{
+										baseLogicalPlan: baseLogicalPlan{},
+										name:            "src1",
+										streamFields: map[string]*ast.JsonStreamField{
+											"name": {
+												Type: "string",
+											},
+										},
+										streamStmt:  streams["src1"],
+										metaFields:  []string{},
+										pruneFields: []string{},
+									}.Init(),
+								},
+							},
+							windowFuncFields: []ast.Field{
+								{
+									Name: "row_number",
+									Expr: &ast.Call{
+										Name:     "row_number",
+										FuncType: ast.FuncTypeWindow,
+									},
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []ast.Field{
+					{
+						Name: "name",
+						Expr: &ast.FieldRef{
+							StreamName: "src1",
+							Name:       "name",
+						},
+					},
+					{
+						Name: "row_number",
+						Expr: &ast.Call{
+							Name:     "row_number",
+							FuncType: ast.FuncTypeWindow,
+						},
+					},
+				},
+				windowFuncNames: map[string]struct{}{
+					"row_number": {},
+				},
+			}.Init(),
+		},
+		{
 			sql: "select name from src1 where true limit 1",
 			p: ProjectPlan{
 				baseLogicalPlan: baseLogicalPlan{

+ 4 - 0
internal/topo/planner/projectPlan.go

@@ -26,6 +26,7 @@ type ProjectPlan struct {
 	aliasNames       []string
 	exprNames        []string
 	exceptNames      []string
+	windowFuncNames  map[string]struct{}
 	wildcardEmitters map[string]bool
 	aliasFields      ast.Fields
 	exprFields       ast.Fields
@@ -62,6 +63,9 @@ func (p ProjectPlan) Init() *ProjectPlan {
 		}
 	}
 	p.baseLogicalPlan.self = &p
+	if len(p.windowFuncNames) < 1 {
+		p.windowFuncNames = nil
+	}
 	return &p
 }
 

+ 27 - 0
internal/topo/planner/windowFuncPlan.go

@@ -0,0 +1,27 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package planner
+
+import "github.com/lf-edge/ekuiper/pkg/ast"
+
+type WindowFuncPlan struct {
+	baseLogicalPlan
+	windowFuncFields ast.Fields
+}
+
+func (p WindowFuncPlan) Init() *WindowFuncPlan {
+	p.baseLogicalPlan.self = &p
+	return &p
+}

+ 95 - 0
internal/topo/topotest/rule_test.go

@@ -22,6 +22,101 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
+func TestWindowFuncSQL(t *testing.T) {
+	// Reset
+	streamList := []string{"demo"}
+	HandleStream(false, streamList, t)
+	tests := []RuleTest{
+		{
+			Name: "TestRowNumber1",
+			Sql:  `select size, row_number() from demo`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"row_number": float64(1),
+						"size":       float64(3),
+					},
+				},
+				{
+					{
+						"row_number": float64(1),
+						"size":       float64(6),
+					},
+				},
+				{
+					{
+						"row_number": float64(1),
+						"size":       float64(2),
+					},
+				},
+				{
+					{
+						"row_number": float64(1),
+						"size":       float64(4),
+					},
+				},
+				{
+					{
+						"row_number": float64(1),
+						"size":       float64(1),
+					},
+				},
+			},
+		},
+		{
+			Name: "TestRowNumber2",
+			Sql:  `select size, row_number() from demo group by countWindow(5)`,
+			R: [][]map[string]interface{}{
+				{
+					{
+						"row_number": float64(1),
+						"size":       float64(3),
+					},
+					{
+						"row_number": float64(2),
+						"size":       float64(6),
+					},
+					{
+						"row_number": float64(3),
+						"size":       float64(2),
+					},
+					{
+						"row_number": float64(4),
+						"size":       float64(4),
+					},
+					{
+						"row_number": float64(5),
+						"size":       float64(1),
+					},
+				},
+			},
+		},
+	}
+	// Data setup
+	HandleStream(true, streamList, t)
+	options := []*api.RuleOption{
+		{
+			BufferLength: 100,
+			SendError:    true,
+		},
+		{
+			BufferLength:       100,
+			SendError:          true,
+			Qos:                api.AtLeastOnce,
+			CheckpointInterval: 5000,
+		},
+		{
+			BufferLength:       100,
+			SendError:          true,
+			Qos:                api.ExactlyOnce,
+			CheckpointInterval: 5000,
+		},
+	}
+	for j, opt := range options {
+		DoRuleTest(t, tests, j, opt, 0)
+	}
+}
+
 func TestAccAggSQL(t *testing.T) {
 	// Reset
 	streamList := []string{"demo"}

+ 1 - 1
internal/xsql/parser.go

@@ -1668,7 +1668,7 @@ func (p *Parser) parseOver(c *ast.Call) error {
 	if tok, _ := p.scanIgnoreWhitespace(); tok != ast.OVER {
 		p.unscan()
 		return nil
-	} else if function.IsAnalyticFunc(c.Name) {
+	} else if function.IsAnalyticFunc(c.Name) || function.IsWindowFunc(c.Name) {
 		if tok1, _ := p.scanIgnoreWhitespace(); tok1 == ast.LPAREN {
 			if t, _ := p.scanIgnoreWhitespace(); t == ast.PARTITION {
 				if t1, l1 := p.scanIgnoreWhitespace(); t1 == ast.BY {

+ 1 - 0
pkg/ast/expr.go

@@ -193,6 +193,7 @@ const (
 	FuncTypeAgg
 	FuncTypeCols
 	FuncTypeSrf
+	FuncTypeWindow
 )
 
 type Call struct {