join_operator.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. package operator
  2. import (
  3. "fmt"
  4. "github.com/lf-edge/ekuiper/internal/xsql"
  5. "github.com/lf-edge/ekuiper/pkg/api"
  6. "github.com/lf-edge/ekuiper/pkg/ast"
  7. )
  8. //TODO join expr should only be the equal op between 2 streams like tb1.id = tb2.id
  9. type JoinOp struct {
  10. From *ast.Table
  11. Joins ast.Joins
  12. }
  13. // input: xsql.WindowTuplesSet from windowOp, window is required for join
  14. // output: xsql.JoinTupleSets
  15. func (jp *JoinOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  16. log := ctx.GetLogger()
  17. var input xsql.WindowTuplesSet
  18. switch v := data.(type) {
  19. case error:
  20. return v
  21. case xsql.WindowTuplesSet:
  22. input = v
  23. log.Debugf("join plan receive %v", data)
  24. default:
  25. return fmt.Errorf("run Join error: join is only supported in window")
  26. }
  27. result := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  28. for i, join := range jp.Joins {
  29. if i == 0 {
  30. v, err := jp.evalSet(input, join, fv)
  31. if err != nil {
  32. return fmt.Errorf("run Join error: %s", err)
  33. }
  34. result = v
  35. } else {
  36. r1, err := jp.evalJoinSets(result, input, join, fv)
  37. if err != nil {
  38. return fmt.Errorf("run Join error: %s", err)
  39. }
  40. if v1, ok := r1.(*xsql.JoinTupleSets); ok {
  41. result = v1
  42. }
  43. }
  44. }
  45. if result.Len() <= 0 {
  46. log.Debugf("join plan yields nothing")
  47. return nil
  48. }
  49. result.WindowRange = input.WindowRange
  50. return result
  51. }
  52. func (jp *JoinOp) getStreamNames(join *ast.Join) ([]string, error) {
  53. var srcs []string
  54. keys := make(map[ast.StreamName]bool)
  55. ast.WalkFunc(join, func(node ast.Node) bool {
  56. if f, ok := node.(*ast.FieldRef); ok {
  57. for _, v := range f.RefSources() {
  58. if _, ok := keys[v]; !ok {
  59. srcs = append(srcs, string(v))
  60. keys[v] = true
  61. }
  62. }
  63. }
  64. return true
  65. })
  66. if len(srcs) != 2 {
  67. if jp.From.Alias != "" {
  68. srcs = append(srcs, jp.From.Alias)
  69. } else {
  70. srcs = append(srcs, jp.From.Name)
  71. }
  72. if join.Alias != "" {
  73. srcs = append(srcs, join.Alias)
  74. } else {
  75. srcs = append(srcs, join.Name)
  76. }
  77. }
  78. return srcs, nil
  79. }
  80. func (jp *JoinOp) evalSet(input xsql.WindowTuplesSet, join ast.Join, fv *xsql.FunctionValuer) (*xsql.JoinTupleSets, error) {
  81. var leftStream, rightStream string
  82. if join.JoinType != ast.CROSS_JOIN {
  83. streams, err := jp.getStreamNames(&join)
  84. if err != nil {
  85. return nil, err
  86. }
  87. leftStream = streams[0]
  88. rightStream = streams[1]
  89. } else {
  90. if jp.From.Alias == "" {
  91. leftStream = jp.From.Name
  92. } else {
  93. leftStream = jp.From.Alias
  94. }
  95. if join.Alias == "" {
  96. rightStream = join.Name
  97. } else {
  98. rightStream = join.Alias
  99. }
  100. }
  101. var lefts, rights []xsql.Tuple
  102. lefts = input.GetBySrc(leftStream)
  103. rights = input.GetBySrc(rightStream)
  104. sets := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  105. if join.JoinType == ast.RIGHT_JOIN {
  106. return jp.evalSetWithRightJoin(input, join, false, fv)
  107. }
  108. for _, left := range lefts {
  109. leftJoined := false
  110. for index, right := range rights {
  111. tupleJoined := false
  112. merged := &xsql.JoinTuple{}
  113. if join.JoinType == ast.LEFT_JOIN || join.JoinType == ast.FULL_JOIN || join.JoinType == ast.CROSS_JOIN {
  114. merged.AddTuple(left)
  115. }
  116. if join.JoinType == ast.CROSS_JOIN {
  117. tupleJoined = true
  118. merged.AddTuple(right)
  119. } else {
  120. temp := &xsql.JoinTuple{}
  121. temp.AddTuple(left)
  122. temp.AddTuple(right)
  123. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, fv)}
  124. result := evalOn(join, ve, &left, &right)
  125. merged.AliasMap = temp.AliasMap
  126. switch val := result.(type) {
  127. case error:
  128. return nil, val
  129. case bool:
  130. if val {
  131. leftJoined = true
  132. tupleJoined = true
  133. if join.JoinType == ast.INNER_JOIN {
  134. merged.AddTuple(left)
  135. merged.AddTuple(right)
  136. } else {
  137. merged.AddTuple(right)
  138. }
  139. }
  140. default:
  141. return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
  142. }
  143. }
  144. if tupleJoined || (!leftJoined && index == len(rights)-1 && len(merged.Tuples) > 0) {
  145. leftJoined = true
  146. sets.Content = append(sets.Content, *merged)
  147. }
  148. }
  149. // If no messages in the right
  150. if !leftJoined && join.JoinType != ast.INNER_JOIN {
  151. merged := &xsql.JoinTuple{}
  152. merged.AddTuple(left)
  153. sets.Content = append(sets.Content, *merged)
  154. }
  155. }
  156. if join.JoinType == ast.FULL_JOIN {
  157. if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true, fv); err == nil {
  158. if len(rightJoinSet.Content) > 0 {
  159. for _, jt := range rightJoinSet.Content {
  160. sets.Content = append(sets.Content, jt)
  161. }
  162. }
  163. } else {
  164. return nil, err
  165. }
  166. }
  167. return sets, nil
  168. }
  169. func evalOn(join ast.Join, ve *xsql.ValuerEval, left interface{}, right *xsql.Tuple) interface{} {
  170. var result interface{}
  171. if join.Expr != nil {
  172. result = ve.Eval(join.Expr)
  173. } else if join.JoinType == ast.INNER_JOIN { // if no on expression
  174. result = left != nil && right != nil
  175. } else {
  176. result = true
  177. }
  178. return result
  179. }
  180. func (jp *JoinOp) evalSetWithRightJoin(input xsql.WindowTuplesSet, join ast.Join, excludeJoint bool, fv *xsql.FunctionValuer) (*xsql.JoinTupleSets, error) {
  181. streams, err := jp.getStreamNames(&join)
  182. if err != nil {
  183. return nil, err
  184. }
  185. leftStream := streams[0]
  186. rightStream := streams[1]
  187. var lefts, rights []xsql.Tuple
  188. lefts = input.GetBySrc(leftStream)
  189. rights = input.GetBySrc(rightStream)
  190. sets := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  191. for _, right := range rights {
  192. isJoint := false
  193. for index, left := range lefts {
  194. tupleJoined := false
  195. merged := &xsql.JoinTuple{}
  196. merged.AddTuple(right)
  197. temp := &xsql.JoinTuple{}
  198. temp.AddTuple(right)
  199. temp.AddTuple(left)
  200. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, fv)}
  201. result := evalOn(join, ve, &left, &right)
  202. merged.AliasMap = temp.AliasMap
  203. switch val := result.(type) {
  204. case error:
  205. return nil, val
  206. case bool:
  207. if val {
  208. merged.AddTuple(left)
  209. isJoint = true
  210. tupleJoined = true
  211. }
  212. default:
  213. return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
  214. }
  215. if !excludeJoint && (tupleJoined || (!isJoint && index == len(lefts)-1 && len(merged.Tuples) > 0)) {
  216. isJoint = true
  217. sets.Content = append(sets.Content, *merged)
  218. }
  219. }
  220. if !isJoint {
  221. merged := &xsql.JoinTuple{}
  222. merged.AddTuple(right)
  223. sets.Content = append(sets.Content, *merged)
  224. }
  225. }
  226. return sets, nil
  227. }
  228. func (jp *JoinOp) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join ast.Join, fv *xsql.FunctionValuer) (interface{}, error) {
  229. var rightStream string
  230. if join.Alias == "" {
  231. rightStream = join.Name
  232. } else {
  233. rightStream = join.Alias
  234. }
  235. rights := input.GetBySrc(rightStream)
  236. newSets := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  237. if join.JoinType == ast.RIGHT_JOIN {
  238. return jp.evalRightJoinSets(set, input, join, false, fv)
  239. }
  240. for _, left := range set.Content {
  241. leftJoined := false
  242. innerAppend := false
  243. for index, right := range rights {
  244. tupleJoined := false
  245. merged := &xsql.JoinTuple{}
  246. if join.JoinType == ast.LEFT_JOIN || join.JoinType == ast.FULL_JOIN || join.JoinType == ast.CROSS_JOIN {
  247. merged.AddTuples(left.Tuples)
  248. }
  249. if join.JoinType == ast.CROSS_JOIN {
  250. tupleJoined = true
  251. merged.AddTuple(right)
  252. } else {
  253. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, fv)}
  254. result := evalOn(join, ve, &left, &right)
  255. merged.AliasMap = left.AliasMap
  256. switch val := result.(type) {
  257. case error:
  258. return nil, val
  259. case bool:
  260. if val {
  261. leftJoined = true
  262. tupleJoined = true
  263. if join.JoinType == ast.INNER_JOIN && !innerAppend {
  264. merged.AddTuples(left.Tuples)
  265. innerAppend = true
  266. }
  267. merged.AddTuple(right)
  268. }
  269. default:
  270. return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
  271. }
  272. }
  273. if tupleJoined || (!leftJoined && index == len(rights)-1 && len(merged.Tuples) > 0) {
  274. leftJoined = true
  275. newSets.Content = append(newSets.Content, *merged)
  276. }
  277. }
  278. if !leftJoined && join.JoinType != ast.INNER_JOIN {
  279. merged := &xsql.JoinTuple{}
  280. merged.AddTuples(left.Tuples)
  281. newSets.Content = append(newSets.Content, *merged)
  282. }
  283. }
  284. if join.JoinType == ast.FULL_JOIN {
  285. if rightJoinSet, err := jp.evalRightJoinSets(set, input, join, true, fv); err == nil && len(rightJoinSet.Content) > 0 {
  286. for _, jt := range rightJoinSet.Content {
  287. newSets.Content = append(newSets.Content, jt)
  288. }
  289. }
  290. }
  291. return newSets, nil
  292. }
  293. func (jp *JoinOp) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join ast.Join, excludeJoint bool, fv *xsql.FunctionValuer) (*xsql.JoinTupleSets, error) {
  294. var rightStream string
  295. if join.Alias == "" {
  296. rightStream = join.Name
  297. } else {
  298. rightStream = join.Alias
  299. }
  300. rights := input.GetBySrc(rightStream)
  301. newSets := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  302. for _, right := range rights {
  303. isJoint := false
  304. for index, left := range set.Content {
  305. tupleJoined := false
  306. merged := &xsql.JoinTuple{}
  307. merged.AddTuple(right)
  308. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, fv)}
  309. result := evalOn(join, ve, &left, &right)
  310. merged.AliasMap = left.AliasMap
  311. switch val := result.(type) {
  312. case error:
  313. return nil, val
  314. case bool:
  315. if val {
  316. isJoint = true
  317. tupleJoined = true
  318. merged.AddTuples(left.Tuples)
  319. }
  320. default:
  321. return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
  322. }
  323. if !excludeJoint && (tupleJoined || (!isJoint && index == len(set.Content)-1 && len(merged.Tuples) > 0)) {
  324. isJoint = true
  325. newSets.Content = append(newSets.Content, *merged)
  326. }
  327. }
  328. if !isJoint {
  329. merged := &xsql.JoinTuple{}
  330. merged.AddTuple(right)
  331. newSets.Content = append(newSets.Content, *merged)
  332. }
  333. }
  334. return newSets, nil
  335. }