windowPlan.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import "github.com/lf-edge/ekuiper/pkg/ast"
  16. type WindowPlan struct {
  17. baseLogicalPlan
  18. triggerCondition ast.Expr
  19. condition ast.Expr
  20. wtype ast.WindowType
  21. delay int64
  22. length int
  23. interval int // If interval is not set, it is equals to Length
  24. timeUnit ast.Token
  25. limit int // If limit is not positive, there will be no limit
  26. isEventTime bool
  27. }
  28. func (p WindowPlan) Init() *WindowPlan {
  29. p.baseLogicalPlan.self = &p
  30. return &p
  31. }
  32. func (p *WindowPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
  33. // not time window depends on the event, so should not filter any
  34. if p.wtype == ast.COUNT_WINDOW || p.wtype == ast.SLIDING_WINDOW {
  35. return condition, p
  36. } else if p.isEventTime {
  37. // TODO event time filter, need event window op support
  38. //p.condition = combine(condition, p.condition)
  39. //// push nil condition won't return any
  40. //p.baseLogicalPlan.PushDownPredicate(nil)
  41. // return nil, p
  42. return condition, p
  43. } else {
  44. // Presume window condition are only one table related.
  45. // TODO window condition validation
  46. a := combine(condition, p.condition)
  47. p.condition, _ = p.baseLogicalPlan.PushDownPredicate(a)
  48. return nil, p
  49. }
  50. }
  51. func (p *WindowPlan) PruneColumns(fields []ast.Expr) error {
  52. f := getFields(p.condition)
  53. return p.baseLogicalPlan.PruneColumns(append(fields, f...))
  54. }