join_test.go 40 KB


  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestLeftJoinPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data xsql.WindowTuplesSet
  16. result interface{}
  17. }{
  18. {
  19. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  20. data: xsql.WindowTuplesSet{
  21. xsql.WindowTuples{
  22. Emitter:"src1",
  23. Tuples:[]xsql.Tuple{
  24. {
  25. Emitter: "src1",
  26. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  27. },{
  28. Emitter: "src1",
  29. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  30. },{
  31. Emitter: "src1",
  32. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  33. },
  34. },
  35. },
  36. xsql.WindowTuples{
  37. Emitter:"src2",
  38. Tuples:[]xsql.Tuple{
  39. {
  40. Emitter: "src2",
  41. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  42. },{
  43. Emitter: "src2",
  44. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  45. },{
  46. Emitter: "src2",
  47. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  48. },
  49. },
  50. },
  51. },
  52. result: xsql.JoinTupleSets{
  53. xsql.JoinTuple{
  54. Tuples: []xsql.Tuple{
  55. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  56. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  57. },
  58. },
  59. xsql.JoinTuple{
  60. Tuples: []xsql.Tuple{
  61. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  62. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2" },},
  63. },
  64. },
  65. xsql.JoinTuple{
  66. Tuples: []xsql.Tuple{
  67. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v3" },},
  68. },
  69. },
  70. },
  71. },
  72. {
  73. sql: "SELECT id1 FROM src1 left join src2 on src1.ts = src2.ts",
  74. data: xsql.WindowTuplesSet{
  75. xsql.WindowTuples{
  76. Emitter:"src1",
  77. Tuples:[]xsql.Tuple{
  78. {
  79. Emitter: "src1",
  80. Message: xsql.Message{ "id1" : 1, "f1" : "v1", "ts": common.TimeFromUnixMilli(1568854515000)},
  81. },{
  82. Emitter: "src1",
  83. Message: xsql.Message{ "id1" : 2, "f1" : "v2", "ts": common.TimeFromUnixMilli(1568854525000) },
  84. },{
  85. Emitter: "src1",
  86. Message: xsql.Message{ "id1" : 3, "f1" : "v3", "ts": common.TimeFromUnixMilli(1568854535000) },
  87. },
  88. },
  89. },
  90. xsql.WindowTuples{
  91. Emitter:"src2",
  92. Tuples:[]xsql.Tuple{
  93. {
  94. Emitter: "src2",
  95. Message: xsql.Message{ "id2" : 1, "f2" : "w1", "ts": common.TimeFromUnixMilli(1568854515000) },
  96. },{
  97. Emitter: "src2",
  98. Message: xsql.Message{ "id2" : 2, "f2" : "w2", "ts": common.TimeFromUnixMilli(1568854525000) },
  99. },{
  100. Emitter: "src2",
  101. Message: xsql.Message{ "id2" : 4, "f2" : "w3", "ts": common.TimeFromUnixMilli(1568854545000) },
  102. },
  103. },
  104. },
  105. },
  106. result: xsql.JoinTupleSets{
  107. xsql.JoinTuple{
  108. Tuples: []xsql.Tuple{
  109. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1", "ts": common.TimeFromUnixMilli(1568854515000) },},
  110. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1", "ts": common.TimeFromUnixMilli(1568854515000) },},
  111. },
  112. },
  113. xsql.JoinTuple{
  114. Tuples: []xsql.Tuple{
  115. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2", "ts": common.TimeFromUnixMilli(1568854525000) },},
  116. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2", "ts": common.TimeFromUnixMilli(1568854525000) },},
  117. },
  118. },
  119. xsql.JoinTuple{
  120. Tuples: []xsql.Tuple{
  121. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v3", "ts": common.TimeFromUnixMilli(1568854535000) },},
  122. },
  123. },
  124. },
  125. },
  126. {
  127. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  128. data: xsql.WindowTuplesSet{
  129. xsql.WindowTuples{
  130. Emitter:"src1",
  131. Tuples:[]xsql.Tuple{
  132. {
  133. Emitter: "src1",
  134. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  135. },{
  136. Emitter: "src1",
  137. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  138. },{
  139. Emitter: "src1",
  140. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  141. },
  142. },
  143. },
  144. xsql.WindowTuples{
  145. Emitter:"src2",
  146. Tuples:[]xsql.Tuple{
  147. {
  148. Emitter: "src2",
  149. Message: xsql.Message{ "id2" : 4, "f2" : "w1" },
  150. },{
  151. Emitter: "src2",
  152. Message: xsql.Message{ "id2" : 5, "f2" : "w2" },
  153. },{
  154. Emitter: "src2",
  155. Message: xsql.Message{ "id2" : 6, "f2" : "w3" },
  156. },
  157. },
  158. },
  159. },
  160. result: nil,
  161. },
  162. {
  163. sql: "SELECT id1 FROM src1 As s1 left join src2 as s2 on s1.id1 = s2.id2",
  164. data: xsql.WindowTuplesSet{
  165. xsql.WindowTuples{
  166. Emitter:"s1",
  167. Tuples:[]xsql.Tuple{
  168. {
  169. Emitter: "s1",
  170. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  171. },{
  172. Emitter: "s1",
  173. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  174. },{
  175. Emitter: "s1",
  176. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  177. },
  178. },
  179. },
  180. xsql.WindowTuples{
  181. Emitter:"s2",
  182. Tuples:[]xsql.Tuple{
  183. {
  184. Emitter: "s2",
  185. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  186. },{
  187. Emitter: "s2",
  188. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  189. },{
  190. Emitter: "s2",
  191. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  192. },
  193. },
  194. },
  195. },
  196. result: xsql.JoinTupleSets{
  197. xsql.JoinTuple{
  198. Tuples: []xsql.Tuple{
  199. {Emitter: "s1", Message: xsql.Message{ "id1" : 1, "f1" : "v1"},},
  200. {Emitter: "s2", Message: xsql.Message{ "id2" : 1, "f2" : "w1"},},
  201. },
  202. },
  203. xsql.JoinTuple{
  204. Tuples: []xsql.Tuple{
  205. {Emitter: "s1", Message: xsql.Message{ "id1" : 2, "f1" : "v2"},},
  206. {Emitter: "s2", Message: xsql.Message{ "id2" : 2, "f2" : "w2"},},
  207. },
  208. },
  209. xsql.JoinTuple{
  210. Tuples: []xsql.Tuple{
  211. {Emitter: "s1", Message: xsql.Message{ "id1" : 3, "f1" : "v3"},},
  212. },
  213. },
  214. },
  215. },
  216. {
  217. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  218. data: xsql.WindowTuplesSet{
  219. xsql.WindowTuples{
  220. Emitter:"src1",
  221. Tuples:[]xsql.Tuple{
  222. {
  223. Emitter: "src1",
  224. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  225. },
  226. },
  227. },
  228. xsql.WindowTuples{
  229. Emitter:"src2",
  230. Tuples:[]xsql.Tuple{
  231. {
  232. Emitter: "src2",
  233. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  234. },{
  235. Emitter: "src2",
  236. Message: xsql.Message{ "id2" : 1, "f2" : "w2" },
  237. },
  238. },
  239. },
  240. },
  241. result: xsql.JoinTupleSets{
  242. xsql.JoinTuple{
  243. Tuples: []xsql.Tuple{
  244. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  245. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  246. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w2" },},
  247. },
  248. },
  249. },
  250. },
  251. {
  252. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  253. data: xsql.WindowTuplesSet{
  254. xsql.WindowTuples{
  255. Emitter:"src1",
  256. Tuples:[]xsql.Tuple{
  257. {
  258. Emitter: "src1",
  259. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  260. },{
  261. Emitter: "src1",
  262. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  263. },{
  264. Emitter: "src1",
  265. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  266. },
  267. },
  268. },
  269. xsql.WindowTuples{
  270. Emitter:"src2",
  271. Tuples:[]xsql.Tuple{
  272. },
  273. },
  274. },
  275. result: xsql.JoinTupleSets{
  276. xsql.JoinTuple{
  277. Tuples: []xsql.Tuple{
  278. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  279. },
  280. },
  281. xsql.JoinTuple{
  282. Tuples: []xsql.Tuple{
  283. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  284. },
  285. },
  286. xsql.JoinTuple{
  287. Tuples: []xsql.Tuple{
  288. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v3" },},
  289. },
  290. },
  291. },
  292. },
  293. {
  294. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  295. data: xsql.WindowTuplesSet{
  296. xsql.WindowTuples{
  297. Emitter:"src1",
  298. Tuples:[]xsql.Tuple{
  299. {
  300. Emitter: "src1",
  301. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  302. },{
  303. Emitter: "src1",
  304. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  305. },{
  306. Emitter: "src1",
  307. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  308. },
  309. },
  310. },
  311. xsql.WindowTuples{
  312. Emitter: "src2",
  313. Tuples: nil,
  314. },
  315. },
  316. result: xsql.JoinTupleSets{
  317. xsql.JoinTuple{
  318. Tuples: []xsql.Tuple{
  319. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  320. },
  321. },
  322. xsql.JoinTuple{
  323. Tuples: []xsql.Tuple{
  324. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  325. },
  326. },
  327. xsql.JoinTuple{
  328. Tuples: []xsql.Tuple{
  329. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v3" },},
  330. },
  331. },
  332. },
  333. },
  334. {
  335. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  336. data: xsql.WindowTuplesSet{
  337. xsql.WindowTuples{
  338. Emitter:"src1",
  339. Tuples:[]xsql.Tuple{},
  340. },
  341. xsql.WindowTuples{
  342. Emitter:"src2",
  343. Tuples:[]xsql.Tuple{
  344. {
  345. Emitter: "src2",
  346. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  347. },{
  348. Emitter: "src2",
  349. Message: xsql.Message{ "id2" : 1, "f2" : "w2" },
  350. },
  351. },
  352. },
  353. },
  354. result: nil,
  355. },
  356. {
  357. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  358. data: xsql.WindowTuplesSet{
  359. xsql.WindowTuples{
  360. Emitter:"src1",
  361. Tuples:nil,
  362. },
  363. xsql.WindowTuples{
  364. Emitter:"src2",
  365. Tuples:[]xsql.Tuple{
  366. {
  367. Emitter: "src2",
  368. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  369. },{
  370. Emitter: "src2",
  371. Message: xsql.Message{ "id2" : 1, "f2" : "w2" },
  372. },
  373. },
  374. },
  375. },
  376. result: nil,
  377. },
  378. {
  379. sql: "SELECT id1 FROM src1 left join src2 on src1.id1*2 = src2.id2",
  380. data: xsql.WindowTuplesSet{
  381. xsql.WindowTuples{
  382. Emitter:"src1",
  383. Tuples:[]xsql.Tuple{
  384. {
  385. Emitter: "src1",
  386. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  387. },{
  388. Emitter: "src1",
  389. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  390. },{
  391. Emitter: "src1",
  392. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  393. },
  394. },
  395. },
  396. xsql.WindowTuples{
  397. Emitter:"src2",
  398. Tuples:[]xsql.Tuple{
  399. {
  400. Emitter: "src2",
  401. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  402. },{
  403. Emitter: "src2",
  404. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  405. },{
  406. Emitter: "src2",
  407. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  408. },
  409. },
  410. },
  411. },
  412. result: xsql.JoinTupleSets{
  413. xsql.JoinTuple{
  414. Tuples: []xsql.Tuple{
  415. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  416. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2" },},
  417. },
  418. },
  419. xsql.JoinTuple{
  420. Tuples: []xsql.Tuple{
  421. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  422. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3" },},
  423. },
  424. },
  425. xsql.JoinTuple{
  426. Tuples: []xsql.Tuple{
  427. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v3" },},
  428. },
  429. },
  430. },
  431. },
  432. {
  433. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2*2",
  434. data: xsql.WindowTuplesSet{
  435. xsql.WindowTuples{
  436. Emitter:"src1",
  437. Tuples:[]xsql.Tuple{
  438. {
  439. Emitter: "src1",
  440. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  441. },{
  442. Emitter: "src1",
  443. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  444. },{
  445. Emitter: "src1",
  446. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  447. },
  448. },
  449. },
  450. xsql.WindowTuples{
  451. Emitter:"src2",
  452. Tuples:[]xsql.Tuple{
  453. {
  454. Emitter: "src2",
  455. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  456. },{
  457. Emitter: "src2",
  458. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  459. },{
  460. Emitter: "src2",
  461. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  462. },
  463. },
  464. },
  465. },
  466. result: xsql.JoinTupleSets{
  467. xsql.JoinTuple{
  468. Tuples: []xsql.Tuple{
  469. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  470. },
  471. },
  472. xsql.JoinTuple{
  473. Tuples: []xsql.Tuple{
  474. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  475. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  476. },
  477. },
  478. xsql.JoinTuple{
  479. Tuples: []xsql.Tuple{
  480. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v3" },},
  481. },
  482. },
  483. },
  484. },
  485. {
  486. sql: "SELECT id1 FROM src1 left join src2 on src1.f1->cid = src2.f2->cid",
  487. data: xsql.WindowTuplesSet{
  488. xsql.WindowTuples{
  489. Emitter:"src1",
  490. Tuples:[]xsql.Tuple{
  491. {
  492. Emitter: "src1",
  493. Message: xsql.Message{ "id1" : 1, "f1" : str2Map(`{"cid" : 1, "name" : "tom1"}`) },
  494. },{
  495. Emitter: "src1",
  496. Message: xsql.Message{ "id1" : 2, "f1" : str2Map(`{"cid" : 2, "name" : "mike1"}`) },
  497. },{
  498. Emitter: "src1",
  499. Message: xsql.Message{ "id1" : 3, "f1" : str2Map(`{"cid" : 3, "name" : "alice1"}`) },
  500. },
  501. },
  502. },
  503. xsql.WindowTuples{
  504. Emitter:"src2",
  505. Tuples:[]xsql.Tuple{
  506. {
  507. Emitter: "src2",
  508. Message: xsql.Message{ "id2" : 1, "f2" : str2Map(`{"cid" : 1, "name" : "tom2"}`) },
  509. },{
  510. Emitter: "src2",
  511. Message: xsql.Message{ "id2" : 2, "f2" : str2Map(`{"cid" : 2, "name" : "mike2"}`) },
  512. },{
  513. Emitter: "src2",
  514. Message: xsql.Message{ "id2" : 4, "f2" : str2Map(`{"cid" : 4, "name" : "alice2"}`) },
  515. },
  516. },
  517. },
  518. },
  519. result: xsql.JoinTupleSets{
  520. xsql.JoinTuple{
  521. Tuples: []xsql.Tuple{
  522. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : str2Map(`{"cid" : 1, "name" : "tom1"}`) },},
  523. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : str2Map(`{"cid" : 1, "name" : "tom2"}`) },},
  524. },
  525. },
  526. xsql.JoinTuple{
  527. Tuples: []xsql.Tuple{
  528. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : str2Map(`{"cid" : 2, "name" : "mike1"}`) },},
  529. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : str2Map(`{"cid" : 2, "name" : "mike2"}`) },},
  530. },
  531. },
  532. xsql.JoinTuple{
  533. Tuples: []xsql.Tuple{
  534. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : str2Map(`{"cid" : 3, "name" : "alice1"}`) },},
  535. },
  536. },
  537. },
  538. },
  539. {
  540. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 left join src2 on src1.id1 = src2.id2",
  541. data: xsql.WindowTuplesSet{
  542. xsql.WindowTuples{
  543. Emitter:"src1",
  544. Tuples:[]xsql.Tuple{
  545. {
  546. Emitter: "src1",
  547. Message: xsql.Message{ "id1" : 1, "f1" : "v1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"},
  548. },
  549. },
  550. },
  551. xsql.WindowTuples{
  552. Emitter:"src2",
  553. Tuples:[]xsql.Tuple{
  554. {
  555. Emitter: "src2",
  556. Message: xsql.Message{ "id2" : 1, "f2" : "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001" },
  557. },
  558. },
  559. },
  560. },
  561. result: xsql.JoinTupleSets{
  562. xsql.JoinTuple{
  563. Tuples: []xsql.Tuple{
  564. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" , xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"},},
  565. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001" },},
  566. },
  567. },
  568. },
  569. },
  570. }
  571. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  572. contextLogger := common.Log.WithField("rule", "TestLeftJoinPlan_Apply")
  573. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  574. for i, tt := range tests {
  575. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  576. if err != nil {
  577. t.Errorf("statement parse error %s", err)
  578. break
  579. }
  580. if table, ok := stmt.Sources[0].(*xsql.Table); !ok{
  581. t.Errorf("statement source is not a table")
  582. }else{
  583. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  584. result := pp.Apply(ctx, tt.data)
  585. if !reflect.DeepEqual(tt.result, result) {
  586. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  587. }
  588. }
  589. }
  590. }
  591. func TestInnerJoinPlan_Apply(t *testing.T) {
  592. var tests = []struct {
  593. sql string
  594. data xsql.WindowTuplesSet
  595. result interface{}
  596. }{
  597. {
  598. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  599. data: xsql.WindowTuplesSet{
  600. xsql.WindowTuples{
  601. Emitter:"src1",
  602. Tuples:[]xsql.Tuple{
  603. {
  604. Emitter: "src1",
  605. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  606. },{
  607. Emitter: "src1",
  608. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  609. },{
  610. Emitter: "src1",
  611. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  612. },
  613. },
  614. },
  615. xsql.WindowTuples{
  616. Emitter:"src2",
  617. Tuples:[]xsql.Tuple{
  618. {
  619. Emitter: "src2",
  620. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  621. },{
  622. Emitter: "src2",
  623. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  624. },{
  625. Emitter: "src2",
  626. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  627. },
  628. },
  629. },
  630. },
  631. result: xsql.JoinTupleSets{
  632. xsql.JoinTuple{
  633. Tuples: []xsql.Tuple{
  634. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  635. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  636. },
  637. },
  638. xsql.JoinTuple{
  639. Tuples: []xsql.Tuple{
  640. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  641. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2" },},
  642. },
  643. },
  644. },
  645. },
  646. {
  647. sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
  648. data: xsql.WindowTuplesSet{
  649. xsql.WindowTuples{
  650. Emitter:"s1",
  651. Tuples:[]xsql.Tuple{
  652. {
  653. Emitter: "s1",
  654. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  655. },{
  656. Emitter: "s1",
  657. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  658. },{
  659. Emitter: "s1",
  660. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  661. },
  662. },
  663. },
  664. xsql.WindowTuples{
  665. Emitter:"s2",
  666. Tuples:[]xsql.Tuple{
  667. {
  668. Emitter: "s2",
  669. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  670. },{
  671. Emitter: "s2",
  672. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  673. },{
  674. Emitter: "s2",
  675. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  676. },
  677. },
  678. },
  679. },
  680. result: xsql.JoinTupleSets{
  681. xsql.JoinTuple{
  682. Tuples: []xsql.Tuple{
  683. {Emitter: "s1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  684. {Emitter: "s2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  685. },
  686. },
  687. xsql.JoinTuple{
  688. Tuples: []xsql.Tuple{
  689. {Emitter: "s1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  690. {Emitter: "s2", Message: xsql.Message{ "id2" : 2, "f2" : "w2"},},
  691. },
  692. },
  693. },
  694. },
  695. {
  696. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  697. data: xsql.WindowTuplesSet{
  698. xsql.WindowTuples{
  699. Emitter:"src1",
  700. Tuples:[]xsql.Tuple{
  701. {
  702. Emitter: "src1",
  703. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  704. },
  705. },
  706. },
  707. xsql.WindowTuples{
  708. Emitter:"src2",
  709. Tuples:[]xsql.Tuple{
  710. {
  711. Emitter: "src2",
  712. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  713. },{
  714. Emitter: "src2",
  715. Message: xsql.Message{ "id2" : 1, "f2" : "w2" },
  716. },
  717. },
  718. },
  719. },
  720. result: xsql.JoinTupleSets{
  721. xsql.JoinTuple{
  722. Tuples: []xsql.Tuple{
  723. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1"},},
  724. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1"},},
  725. },
  726. },
  727. xsql.JoinTuple{
  728. Tuples: []xsql.Tuple{
  729. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1"},},
  730. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w2"},},
  731. },
  732. },
  733. },
  734. },
  735. {
  736. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  737. data: xsql.WindowTuplesSet{
  738. xsql.WindowTuples{
  739. Emitter:"src1",
  740. Tuples:[]xsql.Tuple{
  741. {
  742. Emitter: "src1",
  743. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  744. },{
  745. Emitter: "src1",
  746. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  747. },{
  748. Emitter: "src1",
  749. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  750. },
  751. },
  752. },
  753. xsql.WindowTuples{
  754. Emitter:"src2",
  755. Tuples:[]xsql.Tuple{
  756. },
  757. },
  758. },
  759. result: nil,
  760. },
  761. {
  762. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  763. data: xsql.WindowTuplesSet{
  764. xsql.WindowTuples{
  765. Emitter:"src1",
  766. Tuples:[]xsql.Tuple{
  767. {
  768. Emitter: "src1",
  769. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  770. },{
  771. Emitter: "src1",
  772. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  773. },{
  774. Emitter: "src1",
  775. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  776. },
  777. },
  778. },
  779. xsql.WindowTuples{
  780. Emitter: "src2",
  781. Tuples: nil,
  782. },
  783. },
  784. result: nil,
  785. },
  786. {
  787. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  788. data: xsql.WindowTuplesSet{
  789. xsql.WindowTuples{
  790. Emitter:"src1",
  791. Tuples:[]xsql.Tuple{},
  792. },
  793. xsql.WindowTuples{
  794. Emitter:"src2",
  795. Tuples:[]xsql.Tuple{
  796. {
  797. Emitter: "src2",
  798. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  799. },{
  800. Emitter: "src2",
  801. Message: xsql.Message{ "id2" : 1, "f2" : "w2" },
  802. },
  803. },
  804. },
  805. },
  806. result: nil,
  807. },
  808. {
  809. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  810. data: xsql.WindowTuplesSet{
  811. xsql.WindowTuples{
  812. Emitter:"src1",
  813. Tuples:nil,
  814. },
  815. xsql.WindowTuples{
  816. Emitter:"src2",
  817. Tuples:[]xsql.Tuple{
  818. {
  819. Emitter: "src2",
  820. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  821. },{
  822. Emitter: "src2",
  823. Message: xsql.Message{ "id2" : 1, "f2" : "w2" },
  824. },
  825. },
  826. },
  827. },
  828. result: nil,
  829. },
  830. {
  831. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1*2 = src2.id2",
  832. data: xsql.WindowTuplesSet{
  833. xsql.WindowTuples{
  834. Emitter:"src1",
  835. Tuples:[]xsql.Tuple{
  836. {
  837. Emitter: "src1",
  838. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  839. },{
  840. Emitter: "src1",
  841. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  842. },{
  843. Emitter: "src1",
  844. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  845. },
  846. },
  847. },
  848. xsql.WindowTuples{
  849. Emitter:"src2",
  850. Tuples:[]xsql.Tuple{
  851. {
  852. Emitter: "src2",
  853. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  854. },{
  855. Emitter: "src2",
  856. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  857. },{
  858. Emitter: "src2",
  859. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  860. },
  861. },
  862. },
  863. },
  864. result: xsql.JoinTupleSets{
  865. xsql.JoinTuple{
  866. Tuples: []xsql.Tuple{
  867. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1"}},
  868. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2"}},
  869. },
  870. },
  871. xsql.JoinTuple{
  872. Tuples: []xsql.Tuple{
  873. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2"}},
  874. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3"}},
  875. },
  876. },
  877. },
  878. },
  879. {
  880. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2*2",
  881. data: xsql.WindowTuplesSet{
  882. xsql.WindowTuples{
  883. Emitter:"src1",
  884. Tuples:[]xsql.Tuple{
  885. {
  886. Emitter: "src1",
  887. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  888. },{
  889. Emitter: "src1",
  890. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  891. },{
  892. Emitter: "src1",
  893. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  894. },
  895. },
  896. },
  897. xsql.WindowTuples{
  898. Emitter:"src2",
  899. Tuples:[]xsql.Tuple{
  900. {
  901. Emitter: "src2",
  902. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  903. },{
  904. Emitter: "src2",
  905. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  906. },{
  907. Emitter: "src2",
  908. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  909. },
  910. },
  911. },
  912. },
  913. result: xsql.JoinTupleSets{
  914. xsql.JoinTuple{
  915. Tuples: []xsql.Tuple{
  916. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  917. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  918. },
  919. },
  920. },
  921. },
  922. {
  923. sql: "SELECT id1 FROM src1 inner join src2 on src1.f1->cid = src2.f2->cid",
  924. data: xsql.WindowTuplesSet{
  925. xsql.WindowTuples{
  926. Emitter:"src1",
  927. Tuples:[]xsql.Tuple{
  928. {
  929. Emitter: "src1",
  930. Message: xsql.Message{ "id1" : 1, "f1" : str2Map(`{"cid" : 1, "name" : "tom1"}`) },
  931. },{
  932. Emitter: "src1",
  933. Message: xsql.Message{ "id1" : 2, "f1" : str2Map(`{"cid" : 2, "name" : "mike1"}`) },
  934. },{
  935. Emitter: "src1",
  936. Message: xsql.Message{ "id1" : 3, "f1" : str2Map(`{"cid" : 3, "name" : "alice1"}`) },
  937. },
  938. },
  939. },
  940. xsql.WindowTuples{
  941. Emitter:"src2",
  942. Tuples:[]xsql.Tuple{
  943. {
  944. Emitter: "src2",
  945. Message: xsql.Message{ "id2" : 1, "f2" : str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  946. },{
  947. Emitter: "src2",
  948. Message: xsql.Message{ "id2" : 2, "f2" : str2Map(`{"cid" : 2, "name" : "mike2"}`) },
  949. },{
  950. Emitter: "src2",
  951. Message: xsql.Message{ "id2" : 4, "f2" : str2Map(`{"cid" : 4, "name" : "alice2"}`) },
  952. },
  953. },
  954. },
  955. },
  956. result: xsql.JoinTupleSets{
  957. xsql.JoinTuple{
  958. Tuples: []xsql.Tuple{
  959. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : str2Map(`{"cid" : 1, "name" : "tom1"}`) },},
  960. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : str2Map(`{"cid" : 1, "name" : "tom2"}`) },},
  961. },
  962. },
  963. xsql.JoinTuple{
  964. Tuples: []xsql.Tuple{
  965. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : str2Map(`{"cid" : 2, "name" : "mike1"}`) },},
  966. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : str2Map(`{"cid" : 2, "name" : "mike2"}`) },},
  967. },
  968. },
  969. },
  970. },
  971. }
  972. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  973. contextLogger := common.Log.WithField("rule", "TestInnerJoinPlan_Apply")
  974. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  975. for i, tt := range tests {
  976. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  977. if err != nil {
  978. t.Errorf("statement parse error %s", err)
  979. break
  980. }
  981. if table, ok := stmt.Sources[0].(*xsql.Table); !ok{
  982. t.Errorf("statement source is not a table")
  983. }else{
  984. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  985. result := pp.Apply(ctx, tt.data)
  986. if !reflect.DeepEqual(tt.result, result) {
  987. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  988. }
  989. }
  990. }
  991. }
  992. func TestRightJoinPlan_Apply(t *testing.T) {
  993. var tests = []struct {
  994. sql string
  995. data xsql.WindowTuplesSet
  996. result interface{}
  997. }{
  998. {
  999. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1000. data: xsql.WindowTuplesSet{
  1001. xsql.WindowTuples{
  1002. Emitter: "src1",
  1003. Tuples:[]xsql.Tuple{
  1004. {
  1005. Emitter: "src1",
  1006. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  1007. },{
  1008. Emitter: "src1",
  1009. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  1010. },{
  1011. Emitter: "src1",
  1012. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  1013. },
  1014. },
  1015. },
  1016. xsql.WindowTuples{
  1017. Emitter: "src2",
  1018. Tuples:[]xsql.Tuple{
  1019. {
  1020. Emitter: "src2",
  1021. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  1022. },{
  1023. Emitter: "src2",
  1024. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  1025. },{
  1026. Emitter: "src2",
  1027. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  1028. },
  1029. },
  1030. },
  1031. },
  1032. result: xsql.JoinTupleSets{
  1033. xsql.JoinTuple{
  1034. Tuples: []xsql.Tuple{
  1035. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  1036. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  1037. },
  1038. },
  1039. xsql.JoinTuple{
  1040. Tuples: []xsql.Tuple{
  1041. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2" },},
  1042. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  1043. },
  1044. },
  1045. xsql.JoinTuple{
  1046. Tuples: []xsql.Tuple{
  1047. {Emitter: "src2", Message: xsql.Message{ "id2": 4, "f2": "w3" },},
  1048. },
  1049. },
  1050. },
  1051. },
  1052. {
  1053. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1054. data: xsql.WindowTuplesSet{
  1055. xsql.WindowTuples{
  1056. Emitter:"src1",
  1057. Tuples:[]xsql.Tuple{
  1058. {
  1059. Emitter: "src1",
  1060. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  1061. },
  1062. },
  1063. },
  1064. xsql.WindowTuples{
  1065. Emitter:"src2",
  1066. Tuples:[]xsql.Tuple{
  1067. {
  1068. Emitter: "src2",
  1069. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  1070. },{
  1071. Emitter: "src2",
  1072. Message: xsql.Message{ "id2" : 1, "f2" : "w2" },
  1073. },
  1074. },
  1075. },
  1076. },
  1077. result: xsql.JoinTupleSets{
  1078. xsql.JoinTuple{
  1079. Tuples: []xsql.Tuple{
  1080. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  1081. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  1082. },
  1083. },
  1084. xsql.JoinTuple{
  1085. Tuples: []xsql.Tuple{
  1086. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w2" },},
  1087. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  1088. },
  1089. },
  1090. },
  1091. },
  1092. }
  1093. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1094. contextLogger := common.Log.WithField("rule", "TestRightJoinPlan_Apply")
  1095. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1096. for i, tt := range tests {
  1097. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1098. if err != nil {
  1099. t.Errorf("statement parse error %s", err)
  1100. break
  1101. }
  1102. if table, ok := stmt.Sources[0].(*xsql.Table); !ok{
  1103. t.Errorf("statement source is not a table")
  1104. }else{
  1105. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1106. result := pp.Apply(ctx, tt.data)
  1107. if !reflect.DeepEqual(tt.result, result) {
  1108. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1109. }
  1110. }
  1111. }
  1112. }
  1113. func TestFullJoinPlan_Apply(t *testing.T) {
  1114. var tests = []struct {
  1115. sql string
  1116. data xsql.WindowTuplesSet
  1117. result interface{}
  1118. }{
  1119. {
  1120. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1121. data: xsql.WindowTuplesSet{
  1122. xsql.WindowTuples{
  1123. Emitter: "src1",
  1124. Tuples:[]xsql.Tuple{
  1125. {
  1126. Emitter: "src1",
  1127. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  1128. },{
  1129. Emitter: "src1",
  1130. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  1131. },{
  1132. Emitter: "src1",
  1133. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  1134. },
  1135. },
  1136. },
  1137. xsql.WindowTuples{
  1138. Emitter: "src2",
  1139. Tuples:[]xsql.Tuple{
  1140. {
  1141. Emitter: "src2",
  1142. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  1143. },{
  1144. Emitter: "src2",
  1145. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  1146. },{
  1147. Emitter: "src2",
  1148. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  1149. },{
  1150. Emitter: "src2",
  1151. Message: xsql.Message{ "id2" : 2, "f2" : "w4" },
  1152. },
  1153. },
  1154. },
  1155. },
  1156. result: xsql.JoinTupleSets{
  1157. xsql.JoinTuple{
  1158. Tuples: []xsql.Tuple{
  1159. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  1160. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  1161. },
  1162. },
  1163. xsql.JoinTuple{
  1164. Tuples: []xsql.Tuple{
  1165. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  1166. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2" },},
  1167. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w4" },},
  1168. },
  1169. },
  1170. xsql.JoinTuple{
  1171. Tuples: []xsql.Tuple{
  1172. {Emitter: "src1", Message: xsql.Message{ "id1": 3, "f1": "v3" },},
  1173. },
  1174. },
  1175. xsql.JoinTuple{
  1176. Tuples: []xsql.Tuple{
  1177. {Emitter: "src2", Message: xsql.Message{ "id2": 4, "f2": "w3" },},
  1178. },
  1179. },
  1180. },
  1181. },
  1182. {
  1183. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1184. data: xsql.WindowTuplesSet{
  1185. xsql.WindowTuples{
  1186. Emitter: "src1",
  1187. Tuples:[]xsql.Tuple{
  1188. {
  1189. Emitter: "src1",
  1190. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  1191. },{
  1192. Emitter: "src1",
  1193. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  1194. },{
  1195. Emitter: "src1",
  1196. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  1197. },
  1198. },
  1199. },
  1200. xsql.WindowTuples{
  1201. Emitter: "src2",
  1202. Tuples:[]xsql.Tuple{
  1203. {
  1204. Emitter: "src2",
  1205. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  1206. },{
  1207. Emitter: "src2",
  1208. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  1209. },{
  1210. Emitter: "src2",
  1211. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  1212. },
  1213. },
  1214. },
  1215. },
  1216. result: xsql.JoinTupleSets{
  1217. xsql.JoinTuple{
  1218. Tuples: []xsql.Tuple{
  1219. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  1220. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  1221. },
  1222. },
  1223. xsql.JoinTuple{
  1224. Tuples: []xsql.Tuple{
  1225. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  1226. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2" },},
  1227. },
  1228. },
  1229. xsql.JoinTuple{
  1230. Tuples: []xsql.Tuple{
  1231. {Emitter: "src1", Message: xsql.Message{ "id1": 3, "f1": "v3" },},
  1232. },
  1233. },
  1234. xsql.JoinTuple{
  1235. Tuples: []xsql.Tuple{
  1236. {Emitter: "src2", Message: xsql.Message{ "id2": 4, "f2": "w3" },},
  1237. },
  1238. },
  1239. },
  1240. },
  1241. {
  1242. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1243. data: xsql.WindowTuplesSet{
  1244. xsql.WindowTuples{
  1245. Emitter: "src1",
  1246. Tuples:[]xsql.Tuple{
  1247. {
  1248. Emitter: "src1",
  1249. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  1250. },{
  1251. Emitter: "src1",
  1252. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  1253. },{
  1254. Emitter: "src1",
  1255. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  1256. },
  1257. },
  1258. },
  1259. xsql.WindowTuples{
  1260. Emitter: "src2",
  1261. Tuples: []xsql.Tuple{ },
  1262. },
  1263. },
  1264. result: xsql.JoinTupleSets{
  1265. xsql.JoinTuple{
  1266. Tuples: []xsql.Tuple{
  1267. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  1268. },
  1269. },
  1270. xsql.JoinTuple{
  1271. Tuples: []xsql.Tuple{
  1272. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  1273. },
  1274. },
  1275. xsql.JoinTuple{
  1276. Tuples: []xsql.Tuple{
  1277. {Emitter: "src1", Message: xsql.Message{ "id1": 3, "f1": "v3" },},
  1278. },
  1279. },
  1280. },
  1281. },
  1282. {
  1283. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1284. data: xsql.WindowTuplesSet{
  1285. xsql.WindowTuples{
  1286. Emitter: "src1",
  1287. Tuples: []xsql.Tuple{
  1288. },
  1289. },
  1290. xsql.WindowTuples{
  1291. Emitter: "src2",
  1292. Tuples:[]xsql.Tuple{
  1293. {
  1294. Emitter: "src2",
  1295. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  1296. },{
  1297. Emitter: "src2",
  1298. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  1299. },{
  1300. Emitter: "src2",
  1301. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  1302. },
  1303. },
  1304. },
  1305. },
  1306. result: xsql.JoinTupleSets{
  1307. xsql.JoinTuple{
  1308. Tuples: []xsql.Tuple{
  1309. {Emitter: "src2", Message: xsql.Message{ "id2": 1, "f2": "w1" },},
  1310. },
  1311. },
  1312. xsql.JoinTuple{
  1313. Tuples: []xsql.Tuple{
  1314. {Emitter: "src2", Message: xsql.Message{ "id2": 2, "f2": "w2" },},
  1315. },
  1316. },
  1317. xsql.JoinTuple{
  1318. Tuples: []xsql.Tuple{
  1319. {Emitter: "src2", Message: xsql.Message{ "id2": 4, "f2": "w3" },},
  1320. },
  1321. },
  1322. },
  1323. },
  1324. }
  1325. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1326. contextLogger := common.Log.WithField("rule", "TestFullJoinPlan_Apply")
  1327. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1328. for i, tt := range tests {
  1329. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1330. if err != nil {
  1331. t.Errorf("statement parse error %s", err)
  1332. break
  1333. }
  1334. if table, ok := stmt.Sources[0].(*xsql.Table); !ok{
  1335. t.Errorf("statement source is not a table")
  1336. }else{
  1337. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1338. result := pp.Apply(ctx, tt.data)
  1339. if !reflect.DeepEqual(tt.result, result) {
  1340. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1341. }
  1342. }
  1343. }
  1344. }
  1345. func TestCrossJoinPlan_Apply(t *testing.T) {
  1346. var tests = []struct {
  1347. sql string
  1348. data xsql.WindowTuplesSet
  1349. result interface{}
  1350. }{
  1351. {
  1352. sql: "SELECT id1 FROM src1 cross join src2",
  1353. data: xsql.WindowTuplesSet{
  1354. xsql.WindowTuples{
  1355. Emitter: "src1",
  1356. Tuples:[]xsql.Tuple{
  1357. {
  1358. Emitter: "src1",
  1359. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  1360. },{
  1361. Emitter: "src1",
  1362. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  1363. },{
  1364. Emitter: "src1",
  1365. Message: xsql.Message{ "id1" : 3, "f1" : "v3", },
  1366. },
  1367. },
  1368. },
  1369. xsql.WindowTuples{
  1370. Emitter: "src2",
  1371. Tuples:[]xsql.Tuple{
  1372. {
  1373. Emitter: "src2",
  1374. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  1375. },{
  1376. Emitter: "src2",
  1377. Message: xsql.Message{ "id2" : 2, "f2" : "w2" },
  1378. },{
  1379. Emitter: "src2",
  1380. Message: xsql.Message{ "id2" : 4, "f2" : "w3" },
  1381. },
  1382. },
  1383. },
  1384. },
  1385. result: xsql.JoinTupleSets{
  1386. xsql.JoinTuple{
  1387. Tuples: []xsql.Tuple{
  1388. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  1389. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  1390. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2" },},
  1391. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3" },},
  1392. },
  1393. },
  1394. xsql.JoinTuple{
  1395. Tuples: []xsql.Tuple{
  1396. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2" },},
  1397. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  1398. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2" },},
  1399. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3" },},
  1400. },
  1401. },
  1402. xsql.JoinTuple{
  1403. Tuples: []xsql.Tuple{
  1404. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v3" },},
  1405. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  1406. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2" },},
  1407. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3" },},
  1408. },
  1409. },
  1410. },
  1411. },
  1412. {
  1413. sql: "SELECT id1 FROM src1 cross join src2",
  1414. data: xsql.WindowTuplesSet{
  1415. xsql.WindowTuples{
  1416. Emitter:"src1",
  1417. Tuples:[]xsql.Tuple{
  1418. {
  1419. Emitter: "src1",
  1420. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  1421. },
  1422. },
  1423. },
  1424. xsql.WindowTuples{
  1425. Emitter:"src2",
  1426. Tuples:[]xsql.Tuple{
  1427. {
  1428. Emitter: "src2",
  1429. Message: xsql.Message{ "id2" : 1, "f2" : "w1" },
  1430. },{
  1431. Emitter: "src2",
  1432. Message: xsql.Message{ "id2" : 1, "f2" : "w2" },
  1433. },
  1434. },
  1435. },
  1436. },
  1437. result: xsql.JoinTupleSets{
  1438. xsql.JoinTuple{
  1439. Tuples: []xsql.Tuple{
  1440. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
  1441. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
  1442. {Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w2" },},
  1443. },
  1444. },
  1445. },
  1446. },
  1447. }
  1448. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1449. contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  1450. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1451. for i, tt := range tests {
  1452. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1453. if err != nil {
  1454. t.Errorf("statement parse error %s", err)
  1455. break
  1456. }
  1457. if table, ok := stmt.Sources[0].(*xsql.Table); !ok{
  1458. t.Errorf("statement source is not a table")
  1459. }else{
  1460. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1461. result := pp.Apply(ctx, tt.data)
  1462. if !reflect.DeepEqual(tt.result, result) {
  1463. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1464. }
  1465. }
  1466. }
  1467. }
  1468. func str2Map(s string) map[string]interface{} {
  1469. var input map[string]interface{}
  1470. if err := json.Unmarshal([]byte(s), &input); err != nil {
  1471. fmt.Printf("Failed to parse the JSON data.\n")
  1472. return nil
  1473. }
  1474. return input
  1475. }