join_operator.go 6.8 KB


  1. package plans
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. )
  8. //TODO join expr should only be the equal op between 2 streams like tb1.id = tb2.id
  9. type JoinPlan struct {
  10. From *xsql.Table
  11. Joins xsql.Joins
  12. }
  13. // input: xsql.WindowTuplesSet from windowOp, window is required for join
  14. // output: xsql.JoinTupleSets
  15. func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
  16. log := ctx.GetLogger()
  17. var input xsql.WindowTuplesSet
  18. switch v := data.(type) {
  19. case error:
  20. return input
  21. case xsql.WindowTuplesSet:
  22. input = v
  23. log.Debugf("join plan receive %v", data)
  24. default:
  25. return fmt.Errorf("join is only supported in window")
  26. }
  27. result := xsql.JoinTupleSets{}
  28. for i, join := range jp.Joins {
  29. if i == 0 {
  30. v, err := jp.evalSet(input, join)
  31. if err != nil {
  32. fmt.Println(err)
  33. return nil
  34. }
  35. result = v
  36. } else {
  37. r1, _ := jp.evalJoinSets(&result, input, join)
  38. if v1, ok := r1.(xsql.JoinTupleSets); ok {
  39. result = v1
  40. }
  41. }
  42. }
  43. if result.Len() <= 0 {
  44. log.Debugf("join plan yields nothing")
  45. return nil
  46. }
  47. return result
  48. }
  49. func getStreamNames(join *xsql.Join) ([]string, error) {
  50. var srcs []string
  51. xsql.WalkFunc(join, func(node xsql.Node) {
  52. if f, ok := node.(*xsql.FieldRef); ok {
  53. if string(f.StreamName) == "" {
  54. return
  55. }
  56. srcs = append(srcs, string(f.StreamName))
  57. }
  58. })
  59. if len(srcs) != 2 {
  60. return nil, fmt.Errorf("Not correct join expression, it requires exactly 2 sources at ON expression.")
  61. }
  62. return srcs, nil
  63. }
  64. func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.JoinTupleSets, error) {
  65. var leftStream, rightStream string
  66. if join.JoinType != xsql.CROSS_JOIN {
  67. streams, err := getStreamNames(&join)
  68. if err != nil {
  69. return nil, err
  70. }
  71. leftStream = streams[0]
  72. rightStream = streams[1]
  73. } else {
  74. if jp.From.Alias == "" {
  75. leftStream = jp.From.Name
  76. } else {
  77. leftStream = jp.From.Alias
  78. }
  79. if join.Alias == "" {
  80. rightStream = join.Name
  81. } else {
  82. rightStream = join.Alias
  83. }
  84. }
  85. var lefts, rights []xsql.Tuple
  86. lefts = input.GetBySrc(leftStream)
  87. rights = input.GetBySrc(rightStream)
  88. sets := xsql.JoinTupleSets{}
  89. if join.JoinType == xsql.RIGHT_JOIN {
  90. return jp.evalSetWithRightJoin(input, join, false)
  91. }
  92. for _, left := range lefts {
  93. merged := &xsql.JoinTuple{}
  94. if join.JoinType == xsql.LEFT_JOIN || join.JoinType == xsql.FULL_JOIN || join.JoinType == xsql.CROSS_JOIN {
  95. merged.AddTuple(left)
  96. }
  97. for _, right := range rights {
  98. if join.JoinType == xsql.CROSS_JOIN {
  99. merged.AddTuple(right)
  100. } else {
  101. temp := &xsql.JoinTuple{}
  102. temp.AddTuple(left)
  103. temp.AddTuple(right)
  104. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
  105. if r, ok := ve.Eval(join.Expr).(bool); ok {
  106. if r {
  107. if join.JoinType == xsql.INNER_JOIN {
  108. merged.AddTuple(left)
  109. merged.AddTuple(right)
  110. sets = append(sets, *merged)
  111. merged = &xsql.JoinTuple{}
  112. } else {
  113. merged.AddTuple(right)
  114. }
  115. }
  116. } else {
  117. common.Log.Infoln("Evaluation error for set.")
  118. }
  119. }
  120. }
  121. if len(merged.Tuples) > 0 {
  122. sets = append(sets, *merged)
  123. }
  124. }
  125. if join.JoinType == xsql.FULL_JOIN {
  126. if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true); err == nil && len(rightJoinSet) > 0 {
  127. for _, jt := range rightJoinSet {
  128. sets = append(sets, jt)
  129. }
  130. }
  131. }
  132. return sets, nil
  133. }
  134. func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool) (xsql.JoinTupleSets, error) {
  135. streams, err := getStreamNames(&join)
  136. if err != nil {
  137. return nil, err
  138. }
  139. leftStream := streams[0]
  140. rightStream := streams[1]
  141. var lefts, rights []xsql.Tuple
  142. lefts = input.GetBySrc(leftStream)
  143. rights = input.GetBySrc(rightStream)
  144. sets := xsql.JoinTupleSets{}
  145. for _, right := range rights {
  146. merged := &xsql.JoinTuple{}
  147. merged.AddTuple(right)
  148. isJoint := false
  149. for _, left := range lefts {
  150. temp := &xsql.JoinTuple{}
  151. temp.AddTuple(right)
  152. temp.AddTuple(left)
  153. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
  154. if r, ok := ve.Eval(join.Expr).(bool); ok {
  155. if r {
  156. merged.AddTuple(left)
  157. isJoint = true
  158. }
  159. } else {
  160. common.Log.Infoln("Evaluation error for set.")
  161. }
  162. }
  163. if excludeJoint {
  164. if len(merged.Tuples) > 0 && (!isJoint) {
  165. sets = append(sets, *merged)
  166. }
  167. } else {
  168. if len(merged.Tuples) > 0 {
  169. sets = append(sets, *merged)
  170. }
  171. }
  172. }
  173. return sets, nil
  174. }
  175. func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join) (interface{}, error) {
  176. var rightStream string
  177. if join.Alias == "" {
  178. rightStream = join.Name
  179. } else {
  180. rightStream = join.Alias
  181. }
  182. rights := input.GetBySrc(rightStream)
  183. newSets := xsql.JoinTupleSets{}
  184. if join.JoinType == xsql.RIGHT_JOIN {
  185. return jp.evalRightJoinSets(set, input, join, false)
  186. }
  187. for _, left := range *set {
  188. merged := &xsql.JoinTuple{}
  189. if join.JoinType == xsql.LEFT_JOIN || join.JoinType == xsql.FULL_JOIN || join.JoinType == xsql.CROSS_JOIN {
  190. merged.AddTuples(left.Tuples)
  191. }
  192. innerAppend := false
  193. for _, right := range rights {
  194. if join.JoinType == xsql.CROSS_JOIN {
  195. merged.AddTuple(right)
  196. } else {
  197. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, &xsql.FunctionValuer{})}
  198. if r, ok := ve.Eval(join.Expr).(bool); ok {
  199. if r {
  200. if join.JoinType == xsql.INNER_JOIN && !innerAppend {
  201. merged.AddTuples(left.Tuples)
  202. innerAppend = true
  203. }
  204. merged.AddTuple(right)
  205. }
  206. }
  207. }
  208. }
  209. if len(merged.Tuples) > 0 {
  210. newSets = append(newSets, *merged)
  211. }
  212. }
  213. if join.JoinType == xsql.FULL_JOIN {
  214. if rightJoinSet, err := jp.evalRightJoinSets(set, input, join, true); err == nil && len(rightJoinSet) > 0 {
  215. for _, jt := range rightJoinSet {
  216. newSets = append(newSets, jt)
  217. }
  218. }
  219. }
  220. return newSets, nil
  221. }
  222. func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool) (xsql.JoinTupleSets, error) {
  223. var rightStream string
  224. if join.Alias == "" {
  225. rightStream = join.Name
  226. } else {
  227. rightStream = join.Alias
  228. }
  229. rights := input.GetBySrc(rightStream)
  230. newSets := xsql.JoinTupleSets{}
  231. for _, right := range rights {
  232. merged := &xsql.JoinTuple{}
  233. merged.AddTuple(right)
  234. isJoint := false
  235. for _, left := range *set {
  236. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, &xsql.FunctionValuer{})}
  237. if r, ok := ve.Eval(join.Expr).(bool); ok {
  238. if r {
  239. isJoint = true
  240. merged.AddTuples(left.Tuples)
  241. }
  242. }
  243. }
  244. if excludeJoint {
  245. if len(merged.Tuples) > 0 && (!isJoint) {
  246. newSets = append(newSets, *merged)
  247. }
  248. } else {
  249. if len(merged.Tuples) > 0 {
  250. newSets = append(newSets, *merged)
  251. }
  252. }
  253. }
  254. return newSets, nil
  255. }