aggregate_operator.go 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. )
  21. type AggregateOp struct {
  22. Dimensions ast.Dimensions
  23. }
  24. /**
  25. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  26. * output: xsql.GroupedTuplesSet
  27. */
  28. func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  29. log := ctx.GetLogger()
  30. log.Debugf("aggregate plan receive %s", data)
  31. grouped := data
  32. var wr *xsql.WindowRange
  33. if p.Dimensions != nil {
  34. var ms []xsql.Row
  35. switch input := data.(type) {
  36. case error:
  37. return input
  38. case xsql.Row:
  39. ms = append(ms, input)
  40. case xsql.WindowTuplesSet:
  41. if len(input.Content) != 1 {
  42. return fmt.Errorf("run Group By error: the input WindowTuplesSet with multiple tuples cannot be evaluated")
  43. }
  44. ms = make([]xsql.Row, len(input.Content[0].Tuples))
  45. for i, m := range input.Content[0].Tuples {
  46. //this is needed or it will always point to the last
  47. t := m
  48. ms[i] = &t
  49. }
  50. wr = input.WindowRange
  51. case *xsql.JoinTupleSets:
  52. ms = make([]xsql.Row, len(input.Content))
  53. for i, m := range input.Content {
  54. t := m
  55. ms[i] = &t
  56. }
  57. wr = input.WindowRange
  58. default:
  59. return fmt.Errorf("run Group By error: invalid input %[1]T(%[1]v)", input)
  60. }
  61. result := make(map[string]*xsql.GroupedTuples)
  62. for _, m := range ms {
  63. var name string
  64. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, &xsql.WindowRangeValuer{WindowRange: wr}, fv)}
  65. for _, d := range p.Dimensions {
  66. r := ve.Eval(d.Expr)
  67. if _, ok := r.(error); ok {
  68. return fmt.Errorf("run Group By error: %s", r)
  69. } else {
  70. name += fmt.Sprintf("%v,", r)
  71. }
  72. }
  73. if ts, ok := result[name]; !ok {
  74. result[name] = &xsql.GroupedTuples{Content: []xsql.Row{m}, WindowRange: wr}
  75. } else {
  76. ts.Content = append(ts.Content, m)
  77. }
  78. }
  79. if len(result) > 0 {
  80. g := make([]xsql.GroupedTuples, 0, len(result))
  81. for _, v := range result {
  82. g = append(g, *v)
  83. }
  84. grouped = xsql.GroupedTuplesSet(g)
  85. } else {
  86. grouped = nil
  87. }
  88. }
  89. return grouped
  90. }