1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- // Copyright 2021 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package planner
- import "github.com/lf-edge/ekuiper/pkg/ast"
- type JoinPlan struct {
- baseLogicalPlan
- from *ast.Table
- joins ast.Joins
- }
- func (p JoinPlan) Init() *JoinPlan {
- p.baseLogicalPlan.self = &p
- return &p
- }
- func (p *JoinPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
- // TODO multiple join support
- // Assume only one join
- j := p.joins[0]
- switch j.JoinType {
- case ast.INNER_JOIN:
- a := combine(condition, j.Expr)
- multipleSourcesCondition, singleSourceCondition := extractCondition(a)
- rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
- j.Expr = combine(multipleSourcesCondition, rest) // always swallow all conditions
- p.joins[0] = j
- return nil, p
- default: // TODO fine grain handling for left/right join
- multipleSourcesCondition, singleSourceCondition := extractCondition(condition)
- rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
- // never swallow anything
- return combine(multipleSourcesCondition, rest), p
- }
- }
- // Return the unpushable condition and pushable condition
- func extractCondition(condition ast.Expr) (unpushable ast.Expr, pushable ast.Expr) {
- s, hasDefault := getRefSources(condition)
- l := len(s)
- if hasDefault {
- l += 1
- }
- if l == 0 || (l == 1 && s[0] != ast.DefaultStream) {
- pushable = condition
- return
- }
- if be, ok := condition.(*ast.BinaryExpr); ok && be.OP == ast.AND {
- ul, pl := extractCondition(be.LHS)
- ur, pr := extractCondition(be.RHS)
- unpushable = combine(ul, ur)
- pushable = combine(pl, pr)
- return
- }
- // default case: all condition are unpushable
- return condition, nil
- }
- func (p *JoinPlan) PruneColumns(fields []ast.Expr) error {
- f := getFields(p.joins)
- return p.baseLogicalPlan.PruneColumns(append(fields, f...))
- }
|