瀏覽代碼

feat(state): support function state

Make API changes to function extension
ngjaying 4 年之前
父節點
當前提交
0d3c5a14a0

+ 1 - 0
.github/workflows/run_test_case.yaml

@@ -32,6 +32,7 @@ jobs:
             go build --buildmode=plugin -o plugins/sinks/File@v1.0.0.so plugins/sinks/file.go
             go build --buildmode=plugin -o plugins/sinks/File@v1.0.0.so plugins/sinks/file.go
             go build --buildmode=plugin -o plugins/functions/Echo.so plugins/functions/echo.go
             go build --buildmode=plugin -o plugins/functions/Echo.so plugins/functions/echo.go
             go build --buildmode=plugin -o plugins/functions/CountPlusOne@v1.0.0.so plugins/functions/countPlusOne.go
             go build --buildmode=plugin -o plugins/functions/CountPlusOne@v1.0.0.so plugins/functions/countPlusOne.go
+            go build --buildmode=plugin -o plugins/functions/AccumulateWordCount@v1.0.0.so plugins/functions/accumulateWordCount.go
             go test ./...
             go test ./...
             go test --tags=edgex ./...
             go test --tags=edgex ./...
     
     

+ 24 - 0
docs/en_US/extension/overview.md

@@ -52,3 +52,27 @@ Please read below for how to realize the different extensions.
 - [Sink/Action extension](sink.md)
 - [Sink/Action extension](sink.md)
 - [Function extension](function.md)
 - [Function extension](function.md)
 
 
+### State Storage
+
+Kuiper extensions export a key value state storage interface for Source/Sink/Function through the context.
+
+States are key-value pairs, where the key is a string and the value is arbitrary data. Keys are scoped to an individual extension.
+
+You can access states within extensions using the putState, getState, incrCounter, getCounter and deleteState calls on the context object.
+
+Below is an example of a function extension to access states. It will record the accumulate word count across a range of function calls.
+
+```go
+func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
+    logger := ctx.GetLogger()    
+	err := ctx.IncrCounter("allwordcount", len(strings.Split(args[0], args[1])))
+	if err != nil {
+		return err, false
+	}
+	if c, err := ctx.GetCounter("allwordcount"); err != nil   {
+		return err, false
+	} else {
+		return c, true
+	}
+}
+```

+ 24 - 0
docs/zh_CN/extension/overview.md

@@ -12,3 +12,27 @@ Kuiper允许用户自定义不同类型的扩展。
 - [Sink/Action 扩展](#)
 - [Sink/Action 扩展](#)
 - [函数扩展](#)
 - [函数扩展](#)
 
 
+### 状态存储
+
+Kuiper扩展通过context参数暴露了一个基于键值对的状态存储接口,可用于所有类型的扩展,包括Source,Sink和Function扩展.
+
+状态为键值对,其中键为string类型而值为任意数据。键的作用域仅为当前扩展的实例。
+
+用户可通过context对象访问状态存储。状态相关方法包括putState, getState, incrCounter, getCounter and deleteState。
+
+以下代码为函数扩展访问状态的实例。该函数将计算传入的单词数,并将累积数目保存在状态中。
+
+```go
+func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
+    logger := ctx.GetLogger()    
+	err := ctx.IncrCounter("allwordcount", len(strings.Split(args[0], args[1])))
+	if err != nil {
+		return err, false
+	}
+	if c, err := ctx.GetCounter("allwordcount"); err != nil   {
+		return err, false
+	} else {
+		return c, true
+	}
+}
+```

+ 67 - 0
plugins/functions/accumulateWordCount.go

@@ -0,0 +1,67 @@
+package main
+
+import (
+	"fmt"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/api"
+	"strings"
+)
+
+/**
+ **	A function which will count how many words had been received from the beginning
+ ** to demonstrate how to use states
+ ** There are 2 arguments:
+ **  0: column, the column to be calculated. The column value type must be string
+ **  1: separator, a string literal for word separator
+ **/
+
+type accumulateWordCountFunc struct {
+}
+
+func (f *accumulateWordCountFunc) Validate(args []interface{}) error {
+	if len(args) != 2 {
+		return fmt.Errorf("wordCount function only supports 2 parameter but got %d", len(args))
+	}
+	if arg1, ok := args[1].(xsql.Expr); ok {
+		if _, ok := arg1.(*xsql.StringLiteral); !ok {
+			return fmt.Errorf("the second parameter of wordCount function must be a string literal")
+		}
+	}
+	return nil
+}
+
+func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
+	logger := ctx.GetLogger()
+	fmt.Printf("Exec accumulate")
+	col, ok := args[0].(string)
+	if !ok {
+		logger.Debugf("Exec accumulateWordCountFunc with arg0 %s", col)
+		return fmt.Errorf("args[0] is not a string, got %v", args[0]), false
+	}
+
+	sep, ok := args[1].(string)
+	if !ok {
+		logger.Debugf("Exec accumulateWordCountFunc with arg1 %s", sep)
+		return fmt.Errorf("args[1] is not a string, got %v", args[0]), false
+	}
+
+	err := ctx.IncrCounter("allwordcount", len(strings.Split(col, sep)))
+	if err != nil {
+		logger.Debugf("call accumulateWordCountFunc incrCounter error %s", err)
+		return err, false
+	}
+	if c, err := ctx.GetCounter("allwordcount"); err != nil {
+		logger.Debugf("call accumulateWordCountFunc getCounter error %s", err)
+		return err, false
+	} else {
+		return c, true
+	}
+}
+
+func (f *accumulateWordCountFunc) IsAggregate() bool {
+	return false
+}
+
+func AccumulateWordCount() api.Function {
+	return &accumulateWordCountFunc{}
+}

+ 5 - 2
plugins/functions/countPlusOne.go

@@ -1,6 +1,9 @@
 package main
 package main
 
 
-import "fmt"
+import (
+	"fmt"
+	"github.com/emqx/kuiper/xstream/api"
+)
 
 
 type countPlusOneFunc struct {
 type countPlusOneFunc struct {
 }
 }
@@ -12,7 +15,7 @@ func (f *countPlusOneFunc) Validate(args []interface{}) error {
 	return nil
 	return nil
 }
 }
 
 
-func (f *countPlusOneFunc) Exec(args []interface{}) (interface{}, bool) {
+func (f *countPlusOneFunc) Exec(args []interface{}, _ api.FunctionContext) (interface{}, bool) {
 	arg, ok := args[0].([]interface{})
 	arg, ok := args[0].([]interface{})
 	if !ok {
 	if !ok {
 		return fmt.Errorf("arg is not a slice, got %v", args[0]), false
 		return fmt.Errorf("arg is not a slice, got %v", args[0]), false

+ 2 - 1
plugins/functions/echo.go

@@ -2,6 +2,7 @@ package main
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/emqx/kuiper/xstream/api"
 )
 )
 
 
 type echo struct {
 type echo struct {
@@ -14,7 +15,7 @@ func (f *echo) Validate(args []interface{}) error {
 	return nil
 	return nil
 }
 }
 
 
-func (f *echo) Exec(args []interface{}) (interface{}, bool) {
+func (f *echo) Exec(args []interface{}, _ api.FunctionContext) (interface{}, bool) {
 	result := args[0]
 	result := args[0]
 	return result, true
 	return result, true
 }
 }

+ 1 - 1
plugins/manager_test.go

@@ -111,7 +111,7 @@ func TestManager_List(t *testing.T) {
 			r: []string{"file", "file2"},
 			r: []string{"file", "file2"},
 		}, {
 		}, {
 			t: FUNCTION,
 			t: FUNCTION,
-			r: []string{"countPlusOne", "echo", "echo2"},
+			r: []string{"accumulateWordCount", "countPlusOne", "echo", "echo2"},
 		},
 		},
 	}
 	}
 	manager, err := NewPluginManager()
 	manager, err := NewPluginManager()

+ 16 - 22
xsql/funcs_aggregate.go

@@ -3,22 +3,27 @@ package xsql
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
 	"strings"
 	"strings"
 )
 )
 
 
 type AggregateFunctionValuer struct {
 type AggregateFunctionValuer struct {
-	data    AggregateData
-	fv      *FunctionValuer
-	plugins map[string]api.Function
+	data        AggregateData
+	fv          *FunctionValuer
+	funcPlugins *funcPlugins
+}
+
+func NewFunctionValuersForOp(ctx api.StreamContext) (*FunctionValuer, *AggregateFunctionValuer) {
+	p := NewFuncPlugins(ctx)
+	return NewAggregateFunctionValuers(p)
 }
 }
 
 
 //Should only be called by stream to make sure a single instance for an operation
 //Should only be called by stream to make sure a single instance for an operation
-func NewAggregateFunctionValuers() (*FunctionValuer, *AggregateFunctionValuer) {
-	fv := &FunctionValuer{}
+func NewAggregateFunctionValuers(p *funcPlugins) (*FunctionValuer, *AggregateFunctionValuer) {
+	fv := NewFunctionValuer(p)
 	return fv, &AggregateFunctionValuer{
 	return fv, &AggregateFunctionValuer{
-		fv: fv,
+		fv:          fv,
+		funcPlugins: p,
 	}
 	}
 }
 }
 
 
@@ -167,25 +172,14 @@ func (v *AggregateFunctionValuer) Call(name string, args []interface{}) (interfa
 		return 0, true
 		return 0, true
 	default:
 	default:
 		common.Log.Debugf("run aggregate func %s", name)
 		common.Log.Debugf("run aggregate func %s", name)
-		if v.plugins == nil {
-			v.plugins = make(map[string]api.Function)
-		}
-		var (
-			nf  api.Function
-			ok  bool
-			err error
-		)
-		if nf, ok = v.plugins[name]; !ok {
-			nf, err = plugins.GetFunction(name)
-			if err != nil {
-				return err, false
-			}
-			v.plugins[name] = nf
+		nf, fctx, err := v.funcPlugins.GetFuncFromPlugin(name)
+		if err != nil {
+			return err, false
 		}
 		}
 		if !nf.IsAggregate() {
 		if !nf.IsAggregate() {
 			return nil, false
 			return nil, false
 		}
 		}
-		result, ok := nf.Exec(args)
+		result, ok := nf.Exec(args, fctx)
 		common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
 		common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
 		return result, ok
 		return result, ok
 	}
 	}

+ 45 - 0
xsql/function_plugin.go

@@ -0,0 +1,45 @@
+package xsql
+
+import (
+	"github.com/emqx/kuiper/plugins"
+	"github.com/emqx/kuiper/xstream/api"
+	"github.com/emqx/kuiper/xstream/contexts"
+)
+
+//Manage the function plugin instances
+//Each operator has a single instance of this
+type funcPlugins struct {
+	plugins   map[string]*funcReg
+	parentCtx api.StreamContext
+}
+
+type funcReg struct {
+	ins api.Function
+	ctx api.FunctionContext
+}
+
+func NewFuncPlugins(ctx api.StreamContext) *funcPlugins {
+	return &funcPlugins{
+		parentCtx: ctx,
+	}
+}
+
+func (fp *funcPlugins) GetFuncFromPlugin(name string) (api.Function, api.FunctionContext, error) {
+	if fp.plugins == nil {
+		fp.plugins = make(map[string]*funcReg)
+	}
+	if reg, ok := fp.plugins[name]; !ok {
+		nf, err := plugins.GetFunction(name)
+		if err != nil {
+			return nil, nil, err
+		}
+		fctx := contexts.NewDefaultFuncContext(fp.parentCtx, len(fp.plugins))
+		fp.plugins[name] = &funcReg{
+			ins: nf,
+			ctx: fctx,
+		}
+		return nf, fctx, nil
+	} else {
+		return reg.ins, reg.ctx, nil
+	}
+}

+ 19 - 23
xsql/functions.go

@@ -1,21 +1,27 @@
 package xsql
 package xsql
 
 
 import (
 import (
-	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/plugins"
-	"github.com/emqx/kuiper/xstream/api"
 	"strings"
 	"strings"
 )
 )
 
 
+// ONLY use NewFunctionValuer function to initialize
 type FunctionValuer struct {
 type FunctionValuer struct {
-	plugins map[string]api.Function
+	funcPlugins *funcPlugins
 }
 }
 
 
-func (*FunctionValuer) Value(key string) (interface{}, bool) {
+//Should only be called by stream to make sure a single instance for an operation
+func NewFunctionValuer(p *funcPlugins) *FunctionValuer {
+	fv := &FunctionValuer{
+		funcPlugins: p,
+	}
+	return fv
+}
+
+func (*FunctionValuer) Value(_ string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
 
 
-func (*FunctionValuer) Meta(key string) (interface{}, bool) {
+func (*FunctionValuer) Meta(_ string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
 
 
@@ -83,27 +89,17 @@ func (fv *FunctionValuer) Call(name string, args []interface{}) (interface{}, bo
 	} else if _, ok := aggFuncMap[lowerName]; ok {
 	} else if _, ok := aggFuncMap[lowerName]; ok {
 		return nil, false
 		return nil, false
 	} else {
 	} else {
-		common.Log.Debugf("run func %s", name)
-		if fv.plugins == nil {
-			fv.plugins = make(map[string]api.Function)
-		}
-		var (
-			nf  api.Function
-			ok  bool
-			err error
-		)
-		if nf, ok = fv.plugins[name]; !ok {
-			nf, err = plugins.GetFunction(name)
-			if err != nil {
-				return err, false
-			}
-			fv.plugins[name] = nf
+		nf, fctx, err := fv.funcPlugins.GetFuncFromPlugin(name)
+		if err != nil {
+			return err, false
 		}
 		}
 		if nf.IsAggregate() {
 		if nf.IsAggregate() {
 			return nil, false
 			return nil, false
 		}
 		}
-		result, ok := nf.Exec(args)
-		common.Log.Debugf("run custom function %s, get result %v", name, result)
+		logger := fctx.GetLogger()
+		logger.Debugf("run func %s", name)
+		result, ok := nf.Exec(args, fctx)
+		logger.Debugf("run custom function %s, get result %v", name, result)
 		return result, ok
 		return result, ok
 	}
 	}
 }
 }

+ 4 - 4
xsql/plans/aggregate_test.go

@@ -309,7 +309,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		gr, ok := result.(xsql.GroupedTuplesSet)
 		gr, ok := result.(xsql.GroupedTuplesSet)
@@ -551,7 +551,7 @@ func TestAggregatePlanGroupAlias_Apply(t *testing.T) {
 				}
 				}
 			}
 			}
 		}
 		}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		gr, ok := result.(xsql.GroupedTuplesSet)
 		gr, ok := result.(xsql.GroupedTuplesSet)
@@ -698,7 +698,7 @@ func TestAggregatePlanAlias_Apply(t *testing.T) {
 				}
 				}
 			}
 			}
 		}
 		}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
@@ -751,7 +751,7 @@ func TestAggregatePlanError(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {

+ 2 - 2
xsql/plans/filter_test.go

@@ -333,7 +333,7 @@ func TestFilterPlan_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		pp := &FilterPlan{Condition: stmt.Condition}
 		pp := &FilterPlan{Condition: stmt.Condition}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
@@ -456,7 +456,7 @@ func TestFilterPlanError(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		pp := &FilterPlan{Condition: stmt.Condition}
 		pp := &FilterPlan{Condition: stmt.Condition}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {

+ 3 - 3
xsql/plans/having_test.go

@@ -261,7 +261,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		pp := &HavingPlan{Condition: stmt.Having}
 		pp := &HavingPlan{Condition: stmt.Having}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
@@ -414,7 +414,7 @@ func TestHavingPlanAlias_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		pp := &HavingPlan{Condition: stmt.Having}
 		pp := &HavingPlan{Condition: stmt.Having}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
@@ -486,7 +486,7 @@ func TestHavingPlanError(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		pp := &HavingPlan{Condition: stmt.Having}
 		pp := &HavingPlan{Condition: stmt.Having}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {

+ 1 - 1
xsql/plans/join_multi_test.go

@@ -396,7 +396,7 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {

+ 6 - 6
xsql/plans/join_test.go

@@ -662,7 +662,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
@@ -1128,7 +1128,7 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
@@ -1312,7 +1312,7 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
@@ -1563,7 +1563,7 @@ func TestFullJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
@@ -1694,7 +1694,7 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
@@ -1768,7 +1768,7 @@ func TestCrossJoinPlanError(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {

+ 1 - 1
xsql/plans/math_func_test.go

@@ -467,7 +467,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {

+ 4 - 4
xsql/plans/misc_func_test.go

@@ -183,7 +183,7 @@ func TestMiscFunc_Apply1(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
@@ -236,7 +236,7 @@ func TestMqttFunc_Apply2(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
@@ -353,7 +353,7 @@ func TestMetaFunc_Apply1(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
@@ -699,7 +699,7 @@ func TestJsonPathFunc_Apply1(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {

+ 1 - 1
xsql/plans/order_test.go

@@ -473,7 +473,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 		}
 		}
 
 
 		pp := &OrderPlan{SortFields: stmt.SortFields}
 		pp := &OrderPlan{SortFields: stmt.SortFields}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)

+ 4 - 4
xsql/plans/preprocessor_test.go

@@ -530,7 +530,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			return
 			return
 		} else {
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
 			tuple := &xsql.Tuple{Message: dm}
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			result := pp.Apply(ctx, tuple, fv, afv)
 			result := pp.Apply(ctx, tuple, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
@@ -664,7 +664,7 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 			return
 			return
 		} else {
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
 			tuple := &xsql.Tuple{Message: dm}
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			result := pp.Apply(ctx, tuple, fv, afv)
 			result := pp.Apply(ctx, tuple, fv, afv)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok {
 			if rt, ok := result.(*xsql.Tuple); ok {
@@ -838,7 +838,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			return
 			return
 		} else {
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
 			tuple := &xsql.Tuple{Message: dm}
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			result := pp.Apply(ctx, tuple, fv, afv)
 			result := pp.Apply(ctx, tuple, fv, afv)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok {
 			if rt, ok := result.(*xsql.Tuple); ok {
@@ -917,7 +917,7 @@ func TestPreprocessorError(t *testing.T) {
 			return
 			return
 		} else {
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
 			tuple := &xsql.Tuple{Message: dm}
-			fv, afv := xsql.NewAggregateFunctionValuers()
+			fv, afv := xsql.NewFunctionValuersForOp(nil)
 			result := pp.Apply(ctx, tuple, fv, afv)
 			result := pp.Apply(ctx, tuple, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)

+ 5 - 5
xsql/plans/project_test.go

@@ -409,7 +409,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 
 
 		pp := &ProjectPlan{Fields: stmt.Fields, SendMeta: true}
 		pp := &ProjectPlan{Fields: stmt.Fields, SendMeta: true}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
@@ -960,7 +960,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 
 
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
@@ -1161,7 +1161,7 @@ func TestProjectPlan_Funcs(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
@@ -1610,7 +1610,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			t.Error(err)
 			t.Error(err)
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true, isTest: true}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true, isTest: true}
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
@@ -1761,7 +1761,7 @@ func TestProjectPlanError(t *testing.T) {
 
 
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)

+ 1 - 1
xsql/plans/str_func_test.go

@@ -419,7 +419,7 @@ func TestStrFunc_Apply1(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		fv, afv := xsql.NewAggregateFunctionValuers()
+		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {

+ 271 - 6
xsql/processors/extension_test.go

@@ -1,5 +1,3 @@
-// +build !windows
-
 package processors
 package processors
 
 
 import (
 import (
@@ -7,8 +5,15 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream"
+	"github.com/emqx/kuiper/xstream/api"
+	"github.com/emqx/kuiper/xstream/nodes"
+	"github.com/emqx/kuiper/xstream/test"
 	"os"
 	"os"
 	"path"
 	"path"
+	"reflect"
+	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
 )
 )
@@ -101,10 +106,6 @@ func TestExtensions(t *testing.T) {
 		time.Sleep(1000 * time.Millisecond)
 		time.Sleep(1000 * time.Millisecond)
 		log.Printf("exit main program after a second")
 		log.Printf("exit main program after a second")
 		results := getResults()
 		results := getResults()
-		if len(results) == 0 {
-			t.Errorf("no result found")
-			continue
-		}
 		log.Infof("get results %v", results)
 		log.Infof("get results %v", results)
 		os.Remove(CACHE_FILE)
 		os.Remove(CACHE_FILE)
 		var maps [][]map[string]interface{}
 		var maps [][]map[string]interface{}
@@ -173,3 +174,267 @@ func getResults() []string {
 	f.Close()
 	f.Close()
 	return result
 	return result
 }
 }
+
+func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
+	var data []*xsql.Tuple
+	switch name {
+	case "text":
+		data = []*xsql.Tuple{
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"slogan": "Impossible is nothing",
+					"brand":  "Adidas",
+				},
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"slogan": "Stronger than dirt",
+					"brand":  "Ajax",
+				},
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"slogan": "Belong anywhere",
+					"brand":  "Airbnb",
+				},
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"slogan": "I can't believe I ate the whole thing",
+					"brand":  "Alka Seltzer",
+				},
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"slogan": "You're in good hands",
+					"brand":  "Allstate",
+				},
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"slogan": "Don't leave home without it",
+					"brand":  "American Express",
+				},
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"slogan": "Think different",
+					"brand":  "Apple",
+				},
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"slogan": "We try harder",
+					"brand":  "Avis",
+				},
+			},
+		}
+
+	}
+	return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
+		"DATASOURCE": name,
+	})
+}
+
+func setup2() *RuleProcessor {
+	log := common.Log
+
+	dbDir, err := common.GetAndCreateDataLoc("test")
+	if err != nil {
+		log.Panic(err)
+	}
+	log.Infof("db location is %s", dbDir)
+
+	p := NewStreamProcessor(path.Join(dbDir, "stream"))
+	demo := `DROP STREAM text`
+	p.ExecStmt(demo)
+
+	demo = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
+	_, err = p.ExecStmt(demo)
+	if err != nil {
+		panic(err)
+	}
+
+	rp := NewRuleProcessor(dbDir)
+	return rp
+}
+
+func TestFuncState(t *testing.T) {
+	var tests = []struct {
+		name string
+		sql  string
+		r    [][]map[string]interface{}
+		s    string
+		m    map[string]interface{}
+	}{
+		{
+			name: `rule1`,
+			sql:  `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
+			r: [][]map[string]interface{}{
+				{{
+					"wc": float64(3),
+				}},
+				{{
+					"wc": float64(6),
+				}},
+				{{
+					"wc": float64(8),
+				}},
+				{{
+					"wc": float64(16),
+				}},
+				{{
+					"wc": float64(20),
+				}},
+				{{
+					"wc": float64(25),
+				}},
+				{{
+					"wc": float64(27),
+				}},
+				{{
+					"wc": float64(30),
+				}},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_text_0_exceptions_total":   int64(0),
+				"op_preprocessor_text_0_process_latency_ms": int64(0),
+				"op_preprocessor_text_0_records_in_total":   int64(8),
+				"op_preprocessor_text_0_records_out_total":  int64(8),
+
+				"op_project_0_exceptions_total":   int64(0),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(8),
+				"op_project_0_records_out_total":  int64(8),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(8),
+				"sink_mockSink_0_records_out_total": int64(8),
+
+				"source_text_0_exceptions_total":  int64(0),
+				"source_text_0_records_in_total":  int64(8),
+				"source_text_0_records_out_total": int64(8),
+			},
+			s: "sink_mockSink_0_records_out_total",
+		},
+	}
+	p := setup2()
+	for i, tt := range tests {
+		p.ExecDrop(tt.name)
+		parser := xsql.NewParser(strings.NewReader(tt.sql))
+		var (
+			sources []*nodes.SourceNode
+			syncs   []chan int
+		)
+		if stmt, err := xsql.Language.Parse(parser); err != nil {
+			t.Errorf("parse sql %s error: %s", tt.sql, err)
+		} else {
+			if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
+				t.Errorf("sql %s is not a select statement", tt.sql)
+			} else {
+				streams := xsql.GetStreams(selectStmt)
+				for _, stream := range streams {
+					next := make(chan int)
+					syncs = append(syncs, next)
+					source := getExtMockSource(stream, next, 8)
+					sources = append(sources, source)
+				}
+			}
+		}
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
+			"bufferLength": float64(100),
+		}}, sources)
+		if err != nil {
+			t.Error(err)
+		}
+		mockSink := test.NewMockSink()
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
+		tp.AddSink(inputs, sink)
+		errCh := tp.Open()
+		func() {
+			for i := 0; i < 8; i++ {
+				syncs[i%len(syncs)] <- i
+				select {
+				case err = <-errCh:
+					t.Log(err)
+					tp.Cancel()
+					return
+				default:
+				}
+			}
+			for retry := 100; retry > 0; retry-- {
+				if err := compareMetrics2(tp, tt.m, tt.sql); err == nil {
+					break
+				}
+				time.Sleep(time.Duration(retry) * time.Millisecond)
+			}
+		}()
+		results := mockSink.GetResults()
+		var maps [][]map[string]interface{}
+		for _, v := range results {
+			var mapRes []map[string]interface{}
+			err := json.Unmarshal(v, &mapRes)
+			if err != nil {
+				t.Errorf("Failed to parse the input into map")
+				continue
+			}
+			maps = append(maps, mapRes)
+		}
+		if !reflect.DeepEqual(tt.r, maps) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
+			continue
+		}
+		if err := compareMetrics2(tp, tt.m, tt.sql); err != nil {
+			t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
+		}
+		tp.Cancel()
+	}
+}
+
+func compareMetrics2(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
+	keys, values := tp.GetMetrics()
+	//for i, k := range keys {
+	//	log.Printf("%s:%v", k, values[i])
+	//}
+	for k, v := range m {
+		var (
+			index   int
+			key     string
+			matched bool
+		)
+		for index, key = range keys {
+			if k == key {
+				if strings.HasSuffix(k, "process_latency_ms") {
+					if values[index].(int64) >= v.(int64) {
+						matched = true
+						continue
+					} else {
+						break
+					}
+				}
+				if values[index] == v {
+					matched = true
+				}
+				break
+			}
+		}
+		if matched {
+			continue
+		}
+		//do not find
+		if index < len(values) {
+			return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
+		} else {
+			return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
+		}
+	}
+	return nil
+}

+ 2 - 2
xsql/valuer_eval_test.go

@@ -143,7 +143,7 @@ func TestComparison(t *testing.T) {
 	for i, tt := range data {
 	for i, tt := range data {
 		for j, c := range conditions {
 		for j, c := range conditions {
 			tuple := &Tuple{Emitter: "src", Message: tt.m, Timestamp: common.GetNowInMilli(), Metadata: nil}
 			tuple := &Tuple{Emitter: "src", Message: tt.m, Timestamp: common.GetNowInMilli(), Metadata: nil}
-			ve := &ValuerEval{Valuer: MultiValuer(tuple, &FunctionValuer{})}
+			ve := &ValuerEval{Valuer: MultiValuer(tuple)}
 			result := ve.Eval(c)
 			result := ve.Eval(c)
 			if !reflect.DeepEqual(tt.r[j], result) {
 			if !reflect.DeepEqual(tt.r[j], result) {
 				t.Errorf("%d-%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, j, tt.r[j], result)
 				t.Errorf("%d-%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, j, tt.r[j], result)
@@ -231,7 +231,7 @@ func TestCalculation(t *testing.T) {
 	for i, tt := range data {
 	for i, tt := range data {
 		for j, c := range projects {
 		for j, c := range projects {
 			tuple := &Tuple{Emitter: "src", Message: tt.m, Timestamp: common.GetNowInMilli(), Metadata: nil}
 			tuple := &Tuple{Emitter: "src", Message: tt.m, Timestamp: common.GetNowInMilli(), Metadata: nil}
-			ve := &ValuerEval{Valuer: MultiValuer(tuple, &FunctionValuer{})}
+			ve := &ValuerEval{Valuer: MultiValuer(tuple)}
 			result := ve.Eval(c)
 			result := ve.Eval(c)
 			if !reflect.DeepEqual(tt.r[j], result) {
 			if !reflect.DeepEqual(tt.r[j], result) {
 				t.Errorf("%d-%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, j, tt.r[j], result)
 				t.Errorf("%d-%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, j, tt.r[j], result)

+ 6 - 1
xstream/api/stream.go

@@ -111,12 +111,17 @@ type Operator interface {
 	GetMetrics() [][]interface{}
 	GetMetrics() [][]interface{}
 }
 }
 
 
+type FunctionContext interface {
+	StreamContext
+	GetFuncId() int
+}
+
 type Function interface {
 type Function interface {
 	//The argument is a list of xsql.Expr
 	//The argument is a list of xsql.Expr
 	Validate(args []interface{}) error
 	Validate(args []interface{}) error
 	//Execute the function, return the result and if execution is successful.
 	//Execute the function, return the result and if execution is successful.
 	//If execution fails, return the error and false.
 	//If execution fails, return the error and false.
-	Exec(args []interface{}) (interface{}, bool)
+	Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
 	//If this function is an aggregate function. Each parameter of an aggregate function will be a slice
 	//If this function is an aggregate function. Each parameter of an aggregate function will be a slice
 	IsAggregate() bool
 	IsAggregate() bool
 }
 }

+ 46 - 0
xstream/contexts/func_context.go

@@ -0,0 +1,46 @@
+package contexts
+
+import (
+	"fmt"
+	"github.com/emqx/kuiper/xstream/api"
+)
+
+type DefaultFuncContext struct {
+	api.StreamContext
+	funcId int
+}
+
+func NewDefaultFuncContext(ctx api.StreamContext, id int) *DefaultFuncContext {
+	return &DefaultFuncContext{
+		StreamContext: ctx,
+		funcId:        id,
+	}
+}
+
+func (c *DefaultFuncContext) IncrCounter(key string, amount int) error {
+	return c.StreamContext.IncrCounter(c.convertKey(key), amount)
+}
+
+func (c *DefaultFuncContext) GetCounter(key string) (int, error) {
+	return c.StreamContext.GetCounter(c.convertKey(key))
+}
+
+func (c *DefaultFuncContext) PutState(key string, value interface{}) error {
+	return c.StreamContext.PutState(c.convertKey(key), value)
+}
+
+func (c *DefaultFuncContext) GetState(key string) (interface{}, error) {
+	return c.StreamContext.GetState(c.convertKey(key))
+}
+
+func (c *DefaultFuncContext) DeleteState(key string) error {
+	return c.StreamContext.DeleteState(c.convertKey(key))
+}
+
+func (c *DefaultFuncContext) GetFuncId() int {
+	return c.funcId
+}
+
+func (c *DefaultFuncContext) convertKey(key string) string {
+	return fmt.Sprintf("$$func%d_%s", c.funcId, key)
+}

+ 1 - 1
xstream/operators/operations.go

@@ -126,7 +126,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 	o.mutex.Lock()
 	o.mutex.Lock()
 	o.statManagers = append(o.statManagers, stats)
 	o.statManagers = append(o.statManagers, stats)
 	o.mutex.Unlock()
 	o.mutex.Unlock()
-	fv, afv := xsql.NewAggregateFunctionValuers()
+	fv, afv := xsql.NewFunctionValuersForOp(exeCtx)
 
 
 	for {
 	for {
 		select {
 		select {

+ 16 - 9
xstream/states/memory_state.go

@@ -3,45 +3,52 @@ package states
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"sync"
 )
 )
 
 
 type MemoryState struct {
 type MemoryState struct {
-	storage map[string]interface{}
+	storage sync.Map
+}
+
+func newMemoryState() *MemoryState {
+	return &MemoryState{
+		storage: sync.Map{},
+	}
 }
 }
 
 
 func (s *MemoryState) IncrCounter(key string, amount int) error {
 func (s *MemoryState) IncrCounter(key string, amount int) error {
-	if v, ok := s.storage[key]; ok {
+	if v, ok := s.storage.Load(key); ok {
 		if vi, err := common.ToInt(v); err != nil {
 		if vi, err := common.ToInt(v); err != nil {
 			return fmt.Errorf("state[%s] must be an int", key)
 			return fmt.Errorf("state[%s] must be an int", key)
 		} else {
 		} else {
-			s.storage[key] = vi + amount
+			s.storage.Store(key, vi+amount)
 		}
 		}
 	} else {
 	} else {
-		s.storage[key] = amount
+		s.storage.Store(key, amount)
 	}
 	}
 	return nil
 	return nil
 }
 }
 
 
 func (s *MemoryState) GetCounter(key string) (int, error) {
 func (s *MemoryState) GetCounter(key string) (int, error) {
-	if v, ok := s.storage[key]; ok {
+	if v, ok := s.storage.Load(key); ok {
 		if vi, err := common.ToInt(v); err != nil {
 		if vi, err := common.ToInt(v); err != nil {
 			return 0, fmt.Errorf("state[%s] is not a number, but %v", key, v)
 			return 0, fmt.Errorf("state[%s] is not a number, but %v", key, v)
 		} else {
 		} else {
 			return vi, nil
 			return vi, nil
 		}
 		}
 	} else {
 	} else {
-		s.storage[key] = 0
+		s.storage.Store(key, 0)
 		return 0, nil
 		return 0, nil
 	}
 	}
 }
 }
 
 
 func (s *MemoryState) PutState(key string, value interface{}) error {
 func (s *MemoryState) PutState(key string, value interface{}) error {
-	s.storage[key] = value
+	s.storage.Store(key, value)
 	return nil
 	return nil
 }
 }
 
 
 func (s *MemoryState) GetState(key string) (interface{}, error) {
 func (s *MemoryState) GetState(key string) (interface{}, error) {
-	if v, ok := s.storage[key]; ok {
+	if v, ok := s.storage.Load(key); ok {
 		return v, nil
 		return v, nil
 	} else {
 	} else {
 		return nil, nil
 		return nil, nil
@@ -49,6 +56,6 @@ func (s *MemoryState) GetState(key string) (interface{}, error) {
 }
 }
 
 
 func (s *MemoryState) DeleteState(key string) error {
 func (s *MemoryState) DeleteState(key string) error {
-	delete(s.storage, key)
+	s.storage.Delete(key)
 	return nil
 	return nil
 }
 }

+ 5 - 7
xstream/states/state_context.go

@@ -1,6 +1,8 @@
 package states
 package states
 
 
-import "github.com/emqx/kuiper/xstream/api"
+import (
+	"github.com/emqx/kuiper/xstream/api"
+)
 
 
 type StateType int
 type StateType int
 
 
@@ -20,14 +22,10 @@ type StateContext interface {
 func NewStateContext(st StateType, logger api.Logger) StateContext {
 func NewStateContext(st StateType, logger api.Logger) StateContext {
 	switch st {
 	switch st {
 	case MEMORY:
 	case MEMORY:
-		return &MemoryState{
-			storage: make(map[string]interface{}),
-		}
+		return newMemoryState()
 	default:
 	default:
 		logger.Warnf("request for invalid state type %d, return MemoryState instead", st)
 		logger.Warnf("request for invalid state type %d, return MemoryState instead", st)
-		return &MemoryState{
-			storage: make(map[string]interface{}),
-		}
+		return newMemoryState()
 	}
 	}
 	return nil
 	return nil
 }
 }