join_multi_test.go 12 KB


  1. package operator
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/internal/conf"
  5. "github.com/emqx/kuiper/internal/topo/context"
  6. "github.com/emqx/kuiper/internal/xsql"
  7. "github.com/emqx/kuiper/pkg/ast"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestMultiJoinPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data xsql.WindowTuplesSet
  16. result interface{}
  17. }{
  18. {
  19. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 left join src3 on src2.id2 = src3.id3",
  20. data: xsql.WindowTuplesSet{
  21. Content: []xsql.WindowTuples{
  22. {
  23. Emitter: "src1",
  24. Tuples: []xsql.Tuple{
  25. {
  26. Emitter: "src1",
  27. Message: xsql.Message{"id1": 1, "f1": "v1"},
  28. }, {
  29. Emitter: "src1",
  30. Message: xsql.Message{"id1": 3, "f1": "v3"},
  31. },
  32. },
  33. },
  34. {
  35. Emitter: "src2",
  36. Tuples: []xsql.Tuple{
  37. {
  38. Emitter: "src2",
  39. Message: xsql.Message{"id2": 1, "f2": "w1"},
  40. }, {
  41. Emitter: "src2",
  42. Message: xsql.Message{"id2": 4, "f2": "w3"},
  43. },
  44. },
  45. },
  46. {
  47. Emitter: "src3",
  48. Tuples: []xsql.Tuple{
  49. {
  50. Emitter: "src3",
  51. Message: xsql.Message{"id3": 1, "f3": "x1"},
  52. }, {
  53. Emitter: "src3",
  54. Message: xsql.Message{"id3": 5, "f3": "x5"},
  55. },
  56. },
  57. },
  58. },
  59. },
  60. result: &xsql.JoinTupleSets{
  61. Content: []xsql.JoinTuple{
  62. {
  63. Tuples: []xsql.Tuple{
  64. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  65. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  66. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  67. },
  68. },
  69. {
  70. Tuples: []xsql.Tuple{
  71. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  72. },
  73. },
  74. },
  75. },
  76. },
  77. {
  78. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 inner join src3 on src2.id2 = src3.id3",
  79. data: xsql.WindowTuplesSet{
  80. Content: []xsql.WindowTuples{
  81. {
  82. Emitter: "src1",
  83. Tuples: []xsql.Tuple{
  84. {
  85. Emitter: "src1",
  86. Message: xsql.Message{"id1": 1, "f1": "v1"},
  87. }, {
  88. Emitter: "src1",
  89. Message: xsql.Message{"id1": 3, "f1": "v3"},
  90. },
  91. },
  92. },
  93. {
  94. Emitter: "src2",
  95. Tuples: []xsql.Tuple{
  96. {
  97. Emitter: "src2",
  98. Message: xsql.Message{"id2": 1, "f2": "w1"},
  99. }, {
  100. Emitter: "src2",
  101. Message: xsql.Message{"id2": 4, "f2": "w3"},
  102. },
  103. },
  104. },
  105. {
  106. Emitter: "src3",
  107. Tuples: []xsql.Tuple{
  108. {
  109. Emitter: "src3",
  110. Message: xsql.Message{"id3": 1, "f3": "x1"},
  111. }, {
  112. Emitter: "src3",
  113. Message: xsql.Message{"id3": 5, "f3": "x5"},
  114. },
  115. },
  116. },
  117. },
  118. WindowRange: &xsql.WindowRange{
  119. WindowStart: 1541152486013,
  120. WindowEnd: 1541152487013,
  121. },
  122. },
  123. result: &xsql.JoinTupleSets{
  124. Content: []xsql.JoinTuple{
  125. {
  126. Tuples: []xsql.Tuple{
  127. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  128. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  129. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  130. },
  131. },
  132. },
  133. WindowRange: &xsql.WindowRange{
  134. WindowStart: 1541152486013,
  135. WindowEnd: 1541152487013,
  136. },
  137. },
  138. },
  139. {
  140. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 inner join src3 on src1.id1 = src3.id3",
  141. data: xsql.WindowTuplesSet{
  142. Content: []xsql.WindowTuples{
  143. {
  144. Emitter: "src1",
  145. Tuples: []xsql.Tuple{
  146. {
  147. Emitter: "src1",
  148. Message: xsql.Message{"id1": 1, "f1": "v1"},
  149. }, {
  150. Emitter: "src1",
  151. Message: xsql.Message{"id1": 5, "f1": "v5"},
  152. },
  153. },
  154. },
  155. {
  156. Emitter: "src2",
  157. Tuples: []xsql.Tuple{
  158. {
  159. Emitter: "src2",
  160. Message: xsql.Message{"id2": 1, "f2": "w1"},
  161. }, {
  162. Emitter: "src2",
  163. Message: xsql.Message{"id2": 4, "f2": "w3"},
  164. },
  165. },
  166. },
  167. {
  168. Emitter: "src3",
  169. Tuples: []xsql.Tuple{
  170. {
  171. Emitter: "src3",
  172. Message: xsql.Message{"id3": 2, "f3": "x1"},
  173. }, {
  174. Emitter: "src3",
  175. Message: xsql.Message{"id3": 5, "f3": "x5"},
  176. },
  177. },
  178. },
  179. },
  180. },
  181. result: &xsql.JoinTupleSets{
  182. Content: []xsql.JoinTuple{
  183. {
  184. Tuples: []xsql.Tuple{
  185. {Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
  186. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  187. },
  188. },
  189. },
  190. },
  191. },
  192. {
  193. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 full join src3 on src1.id1 = src3.id3",
  194. data: xsql.WindowTuplesSet{
  195. Content: []xsql.WindowTuples{
  196. {
  197. Emitter: "src1",
  198. Tuples: []xsql.Tuple{
  199. {
  200. Emitter: "src1",
  201. Message: xsql.Message{"id1": 1, "f1": "v1"},
  202. }, {
  203. Emitter: "src1",
  204. Message: xsql.Message{"id1": 5, "f1": "v5"},
  205. },
  206. },
  207. },
  208. {
  209. Emitter: "src2",
  210. Tuples: []xsql.Tuple{
  211. {
  212. Emitter: "src2",
  213. Message: xsql.Message{"id2": 1, "f2": "w1"},
  214. }, {
  215. Emitter: "src2",
  216. Message: xsql.Message{"id2": 4, "f2": "w3"},
  217. },
  218. },
  219. },
  220. {
  221. Emitter: "src3",
  222. Tuples: []xsql.Tuple{
  223. {
  224. Emitter: "src3",
  225. Message: xsql.Message{"id3": 2, "f3": "x1"},
  226. }, {
  227. Emitter: "src3",
  228. Message: xsql.Message{"id3": 5, "f3": "x5"},
  229. },
  230. },
  231. },
  232. },
  233. },
  234. result: &xsql.JoinTupleSets{
  235. Content: []xsql.JoinTuple{
  236. {
  237. Tuples: []xsql.Tuple{
  238. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  239. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  240. },
  241. },
  242. {
  243. Tuples: []xsql.Tuple{
  244. {Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
  245. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  246. },
  247. },
  248. {
  249. Tuples: []xsql.Tuple{
  250. {Emitter: "src3", Message: xsql.Message{"id3": 2, "f3": "x1"}},
  251. },
  252. },
  253. },
  254. },
  255. },
  256. {
  257. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 right join src3 on src2.id2 = src3.id3",
  258. data: xsql.WindowTuplesSet{
  259. Content: []xsql.WindowTuples{
  260. {
  261. Emitter: "src1",
  262. Tuples: []xsql.Tuple{
  263. {
  264. Emitter: "src1",
  265. Message: xsql.Message{"id1": 1, "f1": "v1"},
  266. }, {
  267. Emitter: "src1",
  268. Message: xsql.Message{"id1": 3, "f1": "v3"},
  269. },
  270. },
  271. },
  272. {
  273. Emitter: "src2",
  274. Tuples: []xsql.Tuple{
  275. {
  276. Emitter: "src2",
  277. Message: xsql.Message{"id2": 1, "f2": "w1"},
  278. }, {
  279. Emitter: "src2",
  280. Message: xsql.Message{"id2": 4, "f2": "w3"},
  281. },
  282. },
  283. },
  284. {
  285. Emitter: "src3",
  286. Tuples: []xsql.Tuple{
  287. {
  288. Emitter: "src3",
  289. Message: xsql.Message{"id3": 1, "f3": "x1"},
  290. }, {
  291. Emitter: "src3",
  292. Message: xsql.Message{"id3": 5, "f3": "x5"},
  293. },
  294. },
  295. },
  296. },
  297. },
  298. result: &xsql.JoinTupleSets{
  299. Content: []xsql.JoinTuple{
  300. {
  301. Tuples: []xsql.Tuple{
  302. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  303. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  304. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  305. },
  306. },
  307. {
  308. Tuples: []xsql.Tuple{
  309. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  310. },
  311. },
  312. },
  313. },
  314. },
  315. {
  316. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 right join src3 on src2.id2 = src3.id3",
  317. data: xsql.WindowTuplesSet{
  318. Content: []xsql.WindowTuples{
  319. {
  320. Emitter: "src1",
  321. Tuples: []xsql.Tuple{
  322. {
  323. Emitter: "src1",
  324. Message: xsql.Message{"id1": 1, "f1": "v1"},
  325. }, {
  326. Emitter: "src1",
  327. Message: xsql.Message{"id1": 1, "f1": "v3"},
  328. },
  329. },
  330. },
  331. {
  332. Emitter: "src2",
  333. Tuples: []xsql.Tuple{
  334. {
  335. Emitter: "src2",
  336. Message: xsql.Message{"id2": 1, "f2": "w1"},
  337. }, {
  338. Emitter: "src2",
  339. Message: xsql.Message{"id2": 1, "f2": "w3"},
  340. },
  341. },
  342. },
  343. {
  344. Emitter: "src3",
  345. Tuples: []xsql.Tuple{
  346. {
  347. Emitter: "src3",
  348. Message: xsql.Message{"id3": 1, "f3": "x1"},
  349. }, {
  350. Emitter: "src3",
  351. Message: xsql.Message{"id3": 5, "f3": "x5"},
  352. },
  353. },
  354. },
  355. },
  356. },
  357. result: &xsql.JoinTupleSets{
  358. Content: []xsql.JoinTuple{
  359. {
  360. Tuples: []xsql.Tuple{
  361. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  362. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  363. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  364. },
  365. },
  366. {
  367. Tuples: []xsql.Tuple{
  368. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  369. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  370. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w3"}},
  371. },
  372. },
  373. {
  374. Tuples: []xsql.Tuple{
  375. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  376. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v3"}},
  377. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  378. },
  379. },
  380. {
  381. Tuples: []xsql.Tuple{
  382. {Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
  383. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v3"}},
  384. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w3"}},
  385. },
  386. },
  387. {
  388. Tuples: []xsql.Tuple{
  389. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  390. },
  391. },
  392. },
  393. },
  394. },
  395. {
  396. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 cross join src3",
  397. data: xsql.WindowTuplesSet{
  398. Content: []xsql.WindowTuples{
  399. {
  400. Emitter: "src1",
  401. Tuples: []xsql.Tuple{
  402. {
  403. Emitter: "src1",
  404. Message: xsql.Message{"id1": 1, "f1": "v1"},
  405. }, {
  406. Emitter: "src1",
  407. Message: xsql.Message{"id1": 5, "f1": "v5"},
  408. },
  409. },
  410. },
  411. {
  412. Emitter: "src2",
  413. Tuples: []xsql.Tuple{
  414. {
  415. Emitter: "src2",
  416. Message: xsql.Message{"id2": 1, "f2": "w1"},
  417. }, {
  418. Emitter: "src2",
  419. Message: xsql.Message{"id2": 4, "f2": "w3"},
  420. },
  421. },
  422. },
  423. {
  424. Emitter: "src3",
  425. Tuples: []xsql.Tuple{
  426. {
  427. Emitter: "src3",
  428. Message: xsql.Message{"id3": 2, "f3": "x1"},
  429. }, {
  430. Emitter: "src3",
  431. Message: xsql.Message{"id3": 5, "f3": "x5"},
  432. },
  433. },
  434. },
  435. },
  436. },
  437. result: &xsql.JoinTupleSets{
  438. Content: []xsql.JoinTuple{
  439. {
  440. Tuples: []xsql.Tuple{
  441. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  442. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  443. {Emitter: "src3", Message: xsql.Message{"id3": 2, "f3": "x1"}},
  444. },
  445. },
  446. {
  447. Tuples: []xsql.Tuple{
  448. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  449. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}}, {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  450. },
  451. },
  452. {
  453. Tuples: []xsql.Tuple{
  454. {Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
  455. {Emitter: "src3", Message: xsql.Message{"id3": 2, "f3": "x1"}},
  456. },
  457. },
  458. {
  459. Tuples: []xsql.Tuple{
  460. {Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
  461. {Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
  462. },
  463. },
  464. },
  465. },
  466. },
  467. }
  468. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  469. contextLogger := conf.Log.WithField("rule", "TestMultiJoinPlan_Apply")
  470. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  471. for i, tt := range tests {
  472. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  473. if err != nil {
  474. t.Errorf("statement parse error %s", err)
  475. break
  476. }
  477. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  478. t.Errorf("statement source is not a table")
  479. } else {
  480. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  481. pp := &JoinOp{Joins: stmt.Joins, From: table}
  482. result := pp.Apply(ctx, tt.data, fv, afv)
  483. if !reflect.DeepEqual(tt.result, result) {
  484. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  485. }
  486. }
  487. }
  488. }