windowPlan.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import "github.com/lf-edge/ekuiper/pkg/ast"
  16. type WindowPlan struct {
  17. baseLogicalPlan
  18. condition ast.Expr
  19. wtype ast.WindowType
  20. length int
  21. interval int // If interval is not set, it is equals to Length
  22. limit int // If limit is not positive, there will be no limit
  23. isEventTime bool
  24. }
  25. func (p WindowPlan) Init() *WindowPlan {
  26. p.baseLogicalPlan.self = &p
  27. return &p
  28. }
  29. func (p *WindowPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
  30. // not time window depends on the event, so should not filter any
  31. if p.wtype == ast.COUNT_WINDOW || p.wtype == ast.SLIDING_WINDOW {
  32. return condition, p
  33. } else if p.isEventTime {
  34. // TODO event time filter, need event window op support
  35. //p.condition = combine(condition, p.condition)
  36. //// push nil condition won't return any
  37. //p.baseLogicalPlan.PushDownPredicate(nil)
  38. // return nil, p
  39. return condition, p
  40. } else {
  41. // Presume window condition are only one table related.
  42. // TODO window condition validation
  43. a := combine(condition, p.condition)
  44. p.condition, _ = p.baseLogicalPlan.PushDownPredicate(a)
  45. return nil, p
  46. }
  47. }
  48. func (p *WindowPlan) PruneColumns(fields []ast.Expr) error {
  49. f := getFields(p.condition)
  50. return p.baseLogicalPlan.PruneColumns(append(fields, f...))
  51. }