aggregate_operator.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. )
  21. type AggregateOp struct {
  22. Dimensions ast.Dimensions
  23. }
  24. // Apply
  25. /* input: Collection
  26. * output: Collection
  27. */
  28. func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  29. log := ctx.GetLogger()
  30. log.Debugf("aggregate plan receive %s", data)
  31. grouped := data
  32. if p.Dimensions != nil {
  33. switch input := data.(type) {
  34. case error:
  35. return input
  36. case xsql.SingleCollection:
  37. wr := input.GetWindowRange()
  38. result := make(map[string]*xsql.GroupedTuples)
  39. err := input.Range(func(i int, r xsql.TupleRow) (bool, error) {
  40. var name string
  41. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(r, &xsql.WindowRangeValuer{WindowRange: wr}, fv)}
  42. for _, d := range p.Dimensions {
  43. r := ve.Eval(d.Expr)
  44. if _, ok := r.(error); ok {
  45. return false, fmt.Errorf("run Group By error: %s", r)
  46. } else {
  47. name += fmt.Sprintf("%v,", r)
  48. }
  49. }
  50. if ts, ok := result[name]; !ok {
  51. result[name] = &xsql.GroupedTuples{Content: []xsql.TupleRow{r}, WindowRange: wr}
  52. } else {
  53. ts.Content = append(ts.Content, r)
  54. }
  55. return true, nil
  56. })
  57. if err != nil {
  58. return err
  59. }
  60. if len(result) > 0 {
  61. g := make([]*xsql.GroupedTuples, 0, len(result))
  62. for _, v := range result {
  63. g = append(g, v)
  64. }
  65. grouped = &xsql.GroupedTuplesSet{Groups: g}
  66. } else {
  67. grouped = nil
  68. }
  69. return grouped
  70. default:
  71. return fmt.Errorf("run Group By error: invalid input %[1]T(%[1]v)", input)
  72. }
  73. }
  74. return grouped
  75. }