joinPlan.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package planner
  2. import "github.com/lf-edge/ekuiper/pkg/ast"
  3. type JoinPlan struct {
  4. baseLogicalPlan
  5. from *ast.Table
  6. joins ast.Joins
  7. }
  8. func (p JoinPlan) Init() *JoinPlan {
  9. p.baseLogicalPlan.self = &p
  10. return &p
  11. }
  12. func (p *JoinPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
  13. //TODO multiple join support
  14. //Assume only one join
  15. j := p.joins[0]
  16. switch j.JoinType {
  17. case ast.INNER_JOIN:
  18. a := combine(condition, j.Expr)
  19. multipleSourcesCondition, singleSourceCondition := extractCondition(a)
  20. rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
  21. j.Expr = combine(multipleSourcesCondition, rest) //always swallow all conditions
  22. p.joins[0] = j
  23. return nil, p
  24. default: //TODO fine grain handling for left/right join
  25. multipleSourcesCondition, singleSourceCondition := extractCondition(condition)
  26. rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
  27. // never swallow anything
  28. return combine(multipleSourcesCondition, rest), p
  29. }
  30. }
  31. // Return the unpushable condition and pushable condition
  32. func extractCondition(condition ast.Expr) (unpushable ast.Expr, pushable ast.Expr) {
  33. s, hasDefault := getRefSources(condition)
  34. l := len(s)
  35. if hasDefault {
  36. l += 1
  37. }
  38. if l == 0 || (l == 1 && s[0] != ast.DefaultStream) {
  39. pushable = condition
  40. return
  41. }
  42. if be, ok := condition.(*ast.BinaryExpr); ok && be.OP == ast.AND {
  43. ul, pl := extractCondition(be.LHS)
  44. ur, pr := extractCondition(be.RHS)
  45. unpushable = combine(ul, ur)
  46. pushable = combine(pl, pr)
  47. return
  48. }
  49. //default case: all condition are unpushable
  50. return condition, nil
  51. }
  52. func (p *JoinPlan) PruneColumns(fields []ast.Expr) error {
  53. f := getFields(p.joins)
  54. return p.baseLogicalPlan.PruneColumns(append(fields, f...))
  55. }