aggregate_operator.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package plans
  2. import (
  3. "github.com/emqx/kuiper/xsql"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "fmt"
  6. )
  7. type AggregatePlan struct {
  8. Dimensions xsql.Dimensions
  9. }
  10. /**
  11. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  12. * output: xsql.GroupedTuplesSet
  13. */
  14. func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
  15. log := ctx.GetLogger()
  16. log.Debugf("aggregate plan receive %s", data)
  17. var ms []xsql.DataValuer
  18. switch input := data.(type) {
  19. case xsql.DataValuer:
  20. ms = append(ms, input)
  21. case xsql.WindowTuplesSet:
  22. if len(input) != 1 {
  23. log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
  24. return nil
  25. }
  26. ms = make([]xsql.DataValuer, len(input[0].Tuples))
  27. for i, m := range input[0].Tuples {
  28. //this is needed or it will always point to the last
  29. t := m
  30. ms[i] = &t
  31. }
  32. case xsql.JoinTupleSets:
  33. ms = make([]xsql.DataValuer, len(input))
  34. for i, m := range input {
  35. t := m
  36. ms[i] = &t
  37. }
  38. default:
  39. log.Errorf("Expect xsql.Valuer or its array type.")
  40. return nil
  41. }
  42. result := make(map[string]xsql.GroupedTuples)
  43. for _, m := range ms {
  44. var name string
  45. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, &xsql.FunctionValuer{})}
  46. for _, d := range p.Dimensions {
  47. name += fmt.Sprintf("%v,", ve.Eval(d.Expr))
  48. }
  49. if ts, ok := result[name]; !ok{
  50. result[name] = xsql.GroupedTuples{m}
  51. }else{
  52. result[name] = append(ts, m)
  53. }
  54. }
  55. if len(result) > 0{
  56. g := make([]xsql.GroupedTuples, 0, len(result))
  57. for _, v := range result {
  58. g = append(g, v)
  59. }
  60. return xsql.GroupedTuplesSet(g)
  61. }else{
  62. return nil
  63. }
  64. }