浏览代码

fea(function): add stats function for built-in function

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 2 年之前
父节点
当前提交
68e922983f
共有 4 个文件被更改,包括 286 次插入16 次删除
  1. 1 0
      go.mod
  2. 2 0
      go.sum
  3. 131 0
      internal/binder/function/funcs_agg.go
  4. 152 16
      internal/binder/function/funcs_agg_test.go

+ 1 - 0
go.mod

@@ -24,6 +24,7 @@ require (
 	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
 	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
 	github.com/mattn/go-sqlite3 v1.14.12
 	github.com/mattn/go-sqlite3 v1.14.12
 	github.com/mitchellh/mapstructure v1.5.0
 	github.com/mitchellh/mapstructure v1.5.0
+	github.com/montanaflynn/stats v0.6.6
 	github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b
 	github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b
 	github.com/pebbe/zmq4 v1.2.7
 	github.com/pebbe/zmq4 v1.2.7
 	github.com/prometheus/client_golang v1.11.0
 	github.com/prometheus/client_golang v1.11.0

+ 2 - 0
go.sum

@@ -211,6 +211,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/montanaflynn/stats v0.6.6 h1:Duep6KMIDpY4Yo11iFsvyqJDyfzLF9+sndUKT+v64GQ=
+github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
 github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b h1:6XGbs6qqORYTSLxOWS7JeWnO79Xvucus6RKHicqMoF4=
 github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b h1:6XGbs6qqORYTSLxOWS7JeWnO79Xvucus6RKHicqMoF4=
 github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b/go.mod h1:YUWGR1lA7NRnzRjOmYpSgNVip20OCp6NMBwiVFPPhZ8=
 github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b/go.mod h1:YUWGR1lA7NRnzRjOmYpSgNVip20OCp6NMBwiVFPPhZ8=
 github.com/msgpack/msgpack-go v0.0.0-20130625150338-8224460e6fa3 h1:6pY2f1fJC+u27cqhH0sPkXRquVmGF0VOkLKqraRMYfg=
 github.com/msgpack/msgpack-go v0.0.0-20130625150338-8224460e6fa3 h1:6pY2f1fJC+u27cqhH0sPkXRquVmGF0VOkLKqraRMYfg=

+ 131 - 0
internal/binder/function/funcs_agg.go

@@ -19,6 +19,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/montanaflynn/stats"
 )
 )
 
 
 func registerAggFunc() {
 func registerAggFunc() {
@@ -207,6 +208,136 @@ func registerAggFunc() {
 			return nil
 			return nil
 		},
 		},
 	}
 	}
+	builtins["stddev"] = builtinFunc{
+		fType: ast.FuncTypeAgg,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			arg0 := args[0].([]interface{})
+			if len(arg0) > 0 {
+				float64Slice, err := cast.ToFloat64Slice(arg0, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("requires float64 slice but found %[1]T(%[1]v)", arg0), false
+				}
+				deviation, err := stats.StandardDeviation(float64Slice)
+				if err != nil {
+					return fmt.Errorf("StandardDeviation exec with error: %v", err), false
+				}
+				return deviation, true
+			}
+			return fmt.Errorf("run stddev function error: empty data"), false
+		},
+		val: ValidateOneNumberArg,
+	}
+	builtins["stddevs"] = builtinFunc{
+		fType: ast.FuncTypeAgg,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			arg0 := args[0].([]interface{})
+			if len(arg0) > 0 {
+				float64Slice, err := cast.ToFloat64Slice(arg0, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("requires float64 slice but found %[1]T(%[1]v)", arg0), false
+				}
+				deviation, err := stats.StandardDeviationSample(float64Slice)
+				if err != nil {
+					return fmt.Errorf("StandardDeviationSample exec with error: %v", err), false
+				}
+				return deviation, true
+			}
+			return fmt.Errorf("run stddevs function error: empty data"), false
+		},
+		val: ValidateOneNumberArg,
+	}
+	builtins["var"] = builtinFunc{
+		fType: ast.FuncTypeAgg,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			arg0 := args[0].([]interface{})
+			if len(arg0) > 0 {
+				float64Slice, err := cast.ToFloat64Slice(arg0, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("requires float64 slice but found %[1]T(%[1]v)", arg0), false
+				}
+				deviation, err := stats.Variance(float64Slice)
+				if err != nil {
+					return fmt.Errorf("PopulationVariance exec with error: %v", err), false
+				}
+				return deviation, true
+			}
+			return fmt.Errorf("run var function error: empty data"), false
+		},
+		val: ValidateOneNumberArg,
+	}
+	builtins["vars"] = builtinFunc{
+		fType: ast.FuncTypeAgg,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			arg0 := args[0].([]interface{})
+			if len(arg0) > 0 {
+				float64Slice, err := cast.ToFloat64Slice(arg0, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("requires float64 slice but found %[1]T(%[1]v)", arg0), false
+				}
+				deviation, err := stats.SampleVariance(float64Slice)
+				if err != nil {
+					return fmt.Errorf("SampleVariance exec with error: %v", err), false
+				}
+				return deviation, true
+			}
+			return fmt.Errorf("run vars function error: empty data"), false
+		},
+		val: ValidateOneNumberArg,
+	}
+	builtins["percentile_cont"] = builtinFunc{
+		fType: ast.FuncTypeAgg,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			if err := ValidateLen(2, len(args)); err != nil {
+				return err, false
+			}
+			arg0 := args[0].([]interface{})
+			arg1 := args[1]
+			arg1Float64, err := cast.ToFloat64(arg1, cast.CONVERT_SAMEKIND)
+			if err != nil {
+				return fmt.Errorf("the second parameter requires float64 but found %[1]T(%[1]v)", arg1), false
+			}
+			if len(arg0) > 0 {
+				float64Slice, err := cast.ToFloat64Slice(arg0, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("requires float64 slice but found %[1]T(%[1]v)", arg0), false
+				}
+				deviation, err := stats.Percentile(float64Slice, arg1Float64*100)
+				if err != nil {
+					return fmt.Errorf("percentile exec with error: %v", err), false
+				}
+				return deviation, true
+			}
+			return fmt.Errorf("run percentile_cont function error: empty data"), false
+		},
+		val: ValidateTwoNumberArg,
+	}
+	builtins["percentile_disc"] = builtinFunc{
+		fType: ast.FuncTypeAgg,
+		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
+			if err := ValidateLen(2, len(args)); err != nil {
+				return err, false
+			}
+			arg0 := args[0].([]interface{})
+			arg1 := args[1]
+			arg1Float64, err := cast.ToFloat64(arg1, cast.CONVERT_SAMEKIND)
+			if err != nil {
+				return fmt.Errorf("the second parameter requires float64 but found %[1]T(%[1]v)", arg1), false
+			}
+			if len(arg0) > 0 {
+				float64Slice, err := cast.ToFloat64Slice(arg0, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("requires float64 slice but found %[1]T(%[1]v)", arg0), false
+				}
+				deviation, err := stats.PercentileNearestRank(float64Slice, arg1Float64*100)
+				if err != nil {
+					return fmt.Errorf("PopulationVariance exec with error: %v", err), false
+				}
+				return deviation, true
+			}
+			return fmt.Errorf("run percentile_cont function error: empty data"), false
+		},
+		val: ValidateTwoNumberArg,
+	}
 }
 }
 
 
 func getCount(s []interface{}) int {
 func getCount(s []interface{}) int {

+ 152 - 16
internal/binder/function/funcs_agg_test.go

@@ -37,15 +37,35 @@ func TestAggExec(t *testing.T) {
 	if !ok {
 	if !ok {
 		t.Fatal("builtin not found")
 		t.Fatal("builtin not found")
 	}
 	}
+	fStddev, ok := builtins["stddev"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	fStddevs, ok := builtins["stddevs"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	fVar, ok := builtins["var"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	fVars, ok := builtins["vars"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
 	contextLogger := conf.Log.WithField("rule", "testExec")
 	contextLogger := conf.Log.WithField("rule", "testExec")
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
 	var tests = []struct {
 	var tests = []struct {
-		args []interface{}
-		avg  interface{}
-		max  interface{}
-		min  interface{}
+		args    []interface{}
+		avg     interface{}
+		max     interface{}
+		min     interface{}
+		stddev  interface{}
+		stddevs interface{}
+		var1    interface{}
+		vars    interface{}
 	}{
 	}{
 		{ // 0
 		{ // 0
 			args: []interface{}{
 			args: []interface{}{
@@ -55,9 +75,13 @@ func TestAggExec(t *testing.T) {
 					"self",
 					"self",
 				},
 				},
 			},
 			},
-			avg: fmt.Errorf("run avg function error: found invalid arg string(foo)"),
-			max: "self",
-			min: "bar",
+			avg:     fmt.Errorf("run avg function error: found invalid arg string(foo)"),
+			max:     "self",
+			min:     "bar",
+			stddev:  fmt.Errorf("requires float64 slice but found []interface {}([foo bar self])"),
+			stddevs: fmt.Errorf("requires float64 slice but found []interface {}([foo bar self])"),
+			var1:    fmt.Errorf("requires float64 slice but found []interface {}([foo bar self])"),
+			vars:    fmt.Errorf("requires float64 slice but found []interface {}([foo bar self])"),
 		}, { // 1
 		}, { // 1
 			args: []interface{}{
 			args: []interface{}{
 				[]interface{}{
 				[]interface{}{
@@ -66,9 +90,13 @@ func TestAggExec(t *testing.T) {
 					int64(200),
 					int64(200),
 				},
 				},
 			},
 			},
-			avg: int64(150),
-			max: int64(200),
-			min: int64(100),
+			avg:     int64(150),
+			max:     int64(200),
+			min:     int64(100),
+			stddev:  40.824829046386306,
+			stddevs: float64(50),
+			var1:    1666.6666666666667,
+			vars:    float64(2500),
 		}, { // 2
 		}, { // 2
 			args: []interface{}{
 			args: []interface{}{
 				[]interface{}{
 				[]interface{}{
@@ -77,18 +105,26 @@ func TestAggExec(t *testing.T) {
 					float64(200),
 					float64(200),
 				},
 				},
 			},
 			},
-			avg: float64(150),
-			max: float64(200),
-			min: float64(100),
+			avg:     float64(150),
+			max:     float64(200),
+			min:     float64(100),
+			stddev:  40.824829046386306,
+			stddevs: float64(50),
+			var1:    1666.6666666666667,
+			vars:    float64(2500),
 		}, { // 3
 		}, { // 3
 			args: []interface{}{
 			args: []interface{}{
 				[]interface{}{
 				[]interface{}{
 					100, 150, 200,
 					100, 150, 200,
 				},
 				},
 			},
 			},
-			avg: int64(150),
-			max: int64(200),
-			min: int64(100),
+			avg:     int64(150),
+			max:     int64(200),
+			min:     int64(100),
+			stddev:  40.824829046386306,
+			stddevs: float64(50),
+			var1:    1666.6666666666667,
+			vars:    float64(2500),
 		},
 		},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
@@ -104,5 +140,105 @@ func TestAggExec(t *testing.T) {
 		if !reflect.DeepEqual(rMin, tt.min) {
 		if !reflect.DeepEqual(rMin, tt.min) {
 			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, rMin, tt.min)
 			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, rMin, tt.min)
 		}
 		}
+		rStddev, _ := fStddev.exec(fctx, tt.args)
+		if !reflect.DeepEqual(rStddev, tt.stddev) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, rStddev, tt.stddev)
+		}
+		rStddevs, _ := fStddevs.exec(fctx, tt.args)
+		if !reflect.DeepEqual(rStddevs, tt.stddevs) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, rStddevs, tt.stddevs)
+		}
+		rVar, _ := fVar.exec(fctx, tt.args)
+		if !reflect.DeepEqual(rVar, tt.var1) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, rVar, tt.var1)
+		}
+		rVars, _ := fVars.exec(fctx, tt.args)
+		if !reflect.DeepEqual(rVars, tt.vars) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, rVars, tt.vars)
+		}
+	}
+}
+
+func TestPercentileExec(t *testing.T) {
+	pCont, ok := builtins["percentile_cont"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	pDisc, ok := builtins["percentile_disc"]
+	if !ok {
+		t.Fatal("builtin not found")
+	}
+	contextLogger := conf.Log.WithField("rule", "testExec")
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
+	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
+	var tests = []struct {
+		args  []interface{}
+		pCont interface{}
+		pDisc interface{}
+	}{
+		{ // 0
+			args: []interface{}{
+				[]interface{}{
+					"foo",
+					"bar",
+					"self",
+				},
+				0.25,
+			},
+			pCont: fmt.Errorf("requires float64 slice but found []interface {}([foo bar self])"),
+			pDisc: fmt.Errorf("requires float64 slice but found []interface {}([foo bar self])"),
+		}, { // 1
+			args: []interface{}{
+				[]interface{}{
+					int64(100),
+					int64(150),
+					int64(200),
+				},
+			},
+			pCont: fmt.Errorf("Expect 2 arguments but found 1."),
+			pDisc: fmt.Errorf("Expect 2 arguments but found 1."),
+		}, { // 2
+			args: []interface{}{
+				[]interface{}{
+					int64(100),
+					int64(150),
+					int64(200),
+				},
+				0.5,
+			},
+			pCont: float64(125),
+			pDisc: float64(150),
+		}, { // 3
+			args: []interface{}{
+				[]interface{}{
+					float64(100),
+					float64(150),
+					float64(200),
+				},
+				0.5,
+			},
+			pCont: float64(125),
+			pDisc: float64(150),
+		}, { // 4
+			args: []interface{}{
+				[]interface{}{
+					100, 150, 200,
+				},
+				0.5,
+			},
+			pCont: float64(125),
+			pDisc: float64(150),
+		},
+	}
+	for i, tt := range tests {
+		rCont, _ := pCont.exec(fctx, tt.args)
+		if !reflect.DeepEqual(rCont, tt.pCont) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, rCont, tt.pCont)
+		}
+		rDisc, _ := pDisc.exec(fctx, tt.args)
+		if !reflect.DeepEqual(rDisc, tt.pDisc) {
+			t.Errorf("%d result mismatch,\ngot:\t%v \nwant:\t%v", i, rDisc, tt.pCont)
+		}
 	}
 	}
 }
 }