windowPlan.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import "github.com/lf-edge/ekuiper/pkg/ast"
  16. type WindowPlan struct {
  17. baseLogicalPlan
  18. condition ast.Expr
  19. wtype ast.WindowType
  20. length int
  21. interval int //If interval is not set, it is equals to Length
  22. limit int //If limit is not positive, there will be no limit
  23. isEventTime bool
  24. }
  25. func (p WindowPlan) Init() *WindowPlan {
  26. p.baseLogicalPlan.self = &p
  27. return &p
  28. }
  29. func (p *WindowPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
  30. if p.wtype == ast.COUNT_WINDOW {
  31. return condition, p
  32. } else if p.isEventTime {
  33. // TODO event time filter, need event window op support
  34. //p.condition = combine(condition, p.condition)
  35. //// push nil condition won't return any
  36. //p.baseLogicalPlan.PushDownPredicate(nil)
  37. // return nil, p
  38. return condition, p
  39. } else {
  40. //Presume window condition are only one table related.
  41. // TODO window condition validation
  42. a := combine(condition, p.condition)
  43. p.condition, _ = p.baseLogicalPlan.PushDownPredicate(a)
  44. return nil, p
  45. }
  46. }
  47. func (p *WindowPlan) PruneColumns(fields []ast.Expr) error {
  48. f := getFields(p.condition)
  49. return p.baseLogicalPlan.PruneColumns(append(fields, f...))
  50. }