aggregate_operator.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package operator
  2. import (
  3. "fmt"
  4. "github.com/lf-edge/ekuiper/internal/xsql"
  5. "github.com/lf-edge/ekuiper/pkg/api"
  6. "github.com/lf-edge/ekuiper/pkg/ast"
  7. )
  8. type AggregateOp struct {
  9. Dimensions ast.Dimensions
  10. }
  11. /**
  12. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  13. * output: xsql.GroupedTuplesSet
  14. */
  15. func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
  16. log := ctx.GetLogger()
  17. log.Debugf("aggregate plan receive %s", data)
  18. grouped := data
  19. var wr *xsql.WindowRange
  20. if p.Dimensions != nil {
  21. var ms []xsql.DataValuer
  22. switch input := data.(type) {
  23. case error:
  24. return input
  25. case xsql.DataValuer:
  26. ms = append(ms, input)
  27. case xsql.WindowTuplesSet:
  28. if len(input.Content) != 1 {
  29. return fmt.Errorf("run Group By error: the input WindowTuplesSet with multiple tuples cannot be evaluated")
  30. }
  31. ms = make([]xsql.DataValuer, len(input.Content[0].Tuples))
  32. for i, m := range input.Content[0].Tuples {
  33. //this is needed or it will always point to the last
  34. t := m
  35. ms[i] = &t
  36. }
  37. wr = input.WindowRange
  38. case *xsql.JoinTupleSets:
  39. ms = make([]xsql.DataValuer, len(input.Content))
  40. for i, m := range input.Content {
  41. t := m
  42. ms[i] = &t
  43. }
  44. wr = input.WindowRange
  45. default:
  46. return fmt.Errorf("run Group By error: invalid input %[1]T(%[1]v)", input)
  47. }
  48. result := make(map[string]*xsql.GroupedTuples)
  49. for _, m := range ms {
  50. var name string
  51. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, fv)}
  52. for _, d := range p.Dimensions {
  53. r := ve.Eval(d.Expr)
  54. if _, ok := r.(error); ok {
  55. return fmt.Errorf("run Group By error: %s", r)
  56. } else {
  57. name += fmt.Sprintf("%v,", r)
  58. }
  59. }
  60. if ts, ok := result[name]; !ok {
  61. result[name] = &xsql.GroupedTuples{Content: []xsql.DataValuer{m}, WindowRange: wr}
  62. } else {
  63. ts.Content = append(ts.Content, m)
  64. }
  65. }
  66. if len(result) > 0 {
  67. g := make([]xsql.GroupedTuples, 0, len(result))
  68. for _, v := range result {
  69. g = append(g, *v)
  70. }
  71. grouped = xsql.GroupedTuplesSet(g)
  72. } else {
  73. grouped = nil
  74. }
  75. }
  76. return grouped
  77. }