join_operator.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. )
  21. //TODO join expr should only be the equal op between 2 streams like tb1.id = tb2.id
  22. type JoinOp struct {
  23. From *ast.Table
  24. Joins ast.Joins
  25. }
  26. // input: xsql.WindowTuplesSet from windowOp, window is required for join
  27. // output: xsql.JoinTupleSets
  28. func (jp *JoinOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  29. log := ctx.GetLogger()
  30. var input xsql.WindowTuplesSet
  31. switch v := data.(type) {
  32. case error:
  33. return v
  34. case xsql.WindowTuplesSet:
  35. input = v
  36. log.Debugf("join plan receive %v", data)
  37. default:
  38. return fmt.Errorf("run Join error: join is only supported in window")
  39. }
  40. result := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  41. for i, join := range jp.Joins {
  42. if i == 0 {
  43. v, err := jp.evalSet(input, join, fv)
  44. if err != nil {
  45. return fmt.Errorf("run Join error: %s", err)
  46. }
  47. result = v
  48. } else {
  49. r1, err := jp.evalJoinSets(result, input, join, fv)
  50. if err != nil {
  51. return fmt.Errorf("run Join error: %s", err)
  52. }
  53. if v1, ok := r1.(*xsql.JoinTupleSets); ok {
  54. result = v1
  55. }
  56. }
  57. }
  58. if result.Len() <= 0 {
  59. log.Debugf("join plan yields nothing")
  60. return nil
  61. }
  62. result.WindowRange = input.WindowRange
  63. return result
  64. }
  65. func (jp *JoinOp) getStreamNames(join *ast.Join) ([]string, error) {
  66. var srcs []string
  67. keys := make(map[ast.StreamName]bool)
  68. ast.WalkFunc(join, func(node ast.Node) bool {
  69. if f, ok := node.(*ast.FieldRef); ok {
  70. for _, v := range f.RefSources() {
  71. if _, ok := keys[v]; !ok {
  72. srcs = append(srcs, string(v))
  73. keys[v] = true
  74. }
  75. }
  76. }
  77. return true
  78. })
  79. if len(srcs) != 2 {
  80. if jp.From.Alias != "" {
  81. srcs = append(srcs, jp.From.Alias)
  82. } else {
  83. srcs = append(srcs, jp.From.Name)
  84. }
  85. if join.Alias != "" {
  86. srcs = append(srcs, join.Alias)
  87. } else {
  88. srcs = append(srcs, join.Name)
  89. }
  90. }
  91. return srcs, nil
  92. }
  93. func (jp *JoinOp) evalSet(input xsql.WindowTuplesSet, join ast.Join, fv *xsql.FunctionValuer) (*xsql.JoinTupleSets, error) {
  94. var leftStream, rightStream string
  95. if join.JoinType != ast.CROSS_JOIN {
  96. streams, err := jp.getStreamNames(&join)
  97. if err != nil {
  98. return nil, err
  99. }
  100. leftStream = streams[0]
  101. rightStream = streams[1]
  102. } else {
  103. if jp.From.Alias == "" {
  104. leftStream = jp.From.Name
  105. } else {
  106. leftStream = jp.From.Alias
  107. }
  108. if join.Alias == "" {
  109. rightStream = join.Name
  110. } else {
  111. rightStream = join.Alias
  112. }
  113. }
  114. var lefts, rights []xsql.Tuple
  115. lefts = input.GetBySrc(leftStream)
  116. rights = input.GetBySrc(rightStream)
  117. sets := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  118. if join.JoinType == ast.RIGHT_JOIN {
  119. return jp.evalSetWithRightJoin(input, join, false, fv)
  120. }
  121. for _, left := range lefts {
  122. leftJoined := false
  123. for index, right := range rights {
  124. tupleJoined := false
  125. merged := &xsql.JoinTuple{}
  126. if join.JoinType == ast.LEFT_JOIN || join.JoinType == ast.FULL_JOIN || join.JoinType == ast.CROSS_JOIN {
  127. merged.AddTuple(left)
  128. }
  129. if join.JoinType == ast.CROSS_JOIN {
  130. tupleJoined = true
  131. merged.AddTuple(right)
  132. } else {
  133. temp := &xsql.JoinTuple{}
  134. temp.AddTuple(left)
  135. temp.AddTuple(right)
  136. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, fv)}
  137. result := evalOn(join, ve, &left, &right)
  138. merged.AliasMap = temp.AliasMap
  139. switch val := result.(type) {
  140. case error:
  141. return nil, val
  142. case bool:
  143. if val {
  144. leftJoined = true
  145. tupleJoined = true
  146. if join.JoinType == ast.INNER_JOIN {
  147. merged.AddTuple(left)
  148. merged.AddTuple(right)
  149. } else {
  150. merged.AddTuple(right)
  151. }
  152. }
  153. default:
  154. return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
  155. }
  156. }
  157. if tupleJoined || (!leftJoined && index == len(rights)-1 && len(merged.Tuples) > 0) {
  158. leftJoined = true
  159. sets.Content = append(sets.Content, *merged)
  160. }
  161. }
  162. // If no messages in the right
  163. if !leftJoined && join.JoinType != ast.INNER_JOIN {
  164. merged := &xsql.JoinTuple{}
  165. merged.AddTuple(left)
  166. sets.Content = append(sets.Content, *merged)
  167. }
  168. }
  169. if join.JoinType == ast.FULL_JOIN {
  170. if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true, fv); err == nil {
  171. if len(rightJoinSet.Content) > 0 {
  172. for _, jt := range rightJoinSet.Content {
  173. sets.Content = append(sets.Content, jt)
  174. }
  175. }
  176. } else {
  177. return nil, err
  178. }
  179. }
  180. return sets, nil
  181. }
  182. func evalOn(join ast.Join, ve *xsql.ValuerEval, left interface{}, right *xsql.Tuple) interface{} {
  183. var result interface{}
  184. if join.Expr != nil {
  185. result = ve.Eval(join.Expr)
  186. } else if join.JoinType == ast.INNER_JOIN { // if no on expression
  187. result = left != nil && right != nil
  188. } else {
  189. result = true
  190. }
  191. return result
  192. }
  193. func (jp *JoinOp) evalSetWithRightJoin(input xsql.WindowTuplesSet, join ast.Join, excludeJoint bool, fv *xsql.FunctionValuer) (*xsql.JoinTupleSets, error) {
  194. streams, err := jp.getStreamNames(&join)
  195. if err != nil {
  196. return nil, err
  197. }
  198. leftStream := streams[0]
  199. rightStream := streams[1]
  200. var lefts, rights []xsql.Tuple
  201. lefts = input.GetBySrc(leftStream)
  202. rights = input.GetBySrc(rightStream)
  203. sets := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  204. for _, right := range rights {
  205. isJoint := false
  206. for index, left := range lefts {
  207. tupleJoined := false
  208. merged := &xsql.JoinTuple{}
  209. merged.AddTuple(right)
  210. temp := &xsql.JoinTuple{}
  211. temp.AddTuple(right)
  212. temp.AddTuple(left)
  213. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, fv)}
  214. result := evalOn(join, ve, &left, &right)
  215. merged.AliasMap = temp.AliasMap
  216. switch val := result.(type) {
  217. case error:
  218. return nil, val
  219. case bool:
  220. if val {
  221. merged.AddTuple(left)
  222. isJoint = true
  223. tupleJoined = true
  224. }
  225. default:
  226. return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
  227. }
  228. if !excludeJoint && (tupleJoined || (!isJoint && index == len(lefts)-1 && len(merged.Tuples) > 0)) {
  229. isJoint = true
  230. sets.Content = append(sets.Content, *merged)
  231. }
  232. }
  233. if !isJoint {
  234. merged := &xsql.JoinTuple{}
  235. merged.AddTuple(right)
  236. sets.Content = append(sets.Content, *merged)
  237. }
  238. }
  239. return sets, nil
  240. }
  241. func (jp *JoinOp) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join ast.Join, fv *xsql.FunctionValuer) (interface{}, error) {
  242. var rightStream string
  243. if join.Alias == "" {
  244. rightStream = join.Name
  245. } else {
  246. rightStream = join.Alias
  247. }
  248. rights := input.GetBySrc(rightStream)
  249. newSets := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  250. if join.JoinType == ast.RIGHT_JOIN {
  251. return jp.evalRightJoinSets(set, input, join, false, fv)
  252. }
  253. for _, left := range set.Content {
  254. leftJoined := false
  255. innerAppend := false
  256. for index, right := range rights {
  257. tupleJoined := false
  258. merged := &xsql.JoinTuple{}
  259. if join.JoinType == ast.LEFT_JOIN || join.JoinType == ast.FULL_JOIN || join.JoinType == ast.CROSS_JOIN {
  260. merged.AddTuples(left.Tuples)
  261. }
  262. if join.JoinType == ast.CROSS_JOIN {
  263. tupleJoined = true
  264. merged.AddTuple(right)
  265. } else {
  266. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, fv)}
  267. result := evalOn(join, ve, &left, &right)
  268. merged.AliasMap = left.AliasMap
  269. switch val := result.(type) {
  270. case error:
  271. return nil, val
  272. case bool:
  273. if val {
  274. leftJoined = true
  275. tupleJoined = true
  276. if join.JoinType == ast.INNER_JOIN && !innerAppend {
  277. merged.AddTuples(left.Tuples)
  278. innerAppend = true
  279. }
  280. merged.AddTuple(right)
  281. }
  282. default:
  283. return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
  284. }
  285. }
  286. if tupleJoined || (!leftJoined && index == len(rights)-1 && len(merged.Tuples) > 0) {
  287. leftJoined = true
  288. newSets.Content = append(newSets.Content, *merged)
  289. }
  290. }
  291. if !leftJoined && join.JoinType != ast.INNER_JOIN {
  292. merged := &xsql.JoinTuple{}
  293. merged.AddTuples(left.Tuples)
  294. newSets.Content = append(newSets.Content, *merged)
  295. }
  296. }
  297. if join.JoinType == ast.FULL_JOIN {
  298. if rightJoinSet, err := jp.evalRightJoinSets(set, input, join, true, fv); err == nil && len(rightJoinSet.Content) > 0 {
  299. for _, jt := range rightJoinSet.Content {
  300. newSets.Content = append(newSets.Content, jt)
  301. }
  302. }
  303. }
  304. return newSets, nil
  305. }
  306. func (jp *JoinOp) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join ast.Join, excludeJoint bool, fv *xsql.FunctionValuer) (*xsql.JoinTupleSets, error) {
  307. var rightStream string
  308. if join.Alias == "" {
  309. rightStream = join.Name
  310. } else {
  311. rightStream = join.Alias
  312. }
  313. rights := input.GetBySrc(rightStream)
  314. newSets := &xsql.JoinTupleSets{Content: make([]xsql.JoinTuple, 0)}
  315. for _, right := range rights {
  316. isJoint := false
  317. for index, left := range set.Content {
  318. tupleJoined := false
  319. merged := &xsql.JoinTuple{}
  320. merged.AddTuple(right)
  321. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, fv)}
  322. result := evalOn(join, ve, &left, &right)
  323. merged.AliasMap = left.AliasMap
  324. switch val := result.(type) {
  325. case error:
  326. return nil, val
  327. case bool:
  328. if val {
  329. isJoint = true
  330. tupleJoined = true
  331. merged.AddTuples(left.Tuples)
  332. }
  333. default:
  334. return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
  335. }
  336. if !excludeJoint && (tupleJoined || (!isJoint && index == len(set.Content)-1 && len(merged.Tuples) > 0)) {
  337. isJoint = true
  338. newSets.Content = append(newSets.Content, *merged)
  339. }
  340. }
  341. if !isJoint {
  342. merged := &xsql.JoinTuple{}
  343. merged.AddTuple(right)
  344. newSets.Content = append(newSets.Content, *merged)
  345. }
  346. }
  347. return newSets, nil
  348. }