Sfoglia il codice sorgente

feat(plugin): create function instance for each function call separately

ngjaying 5 anni fa
parent
commit
d8f7cd7664
2 ha cambiato i file con 43 aggiunte e 22 eliminazioni
  1. 22 12
      xsql/funcs_aggregate.go
  2. 21 10
      xsql/functions.go

+ 22 - 12
xsql/funcs_aggregate.go

@@ -4,22 +4,24 @@ import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/plugins"
+	"github.com/emqx/kuiper/xstream/api"
 	"strings"
 )
 
 type AggregateFunctionValuer struct {
-	Data AggregateData
+	Data    AggregateData
+	plugins map[string]api.Function
 }
 
-func (v AggregateFunctionValuer) Value(key string) (interface{}, bool) {
+func (v *AggregateFunctionValuer) Value(key string) (interface{}, bool) {
 	return nil, false
 }
 
-func (v AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
+func (v *AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
 	return nil, false
 }
 
-func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
+func (v *AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	lowerName := strings.ToLower(name)
 	switch lowerName {
 	case "avg":
@@ -139,16 +141,24 @@ func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interfac
 		return 0, true
 	default:
 		common.Log.Debugf("run aggregate func %s", name)
-		if nf, err := plugins.GetFunction(name); err != nil {
-			return nil, false
-		} else {
-			if !nf.IsAggregate() {
-				return nil, false
+		var (
+			nf  api.Function
+			ok  bool
+			err error
+		)
+		if nf, ok = v.plugins[name]; !ok {
+			nf, err = plugins.GetFunction(name)
+			if err != nil {
+				return err, false
 			}
-			result, ok := nf.Exec(args)
-			common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
-			return result, ok
+			v.plugins[name] = nf
+		}
+		if !nf.IsAggregate() {
+			return nil, false
 		}
+		result, ok := nf.Exec(args)
+		common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
+		return result, ok
 	}
 }
 

+ 21 - 10
xsql/functions.go

@@ -3,10 +3,13 @@ package xsql
 import (
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/plugins"
+	"github.com/emqx/kuiper/xstream/api"
 	"strings"
 )
 
-type FunctionValuer struct{}
+type FunctionValuer struct {
+	plugins map[string]api.Function
+}
 
 func (*FunctionValuer) Value(key string) (interface{}, bool) {
 	return nil, false
@@ -59,7 +62,7 @@ var otherFuncMap = map[string]string{"isnull": "",
 	"newuuid": "", "timestamp": "", "mqtt": "", "meta": "",
 }
 
-func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
+func (fv *FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	lowerName := strings.ToLower(name)
 	if _, ok := mathFuncMap[lowerName]; ok {
 		return mathCall(name, args)
@@ -75,15 +78,23 @@ func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool)
 		return nil, false
 	} else {
 		common.Log.Debugf("run func %s", name)
-		if nf, err := plugins.GetFunction(name); err != nil {
-			return err, false
-		} else {
-			if nf.IsAggregate() {
-				return nil, false
+		var (
+			nf  api.Function
+			ok  bool
+			err error
+		)
+		if nf, ok = fv.plugins[name]; !ok {
+			nf, err = plugins.GetFunction(name)
+			if err != nil {
+				return err, false
 			}
-			result, ok := nf.Exec(args)
-			common.Log.Debugf("run custom function %s, get result %v", name, result)
-			return result, ok
+			fv.plugins[name] = nf
+		}
+		if nf.IsAggregate() {
+			return nil, false
 		}
+		result, ok := nf.Exec(args)
+		common.Log.Debugf("run custom function %s, get result %v", name, result)
+		return result, ok
 	}
 }