joinPlan.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import "github.com/lf-edge/ekuiper/pkg/ast"
  16. type JoinPlan struct {
  17. baseLogicalPlan
  18. from *ast.Table
  19. joins ast.Joins
  20. }
  21. func (p JoinPlan) Init() *JoinPlan {
  22. p.baseLogicalPlan.self = &p
  23. return &p
  24. }
  25. func (p *JoinPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
  26. //TODO multiple join support
  27. //Assume only one join
  28. j := p.joins[0]
  29. switch j.JoinType {
  30. case ast.INNER_JOIN:
  31. a := combine(condition, j.Expr)
  32. multipleSourcesCondition, singleSourceCondition := extractCondition(a)
  33. rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
  34. j.Expr = combine(multipleSourcesCondition, rest) //always swallow all conditions
  35. p.joins[0] = j
  36. return nil, p
  37. default: //TODO fine grain handling for left/right join
  38. multipleSourcesCondition, singleSourceCondition := extractCondition(condition)
  39. rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
  40. // never swallow anything
  41. return combine(multipleSourcesCondition, rest), p
  42. }
  43. }
  44. // Return the unpushable condition and pushable condition
  45. func extractCondition(condition ast.Expr) (unpushable ast.Expr, pushable ast.Expr) {
  46. s, hasDefault := getRefSources(condition)
  47. l := len(s)
  48. if hasDefault {
  49. l += 1
  50. }
  51. if l == 0 || (l == 1 && s[0] != ast.DefaultStream) {
  52. pushable = condition
  53. return
  54. }
  55. if be, ok := condition.(*ast.BinaryExpr); ok && be.OP == ast.AND {
  56. ul, pl := extractCondition(be.LHS)
  57. ur, pr := extractCondition(be.RHS)
  58. unpushable = combine(ul, ur)
  59. pushable = combine(pl, pr)
  60. return
  61. }
  62. //default case: all condition are unpushable
  63. return condition, nil
  64. }
  65. func (p *JoinPlan) PruneColumns(fields []ast.Expr) error {
  66. f := getFields(p.joins)
  67. return p.baseLogicalPlan.PruneColumns(append(fields, f...))
  68. }