joinPlan.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package planner
  2. import "github.com/emqx/kuiper/xsql"
  3. type JoinPlan struct {
  4. baseLogicalPlan
  5. from *xsql.Table
  6. joins xsql.Joins
  7. }
  8. func (p JoinPlan) Init() *JoinPlan {
  9. p.baseLogicalPlan.self = &p
  10. return &p
  11. }
  12. func (p *JoinPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalPlan) {
  13. //TODO multiple join support
  14. //Assume only one join
  15. j := p.joins[0]
  16. switch j.JoinType {
  17. case xsql.INNER_JOIN:
  18. a := combine(condition, j.Expr)
  19. multipleSourcesCondition, singleSourceCondition := extractCondition(a)
  20. rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
  21. j.Expr = combine(multipleSourcesCondition, rest) //always swallow all conditions
  22. p.joins[0] = j
  23. return nil, p
  24. default: //TODO fine grain handling for left/right join
  25. multipleSourcesCondition, singleSourceCondition := extractCondition(condition)
  26. rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
  27. // never swallow anything
  28. return combine(multipleSourcesCondition, rest), p
  29. }
  30. }
  31. // Return the unpushable condition and pushable condition
  32. func extractCondition(condition xsql.Expr) (unpushable xsql.Expr, pushable xsql.Expr) {
  33. s := getRefSources(condition)
  34. if len(s) < 2 {
  35. pushable = condition
  36. return
  37. } else {
  38. if be, ok := condition.(*xsql.BinaryExpr); ok && be.OP == xsql.AND {
  39. ul, pl := extractCondition(be.LHS)
  40. ur, pr := extractCondition(be.RHS)
  41. unpushable = combine(ul, ur)
  42. pushable = combine(pl, pr)
  43. return
  44. }
  45. }
  46. //default case: all condition are unpushable
  47. return condition, nil
  48. }
  49. func (p *JoinPlan) PruneColumns(fields []xsql.Expr) error {
  50. f := getFields(p.joins)
  51. return p.baseLogicalPlan.PruneColumns(append(fields, f...))
  52. }