having_operator.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package operator
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/internal/xsql"
  5. "github.com/emqx/kuiper/pkg/api"
  6. "github.com/emqx/kuiper/pkg/ast"
  7. )
  8. type HavingOp struct {
  9. Condition ast.Expr
  10. }
  11. func (p *HavingOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
  12. log := ctx.GetLogger()
  13. log.Debugf("having plan receive %s", data)
  14. switch input := data.(type) {
  15. case error:
  16. return input
  17. case xsql.GroupedTuplesSet:
  18. r := xsql.GroupedTuplesSet{}
  19. for _, v := range input {
  20. afv.SetData(v)
  21. ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(v, fv, v.Content[0], fv, afv, &xsql.WildcardValuer{Data: v.Content[0]})}
  22. result := ve.Eval(p.Condition)
  23. switch val := result.(type) {
  24. case error:
  25. return fmt.Errorf("run Having error: %s", val)
  26. case bool:
  27. if val {
  28. r = append(r, v)
  29. }
  30. default:
  31. return fmt.Errorf("run Having error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
  32. }
  33. }
  34. if len(r) > 0 {
  35. return r
  36. }
  37. case xsql.WindowTuplesSet:
  38. if len(input.Content) != 1 {
  39. return fmt.Errorf("run Having error: input WindowTuplesSet with multiple tuples cannot be evaluated")
  40. }
  41. ms := input.Content[0].Tuples
  42. v := ms[0]
  43. afv.SetData(input)
  44. ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, fv, &v, fv, afv, &xsql.WildcardValuer{Data: &v})}
  45. result := ve.Eval(p.Condition)
  46. switch val := result.(type) {
  47. case error:
  48. return fmt.Errorf("run Having error: %s", val)
  49. case bool:
  50. if val {
  51. return input
  52. }
  53. default:
  54. return fmt.Errorf("run Having error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
  55. }
  56. case *xsql.JoinTupleSets:
  57. ms := input.Content
  58. r := ms[:0]
  59. afv.SetData(input)
  60. for _, v := range ms {
  61. ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, fv, &v, fv, afv, &xsql.WildcardValuer{Data: &v})}
  62. result := ve.Eval(p.Condition)
  63. switch val := result.(type) {
  64. case error:
  65. return fmt.Errorf("run Having error: %s", val)
  66. case bool:
  67. if val {
  68. r = append(r, v)
  69. }
  70. default:
  71. return fmt.Errorf("run Having error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
  72. }
  73. }
  74. input.Content = r
  75. if len(r) > 0 {
  76. return input
  77. }
  78. default:
  79. return fmt.Errorf("run Having error: invalid input %[1]T(%[1]v)", input)
  80. }
  81. return nil
  82. }