having_operator.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package plans
  2. import (
  3. "github.com/emqx/kuiper/xsql"
  4. "github.com/emqx/kuiper/xstream/api"
  5. )
  6. type HavingPlan struct {
  7. Condition xsql.Expr
  8. }
  9. func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
  10. log := ctx.GetLogger()
  11. log.Debugf("having plan receive %s", data)
  12. switch input := data.(type) {
  13. case xsql.GroupedTuplesSet:
  14. r := xsql.GroupedTuplesSet{}
  15. for _, v := range input {
  16. ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: v})}
  17. result, ok := ve.Eval(p.Condition).(bool)
  18. if ok {
  19. if result {
  20. r = append(r, v)
  21. }
  22. } else {
  23. log.Errorf("invalid condition that returns non-bool value")
  24. return nil
  25. }
  26. }
  27. if len(r) > 0 {
  28. return r
  29. }
  30. case xsql.WindowTuplesSet:
  31. if len(input) != 1 {
  32. log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
  33. return nil
  34. }
  35. ms := input[0].Tuples
  36. r := ms[:0]
  37. for _, v := range ms {
  38. //ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
  39. ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
  40. result, ok := ve.Eval(p.Condition).(bool)
  41. if ok {
  42. if result {
  43. r = append(r, v)
  44. }
  45. } else {
  46. log.Errorf("invalid condition that returns non-bool value")
  47. return nil
  48. }
  49. }
  50. if len(r) > 0 {
  51. input[0].Tuples = r
  52. return input
  53. }
  54. case xsql.JoinTupleSets:
  55. ms := input
  56. r := ms[:0]
  57. for _, v := range ms {
  58. //ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
  59. ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
  60. result, ok := ve.Eval(p.Condition).(bool)
  61. if ok {
  62. if result {
  63. r = append(r, v)
  64. }
  65. } else {
  66. log.Errorf("invalid condition that returns non-bool value")
  67. return nil
  68. }
  69. }
  70. if len(r) > 0{
  71. return r
  72. }
  73. default:
  74. log.Errorf("Expect xsql.Valuer or its array type.")
  75. return nil
  76. }
  77. return nil
  78. }