join_test.go 45 KB


  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestLeftJoinPlan_Apply(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data xsql.WindowTuplesSet
  17. result interface{}
  18. }{
  19. {
  20. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  21. data: xsql.WindowTuplesSet{
  22. xsql.WindowTuples{
  23. Emitter: "src1",
  24. Tuples: []xsql.Tuple{
  25. {
  26. Emitter: "src1",
  27. Message: xsql.Message{"id1": 1, "f1": "v1"},
  28. }, {
  29. Emitter: "src1",
  30. Message: xsql.Message{"id1": 2, "f1": "v2"},
  31. }, {
  32. Emitter: "src1",
  33. Message: xsql.Message{"id1": 3, "f1": "v3"},
  34. },
  35. },
  36. },
  37. xsql.WindowTuples{
  38. Emitter: "src2",
  39. Tuples: []xsql.Tuple{
  40. {
  41. Emitter: "src2",
  42. Message: xsql.Message{"id2": 1, "f2": "w1"},
  43. }, {
  44. Emitter: "src2",
  45. Message: xsql.Message{"id2": 2, "f2": "w2"},
  46. }, {
  47. Emitter: "src2",
  48. Message: xsql.Message{"id2": 4, "f2": "w3"},
  49. },
  50. },
  51. },
  52. },
  53. result: xsql.JoinTupleSets{
  54. xsql.JoinTuple{
  55. Tuples: []xsql.Tuple{
  56. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  57. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  58. },
  59. },
  60. xsql.JoinTuple{
  61. Tuples: []xsql.Tuple{
  62. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  63. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  64. },
  65. },
  66. xsql.JoinTuple{
  67. Tuples: []xsql.Tuple{
  68. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  69. },
  70. },
  71. },
  72. },
  73. {
  74. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  75. data: xsql.WindowTuplesSet{
  76. xsql.WindowTuples{
  77. Emitter: "src1",
  78. Tuples: []xsql.Tuple{
  79. {
  80. Emitter: "src1",
  81. Message: xsql.Message{"id1": 1, "f1": "v1"},
  82. }, {
  83. Emitter: "src1",
  84. Message: xsql.Message{"id1": 2, "f1": "v2"},
  85. }, {
  86. Emitter: "src1",
  87. Message: xsql.Message{"id1": 3, "f1": "v3"},
  88. },
  89. },
  90. },
  91. xsql.WindowTuples{
  92. Emitter: "src2",
  93. Tuples: []xsql.Tuple{
  94. {
  95. Emitter: "src2",
  96. Message: xsql.Message{"id2": 1, "f2": "w1"},
  97. }, {
  98. Emitter: "src2",
  99. Message: xsql.Message{"f2": "w2"},
  100. }, {
  101. Emitter: "src2",
  102. Message: xsql.Message{"id2": 4, "f2": "w3"},
  103. },
  104. },
  105. },
  106. },
  107. result: xsql.JoinTupleSets{
  108. xsql.JoinTuple{
  109. Tuples: []xsql.Tuple{
  110. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  111. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  112. },
  113. },
  114. xsql.JoinTuple{
  115. Tuples: []xsql.Tuple{
  116. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  117. },
  118. },
  119. xsql.JoinTuple{
  120. Tuples: []xsql.Tuple{
  121. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  122. },
  123. },
  124. },
  125. },
  126. {
  127. sql: "SELECT id1 FROM src1 left join src2 on src1.ts = src2.ts",
  128. data: xsql.WindowTuplesSet{
  129. xsql.WindowTuples{
  130. Emitter: "src1",
  131. Tuples: []xsql.Tuple{
  132. {
  133. Emitter: "src1",
  134. Message: xsql.Message{"id1": 1, "f1": "v1", "ts": common.TimeFromUnixMilli(1568854515000)},
  135. }, {
  136. Emitter: "src1",
  137. Message: xsql.Message{"id1": 2, "f1": "v2", "ts": common.TimeFromUnixMilli(1568854525000)},
  138. }, {
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 3, "f1": "v3", "ts": common.TimeFromUnixMilli(1568854535000)},
  141. },
  142. },
  143. },
  144. xsql.WindowTuples{
  145. Emitter: "src2",
  146. Tuples: []xsql.Tuple{
  147. {
  148. Emitter: "src2",
  149. Message: xsql.Message{"id2": 1, "f2": "w1", "ts": common.TimeFromUnixMilli(1568854515000)},
  150. }, {
  151. Emitter: "src2",
  152. Message: xsql.Message{"id2": 2, "f2": "w2", "ts": common.TimeFromUnixMilli(1568854525000)},
  153. }, {
  154. Emitter: "src2",
  155. Message: xsql.Message{"id2": 4, "f2": "w3", "ts": common.TimeFromUnixMilli(1568854545000)},
  156. },
  157. },
  158. },
  159. },
  160. result: xsql.JoinTupleSets{
  161. xsql.JoinTuple{
  162. Tuples: []xsql.Tuple{
  163. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "ts": common.TimeFromUnixMilli(1568854515000)}},
  164. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1", "ts": common.TimeFromUnixMilli(1568854515000)}},
  165. },
  166. },
  167. xsql.JoinTuple{
  168. Tuples: []xsql.Tuple{
  169. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "ts": common.TimeFromUnixMilli(1568854525000)}},
  170. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2", "ts": common.TimeFromUnixMilli(1568854525000)}},
  171. },
  172. },
  173. xsql.JoinTuple{
  174. Tuples: []xsql.Tuple{
  175. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3", "ts": common.TimeFromUnixMilli(1568854535000)}},
  176. },
  177. },
  178. },
  179. },
  180. {
  181. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  182. data: xsql.WindowTuplesSet{
  183. xsql.WindowTuples{
  184. Emitter: "src1",
  185. Tuples: []xsql.Tuple{
  186. {
  187. Emitter: "src1",
  188. Message: xsql.Message{"id1": 1, "f1": "v1"},
  189. }, {
  190. Emitter: "src1",
  191. Message: xsql.Message{"id1": 2, "f1": "v2"},
  192. }, {
  193. Emitter: "src1",
  194. Message: xsql.Message{"id1": 3, "f1": "v3"},
  195. },
  196. },
  197. },
  198. xsql.WindowTuples{
  199. Emitter: "src2",
  200. Tuples: []xsql.Tuple{
  201. {
  202. Emitter: "src2",
  203. Message: xsql.Message{"id2": 4, "f2": "w1"},
  204. }, {
  205. Emitter: "src2",
  206. Message: xsql.Message{"id2": 5, "f2": "w2"},
  207. }, {
  208. Emitter: "src2",
  209. Message: xsql.Message{"id2": 6, "f2": "w3"},
  210. },
  211. },
  212. },
  213. },
  214. result: nil,
  215. },
  216. {
  217. sql: "SELECT id1 FROM src1 As s1 left join src2 as s2 on s1.id1 = s2.id2",
  218. data: xsql.WindowTuplesSet{
  219. xsql.WindowTuples{
  220. Emitter: "s1",
  221. Tuples: []xsql.Tuple{
  222. {
  223. Emitter: "s1",
  224. Message: xsql.Message{"id1": 1, "f1": "v1"},
  225. }, {
  226. Emitter: "s1",
  227. Message: xsql.Message{"id1": 2, "f1": "v2"},
  228. }, {
  229. Emitter: "s1",
  230. Message: xsql.Message{"id1": 3, "f1": "v3"},
  231. },
  232. },
  233. },
  234. xsql.WindowTuples{
  235. Emitter: "s2",
  236. Tuples: []xsql.Tuple{
  237. {
  238. Emitter: "s2",
  239. Message: xsql.Message{"id2": 1, "f2": "w1"},
  240. }, {
  241. Emitter: "s2",
  242. Message: xsql.Message{"id2": 2, "f2": "w2"},
  243. }, {
  244. Emitter: "s2",
  245. Message: xsql.Message{"id2": 4, "f2": "w3"},
  246. },
  247. },
  248. },
  249. },
  250. result: xsql.JoinTupleSets{
  251. xsql.JoinTuple{
  252. Tuples: []xsql.Tuple{
  253. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  254. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  255. },
  256. },
  257. xsql.JoinTuple{
  258. Tuples: []xsql.Tuple{
  259. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  260. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  261. },
  262. },
  263. xsql.JoinTuple{
  264. Tuples: []xsql.Tuple{
  265. {Emitter: "s1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  266. },
  267. },
  268. },
  269. },
  270. {
  271. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  272. data: xsql.WindowTuplesSet{
  273. xsql.WindowTuples{
  274. Emitter: "src1",
  275. Tuples: []xsql.Tuple{
  276. {
  277. Emitter: "src1",
  278. Message: xsql.Message{"id1": 1, "f1": "v1"},
  279. },
  280. },
  281. },
  282. xsql.WindowTuples{
  283. Emitter: "src2",
  284. Tuples: []xsql.Tuple{
  285. {
  286. Emitter: "src2",
  287. Message: xsql.Message{"id2": 1, "f2": "w1"},
  288. }, {
  289. Emitter: "src2",
  290. Message: xsql.Message{"id2": 1, "f2": "w2"},
  291. },
  292. },
  293. },
  294. },
  295. result: xsql.JoinTupleSets{
  296. xsql.JoinTuple{
  297. Tuples: []xsql.Tuple{
  298. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  299. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  300. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  301. },
  302. },
  303. },
  304. },
  305. {
  306. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  307. data: xsql.WindowTuplesSet{
  308. xsql.WindowTuples{
  309. Emitter: "src1",
  310. Tuples: []xsql.Tuple{
  311. {
  312. Emitter: "src1",
  313. Message: xsql.Message{"id1": 1, "f1": "v1"},
  314. }, {
  315. Emitter: "src1",
  316. Message: xsql.Message{"id1": 2, "f1": "v2"},
  317. }, {
  318. Emitter: "src1",
  319. Message: xsql.Message{"id1": 3, "f1": "v3"},
  320. },
  321. },
  322. },
  323. xsql.WindowTuples{
  324. Emitter: "src2",
  325. Tuples: []xsql.Tuple{},
  326. },
  327. },
  328. result: xsql.JoinTupleSets{
  329. xsql.JoinTuple{
  330. Tuples: []xsql.Tuple{
  331. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  332. },
  333. },
  334. xsql.JoinTuple{
  335. Tuples: []xsql.Tuple{
  336. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  337. },
  338. },
  339. xsql.JoinTuple{
  340. Tuples: []xsql.Tuple{
  341. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  342. },
  343. },
  344. },
  345. },
  346. {
  347. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  348. data: xsql.WindowTuplesSet{
  349. xsql.WindowTuples{
  350. Emitter: "src1",
  351. Tuples: []xsql.Tuple{
  352. {
  353. Emitter: "src1",
  354. Message: xsql.Message{"id1": 1, "f1": "v1"},
  355. }, {
  356. Emitter: "src1",
  357. Message: xsql.Message{"id1": 2, "f1": "v2"},
  358. }, {
  359. Emitter: "src1",
  360. Message: xsql.Message{"id1": 3, "f1": "v3"},
  361. },
  362. },
  363. },
  364. xsql.WindowTuples{
  365. Emitter: "src2",
  366. Tuples: nil,
  367. },
  368. },
  369. result: xsql.JoinTupleSets{
  370. xsql.JoinTuple{
  371. Tuples: []xsql.Tuple{
  372. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  373. },
  374. },
  375. xsql.JoinTuple{
  376. Tuples: []xsql.Tuple{
  377. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  378. },
  379. },
  380. xsql.JoinTuple{
  381. Tuples: []xsql.Tuple{
  382. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  383. },
  384. },
  385. },
  386. },
  387. {
  388. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  389. data: xsql.WindowTuplesSet{
  390. xsql.WindowTuples{
  391. Emitter: "src1",
  392. Tuples: []xsql.Tuple{},
  393. },
  394. xsql.WindowTuples{
  395. Emitter: "src2",
  396. Tuples: []xsql.Tuple{
  397. {
  398. Emitter: "src2",
  399. Message: xsql.Message{"id2": 1, "f2": "w1"},
  400. }, {
  401. Emitter: "src2",
  402. Message: xsql.Message{"id2": 1, "f2": "w2"},
  403. },
  404. },
  405. },
  406. },
  407. result: nil,
  408. },
  409. {
  410. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  411. data: xsql.WindowTuplesSet{
  412. xsql.WindowTuples{
  413. Emitter: "src1",
  414. Tuples: nil,
  415. },
  416. xsql.WindowTuples{
  417. Emitter: "src2",
  418. Tuples: []xsql.Tuple{
  419. {
  420. Emitter: "src2",
  421. Message: xsql.Message{"id2": 1, "f2": "w1"},
  422. }, {
  423. Emitter: "src2",
  424. Message: xsql.Message{"id2": 1, "f2": "w2"},
  425. },
  426. },
  427. },
  428. },
  429. result: nil,
  430. },
  431. {
  432. sql: "SELECT id1 FROM src1 left join src2 on src1.id1*2 = src2.id2",
  433. data: xsql.WindowTuplesSet{
  434. xsql.WindowTuples{
  435. Emitter: "src1",
  436. Tuples: []xsql.Tuple{
  437. {
  438. Emitter: "src1",
  439. Message: xsql.Message{"id1": 1, "f1": "v1"},
  440. }, {
  441. Emitter: "src1",
  442. Message: xsql.Message{"id1": 2, "f1": "v2"},
  443. }, {
  444. Emitter: "src1",
  445. Message: xsql.Message{"id1": 3, "f1": "v3"},
  446. },
  447. },
  448. },
  449. xsql.WindowTuples{
  450. Emitter: "src2",
  451. Tuples: []xsql.Tuple{
  452. {
  453. Emitter: "src2",
  454. Message: xsql.Message{"id2": 1, "f2": "w1"},
  455. }, {
  456. Emitter: "src2",
  457. Message: xsql.Message{"id2": 2, "f2": "w2"},
  458. }, {
  459. Emitter: "src2",
  460. Message: xsql.Message{"id2": 4, "f2": "w3"},
  461. },
  462. },
  463. },
  464. },
  465. result: xsql.JoinTupleSets{
  466. xsql.JoinTuple{
  467. Tuples: []xsql.Tuple{
  468. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  469. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  470. },
  471. },
  472. xsql.JoinTuple{
  473. Tuples: []xsql.Tuple{
  474. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  475. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  476. },
  477. },
  478. xsql.JoinTuple{
  479. Tuples: []xsql.Tuple{
  480. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  481. },
  482. },
  483. },
  484. },
  485. {
  486. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2*2",
  487. data: xsql.WindowTuplesSet{
  488. xsql.WindowTuples{
  489. Emitter: "src1",
  490. Tuples: []xsql.Tuple{
  491. {
  492. Emitter: "src1",
  493. Message: xsql.Message{"id1": 1, "f1": "v1"},
  494. }, {
  495. Emitter: "src1",
  496. Message: xsql.Message{"id1": 2, "f1": "v2"},
  497. }, {
  498. Emitter: "src1",
  499. Message: xsql.Message{"id1": 3, "f1": "v3"},
  500. },
  501. },
  502. },
  503. xsql.WindowTuples{
  504. Emitter: "src2",
  505. Tuples: []xsql.Tuple{
  506. {
  507. Emitter: "src2",
  508. Message: xsql.Message{"id2": 1, "f2": "w1"},
  509. }, {
  510. Emitter: "src2",
  511. Message: xsql.Message{"id2": 2, "f2": "w2"},
  512. }, {
  513. Emitter: "src2",
  514. Message: xsql.Message{"id2": 4, "f2": "w3"},
  515. },
  516. },
  517. },
  518. },
  519. result: xsql.JoinTupleSets{
  520. xsql.JoinTuple{
  521. Tuples: []xsql.Tuple{
  522. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  523. },
  524. },
  525. xsql.JoinTuple{
  526. Tuples: []xsql.Tuple{
  527. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  528. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  529. },
  530. },
  531. xsql.JoinTuple{
  532. Tuples: []xsql.Tuple{
  533. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  534. },
  535. },
  536. },
  537. },
  538. {
  539. sql: "SELECT id1 FROM src1 left join src2 on src1.f1->cid = src2.f2->cid",
  540. data: xsql.WindowTuplesSet{
  541. xsql.WindowTuples{
  542. Emitter: "src1",
  543. Tuples: []xsql.Tuple{
  544. {
  545. Emitter: "src1",
  546. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  547. }, {
  548. Emitter: "src1",
  549. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  550. }, {
  551. Emitter: "src1",
  552. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  553. },
  554. },
  555. },
  556. xsql.WindowTuples{
  557. Emitter: "src2",
  558. Tuples: []xsql.Tuple{
  559. {
  560. Emitter: "src2",
  561. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  562. }, {
  563. Emitter: "src2",
  564. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  565. }, {
  566. Emitter: "src2",
  567. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  568. },
  569. },
  570. },
  571. },
  572. result: xsql.JoinTupleSets{
  573. xsql.JoinTuple{
  574. Tuples: []xsql.Tuple{
  575. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  576. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  577. },
  578. },
  579. xsql.JoinTuple{
  580. Tuples: []xsql.Tuple{
  581. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  582. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  583. },
  584. },
  585. xsql.JoinTuple{
  586. Tuples: []xsql.Tuple{
  587. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)}},
  588. },
  589. },
  590. },
  591. },
  592. {
  593. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 left join src2 on src1.id1 = src2.id2",
  594. data: xsql.WindowTuplesSet{
  595. xsql.WindowTuples{
  596. Emitter: "src1",
  597. Tuples: []xsql.Tuple{
  598. {
  599. Emitter: "src1",
  600. Message: xsql.Message{"id1": 1, "f1": "v1"},
  601. Metadata: xsql.Metadata{"topic": "devices/type1/device001"},
  602. },
  603. },
  604. },
  605. xsql.WindowTuples{
  606. Emitter: "src2",
  607. Tuples: []xsql.Tuple{
  608. {
  609. Emitter: "src2",
  610. Message: xsql.Message{"id2": 1, "f2": "w1"},
  611. Metadata: xsql.Metadata{"topic": "devices/type2/device001"},
  612. },
  613. },
  614. },
  615. },
  616. result: xsql.JoinTupleSets{
  617. xsql.JoinTuple{
  618. Tuples: []xsql.Tuple{
  619. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  620. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  621. },
  622. },
  623. },
  624. },
  625. }
  626. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  627. contextLogger := common.Log.WithField("rule", "TestLeftJoinPlan_Apply")
  628. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  629. for i, tt := range tests {
  630. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  631. if err != nil {
  632. t.Errorf("statement parse error %s", err)
  633. break
  634. }
  635. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  636. t.Errorf("statement source is not a table")
  637. } else {
  638. fv, afv := xsql.NewAggregateFunctionValuers()
  639. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  640. result := pp.Apply(ctx, tt.data, fv, afv)
  641. if !reflect.DeepEqual(tt.result, result) {
  642. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  643. }
  644. }
  645. }
  646. }
  647. func TestInnerJoinPlan_Apply(t *testing.T) {
  648. var tests = []struct {
  649. sql string
  650. data xsql.WindowTuplesSet
  651. result interface{}
  652. }{
  653. {
  654. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  655. data: xsql.WindowTuplesSet{
  656. xsql.WindowTuples{
  657. Emitter: "src1",
  658. Tuples: []xsql.Tuple{
  659. {
  660. Emitter: "src1",
  661. Message: xsql.Message{"id1": 1, "f1": "v1"},
  662. }, {
  663. Emitter: "src1",
  664. Message: xsql.Message{"id1": 2, "f1": "v2"},
  665. }, {
  666. Emitter: "src1",
  667. Message: xsql.Message{"id1": 3, "f1": "v3"},
  668. },
  669. },
  670. },
  671. xsql.WindowTuples{
  672. Emitter: "src2",
  673. Tuples: []xsql.Tuple{
  674. {
  675. Emitter: "src2",
  676. Message: xsql.Message{"id2": 1, "f2": "w1"},
  677. }, {
  678. Emitter: "src2",
  679. Message: xsql.Message{"id2": 2, "f2": "w2"},
  680. }, {
  681. Emitter: "src2",
  682. Message: xsql.Message{"id2": 4, "f2": "w3"},
  683. },
  684. },
  685. },
  686. },
  687. result: xsql.JoinTupleSets{
  688. xsql.JoinTuple{
  689. Tuples: []xsql.Tuple{
  690. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  691. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  692. },
  693. },
  694. xsql.JoinTuple{
  695. Tuples: []xsql.Tuple{
  696. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  697. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  698. },
  699. },
  700. },
  701. },
  702. {
  703. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  704. data: xsql.WindowTuplesSet{
  705. xsql.WindowTuples{
  706. Emitter: "src1",
  707. Tuples: []xsql.Tuple{
  708. {
  709. Emitter: "src1",
  710. Message: xsql.Message{"id1": 1, "f1": "v1"},
  711. }, {
  712. Emitter: "src1",
  713. Message: xsql.Message{"id1": 2, "f1": "v2"},
  714. }, {
  715. Emitter: "src1",
  716. Message: xsql.Message{"id1": 3, "f1": "v3"},
  717. },
  718. },
  719. },
  720. xsql.WindowTuples{
  721. Emitter: "src2",
  722. Tuples: []xsql.Tuple{
  723. {
  724. Emitter: "src2",
  725. Message: xsql.Message{"id2": 1, "f2": "w1"},
  726. }, {
  727. Emitter: "src2",
  728. Message: xsql.Message{"f2": "w2"},
  729. }, {
  730. Emitter: "src2",
  731. Message: xsql.Message{"id2": 4, "f2": "w3"},
  732. },
  733. },
  734. },
  735. },
  736. result: xsql.JoinTupleSets{
  737. xsql.JoinTuple{
  738. Tuples: []xsql.Tuple{
  739. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  740. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  741. },
  742. },
  743. },
  744. },
  745. {
  746. sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
  747. data: xsql.WindowTuplesSet{
  748. xsql.WindowTuples{
  749. Emitter: "s1",
  750. Tuples: []xsql.Tuple{
  751. {
  752. Emitter: "s1",
  753. Message: xsql.Message{"id1": 1, "f1": "v1"},
  754. }, {
  755. Emitter: "s1",
  756. Message: xsql.Message{"id1": 2, "f1": "v2"},
  757. }, {
  758. Emitter: "s1",
  759. Message: xsql.Message{"id1": 3, "f1": "v3"},
  760. },
  761. },
  762. },
  763. xsql.WindowTuples{
  764. Emitter: "s2",
  765. Tuples: []xsql.Tuple{
  766. {
  767. Emitter: "s2",
  768. Message: xsql.Message{"id2": 1, "f2": "w1"},
  769. }, {
  770. Emitter: "s2",
  771. Message: xsql.Message{"id2": 2, "f2": "w2"},
  772. }, {
  773. Emitter: "s2",
  774. Message: xsql.Message{"id2": 4, "f2": "w3"},
  775. },
  776. },
  777. },
  778. },
  779. result: xsql.JoinTupleSets{
  780. xsql.JoinTuple{
  781. Tuples: []xsql.Tuple{
  782. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  783. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  784. },
  785. },
  786. xsql.JoinTuple{
  787. Tuples: []xsql.Tuple{
  788. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  789. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  790. },
  791. },
  792. },
  793. },
  794. {
  795. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  796. data: xsql.WindowTuplesSet{
  797. xsql.WindowTuples{
  798. Emitter: "src1",
  799. Tuples: []xsql.Tuple{
  800. {
  801. Emitter: "src1",
  802. Message: xsql.Message{"id1": 1, "f1": "v1"},
  803. },
  804. },
  805. },
  806. xsql.WindowTuples{
  807. Emitter: "src2",
  808. Tuples: []xsql.Tuple{
  809. {
  810. Emitter: "src2",
  811. Message: xsql.Message{"id2": 1, "f2": "w1"},
  812. }, {
  813. Emitter: "src2",
  814. Message: xsql.Message{"id2": 1, "f2": "w2"},
  815. },
  816. },
  817. },
  818. },
  819. result: xsql.JoinTupleSets{
  820. xsql.JoinTuple{
  821. Tuples: []xsql.Tuple{
  822. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  823. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  824. },
  825. },
  826. xsql.JoinTuple{
  827. Tuples: []xsql.Tuple{
  828. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  829. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  830. },
  831. },
  832. },
  833. },
  834. {
  835. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  836. data: xsql.WindowTuplesSet{
  837. xsql.WindowTuples{
  838. Emitter: "src1",
  839. Tuples: []xsql.Tuple{
  840. {
  841. Emitter: "src1",
  842. Message: xsql.Message{"id1": 1, "f1": "v1"},
  843. }, {
  844. Emitter: "src1",
  845. Message: xsql.Message{"id1": 2, "f1": "v2"},
  846. }, {
  847. Emitter: "src1",
  848. Message: xsql.Message{"id1": 3, "f1": "v3"},
  849. },
  850. },
  851. },
  852. xsql.WindowTuples{
  853. Emitter: "src2",
  854. Tuples: []xsql.Tuple{},
  855. },
  856. },
  857. result: nil,
  858. },
  859. {
  860. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  861. data: xsql.WindowTuplesSet{
  862. xsql.WindowTuples{
  863. Emitter: "src1",
  864. Tuples: []xsql.Tuple{
  865. {
  866. Emitter: "src1",
  867. Message: xsql.Message{"id1": 1, "f1": "v1"},
  868. }, {
  869. Emitter: "src1",
  870. Message: xsql.Message{"id1": 2, "f1": "v2"},
  871. }, {
  872. Emitter: "src1",
  873. Message: xsql.Message{"id1": 3, "f1": "v3"},
  874. },
  875. },
  876. },
  877. xsql.WindowTuples{
  878. Emitter: "src2",
  879. Tuples: nil,
  880. },
  881. },
  882. result: nil,
  883. },
  884. {
  885. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  886. data: xsql.WindowTuplesSet{
  887. xsql.WindowTuples{
  888. Emitter: "src1",
  889. Tuples: []xsql.Tuple{},
  890. },
  891. xsql.WindowTuples{
  892. Emitter: "src2",
  893. Tuples: []xsql.Tuple{
  894. {
  895. Emitter: "src2",
  896. Message: xsql.Message{"id2": 1, "f2": "w1"},
  897. }, {
  898. Emitter: "src2",
  899. Message: xsql.Message{"id2": 1, "f2": "w2"},
  900. },
  901. },
  902. },
  903. },
  904. result: nil,
  905. },
  906. {
  907. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  908. data: xsql.WindowTuplesSet{
  909. xsql.WindowTuples{
  910. Emitter: "src1",
  911. Tuples: nil,
  912. },
  913. xsql.WindowTuples{
  914. Emitter: "src2",
  915. Tuples: []xsql.Tuple{
  916. {
  917. Emitter: "src2",
  918. Message: xsql.Message{"id2": 1, "f2": "w1"},
  919. }, {
  920. Emitter: "src2",
  921. Message: xsql.Message{"id2": 1, "f2": "w2"},
  922. },
  923. },
  924. },
  925. },
  926. result: nil,
  927. },
  928. {
  929. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1*2 = src2.id2",
  930. data: xsql.WindowTuplesSet{
  931. xsql.WindowTuples{
  932. Emitter: "src1",
  933. Tuples: []xsql.Tuple{
  934. {
  935. Emitter: "src1",
  936. Message: xsql.Message{"id1": 1, "f1": "v1"},
  937. }, {
  938. Emitter: "src1",
  939. Message: xsql.Message{"id1": 2, "f1": "v2"},
  940. }, {
  941. Emitter: "src1",
  942. Message: xsql.Message{"id1": 3, "f1": "v3"},
  943. },
  944. },
  945. },
  946. xsql.WindowTuples{
  947. Emitter: "src2",
  948. Tuples: []xsql.Tuple{
  949. {
  950. Emitter: "src2",
  951. Message: xsql.Message{"id2": 1, "f2": "w1"},
  952. }, {
  953. Emitter: "src2",
  954. Message: xsql.Message{"id2": 2, "f2": "w2"},
  955. }, {
  956. Emitter: "src2",
  957. Message: xsql.Message{"id2": 4, "f2": "w3"},
  958. },
  959. },
  960. },
  961. },
  962. result: xsql.JoinTupleSets{
  963. xsql.JoinTuple{
  964. Tuples: []xsql.Tuple{
  965. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  966. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  967. },
  968. },
  969. xsql.JoinTuple{
  970. Tuples: []xsql.Tuple{
  971. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  972. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  973. },
  974. },
  975. },
  976. },
  977. {
  978. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2*2",
  979. data: xsql.WindowTuplesSet{
  980. xsql.WindowTuples{
  981. Emitter: "src1",
  982. Tuples: []xsql.Tuple{
  983. {
  984. Emitter: "src1",
  985. Message: xsql.Message{"id1": 1, "f1": "v1"},
  986. }, {
  987. Emitter: "src1",
  988. Message: xsql.Message{"id1": 2, "f1": "v2"},
  989. }, {
  990. Emitter: "src1",
  991. Message: xsql.Message{"id1": 3, "f1": "v3"},
  992. },
  993. },
  994. },
  995. xsql.WindowTuples{
  996. Emitter: "src2",
  997. Tuples: []xsql.Tuple{
  998. {
  999. Emitter: "src2",
  1000. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1001. }, {
  1002. Emitter: "src2",
  1003. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1004. }, {
  1005. Emitter: "src2",
  1006. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1007. },
  1008. },
  1009. },
  1010. },
  1011. result: xsql.JoinTupleSets{
  1012. xsql.JoinTuple{
  1013. Tuples: []xsql.Tuple{
  1014. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1015. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1016. },
  1017. },
  1018. },
  1019. },
  1020. {
  1021. sql: "SELECT id1 FROM src1 inner join src2 on src1.f1->cid = src2.f2->cid",
  1022. data: xsql.WindowTuplesSet{
  1023. xsql.WindowTuples{
  1024. Emitter: "src1",
  1025. Tuples: []xsql.Tuple{
  1026. {
  1027. Emitter: "src1",
  1028. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  1029. }, {
  1030. Emitter: "src1",
  1031. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  1032. }, {
  1033. Emitter: "src1",
  1034. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  1035. },
  1036. },
  1037. },
  1038. xsql.WindowTuples{
  1039. Emitter: "src2",
  1040. Tuples: []xsql.Tuple{
  1041. {
  1042. Emitter: "src2",
  1043. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  1044. }, {
  1045. Emitter: "src2",
  1046. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  1047. }, {
  1048. Emitter: "src2",
  1049. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  1050. },
  1051. },
  1052. },
  1053. },
  1054. result: xsql.JoinTupleSets{
  1055. xsql.JoinTuple{
  1056. Tuples: []xsql.Tuple{
  1057. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  1058. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  1059. },
  1060. },
  1061. xsql.JoinTuple{
  1062. Tuples: []xsql.Tuple{
  1063. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  1064. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  1065. },
  1066. },
  1067. },
  1068. },
  1069. }
  1070. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1071. contextLogger := common.Log.WithField("rule", "TestInnerJoinPlan_Apply")
  1072. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1073. for i, tt := range tests {
  1074. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1075. if err != nil {
  1076. t.Errorf("statement parse error %s", err)
  1077. break
  1078. }
  1079. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1080. t.Errorf("statement source is not a table")
  1081. } else {
  1082. fv, afv := xsql.NewAggregateFunctionValuers()
  1083. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1084. result := pp.Apply(ctx, tt.data, fv, afv)
  1085. if !reflect.DeepEqual(tt.result, result) {
  1086. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1087. }
  1088. }
  1089. }
  1090. }
  1091. func TestRightJoinPlan_Apply(t *testing.T) {
  1092. var tests = []struct {
  1093. sql string
  1094. data xsql.WindowTuplesSet
  1095. result interface{}
  1096. }{
  1097. {
  1098. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1099. data: xsql.WindowTuplesSet{
  1100. xsql.WindowTuples{
  1101. Emitter: "src1",
  1102. Tuples: []xsql.Tuple{
  1103. {
  1104. Emitter: "src1",
  1105. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1106. }, {
  1107. Emitter: "src1",
  1108. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1109. }, {
  1110. Emitter: "src1",
  1111. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1112. },
  1113. },
  1114. },
  1115. xsql.WindowTuples{
  1116. Emitter: "src2",
  1117. Tuples: []xsql.Tuple{
  1118. {
  1119. Emitter: "src2",
  1120. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1121. }, {
  1122. Emitter: "src2",
  1123. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1124. }, {
  1125. Emitter: "src2",
  1126. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1127. },
  1128. },
  1129. },
  1130. },
  1131. result: xsql.JoinTupleSets{
  1132. xsql.JoinTuple{
  1133. Tuples: []xsql.Tuple{
  1134. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1135. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1136. },
  1137. },
  1138. xsql.JoinTuple{
  1139. Tuples: []xsql.Tuple{
  1140. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1141. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1142. },
  1143. },
  1144. xsql.JoinTuple{
  1145. Tuples: []xsql.Tuple{
  1146. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1147. },
  1148. },
  1149. },
  1150. },
  1151. {
  1152. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1153. data: xsql.WindowTuplesSet{
  1154. xsql.WindowTuples{
  1155. Emitter: "src1",
  1156. Tuples: []xsql.Tuple{
  1157. {
  1158. Emitter: "src1",
  1159. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1160. }, {
  1161. Emitter: "src1",
  1162. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1163. }, {
  1164. Emitter: "src1",
  1165. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1166. },
  1167. },
  1168. },
  1169. xsql.WindowTuples{
  1170. Emitter: "src2",
  1171. Tuples: []xsql.Tuple{
  1172. {
  1173. Emitter: "src2",
  1174. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1175. }, {
  1176. Emitter: "src2",
  1177. Message: xsql.Message{"f2": "w2"},
  1178. }, {
  1179. Emitter: "src2",
  1180. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1181. },
  1182. },
  1183. },
  1184. },
  1185. result: xsql.JoinTupleSets{
  1186. xsql.JoinTuple{
  1187. Tuples: []xsql.Tuple{
  1188. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1189. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1190. },
  1191. },
  1192. xsql.JoinTuple{
  1193. Tuples: []xsql.Tuple{
  1194. {Emitter: "src2", Message: xsql.Message{"f2": "w2"}},
  1195. },
  1196. },
  1197. xsql.JoinTuple{
  1198. Tuples: []xsql.Tuple{
  1199. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1200. },
  1201. },
  1202. },
  1203. },
  1204. {
  1205. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1206. data: xsql.WindowTuplesSet{
  1207. xsql.WindowTuples{
  1208. Emitter: "src1",
  1209. Tuples: []xsql.Tuple{
  1210. {
  1211. Emitter: "src1",
  1212. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1213. },
  1214. },
  1215. },
  1216. xsql.WindowTuples{
  1217. Emitter: "src2",
  1218. Tuples: []xsql.Tuple{
  1219. {
  1220. Emitter: "src2",
  1221. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1222. }, {
  1223. Emitter: "src2",
  1224. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1225. },
  1226. },
  1227. },
  1228. },
  1229. result: xsql.JoinTupleSets{
  1230. xsql.JoinTuple{
  1231. Tuples: []xsql.Tuple{
  1232. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1233. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1234. },
  1235. },
  1236. xsql.JoinTuple{
  1237. Tuples: []xsql.Tuple{
  1238. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1239. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1240. },
  1241. },
  1242. },
  1243. },
  1244. }
  1245. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1246. contextLogger := common.Log.WithField("rule", "TestRightJoinPlan_Apply")
  1247. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1248. for i, tt := range tests {
  1249. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1250. if err != nil {
  1251. t.Errorf("statement parse error %s", err)
  1252. break
  1253. }
  1254. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1255. t.Errorf("statement source is not a table")
  1256. } else {
  1257. fv, afv := xsql.NewAggregateFunctionValuers()
  1258. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1259. result := pp.Apply(ctx, tt.data, fv, afv)
  1260. if !reflect.DeepEqual(tt.result, result) {
  1261. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1262. }
  1263. }
  1264. }
  1265. }
  1266. func TestFullJoinPlan_Apply(t *testing.T) {
  1267. var tests = []struct {
  1268. sql string
  1269. data xsql.WindowTuplesSet
  1270. result interface{}
  1271. }{
  1272. {
  1273. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1274. data: xsql.WindowTuplesSet{
  1275. xsql.WindowTuples{
  1276. Emitter: "src1",
  1277. Tuples: []xsql.Tuple{
  1278. {
  1279. Emitter: "src1",
  1280. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1281. }, {
  1282. Emitter: "src1",
  1283. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1284. }, {
  1285. Emitter: "src1",
  1286. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1287. },
  1288. },
  1289. },
  1290. xsql.WindowTuples{
  1291. Emitter: "src2",
  1292. Tuples: []xsql.Tuple{
  1293. {
  1294. Emitter: "src2",
  1295. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1296. }, {
  1297. Emitter: "src2",
  1298. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1299. }, {
  1300. Emitter: "src2",
  1301. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1302. }, {
  1303. Emitter: "src2",
  1304. Message: xsql.Message{"id2": 2, "f2": "w4"},
  1305. },
  1306. },
  1307. },
  1308. },
  1309. result: xsql.JoinTupleSets{
  1310. xsql.JoinTuple{
  1311. Tuples: []xsql.Tuple{
  1312. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1313. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1314. },
  1315. },
  1316. xsql.JoinTuple{
  1317. Tuples: []xsql.Tuple{
  1318. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1319. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1320. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w4"}},
  1321. },
  1322. },
  1323. xsql.JoinTuple{
  1324. Tuples: []xsql.Tuple{
  1325. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1326. },
  1327. },
  1328. xsql.JoinTuple{
  1329. Tuples: []xsql.Tuple{
  1330. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1331. },
  1332. },
  1333. },
  1334. },
  1335. {
  1336. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1337. data: xsql.WindowTuplesSet{
  1338. xsql.WindowTuples{
  1339. Emitter: "src1",
  1340. Tuples: []xsql.Tuple{
  1341. {
  1342. Emitter: "src1",
  1343. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1344. }, {
  1345. Emitter: "src1",
  1346. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1347. }, {
  1348. Emitter: "src1",
  1349. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1350. },
  1351. },
  1352. },
  1353. xsql.WindowTuples{
  1354. Emitter: "src2",
  1355. Tuples: []xsql.Tuple{
  1356. {
  1357. Emitter: "src2",
  1358. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1359. }, {
  1360. Emitter: "src2",
  1361. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1362. }, {
  1363. Emitter: "src2",
  1364. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1365. },
  1366. },
  1367. },
  1368. },
  1369. result: xsql.JoinTupleSets{
  1370. xsql.JoinTuple{
  1371. Tuples: []xsql.Tuple{
  1372. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1373. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1374. },
  1375. },
  1376. xsql.JoinTuple{
  1377. Tuples: []xsql.Tuple{
  1378. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1379. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1380. },
  1381. },
  1382. xsql.JoinTuple{
  1383. Tuples: []xsql.Tuple{
  1384. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1385. },
  1386. },
  1387. xsql.JoinTuple{
  1388. Tuples: []xsql.Tuple{
  1389. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1390. },
  1391. },
  1392. },
  1393. },
  1394. {
  1395. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1396. data: xsql.WindowTuplesSet{
  1397. xsql.WindowTuples{
  1398. Emitter: "src1",
  1399. Tuples: []xsql.Tuple{
  1400. {
  1401. Emitter: "src1",
  1402. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1403. }, {
  1404. Emitter: "src1",
  1405. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1406. }, {
  1407. Emitter: "src1",
  1408. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1409. },
  1410. },
  1411. },
  1412. xsql.WindowTuples{
  1413. Emitter: "src2",
  1414. Tuples: []xsql.Tuple{},
  1415. },
  1416. },
  1417. result: xsql.JoinTupleSets{
  1418. xsql.JoinTuple{
  1419. Tuples: []xsql.Tuple{
  1420. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1421. },
  1422. },
  1423. xsql.JoinTuple{
  1424. Tuples: []xsql.Tuple{
  1425. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1426. },
  1427. },
  1428. xsql.JoinTuple{
  1429. Tuples: []xsql.Tuple{
  1430. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1431. },
  1432. },
  1433. },
  1434. },
  1435. {
  1436. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1437. data: xsql.WindowTuplesSet{
  1438. xsql.WindowTuples{
  1439. Emitter: "src1",
  1440. Tuples: []xsql.Tuple{},
  1441. },
  1442. xsql.WindowTuples{
  1443. Emitter: "src2",
  1444. Tuples: []xsql.Tuple{
  1445. {
  1446. Emitter: "src2",
  1447. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1448. }, {
  1449. Emitter: "src2",
  1450. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1451. }, {
  1452. Emitter: "src2",
  1453. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1454. },
  1455. },
  1456. },
  1457. },
  1458. result: xsql.JoinTupleSets{
  1459. xsql.JoinTuple{
  1460. Tuples: []xsql.Tuple{
  1461. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1462. },
  1463. },
  1464. xsql.JoinTuple{
  1465. Tuples: []xsql.Tuple{
  1466. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1467. },
  1468. },
  1469. xsql.JoinTuple{
  1470. Tuples: []xsql.Tuple{
  1471. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1472. },
  1473. },
  1474. },
  1475. },
  1476. }
  1477. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1478. contextLogger := common.Log.WithField("rule", "TestFullJoinPlan_Apply")
  1479. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1480. for i, tt := range tests {
  1481. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1482. if err != nil {
  1483. t.Errorf("statement parse error %s", err)
  1484. break
  1485. }
  1486. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1487. t.Errorf("statement source is not a table")
  1488. } else {
  1489. fv, afv := xsql.NewAggregateFunctionValuers()
  1490. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1491. result := pp.Apply(ctx, tt.data, fv, afv)
  1492. if !reflect.DeepEqual(tt.result, result) {
  1493. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1494. }
  1495. }
  1496. }
  1497. }
  1498. func TestCrossJoinPlan_Apply(t *testing.T) {
  1499. var tests = []struct {
  1500. sql string
  1501. data xsql.WindowTuplesSet
  1502. result interface{}
  1503. }{
  1504. {
  1505. sql: "SELECT id1 FROM src1 cross join src2",
  1506. data: xsql.WindowTuplesSet{
  1507. xsql.WindowTuples{
  1508. Emitter: "src1",
  1509. Tuples: []xsql.Tuple{
  1510. {
  1511. Emitter: "src1",
  1512. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1513. }, {
  1514. Emitter: "src1",
  1515. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1516. }, {
  1517. Emitter: "src1",
  1518. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1519. },
  1520. },
  1521. },
  1522. xsql.WindowTuples{
  1523. Emitter: "src2",
  1524. Tuples: []xsql.Tuple{
  1525. {
  1526. Emitter: "src2",
  1527. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1528. }, {
  1529. Emitter: "src2",
  1530. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1531. }, {
  1532. Emitter: "src2",
  1533. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1534. },
  1535. },
  1536. },
  1537. },
  1538. result: xsql.JoinTupleSets{
  1539. xsql.JoinTuple{
  1540. Tuples: []xsql.Tuple{
  1541. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1542. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1543. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1544. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1545. },
  1546. },
  1547. xsql.JoinTuple{
  1548. Tuples: []xsql.Tuple{
  1549. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1550. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1551. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1552. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1553. },
  1554. },
  1555. xsql.JoinTuple{
  1556. Tuples: []xsql.Tuple{
  1557. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1558. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1559. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1560. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1561. },
  1562. },
  1563. },
  1564. },
  1565. {
  1566. sql: "SELECT id1 FROM src1 cross join src2",
  1567. data: xsql.WindowTuplesSet{
  1568. xsql.WindowTuples{
  1569. Emitter: "src1",
  1570. Tuples: []xsql.Tuple{
  1571. {
  1572. Emitter: "src1",
  1573. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1574. },
  1575. },
  1576. },
  1577. xsql.WindowTuples{
  1578. Emitter: "src2",
  1579. Tuples: []xsql.Tuple{
  1580. {
  1581. Emitter: "src2",
  1582. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1583. }, {
  1584. Emitter: "src2",
  1585. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1586. },
  1587. },
  1588. },
  1589. },
  1590. result: xsql.JoinTupleSets{
  1591. xsql.JoinTuple{
  1592. Tuples: []xsql.Tuple{
  1593. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1594. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1595. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1596. },
  1597. },
  1598. },
  1599. },
  1600. }
  1601. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1602. contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  1603. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1604. for i, tt := range tests {
  1605. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1606. if err != nil {
  1607. t.Errorf("statement parse error %s", err)
  1608. break
  1609. }
  1610. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1611. t.Errorf("statement source is not a table")
  1612. } else {
  1613. fv, afv := xsql.NewAggregateFunctionValuers()
  1614. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1615. result := pp.Apply(ctx, tt.data, fv, afv)
  1616. if !reflect.DeepEqual(tt.result, result) {
  1617. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1618. }
  1619. }
  1620. }
  1621. }
  1622. func TestCrossJoinPlanError(t *testing.T) {
  1623. var tests = []struct {
  1624. sql string
  1625. data interface{}
  1626. result interface{}
  1627. }{
  1628. {
  1629. sql: "SELECT id1 FROM src1 cross join src2",
  1630. data: errors.New("an error from upstream"),
  1631. result: errors.New("an error from upstream"),
  1632. }, {
  1633. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1634. data: xsql.WindowTuplesSet{
  1635. xsql.WindowTuples{
  1636. Emitter: "src1",
  1637. Tuples: []xsql.Tuple{
  1638. {
  1639. Emitter: "src1",
  1640. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1641. }, {
  1642. Emitter: "src1",
  1643. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1644. }, {
  1645. Emitter: "src1",
  1646. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1647. },
  1648. },
  1649. },
  1650. xsql.WindowTuples{
  1651. Emitter: "src2",
  1652. Tuples: []xsql.Tuple{
  1653. {
  1654. Emitter: "src2",
  1655. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1656. }, {
  1657. Emitter: "src2",
  1658. Message: xsql.Message{"id2": "3", "f2": "w2"},
  1659. }, {
  1660. Emitter: "src2",
  1661. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1662. }, {
  1663. Emitter: "src2",
  1664. Message: xsql.Message{"id2": 2, "f2": "w4"},
  1665. },
  1666. },
  1667. },
  1668. },
  1669. result: errors.New("run Join error: invalid operation int64(1) = string(3)"),
  1670. },
  1671. }
  1672. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1673. contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  1674. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1675. for i, tt := range tests {
  1676. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1677. if err != nil {
  1678. t.Errorf("statement parse error %s", err)
  1679. break
  1680. }
  1681. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1682. t.Errorf("statement source is not a table")
  1683. } else {
  1684. fv, afv := xsql.NewAggregateFunctionValuers()
  1685. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1686. result := pp.Apply(ctx, tt.data, fv, afv)
  1687. if !reflect.DeepEqual(tt.result, result) {
  1688. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1689. }
  1690. }
  1691. }
  1692. }
  1693. func str2Map(s string) map[string]interface{} {
  1694. var input map[string]interface{}
  1695. if err := json.Unmarshal([]byte(s), &input); err != nil {
  1696. fmt.Printf("Failed to parse the JSON data.\n")
  1697. return nil
  1698. }
  1699. return input
  1700. }