join_operator.go 858 B

12345678910111213141516171819202122232425262728293031323334353637383940
  1. package plans
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql"
  6. )
  7. type JoinPlan struct {
  8. Joins xsql.Joins
  9. }
  10. func (jp *JoinPlan) Apply(ctx context.Context, data interface{}) interface{} {
  11. var log = common.Log
  12. var input xsql.MultiEmitterTuples
  13. if d, ok := data.(xsql.MultiEmitterTuples ); !ok {
  14. log.Errorf("Expect MultiEmitterTuples type.\n")
  15. return nil
  16. } else {
  17. input = d
  18. }
  19. result := xsql.MergedEmitterTupleSets{}
  20. for _, join := range jp.Joins {
  21. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, &xsql.FunctionValuer{}), JoinType: join.JoinType}
  22. v := ve.Eval(join.Expr)
  23. if v1, ok := v.(xsql.MergedEmitterTupleSets); ok {
  24. result = jp.mergeSet(v1, result)
  25. }
  26. }
  27. return result
  28. }
  29. func (jp *JoinPlan) mergeSet(set1 xsql.MergedEmitterTupleSets, set2 xsql.MergedEmitterTupleSets) xsql.MergedEmitterTupleSets {
  30. return set1
  31. }