Browse Source

Merge pull request #269 from emqx/zmq

Plugin changes
jinfahua 5 years atrás
parent
commit
dcea1a66ef

+ 1 - 1
docs/en_US/extension/function.md

@@ -30,7 +30,7 @@ The main task for a Function is to implement _exec_ method. The method will be l
 Exec(args []interface{}) (interface{}, bool)
 ```  
 
-As the function itself is a plugin, it must be in the main package. Given the function struct name is myFunction. At last of the file, the source must be exported as a symbol as below.
+As the function itself is a plugin, it must be in the main package. Given the function struct name is myFunction. At last of the file, the source must be exported as a symbol as below. There are [2 types of exported symbol supported](overview.md#plugin-development). For function extension, if there is no internal state, it is recommended to export a singleton instance.
 
 ```go
 var MyFunction myFunction

+ 6 - 0
docs/en_US/extension/overview.md

@@ -39,6 +39,12 @@ go build --buildmode=plugin -o plugins/sources/MySource.so plugins/sources/my_so
 ```
 
 
+### Plugin development
+The development of plugins is to implement a specific interface according to the plugin type and export the implementation with a specific name. There are two types of exported symbol supported:
+
+1. Export a constructor function: Kuiper will use the constructor function to create a new instance of the plugin implementation for each load. So each rule will have one instance of the plugin and each instance will be isolated from others. This is the recommended way.
+
+2. Export an instance: Kuiper will use the instance as singleton for all plugin load. So all rules will share the same instance. For such implementation, the developer will need to handle the shared states to avoid any potential multi-thread problems. This mode is recommended where there are no shared states and the performance is critical. Especially, function extension is usually functional without internal state which is suitable for this mode.
 
 Please read below for how to realize the different extensions.
 

+ 4 - 2
docs/en_US/extension/sink.md

@@ -35,10 +35,12 @@ The last method to implement is _Close_ which literally close the connection. It
 Close(ctx StreamContext) error
 ```
 
-As the sink itself is a plugin, it must be in the main package. Given the sink struct name is mySink. At last of the file, the sink must be exported as a symbol as below.
+As the sink itself is a plugin, it must be in the main package. Given the sink struct name is mySink. At last of the file, the sink must be exported as a symbol as below. There are [2 types of exported symbol supported](overview.md#plugin-development). For sink extension, states are usually needed, so it is recommended to export a constructor function.
 
 ```go
-var MySink mySink
+func MySink() api.Sink {
+	return &mySink{}
+}
 ```
 
 The [Memory Sink](../../../plugins/sinks/memory.go) is a good example.

+ 4 - 2
docs/en_US/extension/source.md

@@ -30,10 +30,12 @@ The last method to implement is _Close_ which literally close the connection. It
 Close(ctx StreamContext) error
 ```
 
-As the source itself is a plugin, it must be in the main package. Given the source struct name is mySource. At last of the file, the source must be exported as a symbol as below.
+As the source itself is a plugin, it must be in the main package. Given the source struct name is mySource. At last of the file, the source must be exported as a symbol as below. There are [2 types of exported symbol supported](overview.md#plugin-development). For source extension, states are usually needed, so it is recommended to export a constructor function.
 
 ```go
-var MySource mySource
+function MySource() api.Source{
+    return &mySource{}
+}
 ```
 
 The [Randome Source](../../../plugins/sources/random.go) is a good example.

+ 12 - 7
docs/en_US/rules/overview.md

@@ -86,37 +86,42 @@ In sendSingle=true mode:
 - Print out the whole record
 
 ```
-"dataTemplate": `{"content":{{json .}}}`,
+"dataTemplate": "{\"content\":{{json .}}}",
 ```
-- Print out the the ab field
+- Print out the ab field
 
 ```
-"dataTemplate": `{"content":{{.ab}}}`,
+"dataTemplate": "{\"content\":{{.ab}}}",
+```
+
+if the ab field is a string, add the quotes
+```
+"dataTemplate": "{\"content\":\"{{.ab}}\"}",
 ```
 
 In sendSingle=false mode:
 - Print out the whole record array
 
 ```
-"dataTemplate": `{"content":{{json .}}}`,
+"dataTemplate": "{\"content\":{{json .}}}",
 ```
 
 - Print out the first record
 
 ```
-"dataTemplate": `{"content":{{json (index . 0)}}}`,
+"dataTemplate": "{\"content\":{{json (index . 0)}}}",
 ```
 
 - Print out the field ab of the first record
 
 ```
-"dataTemplate": `{"content":{{index . 0 "ab"}}}`,
+"dataTemplate": "{\"content\":{{index . 0 \"ab\"}}}",
 ```
 
 - Print out field ab of each record in the array to html format
 
 ```
-"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
+"dataTemplate": "<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>",
 ```
 
 Actions could be customized to support different kinds of outputs, see [extension](../extension/overview.md) for more detailed info.

+ 12 - 7
docs/zh_CN/rules/overview.md

@@ -76,38 +76,43 @@ In sendSingle=true mode:
 - Print out the whole record
 
 ```
-"dataTemplate": `{"content":{{json .}}}`,
+"dataTemplate": "{\"content\":{{json .}}}",
 ```
 
-- Print out the the ab field
+- Print out the ab field
 
 ```
-"dataTemplate": `{"content":{{.ab}}}`,
+"dataTemplate": "{\"content\":{{.ab}}}",
+```
+
+if the ab field is a string, add the quotes
+```
+"dataTemplate": "{\"content\":\"{{.ab}}\"}",
 ```
 
 In sendSingle=false mode:
 - Print out the whole record array
 
 ```
-"dataTemplate": `{"content":{{json .}}}`,
+"dataTemplate": "{\"content\":{{json .}}}",
 ```
 
 - Print out the first record
 
 ```
-"dataTemplate": `{"content":{{json (index . 0)}}}`,
+"dataTemplate": "{\"content\":{{json (index . 0)}}}",
 ```
 
 - Print out the field ab of the first record
 
 ```
-"dataTemplate": `{"content":{{index . 0 "ab"}}}`,
+"dataTemplate": "{\"content\":{{index . 0 \"ab\"}}}",
 ```
 
 - Print out field ab of each record in the array to html format
 
 ```
-"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
+"dataTemplate": "<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>",
 ```
 
 

+ 53 - 1
plugins/manager.go

@@ -5,6 +5,7 @@ import (
 	"errors"
 	"fmt"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/api"
 	"io"
 	"io/ioutil"
 	"net/http"
@@ -88,7 +89,7 @@ func (rr *Registry) Get(t PluginType, name string) (string, bool) {
 
 var symbolRegistry = make(map[string]plugin.Symbol)
 
-func GetPlugin(t string, pt PluginType) (plugin.Symbol, error) {
+func getPlugin(t string, pt PluginType) (plugin.Symbol, error) {
 	ut := ucFirst(t)
 	ptype := PluginTypes[pt]
 	key := ptype + "/" + t
@@ -121,6 +122,57 @@ func GetPlugin(t string, pt PluginType) (plugin.Symbol, error) {
 	return nf, nil
 }
 
+func GetSource(t string) (api.Source, error) {
+	nf, err := getPlugin(t, SOURCE)
+	if err != nil {
+		return nil, err
+	}
+	var s api.Source
+	switch t := nf.(type) {
+	case api.Source:
+		s = t
+	case func() api.Source:
+		s = t()
+	default:
+		return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
+	}
+	return s, nil
+}
+
+func GetSink(t string) (api.Sink, error) {
+	nf, err := getPlugin(t, SINK)
+	if err != nil {
+		return nil, err
+	}
+	var s api.Sink
+	switch t := nf.(type) {
+	case api.Sink:
+		s = t
+	case func() api.Sink:
+		s = t()
+	default:
+		return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
+	}
+	return s, nil
+}
+
+func GetFunction(t string) (api.Function, error) {
+	nf, err := getPlugin(t, FUNCTION)
+	if err != nil {
+		return nil, err
+	}
+	var s api.Function
+	switch t := nf.(type) {
+	case api.Function:
+		s = t
+	case func() api.Function:
+		s = t()
+	default:
+		return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
+	}
+	return s, nil
+}
+
 type Manager struct {
 	pluginDir string
 	etcDir    string

+ 3 - 1
plugins/sinks/file.go

@@ -115,4 +115,6 @@ func (m *fileSink) Close(ctx api.StreamContext) error {
 	return nil
 }
 
-var File fileSink
+func File() api.Sink {
+	return &fileSink{}
+}

+ 3 - 1
plugins/sinks/memory.go

@@ -33,4 +33,6 @@ func (m *memory) Configure(props map[string]interface{}) error {
 	return nil
 }
 
-var Memory memory
+func Memory() api.Sink {
+	return &memory{}
+}

+ 3 - 1
plugins/sinks/zmq.go

@@ -79,4 +79,6 @@ func (m *zmqSink) Close(ctx api.StreamContext) error {
 	return nil
 }
 
-var Zmq zmqSink
+func Zmq() api.Sink {
+	return &zmqSink{}
+}

+ 7 - 5
plugins/sources/random.go

@@ -16,8 +16,8 @@ type randomSource struct {
 }
 
 func (s *randomSource) Configure(topic string, props map[string]interface{}) error {
-	if i, ok := props["interval"].(float64); ok {
-		s.interval = int(i)
+	if i, ok := props["interval"].(int); ok {
+		s.interval = i
 	} else {
 		s.interval = 1000
 	}
@@ -27,8 +27,8 @@ func (s *randomSource) Configure(topic string, props map[string]interface{}) err
 		s.pattern = make(map[string]interface{})
 		s.pattern["count"] = 50
 	}
-	if i, ok := props["seed"].(float64); ok {
-		s.seed = int(i)
+	if i, ok := props["seed"].(int); ok {
+		s.seed = i
 	} else {
 		s.seed = 1
 	}
@@ -66,4 +66,6 @@ func (s *randomSource) Close(ctx api.StreamContext) error {
 	return nil
 }
 
-var Random randomSource
+func Random() api.Source {
+	return &randomSource{}
+}

+ 3 - 1
plugins/sources/zmq.go

@@ -86,4 +86,6 @@ func (s *zmqSource) Close(ctx api.StreamContext) error {
 	return nil
 }
 
-var Zmq zmqSource
+func Zmq() api.Source {
+	return &zmqSource{}
+}

+ 29 - 21
xsql/ast.go

@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/plugins"
-	"github.com/emqx/kuiper/xstream/api"
 	"math"
 	"reflect"
 	"sort"
@@ -493,6 +492,7 @@ type CallValuer interface {
 type AggregateCallValuer interface {
 	CallValuer
 	GetAllTuples() AggregateData
+	GetSingleCallValuer() CallValuer
 }
 
 type Wildcarder interface {
@@ -532,7 +532,7 @@ func (wv *WildcardValuer) Meta(key string) (interface{}, bool) {
  */
 
 type AggregateData interface {
-	AggregateEval(expr Expr) []interface{}
+	AggregateEval(expr Expr, v CallValuer) []interface{}
 }
 
 // Message is a valuer that substitutes values for the mapped interface.
@@ -601,8 +601,8 @@ func (t *Tuple) All(stream string) (interface{}, bool) {
 	return t.Message, true
 }
 
-func (t *Tuple) AggregateEval(expr Expr) []interface{} {
-	return []interface{}{Eval(expr, t)}
+func (t *Tuple) AggregateEval(expr Expr, v CallValuer) []interface{} {
+	return []interface{}{Eval(expr, t, v)}
 }
 
 func (t *Tuple) GetTimestamp() int64 {
@@ -683,13 +683,13 @@ func (w WindowTuplesSet) Sort() {
 	}
 }
 
-func (w WindowTuplesSet) AggregateEval(expr Expr) []interface{} {
+func (w WindowTuplesSet) AggregateEval(expr Expr, v CallValuer) []interface{} {
 	var result []interface{}
 	if len(w) != 1 { //should never happen
 		return nil
 	}
 	for _, t := range w[0].Tuples {
-		result = append(result, Eval(expr, &t))
+		result = append(result, Eval(expr, &t, v))
 	}
 	return result
 }
@@ -787,20 +787,20 @@ func (s JoinTupleSets) Len() int           { return len(s) }
 func (s JoinTupleSets) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 func (s JoinTupleSets) Index(i int) Valuer { return &(s[i]) }
 
-func (s JoinTupleSets) AggregateEval(expr Expr) []interface{} {
+func (s JoinTupleSets) AggregateEval(expr Expr, v CallValuer) []interface{} {
 	var result []interface{}
 	for _, t := range s {
-		result = append(result, Eval(expr, &t))
+		result = append(result, Eval(expr, &t, v))
 	}
 	return result
 }
 
 type GroupedTuples []DataValuer
 
-func (s GroupedTuples) AggregateEval(expr Expr) []interface{} {
+func (s GroupedTuples) AggregateEval(expr Expr, v CallValuer) []interface{} {
 	var result []interface{}
 	for _, t := range s {
-		result = append(result, Eval(expr, t))
+		result = append(result, Eval(expr, t, v))
 	}
 	return result
 }
@@ -821,14 +821,16 @@ type SortingData interface {
 type MultiSorter struct {
 	SortingData
 	fields SortFields
+	valuer CallValuer
 	values []map[string]interface{}
 }
 
 // OrderedBy returns a Sorter that sorts using the less functions, in order.
 // Call its Sort method to sort the data.
-func OrderedBy(fields SortFields) *MultiSorter {
+func OrderedBy(fields SortFields, fv *FunctionValuer) *MultiSorter {
 	return &MultiSorter{
 		fields: fields,
+		valuer: fv,
 	}
 }
 
@@ -840,7 +842,7 @@ func OrderedBy(fields SortFields) *MultiSorter {
 // exercise for the reader.
 func (ms *MultiSorter) Less(i, j int) bool {
 	p, q := ms.values[i], ms.values[j]
-	v := &ValuerEval{Valuer: MultiValuer(&FunctionValuer{})}
+	v := &ValuerEval{Valuer: MultiValuer(ms.valuer)}
 	for _, field := range ms.fields {
 		n := field.Name
 		vp, _ := p[n]
@@ -880,7 +882,7 @@ func (ms *MultiSorter) Sort(data SortingData) error {
 	for i := 0; i < data.Len(); i++ {
 		ms.values[i] = make(map[string]interface{})
 		p := data.Index(i)
-		vep := &ValuerEval{Valuer: MultiValuer(p, &FunctionValuer{})}
+		vep := &ValuerEval{Valuer: MultiValuer(p, ms.valuer)}
 		for j, field := range ms.fields {
 			n := field.Name
 			vp, _ := vep.Valuer.Value(n)
@@ -948,8 +950,8 @@ type EvalResultMessage struct {
 type ResultsAndMessages []EvalResultMessage
 
 // Eval evaluates expr against a map.
-func Eval(expr Expr, m Valuer) interface{} {
-	eval := ValuerEval{Valuer: MultiValuer(m, &FunctionValuer{})}
+func Eval(expr Expr, m Valuer, v CallValuer) interface{} {
+	eval := ValuerEval{Valuer: MultiValuer(m, v)}
 	return eval.Eval(expr)
 }
 
@@ -1004,12 +1006,14 @@ func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
 type multiAggregateValuer struct {
 	data AggregateData
 	multiValuer
+	singleCallValuer CallValuer
 }
 
-func MultiAggregateValuer(data AggregateData, valuers ...Valuer) Valuer {
+func MultiAggregateValuer(data AggregateData, singleCallValuer CallValuer, valuers ...Valuer) Valuer {
 	return &multiAggregateValuer{
-		data:        data,
-		multiValuer: valuers,
+		data:             data,
+		multiValuer:      valuers,
+		singleCallValuer: singleCallValuer,
 	}
 }
 
@@ -1046,6 +1050,10 @@ func (a *multiAggregateValuer) GetAllTuples() AggregateData {
 	return a.data
 }
 
+func (a *multiAggregateValuer) GetSingleCallValuer() CallValuer {
+	return a.singleCallValuer
+}
+
 type BracketEvalResult struct {
 	Start, End int
 }
@@ -1086,7 +1094,7 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
 				args = make([]interface{}, len(expr.Args))
 				if aggreValuer, ok := valuer.(AggregateCallValuer); ok {
 					for i := range expr.Args {
-						args[i] = aggreValuer.GetAllTuples().AggregateEval(expr.Args[i])
+						args[i] = aggreValuer.GetAllTuples().AggregateEval(expr.Args[i], aggreValuer.GetSingleCallValuer())
 					}
 				} else {
 					for i := range expr.Args {
@@ -1700,8 +1708,8 @@ func isAggFunc(f *Call) bool {
 	} else if _, ok := mathFuncMap[fn]; ok {
 		return false
 	} else {
-		if nf, err := plugins.GetPlugin(f.Name, plugins.FUNCTION); err == nil {
-			if ef, ok := nf.(api.Function); ok && ef.IsAggregate() {
+		if nf, err := plugins.GetFunction(f.Name); err == nil {
+			if nf.IsAggregate() {
 				return true
 			}
 		}

+ 42 - 17
xsql/funcs_aggregate.go

@@ -9,18 +9,36 @@ import (
 )
 
 type AggregateFunctionValuer struct {
-	Data AggregateData
+	data    AggregateData
+	fv      *FunctionValuer
+	plugins map[string]api.Function
 }
 
-func (v AggregateFunctionValuer) Value(key string) (interface{}, bool) {
+//Should only be called by stream to make sure a single instance for an operation
+func NewAggregateFunctionValuers() (*FunctionValuer, *AggregateFunctionValuer) {
+	fv := &FunctionValuer{}
+	return fv, &AggregateFunctionValuer{
+		fv: fv,
+	}
+}
+
+func (v *AggregateFunctionValuer) SetData(data AggregateData) {
+	v.data = data
+}
+
+func (v *AggregateFunctionValuer) GetSingleCallValuer() CallValuer {
+	return v.fv
+}
+
+func (v *AggregateFunctionValuer) Value(key string) (interface{}, bool) {
 	return nil, false
 }
 
-func (v AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
+func (v *AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
 	return nil, false
 }
 
-func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
+func (v *AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	lowerName := strings.ToLower(name)
 	switch lowerName {
 	case "avg":
@@ -140,25 +158,32 @@ func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interfac
 		return 0, true
 	default:
 		common.Log.Debugf("run aggregate func %s", name)
-		if nf, err := plugins.GetPlugin(name, plugins.FUNCTION); err != nil {
-			return nil, false
-		} else {
-			f, ok := nf.(api.Function)
-			if !ok {
-				return nil, false
-			}
-			if !f.IsAggregate() {
-				return nil, false
+		if v.plugins == nil {
+			v.plugins = make(map[string]api.Function)
+		}
+		var (
+			nf  api.Function
+			ok  bool
+			err error
+		)
+		if nf, ok = v.plugins[name]; !ok {
+			nf, err = plugins.GetFunction(name)
+			if err != nil {
+				return err, false
 			}
-			result, ok := f.Exec(args)
-			common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
-			return result, ok
+			v.plugins[name] = nf
+		}
+		if !nf.IsAggregate() {
+			return nil, false
 		}
+		result, ok := nf.Exec(args)
+		common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
+		return result, ok
 	}
 }
 
 func (v *AggregateFunctionValuer) GetAllTuples() AggregateData {
-	return v.Data
+	return v.data
 }
 
 func getFirstValidArg(s []interface{}) interface{} {

+ 2 - 7
xsql/funcs_ast_validator.go

@@ -3,7 +3,6 @@ package xsql
 import (
 	"fmt"
 	"github.com/emqx/kuiper/plugins"
-	"github.com/emqx/kuiper/xstream/api"
 	"strings"
 )
 
@@ -26,18 +25,14 @@ func validateFuncs(funcName string, args []Expr) error {
 	} else if _, ok := aggFuncMap[lowerName]; ok {
 		return validateAggFunc(lowerName, args)
 	} else {
-		if nf, err := plugins.GetPlugin(funcName, plugins.FUNCTION); err != nil {
+		if nf, err := plugins.GetFunction(funcName); err != nil {
 			return err
 		} else {
-			f, ok := nf.(api.Function)
-			if !ok {
-				return fmt.Errorf("exported symbol %s is not type of api.Function", funcName)
-			}
 			var targs []interface{}
 			for _, arg := range args {
 				targs = append(targs, arg)
 			}
-			return f.Validate(targs)
+			return nf.Validate(targs)
 		}
 	}
 }

+ 23 - 14
xsql/functions.go

@@ -7,7 +7,9 @@ import (
 	"strings"
 )
 
-type FunctionValuer struct{}
+type FunctionValuer struct {
+	plugins map[string]api.Function
+}
 
 func (*FunctionValuer) Value(key string) (interface{}, bool) {
 	return nil, false
@@ -60,7 +62,7 @@ var otherFuncMap = map[string]string{"isnull": "",
 	"newuuid": "", "timestamp": "", "mqtt": "", "meta": "",
 }
 
-func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
+func (fv *FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	lowerName := strings.ToLower(name)
 	if _, ok := mathFuncMap[lowerName]; ok {
 		return mathCall(name, args)
@@ -76,19 +78,26 @@ func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool)
 		return nil, false
 	} else {
 		common.Log.Debugf("run func %s", name)
-		if nf, err := plugins.GetPlugin(name, plugins.FUNCTION); err != nil {
-			return err, false
-		} else {
-			f, ok := nf.(api.Function)
-			if !ok {
-				return nil, false
-			}
-			if f.IsAggregate() {
-				return nil, false
+		if fv.plugins == nil {
+			fv.plugins = make(map[string]api.Function)
+		}
+		var (
+			nf  api.Function
+			ok  bool
+			err error
+		)
+		if nf, ok = fv.plugins[name]; !ok {
+			nf, err = plugins.GetFunction(name)
+			if err != nil {
+				return err, false
 			}
-			result, ok := f.Exec(args)
-			common.Log.Debugf("run custom function %s, get result %v", name, result)
-			return result, ok
+			fv.plugins[name] = nf
+		}
+		if nf.IsAggregate() {
+			return nil, false
 		}
+		result, ok := nf.Exec(args)
+		common.Log.Debugf("run custom function %s, get result %v", name, result)
+		return result, ok
 	}
 }

+ 2 - 2
xsql/plans/aggregate_operator.go

@@ -14,7 +14,7 @@ type AggregatePlan struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: xsql.GroupedTuplesSet
  */
-func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("aggregate plan receive %s", data)
 	var ms []xsql.DataValuer
@@ -46,7 +46,7 @@ func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}) interface
 	result := make(map[string]xsql.GroupedTuples)
 	for _, m := range ms {
 		var name string
-		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, &xsql.FunctionValuer{})}
+		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, fv)}
 		for _, d := range p.Dimensions {
 			r := ve.Eval(d.Expr)
 			if _, ok := r.(error); ok {

+ 4 - 4
xsql/plans/aggregate_test.go

@@ -309,9 +309,9 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			break
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		gr, ok := result.(xsql.GroupedTuplesSet)
 		if !ok {
 			t.Errorf("result is not GroupedTuplesSet")
@@ -378,9 +378,9 @@ func TestAggregatePlanError(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			break
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 4 - 4
xsql/plans/filter_operator.go

@@ -14,14 +14,14 @@ type FilterPlan struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  */
-func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("filter plan receive %s", data)
 	switch input := data.(type) {
 	case error:
 		return input
 	case xsql.Valuer:
-		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, &xsql.FunctionValuer{})}
+		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, fv)}
 		result := ve.Eval(p.Condition)
 		switch r := result.(type) {
 		case error:
@@ -40,7 +40,7 @@ func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 		ms := input[0].Tuples
 		r := ms[:0]
 		for _, v := range ms {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, fv)}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			case error:
@@ -61,7 +61,7 @@ func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 		ms := input
 		r := ms[:0]
 		for _, v := range ms {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, fv)}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			case error:

+ 4 - 4
xsql/plans/filter_test.go

@@ -263,9 +263,9 @@ func TestFilterPlan_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			break
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &FilterPlan{Condition: stmt.Condition}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
@@ -386,9 +386,9 @@ func TestFilterPlanError(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			break
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &FilterPlan{Condition: stmt.Condition}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 7 - 4
xsql/plans/having_operator.go

@@ -10,7 +10,7 @@ type HavingPlan struct {
 	Condition xsql.Expr
 }
 
-func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("having plan receive %s", data)
 	switch input := data.(type) {
@@ -19,7 +19,8 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 	case xsql.GroupedTuplesSet:
 		r := xsql.GroupedTuplesSet{}
 		for _, v := range input {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(v, v[0], &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: v}, &xsql.WildcardValuer{Data: v[0]})}
+			afv.SetData(v)
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(v, fv, v[0], fv, afv, &xsql.WildcardValuer{Data: v[0]})}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			case error:
@@ -41,8 +42,9 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 		}
 		ms := input[0].Tuples
 		r := ms[:0]
+		afv.SetData(input)
 		for _, v := range ms {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, fv, &v, fv, afv, &xsql.WildcardValuer{Data: &v})}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			case error:
@@ -62,8 +64,9 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 	case xsql.JoinTupleSets:
 		ms := input
 		r := ms[:0]
+		afv.SetData(input)
 		for _, v := range ms {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, fv, &v, fv, afv, &xsql.WildcardValuer{Data: &v})}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			case error:

+ 4 - 4
xsql/plans/having_test.go

@@ -261,9 +261,9 @@ func TestHavingPlan_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			break
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &HavingPlan{Condition: stmt.Having}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
@@ -333,9 +333,9 @@ func TestHavingPlanError(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			break
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &HavingPlan{Condition: stmt.Having}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 2 - 1
xsql/plans/join_multi_test.go

@@ -396,8 +396,9 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}

+ 15 - 15
xsql/plans/join_operator.go

@@ -14,7 +14,7 @@ type JoinPlan struct {
 
 // input:  xsql.WindowTuplesSet from windowOp, window is required for join
 // output: xsql.JoinTupleSets
-func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	var input xsql.WindowTuplesSet
 	switch v := data.(type) {
@@ -29,13 +29,13 @@ func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
 	result := xsql.JoinTupleSets{}
 	for i, join := range jp.Joins {
 		if i == 0 {
-			v, err := jp.evalSet(input, join)
+			v, err := jp.evalSet(input, join, fv)
 			if err != nil {
 				return fmt.Errorf("run Join error: %s", err)
 			}
 			result = v
 		} else {
-			r1, err := jp.evalJoinSets(&result, input, join)
+			r1, err := jp.evalJoinSets(&result, input, join, fv)
 			if err != nil {
 				return fmt.Errorf("run Join error: %s", err)
 			}
@@ -67,7 +67,7 @@ func getStreamNames(join *xsql.Join) ([]string, error) {
 	return srcs, nil
 }
 
-func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.JoinTupleSets, error) {
+func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
 	var leftStream, rightStream string
 
 	if join.JoinType != xsql.CROSS_JOIN {
@@ -99,7 +99,7 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 	sets := xsql.JoinTupleSets{}
 
 	if join.JoinType == xsql.RIGHT_JOIN {
-		return jp.evalSetWithRightJoin(input, join, false)
+		return jp.evalSetWithRightJoin(input, join, false, fv)
 	}
 	for _, left := range lefts {
 		merged := &xsql.JoinTuple{}
@@ -113,7 +113,7 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 				temp := &xsql.JoinTuple{}
 				temp.AddTuple(left)
 				temp.AddTuple(right)
-				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
+				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, fv)}
 				result := ve.Eval(join.Expr)
 				switch val := result.(type) {
 				case error:
@@ -140,7 +140,7 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 	}
 
 	if join.JoinType == xsql.FULL_JOIN {
-		if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true); err == nil {
+		if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true, fv); err == nil {
 			if len(rightJoinSet) > 0 {
 				for _, jt := range rightJoinSet {
 					sets = append(sets, jt)
@@ -153,7 +153,7 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 	return sets, nil
 }
 
-func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool) (xsql.JoinTupleSets, error) {
+func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
 	streams, err := getStreamNames(&join)
 	if err != nil {
 		return nil, err
@@ -176,7 +176,7 @@ func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.J
 			temp := &xsql.JoinTuple{}
 			temp.AddTuple(right)
 			temp.AddTuple(left)
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, fv)}
 			result := ve.Eval(join.Expr)
 			switch val := result.(type) {
 			case error:
@@ -203,7 +203,7 @@ func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.J
 	return sets, nil
 }
 
-func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join) (interface{}, error) {
+func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.FunctionValuer) (interface{}, error) {
 	var rightStream string
 	if join.Alias == "" {
 		rightStream = join.Name
@@ -215,7 +215,7 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 
 	newSets := xsql.JoinTupleSets{}
 	if join.JoinType == xsql.RIGHT_JOIN {
-		return jp.evalRightJoinSets(set, input, join, false)
+		return jp.evalRightJoinSets(set, input, join, false, fv)
 	}
 	for _, left := range *set {
 		merged := &xsql.JoinTuple{}
@@ -227,7 +227,7 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 			if join.JoinType == xsql.CROSS_JOIN {
 				merged.AddTuple(right)
 			} else {
-				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, &xsql.FunctionValuer{})}
+				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, fv)}
 				result := ve.Eval(join.Expr)
 				switch val := result.(type) {
 				case error:
@@ -252,7 +252,7 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 	}
 
 	if join.JoinType == xsql.FULL_JOIN {
-		if rightJoinSet, err := jp.evalRightJoinSets(set, input, join, true); err == nil && len(rightJoinSet) > 0 {
+		if rightJoinSet, err := jp.evalRightJoinSets(set, input, join, true, fv); err == nil && len(rightJoinSet) > 0 {
 			for _, jt := range rightJoinSet {
 				newSets = append(newSets, jt)
 			}
@@ -262,7 +262,7 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 	return newSets, nil
 }
 
-func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool) (xsql.JoinTupleSets, error) {
+func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
 	var rightStream string
 	if join.Alias == "" {
 		rightStream = join.Name
@@ -278,7 +278,7 @@ func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.Window
 		merged.AddTuple(right)
 		isJoint := false
 		for _, left := range *set {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, fv)}
 			result := ve.Eval(join.Expr)
 			switch val := result.(type) {
 			case error:

+ 12 - 6
xsql/plans/join_test.go

@@ -662,8 +662,9 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1127,8 +1128,9 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1310,8 +1312,9 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1560,8 +1563,9 @@ func TestFullJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1690,8 +1694,9 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1763,8 +1768,9 @@ func TestCrossJoinPlanError(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}

+ 2 - 1
xsql/plans/math_func_test.go

@@ -467,7 +467,8 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 6 - 3
xsql/plans/misc_func_test.go

@@ -183,7 +183,8 @@ func TestMiscFunc_Apply1(t *testing.T) {
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -235,7 +236,8 @@ func TestMqttFunc_Apply2(t *testing.T) {
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -336,7 +338,8 @@ func TestMetaFunc_Apply1(t *testing.T) {
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 2 - 2
xsql/plans/order_operator.go

@@ -14,10 +14,10 @@ type OrderPlan struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  */
-func (p *OrderPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *OrderPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("order plan receive %s", data)
-	sorter := xsql.OrderedBy(p.SortFields)
+	sorter := xsql.OrderedBy(p.SortFields, fv)
 	switch input := data.(type) {
 	case error:
 		return input

+ 2 - 1
xsql/plans/order_test.go

@@ -434,7 +434,8 @@ func TestOrderPlan_Apply(t *testing.T) {
 		}
 
 		pp := &OrderPlan{SortFields: stmt.SortFields}
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 2 - 2
xsql/plans/preprocessor.go

@@ -41,7 +41,7 @@ func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocesso
  *	input: *xsql.Tuple
  *	output: *xsql.Tuple
  */
-func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	tuple, ok := data.(*xsql.Tuple)
 	if !ok {
@@ -66,7 +66,7 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 	//Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
 	for _, f := range p.fields {
 		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv)}
 			v := ve.Eval(f.Expr)
 			if _, ok := v.(error); ok {
 				return v

+ 8 - 4
xsql/plans/preprocessor_test.go

@@ -530,7 +530,8 @@ func TestPreprocessor_Apply(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
-			result := pp.Apply(ctx, tuple)
+			fv, afv := xsql.NewAggregateFunctionValuers()
+			result := pp.Apply(ctx, tuple, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 			}
@@ -663,7 +664,8 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
-			result := pp.Apply(ctx, tuple)
+			fv, afv := xsql.NewAggregateFunctionValuers()
+			result := pp.Apply(ctx, tuple, fv, afv)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok {
 				if rtt, ok := rt.Message["abc"].(time.Time); ok {
@@ -836,7 +838,8 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
-			result := pp.Apply(ctx, tuple)
+			fv, afv := xsql.NewAggregateFunctionValuers()
+			result := pp.Apply(ctx, tuple, fv, afv)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok {
 				if rtt, ok := rt.Message["abc"].(time.Time); ok {
@@ -914,7 +917,8 @@ func TestPreprocessorError(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
-			result := pp.Apply(ctx, tuple)
+			fv, afv := xsql.NewAggregateFunctionValuers()
+			result := pp.Apply(ctx, tuple, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 			}

+ 9 - 8
xsql/plans/project_operator.go

@@ -20,7 +20,7 @@ type ProjectPlan struct {
  *  input: *xsql.Tuple from preprocessor or filterOp | xsql.WindowTuplesSet from windowOp or filterOp | xsql.JoinTupleSets from joinOp or filterOp
  *  output: []map[string]interface{}
  */
-func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log.Debugf("project plan receive %s", data)
 	var results []map[string]interface{}
@@ -28,7 +28,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	case error:
 		return input
 	case *xsql.Tuple:
-		ve := pp.getVE(input, input)
+		ve := pp.getVE(input, input, fv, afv)
 		if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 			return fmt.Errorf("run Select error: %s", err)
 		} else {
@@ -40,7 +40,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		}
 		ms := input[0].Tuples
 		for _, v := range ms {
-			ve := pp.getVE(&v, input)
+			ve := pp.getVE(&v, input, fv, afv)
 			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 				return fmt.Errorf("run Select error: %s", err)
 			} else {
@@ -53,7 +53,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	case xsql.JoinTupleSets:
 		ms := input
 		for _, v := range ms {
-			ve := pp.getVE(&v, input)
+			ve := pp.getVE(&v, input, fv, afv)
 			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 				return err
 			} else {
@@ -65,7 +65,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		}
 	case xsql.GroupedTuplesSet:
 		for _, v := range input {
-			ve := pp.getVE(v[0], v)
+			ve := pp.getVE(v[0], v, fv, afv)
 			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 				return fmt.Errorf("run Select error: %s", err)
 			} else {
@@ -83,11 +83,12 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	}
 }
 
-func (pp *ProjectPlan) getVE(tuple xsql.DataValuer, agg xsql.AggregateData) *xsql.ValuerEval {
+func (pp *ProjectPlan) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) *xsql.ValuerEval {
+	afv.SetData(agg)
 	if pp.IsAggregate {
-		return &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, tuple, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: agg}, &xsql.WildcardValuer{Data: tuple})}
+		return &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, fv, tuple, fv, afv, &xsql.WildcardValuer{Data: tuple})}
 	} else {
-		return &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{}, &xsql.WildcardValuer{Data: tuple})}
+		return &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv, &xsql.WildcardValuer{Data: tuple})}
 	}
 }
 

+ 10 - 5
xsql/plans/project_test.go

@@ -389,7 +389,8 @@ func TestProjectPlan_Apply1(t *testing.T) {
 
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -939,7 +940,8 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -1139,7 +1141,8 @@ func TestProjectPlan_Funcs(t *testing.T) {
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -1422,7 +1425,8 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -1572,7 +1576,8 @@ func TestProjectPlanError(t *testing.T) {
 
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 2 - 1
xsql/plans/str_func_test.go

@@ -419,7 +419,8 @@ func TestStrFunc_Apply1(t *testing.T) {
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 6 - 1
xsql/processors/extension_test.go

@@ -52,7 +52,7 @@ func TestExtensions(t *testing.T) {
 	}{
 		{
 			name: `$$test1`,
-			rj:   "{\"sql\": \"SELECT echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\":  {\"path\":\"" + CACHE_FILE + "\"}}]}",
+			rj:   "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\":  {\"path\":\"" + CACHE_FILE + "\"}}]}",
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -105,6 +105,11 @@ func TestExtensions(t *testing.T) {
 				break
 			}
 			r := r[0]
+			c := int((r["c"]).(float64))
+			if c != 1 {
+				t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
+				break
+			}
 			e := int((r["e"]).(float64))
 			if e != 50 && e != 51 {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)

+ 6 - 9
xstream/nodes/sink_node.go

@@ -287,7 +287,10 @@ func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval
 }
 
 func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
-	var s api.Sink
+	var (
+		s   api.Sink
+		err error
+	)
 	switch name {
 	case "log":
 		s = sinks.NewLogSink()
@@ -300,18 +303,12 @@ func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 	case "nop":
 		s = &sinks.NopSink{}
 	default:
-		nf, err := plugins.GetPlugin(name, plugins.SINK)
+		s, err = plugins.GetSink(name)
 		if err != nil {
 			return nil, err
 		}
-		var ok bool
-		s, ok = nf.(api.Sink)
-		if !ok {
-			return nil, fmt.Errorf("exported symbol %s is not type of api.Sink", name)
-		}
 	}
-
-	err := s.Configure(action)
+	err = s.Configure(action)
 	if err != nil {
 		return nil, err
 	}

+ 5 - 7
xstream/nodes/source_node.go

@@ -147,20 +147,18 @@ func (m *SourceNode) reset() {
 }
 
 func doGetSource(t string) (api.Source, error) {
-	var s api.Source
-	var ok bool
+	var (
+		s   api.Source
+		err error
+	)
 	switch t {
 	case "mqtt":
 		s = &extensions.MQTTSource{}
 	default:
-		nf, err := plugins.GetPlugin(t, plugins.SOURCE)
+		s, err = plugins.GetSource(t)
 		if err != nil {
 			return nil, err
 		}
-		s, ok = nf.(api.Source)
-		if !ok {
-			return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
-		}
 	}
 	return s, nil
 }

+ 4 - 2
xstream/operators/operations.go

@@ -2,6 +2,7 @@ package operators
 
 import (
 	"fmt"
+	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/nodes"
 	"sync"
@@ -9,7 +10,7 @@ import (
 
 // UnOperation interface represents unary operations (i.e. Map, Filter, etc)
 type UnOperation interface {
-	Apply(ctx api.StreamContext, data interface{}) interface{}
+	Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{}
 }
 
 // UnFunc implements UnOperation as type func (context.Context, interface{})
@@ -125,6 +126,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 	o.mutex.Lock()
 	o.statManagers = append(o.statManagers, stats)
 	o.mutex.Unlock()
+	fv, afv := xsql.NewAggregateFunctionValuers()
 
 	for {
 		select {
@@ -132,7 +134,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 		case item := <-o.input:
 			stats.IncTotalRecordsIn()
 			stats.ProcessTimeStart()
-			result := o.op.Apply(exeCtx, item)
+			result := o.op.Apply(exeCtx, item, fv, afv)
 
 			switch val := result.(type) {
 			case nil: