join_multi_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/topo/context"
  19. "github.com/lf-edge/ekuiper/internal/xsql"
  20. "github.com/lf-edge/ekuiper/pkg/ast"
  21. "reflect"
  22. "strings"
  23. "testing"
  24. )
  25. func TestMultiJoinPlan_Apply(t *testing.T) {
  26. var tests = []struct {
  27. sql string
  28. data xsql.WindowTuplesSet
  29. result interface{}
  30. }{
  31. {
  32. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 left join src3 on src2.id2 = src3.id3",
  33. data: xsql.WindowTuplesSet{
  34. Content: []xsql.WindowTuples{
  35. {
  36. Emitter: "src1",
  37. Tuples: []xsql.Tuple{
  38. {
  39. Emitter: "src1",
  40. Message: xsql.Message{"id1": 1, "f1": "v1"},
  41. }, {
  42. Emitter: "src1",
  43. Message: xsql.Message{"id1": 3, "f1": "v3"},
  44. },
  45. },
  46. },
  47. {
  48. Emitter: "src2",
  49. Tuples: []xsql.Tuple{
  50. {
  51. Emitter: "src2",
  52. Message: xsql.Message{"id2": 1, "f2": "w1"},
  53. }, {
  54. Emitter: "src2",
  55. Message: xsql.Message{"id2": 4, "f2": "w3"},
  56. },
  57. },
  58. },
  59. {
  60. Emitter: "src3",
  61. Tuples: []xsql.Tuple{
  62. {
  63. Emitter: "src3",
  64. Message: xsql.Message{"id3": 1, "f3": "x1"},
  65. }, {
  66. Emitter: "src3",
  67. Message: xsql.Message{"id3": 5, "f3": "x5"},
  68. },
  69. },
  70. },
  71. },
  72. },
  73. result: &xsql.JoinTupleSets{
  74. Content: []xsql.JoinTuple{
  75. {
  76. Tuples: []xsql.Tuple{
  77. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  78. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  79. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  80. },
  81. },
  82. {
  83. Tuples: []xsql.Tuple{
  84. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  85. },
  86. },
  87. },
  88. },
  89. },
  90. {
  91. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 inner join src3 on src2.id2 = src3.id3",
  92. data: xsql.WindowTuplesSet{
  93. Content: []xsql.WindowTuples{
  94. {
  95. Emitter: "src1",
  96. Tuples: []xsql.Tuple{
  97. {
  98. Emitter: "src1",
  99. Message: xsql.Message{"id1": 1, "f1": "v1"},
  100. }, {
  101. Emitter: "src1",
  102. Message: xsql.Message{"id1": 3, "f1": "v3"},
  103. },
  104. },
  105. },
  106. {
  107. Emitter: "src2",
  108. Tuples: []xsql.Tuple{
  109. {
  110. Emitter: "src2",
  111. Message: xsql.Message{"id2": 1, "f2": "w1"},
  112. }, {
  113. Emitter: "src2",
  114. Message: xsql.Message{"id2": 4, "f2": "w3"},
  115. },
  116. },
  117. },
  118. {
  119. Emitter: "src3",
  120. Tuples: []xsql.Tuple{
  121. {
  122. Emitter: "src3",
  123. Message: xsql.Message{"id3": 1, "f3": "x1"},
  124. }, {
  125. Emitter: "src3",
  126. Message: xsql.Message{"id3": 5, "f3": "x5"},
  127. },
  128. },
  129. },
  130. },
  131. WindowRange: &xsql.WindowRange{
  132. WindowStart: 1541152486013,
  133. WindowEnd: 1541152487013,
  134. },
  135. },
  136. result: &xsql.JoinTupleSets{
  137. Content: []xsql.JoinTuple{
  138. {
  139. Tuples: []xsql.Tuple{
  140. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  141. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  142. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  143. },
  144. },
  145. },
  146. WindowRange: &xsql.WindowRange{
  147. WindowStart: 1541152486013,
  148. WindowEnd: 1541152487013,
  149. },
  150. },
  151. },
  152. {
  153. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 inner join src3 on src1.id1 = src3.id3",
  154. data: xsql.WindowTuplesSet{
  155. Content: []xsql.WindowTuples{
  156. {
  157. Emitter: "src1",
  158. Tuples: []xsql.Tuple{
  159. {
  160. Emitter: "src1",
  161. Message: xsql.Message{"id1": 1, "f1": "v1"},
  162. }, {
  163. Emitter: "src1",
  164. Message: xsql.Message{"id1": 5, "f1": "v5"},
  165. },
  166. },
  167. },
  168. {
  169. Emitter: "src2",
  170. Tuples: []xsql.Tuple{
  171. {
  172. Emitter: "src2",
  173. Message: xsql.Message{"id2": 1, "f2": "w1"},
  174. }, {
  175. Emitter: "src2",
  176. Message: xsql.Message{"id2": 4, "f2": "w3"},
  177. },
  178. },
  179. },
  180. {
  181. Emitter: "src3",
  182. Tuples: []xsql.Tuple{
  183. {
  184. Emitter: "src3",
  185. Message: xsql.Message{"id3": 2, "f3": "x1"},
  186. }, {
  187. Emitter: "src3",
  188. Message: xsql.Message{"id3": 5, "f3": "x5"},
  189. },
  190. },
  191. },
  192. },
  193. },
  194. result: &xsql.JoinTupleSets{
  195. Content: []xsql.JoinTuple{
  196. {
  197. Tuples: []xsql.Tuple{
  198. {Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
  199. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  200. },
  201. },
  202. },
  203. },
  204. },
  205. {
  206. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 full join src3 on src1.id1 = src3.id3",
  207. data: xsql.WindowTuplesSet{
  208. Content: []xsql.WindowTuples{
  209. {
  210. Emitter: "src1",
  211. Tuples: []xsql.Tuple{
  212. {
  213. Emitter: "src1",
  214. Message: xsql.Message{"id1": 1, "f1": "v1"},
  215. }, {
  216. Emitter: "src1",
  217. Message: xsql.Message{"id1": 5, "f1": "v5"},
  218. },
  219. },
  220. },
  221. {
  222. Emitter: "src2",
  223. Tuples: []xsql.Tuple{
  224. {
  225. Emitter: "src2",
  226. Message: xsql.Message{"id2": 1, "f2": "w1"},
  227. }, {
  228. Emitter: "src2",
  229. Message: xsql.Message{"id2": 4, "f2": "w3"},
  230. },
  231. },
  232. },
  233. {
  234. Emitter: "src3",
  235. Tuples: []xsql.Tuple{
  236. {
  237. Emitter: "src3",
  238. Message: xsql.Message{"id3": 2, "f3": "x1"},
  239. }, {
  240. Emitter: "src3",
  241. Message: xsql.Message{"id3": 5, "f3": "x5"},
  242. },
  243. },
  244. },
  245. },
  246. },
  247. result: &xsql.JoinTupleSets{
  248. Content: []xsql.JoinTuple{
  249. {
  250. Tuples: []xsql.Tuple{
  251. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  252. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  253. },
  254. },
  255. {
  256. Tuples: []xsql.Tuple{
  257. {Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
  258. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  259. },
  260. },
  261. {
  262. Tuples: []xsql.Tuple{
  263. {Emitter: "src3", Message: xsql.Message{"id3": 2, "f3": "x1"}},
  264. },
  265. },
  266. },
  267. },
  268. },
  269. {
  270. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 right join src3 on src2.id2 = src3.id3",
  271. data: xsql.WindowTuplesSet{
  272. Content: []xsql.WindowTuples{
  273. {
  274. Emitter: "src1",
  275. Tuples: []xsql.Tuple{
  276. {
  277. Emitter: "src1",
  278. Message: xsql.Message{"id1": 1, "f1": "v1"},
  279. }, {
  280. Emitter: "src1",
  281. Message: xsql.Message{"id1": 3, "f1": "v3"},
  282. },
  283. },
  284. },
  285. {
  286. Emitter: "src2",
  287. Tuples: []xsql.Tuple{
  288. {
  289. Emitter: "src2",
  290. Message: xsql.Message{"id2": 1, "f2": "w1"},
  291. }, {
  292. Emitter: "src2",
  293. Message: xsql.Message{"id2": 4, "f2": "w3"},
  294. },
  295. },
  296. },
  297. {
  298. Emitter: "src3",
  299. Tuples: []xsql.Tuple{
  300. {
  301. Emitter: "src3",
  302. Message: xsql.Message{"id3": 1, "f3": "x1"},
  303. }, {
  304. Emitter: "src3",
  305. Message: xsql.Message{"id3": 5, "f3": "x5"},
  306. },
  307. },
  308. },
  309. },
  310. },
  311. result: &xsql.JoinTupleSets{
  312. Content: []xsql.JoinTuple{
  313. {
  314. Tuples: []xsql.Tuple{
  315. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  316. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  317. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  318. },
  319. },
  320. {
  321. Tuples: []xsql.Tuple{
  322. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  323. },
  324. },
  325. },
  326. },
  327. },
  328. {
  329. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 right join src3 on src2.id2 = src3.id3",
  330. data: xsql.WindowTuplesSet{
  331. Content: []xsql.WindowTuples{
  332. {
  333. Emitter: "src1",
  334. Tuples: []xsql.Tuple{
  335. {
  336. Emitter: "src1",
  337. Message: xsql.Message{"id1": 1, "f1": "v1"},
  338. }, {
  339. Emitter: "src1",
  340. Message: xsql.Message{"id1": 1, "f1": "v3"},
  341. },
  342. },
  343. },
  344. {
  345. Emitter: "src2",
  346. Tuples: []xsql.Tuple{
  347. {
  348. Emitter: "src2",
  349. Message: xsql.Message{"id2": 1, "f2": "w1"},
  350. }, {
  351. Emitter: "src2",
  352. Message: xsql.Message{"id2": 1, "f2": "w3"},
  353. },
  354. },
  355. },
  356. {
  357. Emitter: "src3",
  358. Tuples: []xsql.Tuple{
  359. {
  360. Emitter: "src3",
  361. Message: xsql.Message{"id3": 1, "f3": "x1"},
  362. }, {
  363. Emitter: "src3",
  364. Message: xsql.Message{"id3": 5, "f3": "x5"},
  365. },
  366. },
  367. },
  368. },
  369. },
  370. result: &xsql.JoinTupleSets{
  371. Content: []xsql.JoinTuple{
  372. {
  373. Tuples: []xsql.Tuple{
  374. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  375. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  376. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  377. },
  378. },
  379. {
  380. Tuples: []xsql.Tuple{
  381. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  382. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  383. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w3"}},
  384. },
  385. },
  386. {
  387. Tuples: []xsql.Tuple{
  388. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  389. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v3"}},
  390. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  391. },
  392. },
  393. {
  394. Tuples: []xsql.Tuple{
  395. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  396. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v3"}},
  397. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w3"}},
  398. },
  399. },
  400. {
  401. Tuples: []xsql.Tuple{
  402. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  403. },
  404. },
  405. },
  406. },
  407. },
  408. {
  409. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 cross join src3",
  410. data: xsql.WindowTuplesSet{
  411. Content: []xsql.WindowTuples{
  412. {
  413. Emitter: "src1",
  414. Tuples: []xsql.Tuple{
  415. {
  416. Emitter: "src1",
  417. Message: xsql.Message{"id1": 1, "f1": "v1"},
  418. }, {
  419. Emitter: "src1",
  420. Message: xsql.Message{"id1": 5, "f1": "v5"},
  421. },
  422. },
  423. },
  424. {
  425. Emitter: "src2",
  426. Tuples: []xsql.Tuple{
  427. {
  428. Emitter: "src2",
  429. Message: xsql.Message{"id2": 1, "f2": "w1"},
  430. }, {
  431. Emitter: "src2",
  432. Message: xsql.Message{"id2": 4, "f2": "w3"},
  433. },
  434. },
  435. },
  436. {
  437. Emitter: "src3",
  438. Tuples: []xsql.Tuple{
  439. {
  440. Emitter: "src3",
  441. Message: xsql.Message{"id3": 2, "f3": "x1"},
  442. }, {
  443. Emitter: "src3",
  444. Message: xsql.Message{"id3": 5, "f3": "x5"},
  445. },
  446. },
  447. },
  448. },
  449. },
  450. result: &xsql.JoinTupleSets{
  451. Content: []xsql.JoinTuple{
  452. {
  453. Tuples: []xsql.Tuple{
  454. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  455. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  456. {Emitter: "src3", Message: xsql.Message{"id3": 2, "f3": "x1"}},
  457. },
  458. },
  459. {
  460. Tuples: []xsql.Tuple{
  461. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  462. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}}, {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  463. },
  464. },
  465. {
  466. Tuples: []xsql.Tuple{
  467. {Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
  468. {Emitter: "src3", Message: xsql.Message{"id3": 2, "f3": "x1"}},
  469. },
  470. },
  471. {
  472. Tuples: []xsql.Tuple{
  473. {Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
  474. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  475. },
  476. },
  477. },
  478. },
  479. },
  480. }
  481. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  482. contextLogger := conf.Log.WithField("rule", "TestMultiJoinPlan_Apply")
  483. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  484. for i, tt := range tests {
  485. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  486. if err != nil {
  487. t.Errorf("statement parse error %s", err)
  488. break
  489. }
  490. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  491. t.Errorf("statement source is not a table")
  492. } else {
  493. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  494. pp := &JoinOp{Joins: stmt.Joins, From: table}
  495. result := pp.Apply(ctx, tt.data, fv, afv)
  496. if !reflect.DeepEqual(tt.result, result) {
  497. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  498. }
  499. }
  500. }
  501. }