join_test.go 44 KB


  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestLeftJoinPlan_Apply(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data xsql.WindowTuplesSet
  17. result interface{}
  18. }{
  19. {
  20. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  21. data: xsql.WindowTuplesSet{
  22. xsql.WindowTuples{
  23. Emitter: "src1",
  24. Tuples: []xsql.Tuple{
  25. {
  26. Emitter: "src1",
  27. Message: xsql.Message{"id1": 1, "f1": "v1"},
  28. }, {
  29. Emitter: "src1",
  30. Message: xsql.Message{"id1": 2, "f1": "v2"},
  31. }, {
  32. Emitter: "src1",
  33. Message: xsql.Message{"id1": 3, "f1": "v3"},
  34. },
  35. },
  36. },
  37. xsql.WindowTuples{
  38. Emitter: "src2",
  39. Tuples: []xsql.Tuple{
  40. {
  41. Emitter: "src2",
  42. Message: xsql.Message{"id2": 1, "f2": "w1"},
  43. }, {
  44. Emitter: "src2",
  45. Message: xsql.Message{"id2": 2, "f2": "w2"},
  46. }, {
  47. Emitter: "src2",
  48. Message: xsql.Message{"id2": 4, "f2": "w3"},
  49. },
  50. },
  51. },
  52. },
  53. result: xsql.JoinTupleSets{
  54. xsql.JoinTuple{
  55. Tuples: []xsql.Tuple{
  56. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  57. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  58. },
  59. },
  60. xsql.JoinTuple{
  61. Tuples: []xsql.Tuple{
  62. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  63. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  64. },
  65. },
  66. xsql.JoinTuple{
  67. Tuples: []xsql.Tuple{
  68. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  69. },
  70. },
  71. },
  72. },
  73. {
  74. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  75. data: xsql.WindowTuplesSet{
  76. xsql.WindowTuples{
  77. Emitter: "src1",
  78. Tuples: []xsql.Tuple{
  79. {
  80. Emitter: "src1",
  81. Message: xsql.Message{"id1": 1, "f1": "v1"},
  82. }, {
  83. Emitter: "src1",
  84. Message: xsql.Message{"id1": 2, "f1": "v2"},
  85. }, {
  86. Emitter: "src1",
  87. Message: xsql.Message{"id1": 3, "f1": "v3"},
  88. },
  89. },
  90. },
  91. xsql.WindowTuples{
  92. Emitter: "src2",
  93. Tuples: []xsql.Tuple{
  94. {
  95. Emitter: "src2",
  96. Message: xsql.Message{"id2": 1, "f2": "w1"},
  97. }, {
  98. Emitter: "src2",
  99. Message: xsql.Message{"f2": "w2"},
  100. }, {
  101. Emitter: "src2",
  102. Message: xsql.Message{"id2": 4, "f2": "w3"},
  103. },
  104. },
  105. },
  106. },
  107. result: xsql.JoinTupleSets{
  108. xsql.JoinTuple{
  109. Tuples: []xsql.Tuple{
  110. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  111. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  112. },
  113. },
  114. xsql.JoinTuple{
  115. Tuples: []xsql.Tuple{
  116. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  117. },
  118. },
  119. xsql.JoinTuple{
  120. Tuples: []xsql.Tuple{
  121. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  122. },
  123. },
  124. },
  125. },
  126. {
  127. sql: "SELECT id1 FROM src1 left join src2 on src1.ts = src2.ts",
  128. data: xsql.WindowTuplesSet{
  129. xsql.WindowTuples{
  130. Emitter: "src1",
  131. Tuples: []xsql.Tuple{
  132. {
  133. Emitter: "src1",
  134. Message: xsql.Message{"id1": 1, "f1": "v1", "ts": common.TimeFromUnixMilli(1568854515000)},
  135. }, {
  136. Emitter: "src1",
  137. Message: xsql.Message{"id1": 2, "f1": "v2", "ts": common.TimeFromUnixMilli(1568854525000)},
  138. }, {
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 3, "f1": "v3", "ts": common.TimeFromUnixMilli(1568854535000)},
  141. },
  142. },
  143. },
  144. xsql.WindowTuples{
  145. Emitter: "src2",
  146. Tuples: []xsql.Tuple{
  147. {
  148. Emitter: "src2",
  149. Message: xsql.Message{"id2": 1, "f2": "w1", "ts": common.TimeFromUnixMilli(1568854515000)},
  150. }, {
  151. Emitter: "src2",
  152. Message: xsql.Message{"id2": 2, "f2": "w2", "ts": common.TimeFromUnixMilli(1568854525000)},
  153. }, {
  154. Emitter: "src2",
  155. Message: xsql.Message{"id2": 4, "f2": "w3", "ts": common.TimeFromUnixMilli(1568854545000)},
  156. },
  157. },
  158. },
  159. },
  160. result: xsql.JoinTupleSets{
  161. xsql.JoinTuple{
  162. Tuples: []xsql.Tuple{
  163. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "ts": common.TimeFromUnixMilli(1568854515000)}},
  164. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1", "ts": common.TimeFromUnixMilli(1568854515000)}},
  165. },
  166. },
  167. xsql.JoinTuple{
  168. Tuples: []xsql.Tuple{
  169. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "ts": common.TimeFromUnixMilli(1568854525000)}},
  170. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2", "ts": common.TimeFromUnixMilli(1568854525000)}},
  171. },
  172. },
  173. xsql.JoinTuple{
  174. Tuples: []xsql.Tuple{
  175. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3", "ts": common.TimeFromUnixMilli(1568854535000)}},
  176. },
  177. },
  178. },
  179. },
  180. {
  181. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  182. data: xsql.WindowTuplesSet{
  183. xsql.WindowTuples{
  184. Emitter: "src1",
  185. Tuples: []xsql.Tuple{
  186. {
  187. Emitter: "src1",
  188. Message: xsql.Message{"id1": 1, "f1": "v1"},
  189. }, {
  190. Emitter: "src1",
  191. Message: xsql.Message{"id1": 2, "f1": "v2"},
  192. }, {
  193. Emitter: "src1",
  194. Message: xsql.Message{"id1": 3, "f1": "v3"},
  195. },
  196. },
  197. },
  198. xsql.WindowTuples{
  199. Emitter: "src2",
  200. Tuples: []xsql.Tuple{
  201. {
  202. Emitter: "src2",
  203. Message: xsql.Message{"id2": 4, "f2": "w1"},
  204. }, {
  205. Emitter: "src2",
  206. Message: xsql.Message{"id2": 5, "f2": "w2"},
  207. }, {
  208. Emitter: "src2",
  209. Message: xsql.Message{"id2": 6, "f2": "w3"},
  210. },
  211. },
  212. },
  213. },
  214. result: nil,
  215. },
  216. {
  217. sql: "SELECT id1 FROM src1 As s1 left join src2 as s2 on s1.id1 = s2.id2",
  218. data: xsql.WindowTuplesSet{
  219. xsql.WindowTuples{
  220. Emitter: "s1",
  221. Tuples: []xsql.Tuple{
  222. {
  223. Emitter: "s1",
  224. Message: xsql.Message{"id1": 1, "f1": "v1"},
  225. }, {
  226. Emitter: "s1",
  227. Message: xsql.Message{"id1": 2, "f1": "v2"},
  228. }, {
  229. Emitter: "s1",
  230. Message: xsql.Message{"id1": 3, "f1": "v3"},
  231. },
  232. },
  233. },
  234. xsql.WindowTuples{
  235. Emitter: "s2",
  236. Tuples: []xsql.Tuple{
  237. {
  238. Emitter: "s2",
  239. Message: xsql.Message{"id2": 1, "f2": "w1"},
  240. }, {
  241. Emitter: "s2",
  242. Message: xsql.Message{"id2": 2, "f2": "w2"},
  243. }, {
  244. Emitter: "s2",
  245. Message: xsql.Message{"id2": 4, "f2": "w3"},
  246. },
  247. },
  248. },
  249. },
  250. result: xsql.JoinTupleSets{
  251. xsql.JoinTuple{
  252. Tuples: []xsql.Tuple{
  253. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  254. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  255. },
  256. },
  257. xsql.JoinTuple{
  258. Tuples: []xsql.Tuple{
  259. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  260. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  261. },
  262. },
  263. xsql.JoinTuple{
  264. Tuples: []xsql.Tuple{
  265. {Emitter: "s1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  266. },
  267. },
  268. },
  269. },
  270. {
  271. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  272. data: xsql.WindowTuplesSet{
  273. xsql.WindowTuples{
  274. Emitter: "src1",
  275. Tuples: []xsql.Tuple{
  276. {
  277. Emitter: "src1",
  278. Message: xsql.Message{"id1": 1, "f1": "v1"},
  279. },
  280. },
  281. },
  282. xsql.WindowTuples{
  283. Emitter: "src2",
  284. Tuples: []xsql.Tuple{
  285. {
  286. Emitter: "src2",
  287. Message: xsql.Message{"id2": 1, "f2": "w1"},
  288. }, {
  289. Emitter: "src2",
  290. Message: xsql.Message{"id2": 1, "f2": "w2"},
  291. },
  292. },
  293. },
  294. },
  295. result: xsql.JoinTupleSets{
  296. xsql.JoinTuple{
  297. Tuples: []xsql.Tuple{
  298. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  299. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  300. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  301. },
  302. },
  303. },
  304. },
  305. {
  306. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  307. data: xsql.WindowTuplesSet{
  308. xsql.WindowTuples{
  309. Emitter: "src1",
  310. Tuples: []xsql.Tuple{
  311. {
  312. Emitter: "src1",
  313. Message: xsql.Message{"id1": 1, "f1": "v1"},
  314. }, {
  315. Emitter: "src1",
  316. Message: xsql.Message{"id1": 2, "f1": "v2"},
  317. }, {
  318. Emitter: "src1",
  319. Message: xsql.Message{"id1": 3, "f1": "v3"},
  320. },
  321. },
  322. },
  323. xsql.WindowTuples{
  324. Emitter: "src2",
  325. Tuples: []xsql.Tuple{},
  326. },
  327. },
  328. result: xsql.JoinTupleSets{
  329. xsql.JoinTuple{
  330. Tuples: []xsql.Tuple{
  331. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  332. },
  333. },
  334. xsql.JoinTuple{
  335. Tuples: []xsql.Tuple{
  336. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  337. },
  338. },
  339. xsql.JoinTuple{
  340. Tuples: []xsql.Tuple{
  341. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  342. },
  343. },
  344. },
  345. },
  346. {
  347. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  348. data: xsql.WindowTuplesSet{
  349. xsql.WindowTuples{
  350. Emitter: "src1",
  351. Tuples: []xsql.Tuple{
  352. {
  353. Emitter: "src1",
  354. Message: xsql.Message{"id1": 1, "f1": "v1"},
  355. }, {
  356. Emitter: "src1",
  357. Message: xsql.Message{"id1": 2, "f1": "v2"},
  358. }, {
  359. Emitter: "src1",
  360. Message: xsql.Message{"id1": 3, "f1": "v3"},
  361. },
  362. },
  363. },
  364. xsql.WindowTuples{
  365. Emitter: "src2",
  366. Tuples: nil,
  367. },
  368. },
  369. result: xsql.JoinTupleSets{
  370. xsql.JoinTuple{
  371. Tuples: []xsql.Tuple{
  372. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  373. },
  374. },
  375. xsql.JoinTuple{
  376. Tuples: []xsql.Tuple{
  377. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  378. },
  379. },
  380. xsql.JoinTuple{
  381. Tuples: []xsql.Tuple{
  382. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  383. },
  384. },
  385. },
  386. },
  387. {
  388. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  389. data: xsql.WindowTuplesSet{
  390. xsql.WindowTuples{
  391. Emitter: "src1",
  392. Tuples: []xsql.Tuple{},
  393. },
  394. xsql.WindowTuples{
  395. Emitter: "src2",
  396. Tuples: []xsql.Tuple{
  397. {
  398. Emitter: "src2",
  399. Message: xsql.Message{"id2": 1, "f2": "w1"},
  400. }, {
  401. Emitter: "src2",
  402. Message: xsql.Message{"id2": 1, "f2": "w2"},
  403. },
  404. },
  405. },
  406. },
  407. result: nil,
  408. },
  409. {
  410. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  411. data: xsql.WindowTuplesSet{
  412. xsql.WindowTuples{
  413. Emitter: "src1",
  414. Tuples: nil,
  415. },
  416. xsql.WindowTuples{
  417. Emitter: "src2",
  418. Tuples: []xsql.Tuple{
  419. {
  420. Emitter: "src2",
  421. Message: xsql.Message{"id2": 1, "f2": "w1"},
  422. }, {
  423. Emitter: "src2",
  424. Message: xsql.Message{"id2": 1, "f2": "w2"},
  425. },
  426. },
  427. },
  428. },
  429. result: nil,
  430. },
  431. {
  432. sql: "SELECT id1 FROM src1 left join src2 on src1.id1*2 = src2.id2",
  433. data: xsql.WindowTuplesSet{
  434. xsql.WindowTuples{
  435. Emitter: "src1",
  436. Tuples: []xsql.Tuple{
  437. {
  438. Emitter: "src1",
  439. Message: xsql.Message{"id1": 1, "f1": "v1"},
  440. }, {
  441. Emitter: "src1",
  442. Message: xsql.Message{"id1": 2, "f1": "v2"},
  443. }, {
  444. Emitter: "src1",
  445. Message: xsql.Message{"id1": 3, "f1": "v3"},
  446. },
  447. },
  448. },
  449. xsql.WindowTuples{
  450. Emitter: "src2",
  451. Tuples: []xsql.Tuple{
  452. {
  453. Emitter: "src2",
  454. Message: xsql.Message{"id2": 1, "f2": "w1"},
  455. }, {
  456. Emitter: "src2",
  457. Message: xsql.Message{"id2": 2, "f2": "w2"},
  458. }, {
  459. Emitter: "src2",
  460. Message: xsql.Message{"id2": 4, "f2": "w3"},
  461. },
  462. },
  463. },
  464. },
  465. result: xsql.JoinTupleSets{
  466. xsql.JoinTuple{
  467. Tuples: []xsql.Tuple{
  468. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  469. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  470. },
  471. },
  472. xsql.JoinTuple{
  473. Tuples: []xsql.Tuple{
  474. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  475. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  476. },
  477. },
  478. xsql.JoinTuple{
  479. Tuples: []xsql.Tuple{
  480. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  481. },
  482. },
  483. },
  484. },
  485. {
  486. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2*2",
  487. data: xsql.WindowTuplesSet{
  488. xsql.WindowTuples{
  489. Emitter: "src1",
  490. Tuples: []xsql.Tuple{
  491. {
  492. Emitter: "src1",
  493. Message: xsql.Message{"id1": 1, "f1": "v1"},
  494. }, {
  495. Emitter: "src1",
  496. Message: xsql.Message{"id1": 2, "f1": "v2"},
  497. }, {
  498. Emitter: "src1",
  499. Message: xsql.Message{"id1": 3, "f1": "v3"},
  500. },
  501. },
  502. },
  503. xsql.WindowTuples{
  504. Emitter: "src2",
  505. Tuples: []xsql.Tuple{
  506. {
  507. Emitter: "src2",
  508. Message: xsql.Message{"id2": 1, "f2": "w1"},
  509. }, {
  510. Emitter: "src2",
  511. Message: xsql.Message{"id2": 2, "f2": "w2"},
  512. }, {
  513. Emitter: "src2",
  514. Message: xsql.Message{"id2": 4, "f2": "w3"},
  515. },
  516. },
  517. },
  518. },
  519. result: xsql.JoinTupleSets{
  520. xsql.JoinTuple{
  521. Tuples: []xsql.Tuple{
  522. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  523. },
  524. },
  525. xsql.JoinTuple{
  526. Tuples: []xsql.Tuple{
  527. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  528. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  529. },
  530. },
  531. xsql.JoinTuple{
  532. Tuples: []xsql.Tuple{
  533. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  534. },
  535. },
  536. },
  537. },
  538. {
  539. sql: "SELECT id1 FROM src1 left join src2 on src1.f1->cid = src2.f2->cid",
  540. data: xsql.WindowTuplesSet{
  541. xsql.WindowTuples{
  542. Emitter: "src1",
  543. Tuples: []xsql.Tuple{
  544. {
  545. Emitter: "src1",
  546. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  547. }, {
  548. Emitter: "src1",
  549. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  550. }, {
  551. Emitter: "src1",
  552. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  553. },
  554. },
  555. },
  556. xsql.WindowTuples{
  557. Emitter: "src2",
  558. Tuples: []xsql.Tuple{
  559. {
  560. Emitter: "src2",
  561. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  562. }, {
  563. Emitter: "src2",
  564. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  565. }, {
  566. Emitter: "src2",
  567. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  568. },
  569. },
  570. },
  571. },
  572. result: xsql.JoinTupleSets{
  573. xsql.JoinTuple{
  574. Tuples: []xsql.Tuple{
  575. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  576. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  577. },
  578. },
  579. xsql.JoinTuple{
  580. Tuples: []xsql.Tuple{
  581. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  582. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  583. },
  584. },
  585. xsql.JoinTuple{
  586. Tuples: []xsql.Tuple{
  587. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)}},
  588. },
  589. },
  590. },
  591. },
  592. {
  593. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 left join src2 on src1.id1 = src2.id2",
  594. data: xsql.WindowTuplesSet{
  595. xsql.WindowTuples{
  596. Emitter: "src1",
  597. Tuples: []xsql.Tuple{
  598. {
  599. Emitter: "src1",
  600. Message: xsql.Message{"id1": 1, "f1": "v1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"},
  601. },
  602. },
  603. },
  604. xsql.WindowTuples{
  605. Emitter: "src2",
  606. Tuples: []xsql.Tuple{
  607. {
  608. Emitter: "src2",
  609. Message: xsql.Message{"id2": 1, "f2": "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001"},
  610. },
  611. },
  612. },
  613. },
  614. result: xsql.JoinTupleSets{
  615. xsql.JoinTuple{
  616. Tuples: []xsql.Tuple{
  617. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"}},
  618. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001"}},
  619. },
  620. },
  621. },
  622. },
  623. }
  624. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  625. contextLogger := common.Log.WithField("rule", "TestLeftJoinPlan_Apply")
  626. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  627. for i, tt := range tests {
  628. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  629. if err != nil {
  630. t.Errorf("statement parse error %s", err)
  631. break
  632. }
  633. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  634. t.Errorf("statement source is not a table")
  635. } else {
  636. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  637. result := pp.Apply(ctx, tt.data)
  638. if !reflect.DeepEqual(tt.result, result) {
  639. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  640. }
  641. }
  642. }
  643. }
  644. func TestInnerJoinPlan_Apply(t *testing.T) {
  645. var tests = []struct {
  646. sql string
  647. data xsql.WindowTuplesSet
  648. result interface{}
  649. }{
  650. {
  651. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  652. data: xsql.WindowTuplesSet{
  653. xsql.WindowTuples{
  654. Emitter: "src1",
  655. Tuples: []xsql.Tuple{
  656. {
  657. Emitter: "src1",
  658. Message: xsql.Message{"id1": 1, "f1": "v1"},
  659. }, {
  660. Emitter: "src1",
  661. Message: xsql.Message{"id1": 2, "f1": "v2"},
  662. }, {
  663. Emitter: "src1",
  664. Message: xsql.Message{"id1": 3, "f1": "v3"},
  665. },
  666. },
  667. },
  668. xsql.WindowTuples{
  669. Emitter: "src2",
  670. Tuples: []xsql.Tuple{
  671. {
  672. Emitter: "src2",
  673. Message: xsql.Message{"id2": 1, "f2": "w1"},
  674. }, {
  675. Emitter: "src2",
  676. Message: xsql.Message{"id2": 2, "f2": "w2"},
  677. }, {
  678. Emitter: "src2",
  679. Message: xsql.Message{"id2": 4, "f2": "w3"},
  680. },
  681. },
  682. },
  683. },
  684. result: xsql.JoinTupleSets{
  685. xsql.JoinTuple{
  686. Tuples: []xsql.Tuple{
  687. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  688. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  689. },
  690. },
  691. xsql.JoinTuple{
  692. Tuples: []xsql.Tuple{
  693. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  694. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  695. },
  696. },
  697. },
  698. },
  699. {
  700. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  701. data: xsql.WindowTuplesSet{
  702. xsql.WindowTuples{
  703. Emitter: "src1",
  704. Tuples: []xsql.Tuple{
  705. {
  706. Emitter: "src1",
  707. Message: xsql.Message{"id1": 1, "f1": "v1"},
  708. }, {
  709. Emitter: "src1",
  710. Message: xsql.Message{"id1": 2, "f1": "v2"},
  711. }, {
  712. Emitter: "src1",
  713. Message: xsql.Message{"id1": 3, "f1": "v3"},
  714. },
  715. },
  716. },
  717. xsql.WindowTuples{
  718. Emitter: "src2",
  719. Tuples: []xsql.Tuple{
  720. {
  721. Emitter: "src2",
  722. Message: xsql.Message{"id2": 1, "f2": "w1"},
  723. }, {
  724. Emitter: "src2",
  725. Message: xsql.Message{"f2": "w2"},
  726. }, {
  727. Emitter: "src2",
  728. Message: xsql.Message{"id2": 4, "f2": "w3"},
  729. },
  730. },
  731. },
  732. },
  733. result: xsql.JoinTupleSets{
  734. xsql.JoinTuple{
  735. Tuples: []xsql.Tuple{
  736. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  737. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  738. },
  739. },
  740. },
  741. },
  742. {
  743. sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
  744. data: xsql.WindowTuplesSet{
  745. xsql.WindowTuples{
  746. Emitter: "s1",
  747. Tuples: []xsql.Tuple{
  748. {
  749. Emitter: "s1",
  750. Message: xsql.Message{"id1": 1, "f1": "v1"},
  751. }, {
  752. Emitter: "s1",
  753. Message: xsql.Message{"id1": 2, "f1": "v2"},
  754. }, {
  755. Emitter: "s1",
  756. Message: xsql.Message{"id1": 3, "f1": "v3"},
  757. },
  758. },
  759. },
  760. xsql.WindowTuples{
  761. Emitter: "s2",
  762. Tuples: []xsql.Tuple{
  763. {
  764. Emitter: "s2",
  765. Message: xsql.Message{"id2": 1, "f2": "w1"},
  766. }, {
  767. Emitter: "s2",
  768. Message: xsql.Message{"id2": 2, "f2": "w2"},
  769. }, {
  770. Emitter: "s2",
  771. Message: xsql.Message{"id2": 4, "f2": "w3"},
  772. },
  773. },
  774. },
  775. },
  776. result: xsql.JoinTupleSets{
  777. xsql.JoinTuple{
  778. Tuples: []xsql.Tuple{
  779. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  780. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  781. },
  782. },
  783. xsql.JoinTuple{
  784. Tuples: []xsql.Tuple{
  785. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  786. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  787. },
  788. },
  789. },
  790. },
  791. {
  792. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  793. data: xsql.WindowTuplesSet{
  794. xsql.WindowTuples{
  795. Emitter: "src1",
  796. Tuples: []xsql.Tuple{
  797. {
  798. Emitter: "src1",
  799. Message: xsql.Message{"id1": 1, "f1": "v1"},
  800. },
  801. },
  802. },
  803. xsql.WindowTuples{
  804. Emitter: "src2",
  805. Tuples: []xsql.Tuple{
  806. {
  807. Emitter: "src2",
  808. Message: xsql.Message{"id2": 1, "f2": "w1"},
  809. }, {
  810. Emitter: "src2",
  811. Message: xsql.Message{"id2": 1, "f2": "w2"},
  812. },
  813. },
  814. },
  815. },
  816. result: xsql.JoinTupleSets{
  817. xsql.JoinTuple{
  818. Tuples: []xsql.Tuple{
  819. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  820. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  821. },
  822. },
  823. xsql.JoinTuple{
  824. Tuples: []xsql.Tuple{
  825. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  826. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  827. },
  828. },
  829. },
  830. },
  831. {
  832. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  833. data: xsql.WindowTuplesSet{
  834. xsql.WindowTuples{
  835. Emitter: "src1",
  836. Tuples: []xsql.Tuple{
  837. {
  838. Emitter: "src1",
  839. Message: xsql.Message{"id1": 1, "f1": "v1"},
  840. }, {
  841. Emitter: "src1",
  842. Message: xsql.Message{"id1": 2, "f1": "v2"},
  843. }, {
  844. Emitter: "src1",
  845. Message: xsql.Message{"id1": 3, "f1": "v3"},
  846. },
  847. },
  848. },
  849. xsql.WindowTuples{
  850. Emitter: "src2",
  851. Tuples: []xsql.Tuple{},
  852. },
  853. },
  854. result: nil,
  855. },
  856. {
  857. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  858. data: xsql.WindowTuplesSet{
  859. xsql.WindowTuples{
  860. Emitter: "src1",
  861. Tuples: []xsql.Tuple{
  862. {
  863. Emitter: "src1",
  864. Message: xsql.Message{"id1": 1, "f1": "v1"},
  865. }, {
  866. Emitter: "src1",
  867. Message: xsql.Message{"id1": 2, "f1": "v2"},
  868. }, {
  869. Emitter: "src1",
  870. Message: xsql.Message{"id1": 3, "f1": "v3"},
  871. },
  872. },
  873. },
  874. xsql.WindowTuples{
  875. Emitter: "src2",
  876. Tuples: nil,
  877. },
  878. },
  879. result: nil,
  880. },
  881. {
  882. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  883. data: xsql.WindowTuplesSet{
  884. xsql.WindowTuples{
  885. Emitter: "src1",
  886. Tuples: []xsql.Tuple{},
  887. },
  888. xsql.WindowTuples{
  889. Emitter: "src2",
  890. Tuples: []xsql.Tuple{
  891. {
  892. Emitter: "src2",
  893. Message: xsql.Message{"id2": 1, "f2": "w1"},
  894. }, {
  895. Emitter: "src2",
  896. Message: xsql.Message{"id2": 1, "f2": "w2"},
  897. },
  898. },
  899. },
  900. },
  901. result: nil,
  902. },
  903. {
  904. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  905. data: xsql.WindowTuplesSet{
  906. xsql.WindowTuples{
  907. Emitter: "src1",
  908. Tuples: nil,
  909. },
  910. xsql.WindowTuples{
  911. Emitter: "src2",
  912. Tuples: []xsql.Tuple{
  913. {
  914. Emitter: "src2",
  915. Message: xsql.Message{"id2": 1, "f2": "w1"},
  916. }, {
  917. Emitter: "src2",
  918. Message: xsql.Message{"id2": 1, "f2": "w2"},
  919. },
  920. },
  921. },
  922. },
  923. result: nil,
  924. },
  925. {
  926. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1*2 = src2.id2",
  927. data: xsql.WindowTuplesSet{
  928. xsql.WindowTuples{
  929. Emitter: "src1",
  930. Tuples: []xsql.Tuple{
  931. {
  932. Emitter: "src1",
  933. Message: xsql.Message{"id1": 1, "f1": "v1"},
  934. }, {
  935. Emitter: "src1",
  936. Message: xsql.Message{"id1": 2, "f1": "v2"},
  937. }, {
  938. Emitter: "src1",
  939. Message: xsql.Message{"id1": 3, "f1": "v3"},
  940. },
  941. },
  942. },
  943. xsql.WindowTuples{
  944. Emitter: "src2",
  945. Tuples: []xsql.Tuple{
  946. {
  947. Emitter: "src2",
  948. Message: xsql.Message{"id2": 1, "f2": "w1"},
  949. }, {
  950. Emitter: "src2",
  951. Message: xsql.Message{"id2": 2, "f2": "w2"},
  952. }, {
  953. Emitter: "src2",
  954. Message: xsql.Message{"id2": 4, "f2": "w3"},
  955. },
  956. },
  957. },
  958. },
  959. result: xsql.JoinTupleSets{
  960. xsql.JoinTuple{
  961. Tuples: []xsql.Tuple{
  962. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  963. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  964. },
  965. },
  966. xsql.JoinTuple{
  967. Tuples: []xsql.Tuple{
  968. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  969. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  970. },
  971. },
  972. },
  973. },
  974. {
  975. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2*2",
  976. data: xsql.WindowTuplesSet{
  977. xsql.WindowTuples{
  978. Emitter: "src1",
  979. Tuples: []xsql.Tuple{
  980. {
  981. Emitter: "src1",
  982. Message: xsql.Message{"id1": 1, "f1": "v1"},
  983. }, {
  984. Emitter: "src1",
  985. Message: xsql.Message{"id1": 2, "f1": "v2"},
  986. }, {
  987. Emitter: "src1",
  988. Message: xsql.Message{"id1": 3, "f1": "v3"},
  989. },
  990. },
  991. },
  992. xsql.WindowTuples{
  993. Emitter: "src2",
  994. Tuples: []xsql.Tuple{
  995. {
  996. Emitter: "src2",
  997. Message: xsql.Message{"id2": 1, "f2": "w1"},
  998. }, {
  999. Emitter: "src2",
  1000. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1001. }, {
  1002. Emitter: "src2",
  1003. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1004. },
  1005. },
  1006. },
  1007. },
  1008. result: xsql.JoinTupleSets{
  1009. xsql.JoinTuple{
  1010. Tuples: []xsql.Tuple{
  1011. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1012. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1013. },
  1014. },
  1015. },
  1016. },
  1017. {
  1018. sql: "SELECT id1 FROM src1 inner join src2 on src1.f1->cid = src2.f2->cid",
  1019. data: xsql.WindowTuplesSet{
  1020. xsql.WindowTuples{
  1021. Emitter: "src1",
  1022. Tuples: []xsql.Tuple{
  1023. {
  1024. Emitter: "src1",
  1025. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  1026. }, {
  1027. Emitter: "src1",
  1028. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  1029. }, {
  1030. Emitter: "src1",
  1031. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  1032. },
  1033. },
  1034. },
  1035. xsql.WindowTuples{
  1036. Emitter: "src2",
  1037. Tuples: []xsql.Tuple{
  1038. {
  1039. Emitter: "src2",
  1040. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  1041. }, {
  1042. Emitter: "src2",
  1043. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  1044. }, {
  1045. Emitter: "src2",
  1046. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  1047. },
  1048. },
  1049. },
  1050. },
  1051. result: xsql.JoinTupleSets{
  1052. xsql.JoinTuple{
  1053. Tuples: []xsql.Tuple{
  1054. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  1055. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  1056. },
  1057. },
  1058. xsql.JoinTuple{
  1059. Tuples: []xsql.Tuple{
  1060. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  1061. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  1062. },
  1063. },
  1064. },
  1065. },
  1066. }
  1067. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1068. contextLogger := common.Log.WithField("rule", "TestInnerJoinPlan_Apply")
  1069. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1070. for i, tt := range tests {
  1071. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1072. if err != nil {
  1073. t.Errorf("statement parse error %s", err)
  1074. break
  1075. }
  1076. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1077. t.Errorf("statement source is not a table")
  1078. } else {
  1079. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1080. result := pp.Apply(ctx, tt.data)
  1081. if !reflect.DeepEqual(tt.result, result) {
  1082. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1083. }
  1084. }
  1085. }
  1086. }
  1087. func TestRightJoinPlan_Apply(t *testing.T) {
  1088. var tests = []struct {
  1089. sql string
  1090. data xsql.WindowTuplesSet
  1091. result interface{}
  1092. }{
  1093. {
  1094. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1095. data: xsql.WindowTuplesSet{
  1096. xsql.WindowTuples{
  1097. Emitter: "src1",
  1098. Tuples: []xsql.Tuple{
  1099. {
  1100. Emitter: "src1",
  1101. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1102. }, {
  1103. Emitter: "src1",
  1104. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1105. }, {
  1106. Emitter: "src1",
  1107. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1108. },
  1109. },
  1110. },
  1111. xsql.WindowTuples{
  1112. Emitter: "src2",
  1113. Tuples: []xsql.Tuple{
  1114. {
  1115. Emitter: "src2",
  1116. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1117. }, {
  1118. Emitter: "src2",
  1119. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1120. }, {
  1121. Emitter: "src2",
  1122. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1123. },
  1124. },
  1125. },
  1126. },
  1127. result: xsql.JoinTupleSets{
  1128. xsql.JoinTuple{
  1129. Tuples: []xsql.Tuple{
  1130. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1131. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1132. },
  1133. },
  1134. xsql.JoinTuple{
  1135. Tuples: []xsql.Tuple{
  1136. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1137. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1138. },
  1139. },
  1140. xsql.JoinTuple{
  1141. Tuples: []xsql.Tuple{
  1142. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1143. },
  1144. },
  1145. },
  1146. },
  1147. {
  1148. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1149. data: xsql.WindowTuplesSet{
  1150. xsql.WindowTuples{
  1151. Emitter: "src1",
  1152. Tuples: []xsql.Tuple{
  1153. {
  1154. Emitter: "src1",
  1155. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1156. }, {
  1157. Emitter: "src1",
  1158. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1159. }, {
  1160. Emitter: "src1",
  1161. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1162. },
  1163. },
  1164. },
  1165. xsql.WindowTuples{
  1166. Emitter: "src2",
  1167. Tuples: []xsql.Tuple{
  1168. {
  1169. Emitter: "src2",
  1170. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1171. }, {
  1172. Emitter: "src2",
  1173. Message: xsql.Message{"f2": "w2"},
  1174. }, {
  1175. Emitter: "src2",
  1176. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1177. },
  1178. },
  1179. },
  1180. },
  1181. result: xsql.JoinTupleSets{
  1182. xsql.JoinTuple{
  1183. Tuples: []xsql.Tuple{
  1184. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1185. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1186. },
  1187. },
  1188. xsql.JoinTuple{
  1189. Tuples: []xsql.Tuple{
  1190. {Emitter: "src2", Message: xsql.Message{"f2": "w2"}},
  1191. },
  1192. },
  1193. xsql.JoinTuple{
  1194. Tuples: []xsql.Tuple{
  1195. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1196. },
  1197. },
  1198. },
  1199. },
  1200. {
  1201. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1202. data: xsql.WindowTuplesSet{
  1203. xsql.WindowTuples{
  1204. Emitter: "src1",
  1205. Tuples: []xsql.Tuple{
  1206. {
  1207. Emitter: "src1",
  1208. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1209. },
  1210. },
  1211. },
  1212. xsql.WindowTuples{
  1213. Emitter: "src2",
  1214. Tuples: []xsql.Tuple{
  1215. {
  1216. Emitter: "src2",
  1217. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1218. }, {
  1219. Emitter: "src2",
  1220. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1221. },
  1222. },
  1223. },
  1224. },
  1225. result: xsql.JoinTupleSets{
  1226. xsql.JoinTuple{
  1227. Tuples: []xsql.Tuple{
  1228. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1229. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1230. },
  1231. },
  1232. xsql.JoinTuple{
  1233. Tuples: []xsql.Tuple{
  1234. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1235. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1236. },
  1237. },
  1238. },
  1239. },
  1240. }
  1241. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1242. contextLogger := common.Log.WithField("rule", "TestRightJoinPlan_Apply")
  1243. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1244. for i, tt := range tests {
  1245. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1246. if err != nil {
  1247. t.Errorf("statement parse error %s", err)
  1248. break
  1249. }
  1250. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1251. t.Errorf("statement source is not a table")
  1252. } else {
  1253. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1254. result := pp.Apply(ctx, tt.data)
  1255. if !reflect.DeepEqual(tt.result, result) {
  1256. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1257. }
  1258. }
  1259. }
  1260. }
  1261. func TestFullJoinPlan_Apply(t *testing.T) {
  1262. var tests = []struct {
  1263. sql string
  1264. data xsql.WindowTuplesSet
  1265. result interface{}
  1266. }{
  1267. {
  1268. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1269. data: xsql.WindowTuplesSet{
  1270. xsql.WindowTuples{
  1271. Emitter: "src1",
  1272. Tuples: []xsql.Tuple{
  1273. {
  1274. Emitter: "src1",
  1275. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1276. }, {
  1277. Emitter: "src1",
  1278. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1279. }, {
  1280. Emitter: "src1",
  1281. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1282. },
  1283. },
  1284. },
  1285. xsql.WindowTuples{
  1286. Emitter: "src2",
  1287. Tuples: []xsql.Tuple{
  1288. {
  1289. Emitter: "src2",
  1290. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1291. }, {
  1292. Emitter: "src2",
  1293. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1294. }, {
  1295. Emitter: "src2",
  1296. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1297. }, {
  1298. Emitter: "src2",
  1299. Message: xsql.Message{"id2": 2, "f2": "w4"},
  1300. },
  1301. },
  1302. },
  1303. },
  1304. result: xsql.JoinTupleSets{
  1305. xsql.JoinTuple{
  1306. Tuples: []xsql.Tuple{
  1307. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1308. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1309. },
  1310. },
  1311. xsql.JoinTuple{
  1312. Tuples: []xsql.Tuple{
  1313. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1314. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1315. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w4"}},
  1316. },
  1317. },
  1318. xsql.JoinTuple{
  1319. Tuples: []xsql.Tuple{
  1320. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1321. },
  1322. },
  1323. xsql.JoinTuple{
  1324. Tuples: []xsql.Tuple{
  1325. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1326. },
  1327. },
  1328. },
  1329. },
  1330. {
  1331. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1332. data: xsql.WindowTuplesSet{
  1333. xsql.WindowTuples{
  1334. Emitter: "src1",
  1335. Tuples: []xsql.Tuple{
  1336. {
  1337. Emitter: "src1",
  1338. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1339. }, {
  1340. Emitter: "src1",
  1341. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1342. }, {
  1343. Emitter: "src1",
  1344. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1345. },
  1346. },
  1347. },
  1348. xsql.WindowTuples{
  1349. Emitter: "src2",
  1350. Tuples: []xsql.Tuple{
  1351. {
  1352. Emitter: "src2",
  1353. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1354. }, {
  1355. Emitter: "src2",
  1356. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1357. }, {
  1358. Emitter: "src2",
  1359. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1360. },
  1361. },
  1362. },
  1363. },
  1364. result: xsql.JoinTupleSets{
  1365. xsql.JoinTuple{
  1366. Tuples: []xsql.Tuple{
  1367. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1368. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1369. },
  1370. },
  1371. xsql.JoinTuple{
  1372. Tuples: []xsql.Tuple{
  1373. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1374. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1375. },
  1376. },
  1377. xsql.JoinTuple{
  1378. Tuples: []xsql.Tuple{
  1379. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1380. },
  1381. },
  1382. xsql.JoinTuple{
  1383. Tuples: []xsql.Tuple{
  1384. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1385. },
  1386. },
  1387. },
  1388. },
  1389. {
  1390. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1391. data: xsql.WindowTuplesSet{
  1392. xsql.WindowTuples{
  1393. Emitter: "src1",
  1394. Tuples: []xsql.Tuple{
  1395. {
  1396. Emitter: "src1",
  1397. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1398. }, {
  1399. Emitter: "src1",
  1400. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1401. }, {
  1402. Emitter: "src1",
  1403. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1404. },
  1405. },
  1406. },
  1407. xsql.WindowTuples{
  1408. Emitter: "src2",
  1409. Tuples: []xsql.Tuple{},
  1410. },
  1411. },
  1412. result: xsql.JoinTupleSets{
  1413. xsql.JoinTuple{
  1414. Tuples: []xsql.Tuple{
  1415. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1416. },
  1417. },
  1418. xsql.JoinTuple{
  1419. Tuples: []xsql.Tuple{
  1420. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1421. },
  1422. },
  1423. xsql.JoinTuple{
  1424. Tuples: []xsql.Tuple{
  1425. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1426. },
  1427. },
  1428. },
  1429. },
  1430. {
  1431. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1432. data: xsql.WindowTuplesSet{
  1433. xsql.WindowTuples{
  1434. Emitter: "src1",
  1435. Tuples: []xsql.Tuple{},
  1436. },
  1437. xsql.WindowTuples{
  1438. Emitter: "src2",
  1439. Tuples: []xsql.Tuple{
  1440. {
  1441. Emitter: "src2",
  1442. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1443. }, {
  1444. Emitter: "src2",
  1445. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1446. }, {
  1447. Emitter: "src2",
  1448. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1449. },
  1450. },
  1451. },
  1452. },
  1453. result: xsql.JoinTupleSets{
  1454. xsql.JoinTuple{
  1455. Tuples: []xsql.Tuple{
  1456. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1457. },
  1458. },
  1459. xsql.JoinTuple{
  1460. Tuples: []xsql.Tuple{
  1461. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1462. },
  1463. },
  1464. xsql.JoinTuple{
  1465. Tuples: []xsql.Tuple{
  1466. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1467. },
  1468. },
  1469. },
  1470. },
  1471. }
  1472. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1473. contextLogger := common.Log.WithField("rule", "TestFullJoinPlan_Apply")
  1474. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1475. for i, tt := range tests {
  1476. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1477. if err != nil {
  1478. t.Errorf("statement parse error %s", err)
  1479. break
  1480. }
  1481. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1482. t.Errorf("statement source is not a table")
  1483. } else {
  1484. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1485. result := pp.Apply(ctx, tt.data)
  1486. if !reflect.DeepEqual(tt.result, result) {
  1487. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1488. }
  1489. }
  1490. }
  1491. }
  1492. func TestCrossJoinPlan_Apply(t *testing.T) {
  1493. var tests = []struct {
  1494. sql string
  1495. data xsql.WindowTuplesSet
  1496. result interface{}
  1497. }{
  1498. {
  1499. sql: "SELECT id1 FROM src1 cross join src2",
  1500. data: xsql.WindowTuplesSet{
  1501. xsql.WindowTuples{
  1502. Emitter: "src1",
  1503. Tuples: []xsql.Tuple{
  1504. {
  1505. Emitter: "src1",
  1506. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1507. }, {
  1508. Emitter: "src1",
  1509. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1510. }, {
  1511. Emitter: "src1",
  1512. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1513. },
  1514. },
  1515. },
  1516. xsql.WindowTuples{
  1517. Emitter: "src2",
  1518. Tuples: []xsql.Tuple{
  1519. {
  1520. Emitter: "src2",
  1521. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1522. }, {
  1523. Emitter: "src2",
  1524. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1525. }, {
  1526. Emitter: "src2",
  1527. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1528. },
  1529. },
  1530. },
  1531. },
  1532. result: xsql.JoinTupleSets{
  1533. xsql.JoinTuple{
  1534. Tuples: []xsql.Tuple{
  1535. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1536. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1537. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1538. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1539. },
  1540. },
  1541. xsql.JoinTuple{
  1542. Tuples: []xsql.Tuple{
  1543. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1544. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1545. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1546. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1547. },
  1548. },
  1549. xsql.JoinTuple{
  1550. Tuples: []xsql.Tuple{
  1551. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1552. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1553. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1554. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1555. },
  1556. },
  1557. },
  1558. },
  1559. {
  1560. sql: "SELECT id1 FROM src1 cross join src2",
  1561. data: xsql.WindowTuplesSet{
  1562. xsql.WindowTuples{
  1563. Emitter: "src1",
  1564. Tuples: []xsql.Tuple{
  1565. {
  1566. Emitter: "src1",
  1567. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1568. },
  1569. },
  1570. },
  1571. xsql.WindowTuples{
  1572. Emitter: "src2",
  1573. Tuples: []xsql.Tuple{
  1574. {
  1575. Emitter: "src2",
  1576. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1577. }, {
  1578. Emitter: "src2",
  1579. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1580. },
  1581. },
  1582. },
  1583. },
  1584. result: xsql.JoinTupleSets{
  1585. xsql.JoinTuple{
  1586. Tuples: []xsql.Tuple{
  1587. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1588. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1589. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1590. },
  1591. },
  1592. },
  1593. },
  1594. }
  1595. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1596. contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  1597. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1598. for i, tt := range tests {
  1599. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1600. if err != nil {
  1601. t.Errorf("statement parse error %s", err)
  1602. break
  1603. }
  1604. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1605. t.Errorf("statement source is not a table")
  1606. } else {
  1607. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1608. result := pp.Apply(ctx, tt.data)
  1609. if !reflect.DeepEqual(tt.result, result) {
  1610. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1611. }
  1612. }
  1613. }
  1614. }
  1615. func TestCrossJoinPlanError(t *testing.T) {
  1616. var tests = []struct {
  1617. sql string
  1618. data interface{}
  1619. result interface{}
  1620. }{
  1621. {
  1622. sql: "SELECT id1 FROM src1 cross join src2",
  1623. data: errors.New("an error from upstream"),
  1624. result: errors.New("an error from upstream"),
  1625. }, {
  1626. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1627. data: xsql.WindowTuplesSet{
  1628. xsql.WindowTuples{
  1629. Emitter: "src1",
  1630. Tuples: []xsql.Tuple{
  1631. {
  1632. Emitter: "src1",
  1633. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1634. }, {
  1635. Emitter: "src1",
  1636. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1637. }, {
  1638. Emitter: "src1",
  1639. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1640. },
  1641. },
  1642. },
  1643. xsql.WindowTuples{
  1644. Emitter: "src2",
  1645. Tuples: []xsql.Tuple{
  1646. {
  1647. Emitter: "src2",
  1648. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1649. }, {
  1650. Emitter: "src2",
  1651. Message: xsql.Message{"id2": "3", "f2": "w2"},
  1652. }, {
  1653. Emitter: "src2",
  1654. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1655. }, {
  1656. Emitter: "src2",
  1657. Message: xsql.Message{"id2": 2, "f2": "w4"},
  1658. },
  1659. },
  1660. },
  1661. },
  1662. result: errors.New("run Join error: invalid operation int64(1) = string(3)"),
  1663. },
  1664. }
  1665. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1666. contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  1667. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1668. for i, tt := range tests {
  1669. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1670. if err != nil {
  1671. t.Errorf("statement parse error %s", err)
  1672. break
  1673. }
  1674. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1675. t.Errorf("statement source is not a table")
  1676. } else {
  1677. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1678. result := pp.Apply(ctx, tt.data)
  1679. if !reflect.DeepEqual(tt.result, result) {
  1680. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1681. }
  1682. }
  1683. }
  1684. }
  1685. func str2Map(s string) map[string]interface{} {
  1686. var input map[string]interface{}
  1687. if err := json.Unmarshal([]byte(s), &input); err != nil {
  1688. fmt.Printf("Failed to parse the JSON data.\n")
  1689. return nil
  1690. }
  1691. return input
  1692. }