lookupPlan.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "github.com/lf-edge/ekuiper/pkg/ast"
  17. )
  18. // LookupPlan is the plan for table lookup and then merged/joined
  19. type LookupPlan struct {
  20. baseLogicalPlan
  21. joinExpr ast.Join
  22. keys []string
  23. fields []string
  24. valvars []ast.Expr
  25. options *ast.Options
  26. conditions ast.Expr
  27. }
  28. // Init must run validateAndExtractCondition before this func
  29. func (p LookupPlan) Init() *LookupPlan {
  30. p.baseLogicalPlan.self = &p
  31. return &p
  32. }
  33. // PushDownPredicate do not deal with conditions, push down or return up
  34. func (p *LookupPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
  35. a := combine(condition, p.conditions)
  36. if len(p.children) == 0 {
  37. return a, p.self
  38. }
  39. rest, _ := p.baseLogicalPlan.PushDownPredicate(a)
  40. // Swallow all filter conditions. If there are other filter plans, there may have multiple filters
  41. if rest != nil {
  42. // Add a filter plan for children
  43. f := FilterPlan{
  44. condition: rest,
  45. }.Init()
  46. f.SetChildren([]LogicalPlan{p})
  47. return nil, f
  48. }
  49. return nil, p.self
  50. }
  51. // validateAndExtractCondition Make sure the join condition is equi-join and extreact other conditions
  52. func (p *LookupPlan) validateAndExtractCondition() bool {
  53. equi, conditions := flatConditions(p.joinExpr.Expr)
  54. // No equal predict condition found
  55. if len(equi) == 0 {
  56. return false
  57. }
  58. if len(conditions) > 0 {
  59. p.conditions = conditions[0]
  60. for _, c := range conditions[1:] {
  61. p.conditions = &ast.BinaryExpr{OP: ast.AND, LHS: p.conditions, RHS: c}
  62. }
  63. }
  64. strName := p.joinExpr.Name
  65. kset := make(map[string]struct{})
  66. // Extract equi-join condition
  67. for _, c := range equi {
  68. lref, lok := c.LHS.(*ast.FieldRef)
  69. rref, rok := c.RHS.(*ast.FieldRef)
  70. if lok && rok {
  71. if lref.StreamName == rref.StreamName {
  72. continue
  73. }
  74. if string(lref.StreamName) == strName {
  75. if _, ok := kset[lref.Name]; ok {
  76. return false
  77. }
  78. kset[lref.Name] = struct{}{}
  79. p.valvars = append(p.valvars, rref)
  80. } else if string(rref.StreamName) == strName {
  81. if _, ok := kset[rref.Name]; ok {
  82. return false
  83. }
  84. kset[rref.Name] = struct{}{}
  85. p.valvars = append(p.valvars, lref)
  86. } else {
  87. continue
  88. }
  89. } else if lok {
  90. if string(lref.StreamName) == strName {
  91. if _, ok := kset[lref.Name]; ok {
  92. return false
  93. }
  94. kset[lref.Name] = struct{}{}
  95. p.valvars = append(p.valvars, c.RHS)
  96. } else {
  97. continue
  98. }
  99. } else if rok {
  100. if string(rref.StreamName) == strName {
  101. if _, ok := kset[rref.Name]; ok {
  102. return false
  103. }
  104. kset[rref.Name] = struct{}{}
  105. p.valvars = append(p.valvars, c.LHS)
  106. } else {
  107. continue
  108. }
  109. } else {
  110. continue
  111. }
  112. }
  113. if len(kset) > 0 {
  114. p.keys = make([]string, 0, len(kset))
  115. for k := range kset {
  116. p.keys = append(p.keys, k)
  117. }
  118. return true
  119. }
  120. return false
  121. }
  122. // flatConditions flat the join condition. Only binary condition of EQ and AND are allowed
  123. func flatConditions(condition ast.Expr) ([]*ast.BinaryExpr, []ast.Expr) {
  124. if be, ok := condition.(*ast.BinaryExpr); ok {
  125. switch be.OP {
  126. case ast.EQ:
  127. return []*ast.BinaryExpr{be}, []ast.Expr{}
  128. case ast.AND:
  129. e1, e2 := flatConditions(be.LHS)
  130. e3, e4 := flatConditions(be.RHS)
  131. return append(e1, e3...), append(e2, e4...)
  132. default:
  133. return []*ast.BinaryExpr{}, []ast.Expr{condition}
  134. }
  135. }
  136. return []*ast.BinaryExpr{}, []ast.Expr{condition}
  137. }
  138. func (p *LookupPlan) PruneColumns(fields []ast.Expr) error {
  139. newFields := make([]ast.Expr, 0, len(fields))
  140. isWildcard := false
  141. strName := p.joinExpr.Name
  142. fieldMap := make(map[string]struct{})
  143. for _, field := range fields {
  144. switch f := field.(type) {
  145. case *ast.Wildcard:
  146. isWildcard = true
  147. case *ast.FieldRef:
  148. if !isWildcard && (f.StreamName == ast.DefaultStream || string(f.StreamName) == strName) {
  149. if f.Name == "*" {
  150. isWildcard = true
  151. } else {
  152. fieldMap[f.Name] = struct{}{}
  153. }
  154. continue
  155. }
  156. case *ast.SortField:
  157. if !isWildcard {
  158. fieldMap[f.Name] = struct{}{}
  159. continue
  160. }
  161. }
  162. newFields = append(newFields, field)
  163. }
  164. if !isWildcard {
  165. p.fields = make([]string, 0, len(fieldMap))
  166. for k := range fieldMap {
  167. p.fields = append(p.fields, k)
  168. }
  169. }
  170. return p.baseLogicalPlan.PruneColumns(newFields)
  171. }