windowPlan.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import "github.com/lf-edge/ekuiper/pkg/ast"
  16. type WindowPlan struct {
  17. baseLogicalPlan
  18. condition ast.Expr
  19. wtype ast.WindowType
  20. length int
  21. interval int // If interval is not set, it is equals to Length
  22. timeUnit ast.Token
  23. limit int // If limit is not positive, there will be no limit
  24. isEventTime bool
  25. }
  26. func (p WindowPlan) Init() *WindowPlan {
  27. p.baseLogicalPlan.self = &p
  28. return &p
  29. }
  30. func (p *WindowPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
  31. // not time window depends on the event, so should not filter any
  32. if p.wtype == ast.COUNT_WINDOW || p.wtype == ast.SLIDING_WINDOW {
  33. return condition, p
  34. } else if p.isEventTime {
  35. // TODO event time filter, need event window op support
  36. //p.condition = combine(condition, p.condition)
  37. //// push nil condition won't return any
  38. //p.baseLogicalPlan.PushDownPredicate(nil)
  39. // return nil, p
  40. return condition, p
  41. } else {
  42. // Presume window condition are only one table related.
  43. // TODO window condition validation
  44. a := combine(condition, p.condition)
  45. p.condition, _ = p.baseLogicalPlan.PushDownPredicate(a)
  46. return nil, p
  47. }
  48. }
  49. func (p *WindowPlan) PruneColumns(fields []ast.Expr) error {
  50. f := getFields(p.condition)
  51. return p.baseLogicalPlan.PruneColumns(append(fields, f...))
  52. }