Przeglądaj źródła

feat(func): Add the multiple cols func framework and implement change_cols func

Allow multiple arguments

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 lat temu
rodzic
commit
34ba9fa438

+ 15 - 0
internal/binder/function/binder.go

@@ -31,6 +31,7 @@ const (
 	FuncTypeUnknown FuncType = iota - 1
 	FuncTypeUnknown FuncType = iota - 1
 	FuncTypeScalar
 	FuncTypeScalar
 	FuncTypeAgg
 	FuncTypeAgg
+	FuncTypeCols
 )
 )
 
 
 func init() {
 func init() {
@@ -105,3 +106,17 @@ func IsAggFunc(funcName string) bool {
 	}
 	}
 	return false
 	return false
 }
 }
+
+func GetFuncType(funcName string) FuncType {
+	f, _ := Function(funcName)
+	if f != nil {
+		if mf, ok := f.(multiAggFunc); ok {
+			return mf.GetFuncType(funcName)
+		}
+		if f.IsAggregate() {
+			return FuncTypeAgg
+		}
+		return FuncTypeScalar
+	}
+	return FuncTypeUnknown
+}

+ 101 - 0
internal/binder/function/funcs_cols.go

@@ -0,0 +1,101 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package function
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+)
+
+type ResultCols map[string]interface{}
+
+// ColFunc Functions which will return columns directly instead of a map
+type ColFunc func(ctx api.FunctionContext, args []interface{}, keys []string) (ResultCols, error)
+
+func wrapColFunc(colFunc ColFunc) funcExe {
+	return func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+		keys, ok := args[len(args)-1].([]string)
+		if !ok {
+			return fmt.Errorf("the last arg is not the key list but got %v", args[len(args)-1]), false
+		}
+		r, err := colFunc(ctx, args[:len(args)-1], keys)
+		if err != nil {
+			return err, false
+		}
+		return r, true
+	}
+}
+
+func registerColsFunc() {
+	builtins["changed_cols"] = builtinFunc{
+		fType: FuncTypeCols,
+		exec:  wrapColFunc(changedFunc),
+		val: func(_ api.FunctionContext, args []ast.Expr) error {
+			if len(args) <= 2 {
+				return fmt.Errorf("expect more than two args but got %d", len(args))
+			}
+			if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) {
+				return ProduceErrInfo(0, "string")
+			}
+			if ast.IsNumericArg(args[1]) || ast.IsTimeArg(args[1]) || ast.IsStringArg(args[1]) {
+				return ProduceErrInfo(1, "bool")
+			}
+			return nil
+		},
+	}
+}
+
+func changedFunc(ctx api.FunctionContext, args []interface{}, keys []string) (ResultCols, error) {
+	// validation
+	if len(args) <= 2 {
+		return nil, fmt.Errorf("expect more than two args but got %d", len(args))
+	}
+	prefix, ok := args[0].(string)
+	if !ok {
+		return nil, fmt.Errorf("first arg is not a string but got %v", args[0])
+	}
+	ignoreNull, ok := args[1].(bool)
+	if !ok {
+		return nil, fmt.Errorf("second arg is not a bool but got %v", args[1])
+	}
+	if len(args) != len(keys) {
+		return nil, fmt.Errorf("the length of keys %d does not match the args %d", len(keys), len(args)-2)
+	}
+
+	var r ResultCols
+	for i := 2; i < len(args); i++ {
+		k := keys[i]
+		v := args[i]
+		if ignoreNull && v == nil {
+			continue
+		}
+		lv, err := ctx.GetState(k)
+		if err != nil {
+			return nil, err
+		}
+		if lv != v {
+			if r == nil {
+				r = make(ResultCols)
+			}
+			r[prefix+k] = v
+			err := ctx.PutState(k, v)
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+	return r, nil
+}

+ 306 - 0
internal/binder/function/funcs_cols_test.go

@@ -0,0 +1,306 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package function
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/state"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"reflect"
+	"testing"
+)
+
+func TestValidation(t *testing.T) {
+	f, ok := builtins["changed_cols"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	var tests = []struct {
+		args []ast.Expr
+		err  error
+	}{
+		{
+			args: []ast.Expr{
+				&ast.StringLiteral{Val: "foo"},
+			},
+			err: fmt.Errorf("expect more than two args but got 1"),
+		}, {
+			args: []ast.Expr{
+				&ast.StringLiteral{Val: "foo"},
+				&ast.StringLiteral{Val: "bar"},
+			},
+			err: fmt.Errorf("expect more than two args but got 2"),
+		}, {
+			args: []ast.Expr{
+				&ast.StringLiteral{Val: "foo"},
+				&ast.StringLiteral{Val: "bar"},
+				&ast.StringLiteral{Val: "baz"},
+			},
+			err: fmt.Errorf("Expect bool type for parameter 2"),
+		}, {
+			args: []ast.Expr{
+				&ast.IntegerLiteral{Val: 20},
+				&ast.BooleanLiteral{Val: true},
+				&ast.StringLiteral{Val: "baz"},
+			},
+			err: fmt.Errorf("Expect string type for parameter 1"),
+		}, {
+			args: []ast.Expr{
+				&ast.FieldRef{
+					StreamName: "demo",
+					Name:       "a",
+					AliasRef:   nil,
+				},
+				&ast.BooleanLiteral{Val: true},
+				&ast.StringLiteral{Val: "baz"},
+			},
+			err: nil,
+		},
+	}
+	for i, tt := range tests {
+		err := f.val(nil, tt.args)
+		if !reflect.DeepEqual(err, tt.err) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, err, tt.err)
+		}
+	}
+}
+
+func TestExec(t *testing.T) {
+	f, ok := builtins["changed_cols"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
+	var nilResult ResultCols
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+				"baz",
+			},
+			result: fmt.Errorf("the last arg is not the key list but got baz"),
+		}, { // 1
+			args: []interface{}{
+				"foo",
+				"bar",
+				[]string{"baz"},
+			},
+			result: fmt.Errorf("expect more than two args but got 2"),
+		}, { // 2
+			args: []interface{}{
+				"foo",
+				"bar",
+				"baz",
+				[]string{"baz"},
+			},
+			result: fmt.Errorf("second arg is not a bool but got bar"),
+		}, { // 3
+			args: []interface{}{
+				"ab_",
+				true,
+				"baz",
+				44,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: ResultCols{
+				"ab_col1": "baz",
+				"ab_col2": 44,
+			},
+		}, { // 4
+			args: []interface{}{
+				"ab_",
+				true,
+				"baz",
+				44,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: nilResult,
+		}, { // 5
+			args: []interface{}{
+				"cd_",
+				true,
+				"baz",
+				45,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: ResultCols{
+				"cd_col2": 45,
+			},
+		}, { // 6
+			args: []interface{}{
+				"ab_",
+				true,
+				"foo",
+				46,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: ResultCols{
+				"ab_col1": "foo",
+				"ab_col2": 46,
+			},
+		}, { // 7
+			args: []interface{}{
+				"ab_",
+				true,
+				"foo",
+				46,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: nilResult,
+		}, { // 8
+			args: []interface{}{
+				"ab_",
+				true,
+				"baz",
+				44,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: ResultCols{
+				"ab_col1": "baz",
+				"ab_col2": 44,
+			},
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}
+
+func TestExecIgnoreNull(t *testing.T) {
+	f, ok := builtins["changed_cols"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
+	var nilResult ResultCols
+	var tests = []struct {
+		args   []interface{}
+		result interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				"foo",
+				"bar",
+				"baz",
+			},
+			result: fmt.Errorf("the last arg is not the key list but got baz"),
+		}, { // 1
+			args: []interface{}{
+				"foo",
+				"bar",
+				[]string{"baz"},
+			},
+			result: fmt.Errorf("expect more than two args but got 2"),
+		}, { // 2
+			args: []interface{}{
+				"foo",
+				"bar",
+				"baz",
+				[]string{"baz"},
+			},
+			result: fmt.Errorf("second arg is not a bool but got bar"),
+		}, { // 3
+			args: []interface{}{
+				"ab_",
+				false,
+				"baz",
+				44,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: ResultCols{
+				"ab_col1": "baz",
+				"ab_col2": 44,
+			},
+		}, { // 4
+			args: []interface{}{
+				"ab_",
+				false,
+				nil,
+				44,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: ResultCols{
+				"ab_col1": nil,
+			},
+		}, { // 5
+			args: []interface{}{
+				"cd_",
+				false,
+				"baz",
+				45,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: ResultCols{
+				"cd_col1": "baz",
+				"cd_col2": 45,
+			},
+		}, { // 6
+			args: []interface{}{
+				"ab_",
+				true,
+				"foo",
+				46,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: ResultCols{
+				"ab_col1": "foo",
+				"ab_col2": 46,
+			},
+		}, { // 7
+			args: []interface{}{
+				"ab_",
+				true,
+				"foo",
+				46,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: nilResult,
+		}, { // 8
+			args: []interface{}{
+				"ab_",
+				true,
+				"baz",
+				44,
+				[]string{"a", "b", "col1", "col2"},
+			},
+			result: ResultCols{
+				"ab_col1": "baz",
+				"ab_col2": 44,
+			},
+		},
+	}
+	for i, tt := range tests {
+		result, _ := f.exec(fctx, tt.args)
+		if !reflect.DeepEqual(result, tt.result) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, result, tt.result)
+		}
+	}
+}

+ 1 - 0
internal/binder/function/function.go

@@ -38,6 +38,7 @@ func init() {
 	registerMathFunc()
 	registerMathFunc()
 	registerStrFunc()
 	registerStrFunc()
 	registerMiscFunc()
 	registerMiscFunc()
+	registerColsFunc()
 }
 }
 
 
 //var funcWithAsteriskSupportMap = map[string]string{
 //var funcWithAsteriskSupportMap = map[string]string{

+ 1 - 1
internal/binder/function/validator_funcs.go

@@ -23,7 +23,7 @@ import (
 // ProduceErrInfo Index is starting from 0
 // ProduceErrInfo Index is starting from 0
 func ProduceErrInfo(index int, expect string) (err error) {
 func ProduceErrInfo(index int, expect string) (err error) {
 	index++
 	index++
-	err = fmt.Errorf("Expect %s type for %d parameter", expect, index)
+	err = fmt.Errorf("Expect %s type for parameter %d", expect, index)
 	return
 	return
 }
 }
 
 

+ 102 - 0
internal/topo/operator/cols_func_test.go

@@ -0,0 +1,102 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package operator
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/state"
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"reflect"
+	"strings"
+	"testing"
+)
+
+func TestChangedColsFunc_Apply1(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   []interface{}
+		result [][]map[string]interface{}
+	}{
+		{
+			sql: `SELECT changed_cols("", true, a, b, c) FROM test`,
+			data: []interface{}{
+				&xsql.Tuple{
+					Emitter: "test",
+					Message: xsql.Message{
+						"a": "a1",
+						"b": "b1",
+						"c": "c1",
+					},
+				},
+				&xsql.Tuple{
+					Emitter: "test",
+					Message: xsql.Message{
+						"a": "a1",
+						"b": "b2",
+						"c": "c1",
+					},
+				},
+				&xsql.Tuple{
+					Emitter: "test",
+					Message: xsql.Message{
+						"a": "a1",
+						"c": "c1",
+					},
+				},
+				&xsql.Tuple{
+					Emitter: "test",
+					Message: xsql.Message{
+						"a": "a1",
+						"b": "b2",
+						"c": "c2",
+					},
+				},
+			},
+			result: [][]map[string]interface{}{{{
+				"a": "a1",
+				"b": "b1",
+				"c": "c1",
+			}}, {{
+				"b": "b2",
+			}}, {{}}, {{
+				"c": "c2",
+			}}},
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta("mockRule0", "project", tempStore)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil || stmt == nil {
+			t.Errorf("parse sql %s error %v", tt.sql, err)
+		}
+		pp := &ProjectOp{Fields: stmt.Fields}
+		fv, afv := xsql.NewFunctionValuersForOp(ctx)
+		r := make([][]map[string]interface{}, 0, len(tt.data))
+		for _, d := range tt.data {
+			result := pp.Apply(ctx, d, fv, afv)
+			r = append(r, result.([]map[string]interface{}))
+		}
+		if !reflect.DeepEqual(tt.result, r) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
+		}
+	}
+}

+ 17 - 7
internal/topo/operator/project_operator.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@ package operator
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/binder/function"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -116,12 +117,12 @@ func (pp *ProjectOp) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xs
 func project(fs ast.Fields, ve *xsql.ValuerEval) (map[string]interface{}, error) {
 func project(fs ast.Fields, ve *xsql.ValuerEval) (map[string]interface{}, error) {
 	result := make(map[string]interface{}, len(fs))
 	result := make(map[string]interface{}, len(fs))
 	for _, f := range fs {
 	for _, f := range fs {
-		v := ve.Eval(f.Expr)
-		if e, ok := v.(error); ok {
+		vi := ve.Eval(f.Expr)
+		if e, ok := vi.(error); ok {
 			return nil, e
 			return nil, e
 		}
 		}
 		if _, ok := f.Expr.(*ast.Wildcard); ok || f.Name == "*" {
 		if _, ok := f.Expr.(*ast.Wildcard); ok || f.Name == "*" {
-			switch val := v.(type) {
+			switch val := vi.(type) {
 			case map[string]interface{}:
 			case map[string]interface{}:
 				for k, v := range val {
 				for k, v := range val {
 					if _, ok := result[k]; !ok {
 					if _, ok := result[k]; !ok {
@@ -138,9 +139,18 @@ func project(fs ast.Fields, ve *xsql.ValuerEval) (map[string]interface{}, error)
 				return nil, fmt.Errorf("wildcarder does not return map")
 				return nil, fmt.Errorf("wildcarder does not return map")
 			}
 			}
 		} else {
 		} else {
-			if v != nil {
-				n := assignName(f.Name, f.AName)
-				result[n] = v
+			if vi != nil {
+				switch vt := vi.(type) {
+				case function.ResultCols:
+					for k, v := range vt {
+						if _, ok := result[k]; !ok {
+							result[k] = v
+						}
+					}
+				default:
+					n := assignName(f.Name, f.AName)
+					result[n] = vt
+				}
 			}
 			}
 		}
 		}
 	}
 	}

+ 1 - 1
internal/topo/planner/planner.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.

+ 39 - 1
internal/topo/topotest/rule_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -504,6 +504,44 @@ func TestSingleSQL(t *testing.T) {
 				"source_table1_0_records_in_total":  int64(4),
 				"source_table1_0_records_in_total":  int64(4),
 				"source_table1_0_records_out_total": int64(1),
 				"source_table1_0_records_out_total": int64(1),
 			},
 			},
+		}, {
+			Name: `TestChanged13`,
+			Sql:  "SELECT changed_cols(\"tt_\", true, color, size) FROM demo",
+			R: [][]map[string]interface{}{
+				{{
+					"tt_color": "red",
+					"tt_size":  float64(3),
+				}},
+				{{
+					"tt_color": "blue",
+					"tt_size":  float64(6),
+				}},
+				{{
+					"tt_size": float64(2),
+				}},
+				{{
+					"tt_color": "yellow",
+					"tt_size":  float64(4),
+				}},
+				{{
+					"tt_color": "red",
+					"tt_size":  float64(1),
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(5),
+				"op_2_project_0_records_out_total":  int64(5),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(5),
+				"sink_mockSink_0_records_out_total": int64(5),
+
+				"source_demo_0_exceptions_total":  int64(0),
+				"source_demo_0_records_in_total":  int64(5),
+				"source_demo_0_records_out_total": int64(5),
+			},
 		},
 		},
 	}
 	}
 	HandleStream(true, streamList, t)
 	HandleStream(true, streamList, t)

+ 1 - 1
internal/xsql/collections.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.

+ 36 - 36
internal/xsql/funcs_validator_test.go

@@ -60,19 +60,19 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT abs(true) FROM tbl`,
 			s:    `SELECT abs(true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT abs("test") FROM tbl`,
 			s:    `SELECT abs("test") FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT abs(ss) FROM tbl`,
 			s:    `SELECT abs(ss) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 
 
 		///
 		///
@@ -93,19 +93,19 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT sin(true) FROM tbl`,
 			s:    `SELECT sin(true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT sin("test") FROM tbl`,
 			s:    `SELECT sin("test") FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT sin(ss) FROM tbl`,
 			s:    `SELECT sin(ss) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 		///
 		///
 		{
 		{
@@ -125,19 +125,19 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT tanh(true) FROM tbl`,
 			s:    `SELECT tanh(true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT tanh("test") FROM tbl`,
 			s:    `SELECT tanh("test") FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT tanh(ss) FROM tbl`,
 			s:    `SELECT tanh(ss) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 
 
 		///
 		///
@@ -151,25 +151,25 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT bitxor(1.1, 2) FROM tbl`,
 			s:    `SELECT bitxor(1.1, 2) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 1 parameter",
+			err:  "Expect int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT bitxor(true, 2) FROM tbl`,
 			s:    `SELECT bitxor(true, 2) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 1 parameter",
+			err:  "Expect int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT bitxor(1, ss) FROM tbl`,
 			s:    `SELECT bitxor(1, ss) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 2 parameter",
+			err:  "Expect int type for parameter 2",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT bitxor(1, 2.2) FROM tbl`,
 			s:    `SELECT bitxor(1, 2.2) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 2 parameter",
+			err:  "Expect int type for parameter 2",
 		},
 		},
 
 
 		///
 		///
@@ -183,13 +183,13 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT bitnot(1.1) FROM tbl`,
 			s:    `SELECT bitnot(1.1) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 1 parameter",
+			err:  "Expect int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT bitnot(true) FROM tbl`,
 			s:    `SELECT bitnot(true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 1 parameter",
+			err:  "Expect int type for parameter 1",
 		},
 		},
 
 
 		///
 		///
@@ -203,19 +203,19 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT mod("1.1", 2) FROM tbl`,
 			s:    `SELECT mod("1.1", 2) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 1 parameter",
+			err:  "Expect number - float or int type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT mod(1.1, true) FROM tbl`,
 			s:    `SELECT mod(1.1, true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 2 parameter",
+			err:  "Expect number - float or int type for parameter 2",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT mod(1, ss) FROM tbl`,
 			s:    `SELECT mod(1, ss) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect number - float or int type for 2 parameter",
+			err:  "Expect number - float or int type for parameter 2",
 		},
 		},
 
 
 		///
 		///
@@ -229,19 +229,19 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT concat("1.1", 2) FROM tbl`,
 			s:    `SELECT concat("1.1", 2) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 2 parameter",
+			err:  "Expect string type for parameter 2",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT concat("1.1", true) FROM tbl`,
 			s:    `SELECT concat("1.1", true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 2 parameter",
+			err:  "Expect string type for parameter 2",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT concat("1", ss) FROM tbl`,
 			s:    `SELECT concat("1", ss) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 2 parameter",
+			err:  "Expect string type for parameter 2",
 		},
 		},
 
 
 		///
 		///
@@ -255,13 +255,13 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT regexp_matches(1, "true") FROM tbl`,
 			s:    `SELECT regexp_matches(1, "true") FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 1 parameter",
+			err:  "Expect string type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT regexp_matches("1.1", 2) FROM tbl`,
 			s:    `SELECT regexp_matches("1.1", 2) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 2 parameter",
+			err:  "Expect string type for parameter 2",
 		},
 		},
 
 
 		///
 		///
@@ -275,7 +275,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT regexp_replace(field1, "true", true) FROM tbl`,
 			s:    `SELECT regexp_replace(field1, "true", true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 3 parameter",
+			err:  "Expect string type for parameter 3",
 		},
 		},
 
 
 		///
 		///
@@ -289,7 +289,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT trim(1) FROM tbl`,
 			s:    `SELECT trim(1) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 1 parameter",
+			err:  "Expect string type for parameter 1",
 		},
 		},
 
 
 		///
 		///
@@ -303,7 +303,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT rpad("ff", true) FROM tbl`,
 			s:    `SELECT rpad("ff", true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 2 parameter",
+			err:  "Expect int type for parameter 2",
 		},
 		},
 
 
 		///
 		///
@@ -329,7 +329,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT substring(field, 0, true) FROM tbl`,
 			s:    `SELECT substring(field, 0, true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 3 parameter",
+			err:  "Expect int type for parameter 3",
 		},
 		},
 
 
 		///
 		///
@@ -357,7 +357,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT chr(true) FROM tbl`,
 			s:    `SELECT chr(true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 1 parameter",
+			err:  "Expect int type for parameter 1",
 		},
 		},
 
 
 		///
 		///
@@ -371,7 +371,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT encode(field, true) FROM tbl`,
 			s:    `SELECT encode(field, true) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 2 parameter",
+			err:  "Expect string type for parameter 2",
 		},
 		},
 
 
 		///
 		///
@@ -385,7 +385,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT trunc(5, ss) FROM tbl`,
 			s:    `SELECT trunc(5, ss) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect int type for 2 parameter",
+			err:  "Expect int type for parameter 2",
 		},
 		},
 
 
 		///
 		///
@@ -399,13 +399,13 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT sha512(20) FROM tbl`,
 			s:    `SELECT sha512(20) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 1 parameter",
+			err:  "Expect string type for parameter 1",
 		},
 		},
 
 
 		{
 		{
 			s:    `SELECT mqtt("topic") FROM tbl`,
 			s:    `SELECT mqtt("topic") FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect meta reference type for 1 parameter",
+			err:  "Expect meta reference type for parameter 1",
 		},
 		},
 
 
 		{
 		{
@@ -423,7 +423,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT split_value(topic1, 3, 1) FROM tbl`,
 			s:    `SELECT split_value(topic1, 3, 1) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 2 parameter",
+			err:  "Expect string type for parameter 2",
 		},
 		},
 		{
 		{
 			s:    `SELECT split_value(topic1, "hello", -1) FROM tbl`,
 			s:    `SELECT split_value(topic1, "hello", -1) FROM tbl`,
@@ -438,7 +438,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT meta("src1.device") FROM tbl`,
 			s:    `SELECT meta("src1.device") FROM tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect meta reference type for 1 parameter",
+			err:  "Expect meta reference type for parameter 1",
 		},
 		},
 		{
 		{
 			s:    `SELECT meta(device) FROM tbl`,
 			s:    `SELECT meta(device) FROM tbl`,
@@ -464,7 +464,7 @@ func TestFuncValidator(t *testing.T) {
 			s: `SELECT json_path_query(data, 44) AS data
 			s: `SELECT json_path_query(data, 44) AS data
     FROM characters;`,
     FROM characters;`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect string type for 2 parameter",
+			err:  "Expect string type for parameter 2",
 		},
 		},
 		{
 		{
 			s:    `SELECT collect() from tbl`,
 			s:    `SELECT collect() from tbl`,
@@ -479,7 +479,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 		{
 			s:    `SELECT deduplicate(temp, "string") from tbl`,
 			s:    `SELECT deduplicate(temp, "string") from tbl`,
 			stmt: nil,
 			stmt: nil,
-			err:  "Expect bool type for 2 parameter",
+			err:  "Expect bool type for parameter 2",
 		},
 		},
 	}
 	}
 
 

+ 31 - 14
internal/xsql/parser.go

@@ -38,6 +38,7 @@ type Parser struct {
 	}
 	}
 	inmeta bool
 	inmeta bool
 	f      int // anonymous field index number
 	f      int // anonymous field index number
+	clause string
 }
 }
 
 
 func (p *Parser) parseCondition() (ast.Expr, error) {
 func (p *Parser) parseCondition() (ast.Expr, error) {
@@ -128,25 +129,25 @@ func (p *Parser) Parse() (*ast.SelectStatement, error) {
 	} else if tok != ast.SELECT {
 	} else if tok != ast.SELECT {
 		return nil, fmt.Errorf("Found %q, Expected SELECT.\n", lit)
 		return nil, fmt.Errorf("Found %q, Expected SELECT.\n", lit)
 	}
 	}
-
+	p.clause = "select"
 	if fields, err := p.parseFields(); err != nil {
 	if fields, err := p.parseFields(); err != nil {
 		return nil, err
 		return nil, err
 	} else {
 	} else {
 		selects.Fields = fields
 		selects.Fields = fields
 	}
 	}
-
+	p.clause = "from"
 	if src, err := p.parseSource(); err != nil {
 	if src, err := p.parseSource(); err != nil {
 		return nil, err
 		return nil, err
 	} else {
 	} else {
 		selects.Sources = src
 		selects.Sources = src
 	}
 	}
-
+	p.clause = "join"
 	if joins, err := p.parseJoins(); err != nil {
 	if joins, err := p.parseJoins(); err != nil {
 		return nil, err
 		return nil, err
 	} else {
 	} else {
 		selects.Joins = joins
 		selects.Joins = joins
 	}
 	}
-
+	p.clause = "where"
 	if exp, err := p.parseCondition(); err != nil {
 	if exp, err := p.parseCondition(); err != nil {
 		return nil, err
 		return nil, err
 	} else {
 	} else {
@@ -154,25 +155,25 @@ func (p *Parser) Parse() (*ast.SelectStatement, error) {
 			selects.Condition = exp
 			selects.Condition = exp
 		}
 		}
 	}
 	}
-
+	p.clause = "groupby"
 	if dims, err := p.parseDimensions(); err != nil {
 	if dims, err := p.parseDimensions(); err != nil {
 		return nil, err
 		return nil, err
 	} else {
 	} else {
 		selects.Dimensions = dims
 		selects.Dimensions = dims
 	}
 	}
-
+	p.clause = "having"
 	if having, err := p.parseHaving(); err != nil {
 	if having, err := p.parseHaving(); err != nil {
 		return nil, err
 		return nil, err
 	} else {
 	} else {
 		selects.Having = having
 		selects.Having = having
 	}
 	}
-
+	p.clause = "orderby"
 	if sorts, err := p.parseSorts(); err != nil {
 	if sorts, err := p.parseSorts(); err != nil {
 		return nil, err
 		return nil, err
 	} else {
 	} else {
 		selects.SortFields = sorts
 		selects.SortFields = sorts
 	}
 	}
-
+	p.clause = ""
 	if tok, lit := p.scanIgnoreWhitespace(); tok == ast.SEMICOLON {
 	if tok, lit := p.scanIgnoreWhitespace(); tok == ast.SEMICOLON {
 		p.unscan()
 		p.unscan()
 		return selects, nil
 		return selects, nil
@@ -432,11 +433,7 @@ func (p *Parser) parseField() (*ast.Field, error) {
 	if exp, err := p.ParseExpr(); err != nil {
 	if exp, err := p.ParseExpr(); err != nil {
 		return nil, err
 		return nil, err
 	} else {
 	} else {
-		if e, ok := exp.(*ast.FieldRef); ok {
-			field.Name = e.Name
-		} else if e, ok := exp.(*ast.Call); ok {
-			field.Name = e.Name
-		}
+		field.Name = nameExpr(exp)
 		field.Expr = exp
 		field.Expr = exp
 	}
 	}
 
 
@@ -455,6 +452,17 @@ func (p *Parser) parseField() (*ast.Field, error) {
 	return field, nil
 	return field, nil
 }
 }
 
 
+func nameExpr(exp ast.Expr) string {
+	switch e := exp.(type) {
+	case *ast.FieldRef:
+		return e.Name
+	case *ast.Call:
+		return e.Name
+	default:
+		return ""
+	}
+}
+
 func (p *Parser) parseAlias() (string, error) {
 func (p *Parser) parseAlias() (string, error) {
 	tok, lit := p.scanIgnoreWhitespace()
 	tok, lit := p.scanIgnoreWhitespace()
 	if tok == ast.AS {
 	if tok == ast.AS {
@@ -684,6 +692,10 @@ func (p *Parser) parseCall(n string) (ast.Expr, error) {
 			p.inmeta = false
 			p.inmeta = false
 		}()
 		}()
 	}
 	}
+	ft := function.GetFuncType(name)
+	if ft == function.FuncTypeCols && p.clause != "select" {
+		return nil, fmt.Errorf("function %s can only be used inside the select clause", n)
+	}
 	var args []ast.Expr
 	var args []ast.Expr
 	for {
 	for {
 		if tok, _ := p.scanIgnoreWhitespace(); tok == ast.RPAREN {
 		if tok, _ := p.scanIgnoreWhitespace(); tok == ast.RPAREN {
@@ -709,7 +721,12 @@ func (p *Parser) parseCall(n string) (ast.Expr, error) {
 		if exp, err := p.ParseExpr(); err != nil {
 		if exp, err := p.ParseExpr(); err != nil {
 			return nil, err
 			return nil, err
 		} else {
 		} else {
-			args = append(args, exp)
+			if ft == function.FuncTypeCols {
+				field := &ast.ColFuncField{Expr: exp, Name: nameExpr(exp)}
+				args = append(args, field)
+			} else {
+				args = append(args, exp)
+			}
 		}
 		}
 
 
 		if tok, _ := p.scanIgnoreWhitespace(); tok != ast.COMMA {
 		if tok, _ := p.scanIgnoreWhitespace(); tok != ast.COMMA {

+ 39 - 0
internal/xsql/parser_test.go

@@ -1435,6 +1435,45 @@ func TestParser_ParseStatement(t *testing.T) {
 				},
 				},
 				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
 				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
 			},
 			},
+		}, {
+			s: `SELECT changed_cols("",true,a,b,c) FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						AName: "",
+						Name:  "changed_cols",
+						Expr: &ast.Call{
+							Name: "changed_cols",
+							Args: []ast.Expr{
+								&ast.ColFuncField{
+									Name: "",
+									Expr: &ast.StringLiteral{Val: ""},
+								},
+								&ast.ColFuncField{
+									Name: "",
+									Expr: &ast.BooleanLiteral{Val: true},
+								},
+								&ast.ColFuncField{Name: "a", Expr: &ast.FieldRef{
+									StreamName: ast.DefaultStream,
+									Name:       "a",
+								}},
+								&ast.ColFuncField{Name: "b", Expr: &ast.FieldRef{
+									StreamName: ast.DefaultStream,
+									Name:       "b",
+								}},
+								&ast.ColFuncField{Name: "c", Expr: &ast.FieldRef{
+									StreamName: ast.DefaultStream,
+									Name:       "c",
+								}},
+							},
+						},
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		}, {
+			s:   `SELECT a FROM tbl WHERE changed_cols("",true,a,b,c) > 3`,
+			err: "function changed_cols can only be used inside the select clause",
 		},
 		},
 		{
 		{
 			s:   `SELECT ".*(/)(?!.*\1)" FROM topic/sensor1 AS t1`,
 			s:   `SELECT ".*(/)(?!.*\1)" FROM topic/sensor1 AS t1`,

+ 39 - 7
internal/xsql/valuer.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -407,19 +407,51 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 			}
 			}
 		} else {
 		} else {
 			if valuer, ok := v.Valuer.(CallValuer); ok {
 			if valuer, ok := v.Valuer.(CallValuer); ok {
-				var args []interface{}
-				if len(expr.Args) > 0 {
+				var (
 					args = make([]interface{}, len(expr.Args))
 					args = make([]interface{}, len(expr.Args))
-					for i, arg := range expr.Args {
-						if aggreValuer, ok := valuer.(AggregateCallValuer); function.IsAggFunc(expr.Name) && ok {
-							args[i] = aggreValuer.GetAllTuples().AggregateEval(arg, aggreValuer.GetSingleCallValuer())
-						} else {
+					ft   = function.GetFuncType(expr.Name)
+				)
+				if len(expr.Args) > 0 {
+					switch ft {
+					case function.FuncTypeAgg:
+						for i, arg := range expr.Args {
+							if aggreValuer, ok := valuer.(AggregateCallValuer); ok {
+								args[i] = aggreValuer.GetAllTuples().AggregateEval(arg, aggreValuer.GetSingleCallValuer())
+							} else {
+								args[i] = v.Eval(arg)
+								if _, ok := args[i].(error); ok {
+									return args[i]
+								}
+							}
+						}
+					case function.FuncTypeScalar:
+						for i, arg := range expr.Args {
 							args[i] = v.Eval(arg)
 							args[i] = v.Eval(arg)
 							if _, ok := args[i].(error); ok {
 							if _, ok := args[i].(error); ok {
 								return args[i]
 								return args[i]
 							}
 							}
 						}
 						}
+					case function.FuncTypeCols:
+						keys := make([]string, len(expr.Args))
+						for i, arg := range expr.Args { // In the parser, the col func arguments must be ColField
+							cf, ok := arg.(*ast.ColFuncField)
+							if !ok {
+								// won't happen
+								return fmt.Errorf("expect colFuncField but got %v", arg)
+							}
+							temp := v.Eval(cf.Expr)
+							if _, ok := temp.(error); ok {
+								return temp
+							}
+							args[i] = temp
+							keys[i] = cf.Name
+						}
+						args = append(args, keys)
+					default:
+						// won't happen
+						return fmt.Errorf("unknown function type")
 					}
 					}
+
 				}
 				}
 				val, _ := valuer.Call(expr.Name, args)
 				val, _ := valuer.Call(expr.Name, args)
 				return val
 				return val

+ 8 - 0
pkg/ast/expr.go

@@ -269,3 +269,11 @@ type JsonFieldRef struct {
 
 
 func (fr *JsonFieldRef) expr() {}
 func (fr *JsonFieldRef) expr() {}
 func (fr *JsonFieldRef) node() {}
 func (fr *JsonFieldRef) node() {}
+
+type ColFuncField struct {
+	Name string
+	Expr Expr
+}
+
+func (fr *ColFuncField) expr() {}
+func (fr *ColFuncField) node() {}

+ 4 - 1
pkg/ast/visitor.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -110,6 +110,9 @@ func Walk(v Visitor, node Node) {
 
 
 	case *IndexExpr:
 	case *IndexExpr:
 		Walk(v, n.Index)
 		Walk(v, n.Index)
+
+	case *ColFuncField:
+		Walk(v, n.Expr)
 	}
 	}
 }
 }