Browse Source

fix (function): make aggregate functions return the same value as the SQL for empty Windows (#1690)

* fix aggregate function

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix bug

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix dedup()

Signed-off-by: Rui-Gan <1171530954@qq.com>

---------

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 2 years ago
parent
commit
5d2045be83
2 changed files with 52 additions and 13 deletions
  1. 33 11
      internal/binder/function/funcs_agg.go
  2. 19 2
      internal/binder/function/funcs_agg_test.go

+ 33 - 11
internal/binder/function/funcs_agg.go

@@ -16,6 +16,7 @@ package function
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
@@ -49,7 +50,7 @@ func registerAggFunc() {
 					return fmt.Errorf("run avg function error: found invalid arg %[1]T(%[1]v)", v), false
 					return fmt.Errorf("run avg function error: found invalid arg %[1]T(%[1]v)", v), false
 				}
 				}
 			}
 			}
-			return 0, true
+			return nil, true
 		},
 		},
 		val: ValidateOneNumberArg,
 		val: ValidateOneNumberArg,
 	}
 	}
@@ -98,7 +99,7 @@ func registerAggFunc() {
 					return fmt.Errorf("run max function error: found invalid arg %[1]T(%[1]v)", v), false
 					return fmt.Errorf("run max function error: found invalid arg %[1]T(%[1]v)", v), false
 				}
 				}
 			}
 			}
-			return fmt.Errorf("run max function error: empty data"), false
+			return nil, true
 		},
 		},
 		val: ValidateOneNumberArg,
 		val: ValidateOneNumberArg,
 	}
 	}
@@ -139,7 +140,7 @@ func registerAggFunc() {
 					return fmt.Errorf("run min function error: found invalid arg %[1]T(%[1]v)", v), false
 					return fmt.Errorf("run min function error: found invalid arg %[1]T(%[1]v)", v), false
 				}
 				}
 			}
 			}
-			return fmt.Errorf("run min function error: empty data"), false
+			return nil, true
 		},
 		},
 		val: ValidateOneNumberArg,
 		val: ValidateOneNumberArg,
 	}
 	}
@@ -168,14 +169,17 @@ func registerAggFunc() {
 					return fmt.Errorf("run sum function error: found invalid arg %[1]T(%[1]v)", v), false
 					return fmt.Errorf("run sum function error: found invalid arg %[1]T(%[1]v)", v), false
 				}
 				}
 			}
 			}
-			return 0, true
+			return nil, true
 		},
 		},
 		val: ValidateOneNumberArg,
 		val: ValidateOneNumberArg,
 	}
 	}
 	builtins["collect"] = builtinFunc{
 	builtins["collect"] = builtinFunc{
 		fType: ast.FuncTypeAgg,
 		fType: ast.FuncTypeAgg,
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
 		exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
-			return args[0], true
+			if len(args) > 0 {
+				return args[0], true
+			}
+			return make([]interface{}, 0), true
 		},
 		},
 		val: ValidateOneArg,
 		val: ValidateOneArg,
 	}
 	}
@@ -219,11 +223,14 @@ func registerAggFunc() {
 				}
 				}
 				deviation, err := stats.StandardDeviation(float64Slice)
 				deviation, err := stats.StandardDeviation(float64Slice)
 				if err != nil {
 				if err != nil {
+					if err == stats.EmptyInputErr {
+						return nil, true
+					}
 					return fmt.Errorf("StandardDeviation exec with error: %v", err), false
 					return fmt.Errorf("StandardDeviation exec with error: %v", err), false
 				}
 				}
 				return deviation, true
 				return deviation, true
 			}
 			}
-			return fmt.Errorf("run stddev function error: empty data"), false
+			return nil, true
 		},
 		},
 		val: ValidateOneNumberArg,
 		val: ValidateOneNumberArg,
 	}
 	}
@@ -238,11 +245,14 @@ func registerAggFunc() {
 				}
 				}
 				deviation, err := stats.StandardDeviationSample(float64Slice)
 				deviation, err := stats.StandardDeviationSample(float64Slice)
 				if err != nil {
 				if err != nil {
+					if err == stats.EmptyInputErr {
+						return nil, true
+					}
 					return fmt.Errorf("StandardDeviationSample exec with error: %v", err), false
 					return fmt.Errorf("StandardDeviationSample exec with error: %v", err), false
 				}
 				}
 				return deviation, true
 				return deviation, true
 			}
 			}
-			return fmt.Errorf("run stddevs function error: empty data"), false
+			return nil, true
 		},
 		},
 		val: ValidateOneNumberArg,
 		val: ValidateOneNumberArg,
 	}
 	}
@@ -257,11 +267,14 @@ func registerAggFunc() {
 				}
 				}
 				deviation, err := stats.Variance(float64Slice)
 				deviation, err := stats.Variance(float64Slice)
 				if err != nil {
 				if err != nil {
+					if err == stats.EmptyInputErr {
+						return nil, true
+					}
 					return fmt.Errorf("PopulationVariance exec with error: %v", err), false
 					return fmt.Errorf("PopulationVariance exec with error: %v", err), false
 				}
 				}
 				return deviation, true
 				return deviation, true
 			}
 			}
-			return fmt.Errorf("run var function error: empty data"), false
+			return nil, true
 		},
 		},
 		val: ValidateOneNumberArg,
 		val: ValidateOneNumberArg,
 	}
 	}
@@ -276,11 +289,14 @@ func registerAggFunc() {
 				}
 				}
 				deviation, err := stats.SampleVariance(float64Slice)
 				deviation, err := stats.SampleVariance(float64Slice)
 				if err != nil {
 				if err != nil {
+					if err == stats.EmptyInputErr {
+						return nil, true
+					}
 					return fmt.Errorf("SampleVariance exec with error: %v", err), false
 					return fmt.Errorf("SampleVariance exec with error: %v", err), false
 				}
 				}
 				return deviation, true
 				return deviation, true
 			}
 			}
-			return fmt.Errorf("run vars function error: empty data"), false
+			return nil, true
 		},
 		},
 		val: ValidateOneNumberArg,
 		val: ValidateOneNumberArg,
 	}
 	}
@@ -309,11 +325,14 @@ func registerAggFunc() {
 				}
 				}
 				deviation, err := stats.Percentile(float64Slice, arg1Float64*100)
 				deviation, err := stats.Percentile(float64Slice, arg1Float64*100)
 				if err != nil {
 				if err != nil {
+					if err == stats.EmptyInputErr {
+						return nil, true
+					}
 					return fmt.Errorf("percentile exec with error: %v", err), false
 					return fmt.Errorf("percentile exec with error: %v", err), false
 				}
 				}
 				return deviation, true
 				return deviation, true
 			}
 			}
-			return fmt.Errorf("run percentile_cont function error: empty data"), false
+			return nil, true
 		},
 		},
 		val: ValidateTwoNumberArg,
 		val: ValidateTwoNumberArg,
 	}
 	}
@@ -341,11 +360,14 @@ func registerAggFunc() {
 				}
 				}
 				deviation, err := stats.PercentileNearestRank(float64Slice, arg1Float64*100)
 				deviation, err := stats.PercentileNearestRank(float64Slice, arg1Float64*100)
 				if err != nil {
 				if err != nil {
+					if err == stats.EmptyInputErr {
+						return nil, true
+					}
 					return fmt.Errorf("PopulationVariance exec with error: %v", err), false
 					return fmt.Errorf("PopulationVariance exec with error: %v", err), false
 				}
 				}
 				return deviation, true
 				return deviation, true
 			}
 			}
-			return fmt.Errorf("run percentile_cont function error: empty data"), false
+			return nil, true
 		},
 		},
 		val: ValidateTwoNumberArg,
 		val: ValidateTwoNumberArg,
 	}
 	}

+ 19 - 2
internal/binder/function/funcs_agg_test.go

@@ -16,12 +16,13 @@ package function
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"reflect"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"reflect"
-	"testing"
 )
 )
 
 
 func TestAggExec(t *testing.T) {
 func TestAggExec(t *testing.T) {
@@ -125,6 +126,17 @@ func TestAggExec(t *testing.T) {
 			stddevs: float64(50),
 			stddevs: float64(50),
 			var1:    1666.6666666666667,
 			var1:    1666.6666666666667,
 			vars:    float64(2500),
 			vars:    float64(2500),
+		}, { // 4
+			args: []interface{}{
+				[]interface{}{},
+			},
+			avg:     nil,
+			max:     nil,
+			min:     nil,
+			stddev:  nil,
+			stddevs: nil,
+			var1:    nil,
+			vars:    nil,
 		},
 		},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
@@ -229,6 +241,11 @@ func TestPercentileExec(t *testing.T) {
 			},
 			},
 			pCont: float64(125),
 			pCont: float64(125),
 			pDisc: float64(150),
 			pDisc: float64(150),
+		}, { //5
+			args: []interface{}{[]interface{}{},
+				[]interface{}{}},
+			pCont: nil,
+			pDisc: nil,
 		},
 		},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {