aggregate_operator.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package plans
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. )
  7. type AggregatePlan struct {
  8. Dimensions xsql.Dimensions
  9. }
  10. /**
  11. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  12. * output: xsql.GroupedTuplesSet
  13. */
  14. func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
  15. log := ctx.GetLogger()
  16. log.Debugf("aggregate plan receive %s", data)
  17. var ms []xsql.DataValuer
  18. switch input := data.(type) {
  19. case error:
  20. return input
  21. case xsql.DataValuer:
  22. ms = append(ms, input)
  23. case xsql.WindowTuplesSet:
  24. if len(input) != 1 {
  25. log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
  26. return nil
  27. }
  28. ms = make([]xsql.DataValuer, len(input[0].Tuples))
  29. for i, m := range input[0].Tuples {
  30. //this is needed or it will always point to the last
  31. t := m
  32. ms[i] = &t
  33. }
  34. case xsql.JoinTupleSets:
  35. ms = make([]xsql.DataValuer, len(input))
  36. for i, m := range input {
  37. t := m
  38. ms[i] = &t
  39. }
  40. default:
  41. return fmt.Errorf("expect xsql.Valuer or its array type.")
  42. }
  43. result := make(map[string]xsql.GroupedTuples)
  44. for _, m := range ms {
  45. var name string
  46. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, &xsql.FunctionValuer{})}
  47. for _, d := range p.Dimensions {
  48. r := ve.Eval(d.Expr)
  49. if _, ok := r.(error); ok {
  50. return r
  51. } else {
  52. name += fmt.Sprintf("%v,", r)
  53. }
  54. }
  55. if ts, ok := result[name]; !ok {
  56. result[name] = xsql.GroupedTuples{m}
  57. } else {
  58. result[name] = append(ts, m)
  59. }
  60. }
  61. if len(result) > 0 {
  62. g := make([]xsql.GroupedTuples, 0, len(result))
  63. for _, v := range result {
  64. g = append(g, v)
  65. }
  66. return xsql.GroupedTuplesSet(g)
  67. } else {
  68. return nil
  69. }
  70. }