join_operator.go 6.7 KB


  1. package plans
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "fmt"
  7. )
  8. //TODO join expr should only be the equal op between 2 streams like tb1.id = tb2.id
  9. type JoinPlan struct {
  10. From *xsql.Table
  11. Joins xsql.Joins
  12. }
  13. // input: xsql.WindowTuplesSet from windowOp, window is required for join
  14. // output: xsql.JoinTupleSets
  15. func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
  16. log := ctx.GetLogger()
  17. var input xsql.WindowTuplesSet
  18. if d, ok := data.(xsql.WindowTuplesSet); !ok {
  19. log.Errorf("Expect WindowTuplesSet type.\n")
  20. return nil
  21. } else {
  22. log.Debugf("join plan receive %v", d)
  23. input = d
  24. }
  25. result := xsql.JoinTupleSets{}
  26. for i, join := range jp.Joins {
  27. if i == 0 {
  28. v, err := jp.evalSet(input, join)
  29. if err != nil {
  30. fmt.Println(err)
  31. return nil
  32. }
  33. result = v
  34. } else {
  35. r1, _ := jp.evalJoinSets(&result, input, join)
  36. if v1, ok := r1.(xsql.JoinTupleSets); ok {
  37. result = v1
  38. }
  39. }
  40. }
  41. if result.Len() <= 0 {
  42. log.Debugf("join plan yields nothing")
  43. return nil
  44. }
  45. return result
  46. }
  47. func getStreamNames(join *xsql.Join) ([]string, error) {
  48. var srcs []string
  49. xsql.WalkFunc(join, func(node xsql.Node) {
  50. if f,ok := node.(*xsql.FieldRef); ok {
  51. if string(f.StreamName) == "" {
  52. return
  53. }
  54. srcs = append(srcs, string(f.StreamName))
  55. }
  56. })
  57. if len(srcs) != 2 {
  58. return nil, fmt.Errorf("Not correct join expression, it requires exactly 2 sources at ON expression.")
  59. }
  60. return srcs, nil
  61. }
  62. func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.JoinTupleSets, error) {
  63. var leftStream, rightStream string
  64. if join.JoinType != xsql.CROSS_JOIN {
  65. streams, err := getStreamNames(&join)
  66. if err != nil {
  67. return nil, err
  68. }
  69. leftStream = streams[0]
  70. rightStream = streams[1]
  71. } else {
  72. if jp.From.Alias == "" {
  73. leftStream = jp.From.Name
  74. } else {
  75. leftStream = jp.From.Alias
  76. }
  77. if join.Alias == "" {
  78. rightStream = join.Name
  79. } else {
  80. rightStream = join.Alias
  81. }
  82. }
  83. var lefts, rights []xsql.Tuple
  84. lefts = input.GetBySrc(leftStream)
  85. rights = input.GetBySrc(rightStream)
  86. sets := xsql.JoinTupleSets{}
  87. if join.JoinType == xsql.RIGHT_JOIN {
  88. return jp.evalSetWithRightJoin(input, join, false)
  89. }
  90. for _, left := range lefts {
  91. merged := &xsql.JoinTuple{}
  92. if join.JoinType == xsql.LEFT_JOIN || join.JoinType == xsql.FULL_JOIN || join.JoinType == xsql.CROSS_JOIN {
  93. merged.AddTuple(left)
  94. }
  95. for _, right := range rights {
  96. if join.JoinType == xsql.CROSS_JOIN {
  97. merged.AddTuple(right)
  98. } else {
  99. temp := &xsql.JoinTuple{}
  100. temp.AddTuple(left)
  101. temp.AddTuple(right)
  102. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
  103. if r, ok := ve.Eval(join.Expr).(bool); ok {
  104. if r {
  105. if join.JoinType == xsql.INNER_JOIN {
  106. merged.AddTuple(left)
  107. merged.AddTuple(right)
  108. sets = append(sets, *merged)
  109. merged = &xsql.JoinTuple{}
  110. }else{
  111. merged.AddTuple(right)
  112. }
  113. }
  114. } else {
  115. common.Log.Infoln("Evaluation error for set.")
  116. }
  117. }
  118. }
  119. if len(merged.Tuples) > 0 {
  120. sets = append(sets, *merged)
  121. }
  122. }
  123. if join.JoinType == xsql.FULL_JOIN {
  124. if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true); err == nil && len(rightJoinSet) > 0 {
  125. for _, jt := range rightJoinSet {
  126. sets = append(sets, jt)
  127. }
  128. }
  129. }
  130. return sets, nil
  131. }
  132. func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool) (xsql.JoinTupleSets, error) {
  133. streams, err := getStreamNames(&join)
  134. if err != nil {
  135. return nil, err
  136. }
  137. leftStream := streams[0]
  138. rightStream := streams[1]
  139. var lefts, rights []xsql.Tuple
  140. lefts = input.GetBySrc(leftStream)
  141. rights = input.GetBySrc(rightStream)
  142. sets := xsql.JoinTupleSets{}
  143. for _, right := range rights {
  144. merged := &xsql.JoinTuple{}
  145. merged.AddTuple(right)
  146. isJoint := false
  147. for _, left := range lefts {
  148. temp := &xsql.JoinTuple{}
  149. temp.AddTuple(right)
  150. temp.AddTuple(left)
  151. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
  152. if r, ok := ve.Eval(join.Expr).(bool); ok {
  153. if r {
  154. merged.AddTuple(left)
  155. isJoint = true
  156. }
  157. } else {
  158. common.Log.Infoln("Evaluation error for set.")
  159. }
  160. }
  161. if excludeJoint {
  162. if len(merged.Tuples) > 0 && (!isJoint) {
  163. sets = append(sets, *merged)
  164. }
  165. } else {
  166. if len(merged.Tuples) > 0 {
  167. sets = append(sets, *merged)
  168. }
  169. }
  170. }
  171. return sets, nil
  172. }
  173. func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join) (interface{}, error) {
  174. var rightStream string
  175. if join.Alias == "" {
  176. rightStream = join.Name
  177. } else {
  178. rightStream = join.Alias
  179. }
  180. rights := input.GetBySrc(rightStream)
  181. newSets := xsql.JoinTupleSets{}
  182. if join.JoinType == xsql.RIGHT_JOIN {
  183. return jp.evalRightJoinSets(set, input, join,false)
  184. }
  185. for _, left := range *set {
  186. merged := &xsql.JoinTuple{}
  187. if join.JoinType == xsql.LEFT_JOIN || join.JoinType == xsql.FULL_JOIN || join.JoinType == xsql.CROSS_JOIN {
  188. merged.AddTuples(left.Tuples)
  189. }
  190. innerAppend := false
  191. for _, right := range rights {
  192. if join.JoinType == xsql.CROSS_JOIN {
  193. merged.AddTuple(right)
  194. } else {
  195. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, &xsql.FunctionValuer{})}
  196. if r, ok := ve.Eval(join.Expr).(bool); ok {
  197. if r {
  198. if join.JoinType == xsql.INNER_JOIN && !innerAppend {
  199. merged.AddTuples(left.Tuples)
  200. innerAppend = true
  201. }
  202. merged.AddTuple(right)
  203. }
  204. }
  205. }
  206. }
  207. if len(merged.Tuples) > 0 {
  208. newSets = append(newSets, *merged)
  209. }
  210. }
  211. if join.JoinType == xsql.FULL_JOIN {
  212. if rightJoinSet, err := jp.evalRightJoinSets(set, input, join, true); err == nil && len(rightJoinSet) > 0 {
  213. for _, jt := range rightJoinSet {
  214. newSets = append(newSets, jt)
  215. }
  216. }
  217. }
  218. return newSets, nil
  219. }
  220. func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool) (xsql.JoinTupleSets, error) {
  221. var rightStream string
  222. if join.Alias == "" {
  223. rightStream = join.Name
  224. } else {
  225. rightStream = join.Alias
  226. }
  227. rights := input.GetBySrc(rightStream)
  228. newSets := xsql.JoinTupleSets{}
  229. for _, right := range rights {
  230. merged := &xsql.JoinTuple{}
  231. merged.AddTuple(right)
  232. isJoint := false
  233. for _, left := range *set {
  234. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, &xsql.FunctionValuer{})}
  235. if r, ok := ve.Eval(join.Expr).(bool); ok {
  236. if r {
  237. isJoint = true
  238. merged.AddTuples(left.Tuples)
  239. }
  240. }
  241. }
  242. if excludeJoint {
  243. if len(merged.Tuples) > 0 && (!isJoint) {
  244. newSets = append(newSets, *merged)
  245. }
  246. } else {
  247. if len(merged.Tuples) > 0 {
  248. newSets = append(newSets, *merged)
  249. }
  250. }
  251. }
  252. return newSets, nil
  253. }