aggregate_operator.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. )
  21. type AggregateOp struct {
  22. Dimensions ast.Dimensions
  23. }
  24. // Apply
  25. /* input: Collection
  26. * output: Collection
  27. */
  28. func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  29. log := ctx.GetLogger()
  30. log.Debugf("aggregate plan receive %s", data)
  31. grouped := data
  32. if p.Dimensions != nil {
  33. switch input := data.(type) {
  34. case error:
  35. return input
  36. case xsql.SingleCollection:
  37. wr := input.GetWindowRange()
  38. result := make(map[string]*xsql.GroupedTuples)
  39. err := input.Range(func(i int, ir xsql.ReadonlyRow) (bool, error) {
  40. var name string
  41. tr := ir.(xsql.TupleRow)
  42. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tr, &xsql.WindowRangeValuer{WindowRange: wr}, fv)}
  43. for _, d := range p.Dimensions {
  44. r := ve.Eval(d.Expr)
  45. if _, ok := r.(error); ok {
  46. return false, fmt.Errorf("run Group By error: %s", r)
  47. } else {
  48. name += fmt.Sprintf("%v,", r)
  49. }
  50. }
  51. if ts, ok := result[name]; !ok {
  52. result[name] = &xsql.GroupedTuples{Content: []xsql.TupleRow{tr}, WindowRange: wr}
  53. } else {
  54. ts.Content = append(ts.Content, tr)
  55. }
  56. return true, nil
  57. })
  58. if err != nil {
  59. return err
  60. }
  61. if len(result) > 0 {
  62. g := make([]*xsql.GroupedTuples, 0, len(result))
  63. for _, v := range result {
  64. g = append(g, v)
  65. }
  66. grouped = &xsql.GroupedTuplesSet{Groups: g}
  67. } else {
  68. grouped = nil
  69. }
  70. return grouped
  71. default:
  72. return fmt.Errorf("run Group By error: invalid input %[1]T(%[1]v)", input)
  73. }
  74. }
  75. return grouped
  76. }