aggregate_operator.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package plans
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql"
  6. "fmt"
  7. )
  8. type AggregatePlan struct {
  9. Dimensions xsql.Dimensions
  10. }
  11. /**
  12. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  13. * output: xsql.GroupedTuplesSet
  14. */
  15. func (p *AggregatePlan) Apply(ctx context.Context, data interface{}) interface{} {
  16. log := common.GetLogger(ctx)
  17. log.Debugf("aggregate plan receive %s", data)
  18. var ms []xsql.DataValuer
  19. switch input := data.(type) {
  20. case xsql.DataValuer:
  21. ms = append(ms, input)
  22. case xsql.WindowTuplesSet:
  23. if len(input) != 1 {
  24. log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
  25. return nil
  26. }
  27. ms = make([]xsql.DataValuer, len(input[0].Tuples))
  28. for i, m := range input[0].Tuples {
  29. //this is needed or it will always point to the last
  30. t := m
  31. ms[i] = &t
  32. }
  33. case xsql.JoinTupleSets:
  34. ms = make([]xsql.DataValuer, len(input))
  35. for i, m := range input {
  36. t := m
  37. ms[i] = &t
  38. }
  39. default:
  40. log.Errorf("Expect xsql.Valuer or its array type.")
  41. return nil
  42. }
  43. result := make(map[string]xsql.GroupedTuples)
  44. for _, m := range ms {
  45. var name string
  46. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, &xsql.FunctionValuer{})}
  47. for _, d := range p.Dimensions {
  48. name += fmt.Sprintf("%v,", ve.Eval(d.Expr))
  49. }
  50. if ts, ok := result[name]; !ok{
  51. result[name] = xsql.GroupedTuples{m}
  52. }else{
  53. result[name] = append(ts, m)
  54. }
  55. }
  56. if len(result) > 0{
  57. g := make([]xsql.GroupedTuples, 0, len(result))
  58. for _, v := range result {
  59. g = append(g, v)
  60. }
  61. return xsql.GroupedTuplesSet(g)
  62. }else{
  63. return nil
  64. }
  65. }