123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- // Copyright 2021-2022 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package planner
- import (
- "github.com/lf-edge/ekuiper/pkg/ast"
- )
- // LookupPlan is the plan for table lookup and then merged/joined
- type LookupPlan struct {
- baseLogicalPlan
- joinExpr ast.Join
- keys []string
- fields []string
- valvars []ast.Expr
- options *ast.Options
- conditions ast.Expr
- }
- // Init must run validateAndExtractCondition before this func
- func (p LookupPlan) Init() *LookupPlan {
- p.baseLogicalPlan.self = &p
- return &p
- }
- // PushDownPredicate do not deal with conditions, push down or return up
- func (p *LookupPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
- a := combine(condition, p.conditions)
- if len(p.children) == 0 {
- return a, p.self
- }
- rest, _ := p.baseLogicalPlan.PushDownPredicate(a)
- // Swallow all filter conditions. If there are other filter plans, there may have multiple filters
- if rest != nil {
- // Add a filter plan for children
- f := FilterPlan{
- condition: rest,
- }.Init()
- f.SetChildren([]LogicalPlan{p})
- return nil, f
- }
- return nil, p.self
- }
- // validateAndExtractCondition Make sure the join condition is equi-join and extreact other conditions
- func (p *LookupPlan) validateAndExtractCondition() bool {
- equi, conditions := flatConditions(p.joinExpr.Expr)
- // No equal predict condition found
- if len(equi) == 0 {
- return false
- }
- if len(conditions) > 0 {
- p.conditions = conditions[0]
- for _, c := range conditions[1:] {
- p.conditions = &ast.BinaryExpr{OP: ast.AND, LHS: p.conditions, RHS: c}
- }
- }
- strName := p.joinExpr.Name
- kset := make(map[string]struct{})
- // Extract equi-join condition
- for _, c := range equi {
- lref, lok := c.LHS.(*ast.FieldRef)
- rref, rok := c.RHS.(*ast.FieldRef)
- if lok && rok {
- if lref.StreamName == rref.StreamName {
- continue
- }
- if string(lref.StreamName) == strName {
- if _, ok := kset[lref.Name]; ok {
- return false
- }
- kset[lref.Name] = struct{}{}
- p.valvars = append(p.valvars, rref)
- } else if string(rref.StreamName) == strName {
- if _, ok := kset[rref.Name]; ok {
- return false
- }
- kset[rref.Name] = struct{}{}
- p.valvars = append(p.valvars, lref)
- } else {
- continue
- }
- } else if lok {
- if string(lref.StreamName) == strName {
- if _, ok := kset[lref.Name]; ok {
- return false
- }
- kset[lref.Name] = struct{}{}
- p.valvars = append(p.valvars, c.RHS)
- } else {
- continue
- }
- } else if rok {
- if string(rref.StreamName) == strName {
- if _, ok := kset[rref.Name]; ok {
- return false
- }
- kset[rref.Name] = struct{}{}
- p.valvars = append(p.valvars, c.LHS)
- } else {
- continue
- }
- } else {
- continue
- }
- }
- if len(kset) > 0 {
- p.keys = make([]string, 0, len(kset))
- for k := range kset {
- p.keys = append(p.keys, k)
- }
- return true
- }
- return false
- }
- // flatConditions flat the join condition. Only binary condition of EQ and AND are allowed
- func flatConditions(condition ast.Expr) ([]*ast.BinaryExpr, []ast.Expr) {
- if be, ok := condition.(*ast.BinaryExpr); ok {
- switch be.OP {
- case ast.EQ:
- return []*ast.BinaryExpr{be}, []ast.Expr{}
- case ast.AND:
- e1, e2 := flatConditions(be.LHS)
- e3, e4 := flatConditions(be.RHS)
- return append(e1, e3...), append(e2, e4...)
- default:
- return []*ast.BinaryExpr{}, []ast.Expr{condition}
- }
- }
- return []*ast.BinaryExpr{}, []ast.Expr{condition}
- }
- func (p *LookupPlan) PruneColumns(fields []ast.Expr) error {
- newFields := make([]ast.Expr, 0, len(fields))
- isWildcard := false
- strName := p.joinExpr.Name
- fieldMap := make(map[string]struct{})
- for _, field := range fields {
- switch f := field.(type) {
- case *ast.Wildcard:
- isWildcard = true
- case *ast.FieldRef:
- if !isWildcard && (f.StreamName == ast.DefaultStream || string(f.StreamName) == strName) {
- if f.Name == "*" {
- isWildcard = true
- } else {
- fieldMap[f.Name] = struct{}{}
- }
- continue
- }
- case *ast.SortField:
- if !isWildcard {
- fieldMap[f.Name] = struct{}{}
- continue
- }
- }
- newFields = append(newFields, field)
- }
- if !isWildcard {
- p.fields = make([]string, 0, len(fieldMap))
- for k := range fieldMap {
- p.fields = append(p.fields, k)
- }
- }
- return p.baseLogicalPlan.PruneColumns(newFields)
- }
|