aggregate_operator.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package plans
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. )
  7. type AggregatePlan struct {
  8. Dimensions xsql.Dimensions
  9. }
  10. /**
  11. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  12. * output: xsql.GroupedTuplesSet
  13. */
  14. func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
  15. log := ctx.GetLogger()
  16. log.Debugf("aggregate plan receive %s", data)
  17. var ms []xsql.DataValuer
  18. switch input := data.(type) {
  19. case error:
  20. return input
  21. case xsql.DataValuer:
  22. ms = append(ms, input)
  23. case xsql.WindowTuplesSet:
  24. if len(input) != 1 {
  25. return fmt.Errorf("run Group By error: the input WindowTuplesSet with multiple tuples cannot be evaluated")
  26. }
  27. ms = make([]xsql.DataValuer, len(input[0].Tuples))
  28. for i, m := range input[0].Tuples {
  29. //this is needed or it will always point to the last
  30. t := m
  31. ms[i] = &t
  32. }
  33. case xsql.JoinTupleSets:
  34. ms = make([]xsql.DataValuer, len(input))
  35. for i, m := range input {
  36. t := m
  37. ms[i] = &t
  38. }
  39. default:
  40. return fmt.Errorf("run Group By error: invalid input %[1]T(%[1]v)", input)
  41. }
  42. result := make(map[string]xsql.GroupedTuples)
  43. for _, m := range ms {
  44. var name string
  45. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, &xsql.FunctionValuer{})}
  46. for _, d := range p.Dimensions {
  47. r := ve.Eval(d.Expr)
  48. if _, ok := r.(error); ok {
  49. return fmt.Errorf("run Group By error: %s", r)
  50. } else {
  51. name += fmt.Sprintf("%v,", r)
  52. }
  53. }
  54. if ts, ok := result[name]; !ok {
  55. result[name] = xsql.GroupedTuples{m}
  56. } else {
  57. result[name] = append(ts, m)
  58. }
  59. }
  60. if len(result) > 0 {
  61. g := make([]xsql.GroupedTuples, 0, len(result))
  62. for _, v := range result {
  63. g = append(g, v)
  64. }
  65. return xsql.GroupedTuplesSet(g)
  66. } else {
  67. return nil
  68. }
  69. }