join_test.go 61 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func TestLeftJoinPlan_Apply(t *testing.T) {
  29. var tests = []struct {
  30. sql string
  31. data xsql.WindowTuplesSet
  32. result interface{}
  33. }{
  34. { //0
  35. sql: "SELECT id1 FROM src1 left join src2 on id1 = id2",
  36. data: xsql.WindowTuplesSet{
  37. Content: []xsql.WindowTuples{
  38. {
  39. Emitter: "src1",
  40. Tuples: []xsql.Tuple{
  41. {
  42. Emitter: "src1",
  43. Message: xsql.Message{"id1": 1, "f1": "v1"},
  44. }, {
  45. Emitter: "src1",
  46. Message: xsql.Message{"id1": 2, "f1": "v2"},
  47. }, {
  48. Emitter: "src1",
  49. Message: xsql.Message{"id1": 3, "f1": "v3"},
  50. },
  51. },
  52. },
  53. {
  54. Emitter: "src2",
  55. Tuples: []xsql.Tuple{
  56. {
  57. Emitter: "src2",
  58. Message: xsql.Message{"id2": 1, "f2": "w1"},
  59. }, {
  60. Emitter: "src2",
  61. Message: xsql.Message{"id2": 2, "f2": "w2"},
  62. }, {
  63. Emitter: "src2",
  64. Message: xsql.Message{"id2": 4, "f2": "w3"},
  65. },
  66. },
  67. },
  68. },
  69. WindowRange: &xsql.WindowRange{
  70. WindowStart: 1541152486013,
  71. WindowEnd: 1541152487013,
  72. },
  73. },
  74. result: &xsql.JoinTupleSets{
  75. Content: []xsql.JoinTuple{
  76. {
  77. Tuples: []xsql.Tuple{
  78. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  79. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  80. },
  81. },
  82. {
  83. Tuples: []xsql.Tuple{
  84. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  85. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  86. },
  87. },
  88. {
  89. Tuples: []xsql.Tuple{
  90. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  91. },
  92. },
  93. },
  94. WindowRange: &xsql.WindowRange{
  95. WindowStart: 1541152486013,
  96. WindowEnd: 1541152487013,
  97. },
  98. },
  99. },
  100. { // 1
  101. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  102. data: xsql.WindowTuplesSet{
  103. Content: []xsql.WindowTuples{
  104. {
  105. Emitter: "src1",
  106. Tuples: []xsql.Tuple{
  107. {
  108. Emitter: "src1",
  109. Message: xsql.Message{"id1": 1, "f1": "v1"},
  110. }, {
  111. Emitter: "src1",
  112. Message: xsql.Message{"id1": 2, "f1": "v2"},
  113. }, {
  114. Emitter: "src1",
  115. Message: xsql.Message{"id1": 3, "f1": "v3"},
  116. },
  117. },
  118. },
  119. {
  120. Emitter: "src2",
  121. Tuples: []xsql.Tuple{
  122. {
  123. Emitter: "src2",
  124. Message: xsql.Message{"id2": 1, "f2": "w1"},
  125. }, {
  126. Emitter: "src2",
  127. Message: xsql.Message{"f2": "w2"},
  128. }, {
  129. Emitter: "src2",
  130. Message: xsql.Message{"id2": 4, "f2": "w3"},
  131. },
  132. },
  133. },
  134. },
  135. },
  136. result: &xsql.JoinTupleSets{
  137. Content: []xsql.JoinTuple{
  138. {
  139. Tuples: []xsql.Tuple{
  140. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  141. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  142. },
  143. },
  144. {
  145. Tuples: []xsql.Tuple{
  146. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  147. },
  148. },
  149. {
  150. Tuples: []xsql.Tuple{
  151. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  152. },
  153. },
  154. },
  155. },
  156. },
  157. { // 2
  158. sql: "SELECT id1 FROM src1 left join src2 on src1.ts = src2.ts",
  159. data: xsql.WindowTuplesSet{
  160. Content: []xsql.WindowTuples{
  161. {
  162. Emitter: "src1",
  163. Tuples: []xsql.Tuple{
  164. {
  165. Emitter: "src1",
  166. Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)},
  167. }, {
  168. Emitter: "src1",
  169. Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854525000)},
  170. }, {
  171. Emitter: "src1",
  172. Message: xsql.Message{"id1": 3, "f1": "v3", "ts": cast.TimeFromUnixMilli(1568854535000)},
  173. },
  174. },
  175. },
  176. {
  177. Emitter: "src2",
  178. Tuples: []xsql.Tuple{
  179. {
  180. Emitter: "src2",
  181. Message: xsql.Message{"id2": 1, "f2": "w1", "ts": cast.TimeFromUnixMilli(1568854515000)},
  182. }, {
  183. Emitter: "src2",
  184. Message: xsql.Message{"id2": 2, "f2": "w2", "ts": cast.TimeFromUnixMilli(1568854525000)},
  185. }, {
  186. Emitter: "src2",
  187. Message: xsql.Message{"id2": 4, "f2": "w3", "ts": cast.TimeFromUnixMilli(1568854545000)},
  188. },
  189. },
  190. },
  191. },
  192. },
  193. result: &xsql.JoinTupleSets{
  194. Content: []xsql.JoinTuple{
  195. {
  196. Tuples: []xsql.Tuple{
  197. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)}},
  198. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1", "ts": cast.TimeFromUnixMilli(1568854515000)}},
  199. },
  200. },
  201. {
  202. Tuples: []xsql.Tuple{
  203. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854525000)}},
  204. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2", "ts": cast.TimeFromUnixMilli(1568854525000)}},
  205. },
  206. },
  207. {
  208. Tuples: []xsql.Tuple{
  209. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3", "ts": cast.TimeFromUnixMilli(1568854535000)}},
  210. },
  211. },
  212. },
  213. },
  214. },
  215. { // 3
  216. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  217. data: xsql.WindowTuplesSet{
  218. Content: []xsql.WindowTuples{
  219. {
  220. Emitter: "src1",
  221. Tuples: []xsql.Tuple{
  222. {
  223. Emitter: "src1",
  224. Message: xsql.Message{"id1": 1, "f1": "v1"},
  225. }, {
  226. Emitter: "src1",
  227. Message: xsql.Message{"id1": 2, "f1": "v2"},
  228. }, {
  229. Emitter: "src1",
  230. Message: xsql.Message{"id1": 3, "f1": "v3"},
  231. },
  232. },
  233. },
  234. {
  235. Emitter: "src2",
  236. Tuples: []xsql.Tuple{
  237. {
  238. Emitter: "src2",
  239. Message: xsql.Message{"id2": 4, "f2": "w1"},
  240. }, {
  241. Emitter: "src2",
  242. Message: xsql.Message{"id2": 5, "f2": "w2"},
  243. }, {
  244. Emitter: "src2",
  245. Message: xsql.Message{"id2": 6, "f2": "w3"},
  246. },
  247. },
  248. },
  249. },
  250. },
  251. result: nil,
  252. },
  253. { // 4
  254. sql: "SELECT id1 FROM src1 As s1 left join src2 as s2 on s1.id1 = s2.id2",
  255. data: xsql.WindowTuplesSet{
  256. Content: []xsql.WindowTuples{
  257. {
  258. Emitter: "s1",
  259. Tuples: []xsql.Tuple{
  260. {
  261. Emitter: "s1",
  262. Message: xsql.Message{"id1": 1, "f1": "v1"},
  263. }, {
  264. Emitter: "s1",
  265. Message: xsql.Message{"id1": 2, "f1": "v2"},
  266. }, {
  267. Emitter: "s1",
  268. Message: xsql.Message{"id1": 3, "f1": "v3"},
  269. },
  270. },
  271. },
  272. {
  273. Emitter: "s2",
  274. Tuples: []xsql.Tuple{
  275. {
  276. Emitter: "s2",
  277. Message: xsql.Message{"id2": 1, "f2": "w1"},
  278. }, {
  279. Emitter: "s2",
  280. Message: xsql.Message{"id2": 2, "f2": "w2"},
  281. }, {
  282. Emitter: "s2",
  283. Message: xsql.Message{"id2": 4, "f2": "w3"},
  284. },
  285. },
  286. },
  287. },
  288. },
  289. result: &xsql.JoinTupleSets{
  290. Content: []xsql.JoinTuple{
  291. {
  292. Tuples: []xsql.Tuple{
  293. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  294. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  295. },
  296. },
  297. {
  298. Tuples: []xsql.Tuple{
  299. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  300. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  301. },
  302. },
  303. {
  304. Tuples: []xsql.Tuple{
  305. {Emitter: "s1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  306. },
  307. },
  308. },
  309. },
  310. },
  311. { // 5
  312. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  313. data: xsql.WindowTuplesSet{
  314. Content: []xsql.WindowTuples{
  315. {
  316. Emitter: "src1",
  317. Tuples: []xsql.Tuple{
  318. {
  319. Emitter: "src1",
  320. Message: xsql.Message{"id1": 1, "f1": "v1"},
  321. },
  322. },
  323. },
  324. {
  325. Emitter: "src2",
  326. Tuples: []xsql.Tuple{
  327. {
  328. Emitter: "src2",
  329. Message: xsql.Message{"id2": 1, "f2": "w1"},
  330. }, {
  331. Emitter: "src2",
  332. Message: xsql.Message{"id2": 1, "f2": "w2"},
  333. },
  334. },
  335. },
  336. },
  337. },
  338. result: &xsql.JoinTupleSets{
  339. Content: []xsql.JoinTuple{
  340. {
  341. Tuples: []xsql.Tuple{
  342. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  343. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  344. },
  345. },
  346. {
  347. Tuples: []xsql.Tuple{
  348. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}}, {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  349. },
  350. },
  351. },
  352. },
  353. },
  354. { // 6
  355. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  356. data: xsql.WindowTuplesSet{
  357. Content: []xsql.WindowTuples{
  358. {
  359. Emitter: "src1",
  360. Tuples: []xsql.Tuple{
  361. {
  362. Emitter: "src1",
  363. Message: xsql.Message{"id1": 1, "f1": "v1"},
  364. }, {
  365. Emitter: "src1",
  366. Message: xsql.Message{"id1": 2, "f1": "v2"},
  367. }, {
  368. Emitter: "src1",
  369. Message: xsql.Message{"id1": 3, "f1": "v3"},
  370. },
  371. },
  372. },
  373. {
  374. Emitter: "src2",
  375. Tuples: []xsql.Tuple{},
  376. },
  377. },
  378. },
  379. result: &xsql.JoinTupleSets{
  380. Content: []xsql.JoinTuple{
  381. {
  382. Tuples: []xsql.Tuple{
  383. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  384. },
  385. },
  386. {
  387. Tuples: []xsql.Tuple{
  388. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  389. },
  390. },
  391. {
  392. Tuples: []xsql.Tuple{
  393. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  394. },
  395. },
  396. },
  397. },
  398. },
  399. {
  400. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  401. data: xsql.WindowTuplesSet{
  402. Content: []xsql.WindowTuples{
  403. {
  404. Emitter: "src1",
  405. Tuples: []xsql.Tuple{
  406. {
  407. Emitter: "src1",
  408. Message: xsql.Message{"id1": 1, "f1": "v1"},
  409. }, {
  410. Emitter: "src1",
  411. Message: xsql.Message{"id1": 2, "f1": "v2"},
  412. }, {
  413. Emitter: "src1",
  414. Message: xsql.Message{"id1": 3, "f1": "v3"},
  415. },
  416. },
  417. },
  418. {
  419. Emitter: "src2",
  420. Tuples: nil,
  421. },
  422. },
  423. },
  424. result: &xsql.JoinTupleSets{
  425. Content: []xsql.JoinTuple{
  426. {
  427. Tuples: []xsql.Tuple{
  428. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  429. },
  430. },
  431. {
  432. Tuples: []xsql.Tuple{
  433. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  434. },
  435. },
  436. {
  437. Tuples: []xsql.Tuple{
  438. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  439. },
  440. },
  441. },
  442. },
  443. },
  444. {
  445. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  446. data: xsql.WindowTuplesSet{
  447. Content: []xsql.WindowTuples{
  448. {
  449. Emitter: "src1",
  450. Tuples: []xsql.Tuple{},
  451. },
  452. {
  453. Emitter: "src2",
  454. Tuples: []xsql.Tuple{
  455. {
  456. Emitter: "src2",
  457. Message: xsql.Message{"id2": 1, "f2": "w1"},
  458. }, {
  459. Emitter: "src2",
  460. Message: xsql.Message{"id2": 1, "f2": "w2"},
  461. },
  462. },
  463. },
  464. },
  465. },
  466. result: nil,
  467. },
  468. {
  469. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  470. data: xsql.WindowTuplesSet{
  471. Content: []xsql.WindowTuples{
  472. {
  473. Emitter: "src1",
  474. Tuples: nil,
  475. },
  476. {
  477. Emitter: "src2",
  478. Tuples: []xsql.Tuple{
  479. {
  480. Emitter: "src2",
  481. Message: xsql.Message{"id2": 1, "f2": "w1"},
  482. }, {
  483. Emitter: "src2",
  484. Message: xsql.Message{"id2": 1, "f2": "w2"},
  485. },
  486. },
  487. },
  488. },
  489. },
  490. result: nil,
  491. },
  492. { // 10 select id1 FROM src1 left join src2 on null = null
  493. sql: "SELECT id1 FROM src1 left join src2 on src1.id2 = src2.id1",
  494. data: xsql.WindowTuplesSet{
  495. Content: []xsql.WindowTuples{
  496. {
  497. Emitter: "src1",
  498. Tuples: []xsql.Tuple{
  499. {
  500. Emitter: "src1",
  501. Message: xsql.Message{"id1": 1, "f2": "w1"},
  502. },
  503. },
  504. },
  505. {
  506. Emitter: "src2",
  507. Tuples: []xsql.Tuple{
  508. {
  509. Emitter: "src2",
  510. Message: xsql.Message{"id2": 2, "f2": "w1"},
  511. }, {
  512. Emitter: "src2",
  513. Message: xsql.Message{"id2": 2, "f2": "w2"},
  514. },
  515. },
  516. },
  517. },
  518. },
  519. result: &xsql.JoinTupleSets{
  520. Content: []xsql.JoinTuple{
  521. {
  522. Tuples: []xsql.Tuple{
  523. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f2": "w1"}},
  524. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w1"}},
  525. },
  526. },
  527. {
  528. Tuples: []xsql.Tuple{
  529. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f2": "w1"}},
  530. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  531. },
  532. },
  533. },
  534. },
  535. },
  536. {
  537. sql: "SELECT id1 FROM src1 left join src2 on src1.id1*2 = src2.id2",
  538. data: xsql.WindowTuplesSet{
  539. Content: []xsql.WindowTuples{
  540. {
  541. Emitter: "src1",
  542. Tuples: []xsql.Tuple{
  543. {
  544. Emitter: "src1",
  545. Message: xsql.Message{"id1": 1, "f1": "v1"},
  546. }, {
  547. Emitter: "src1",
  548. Message: xsql.Message{"id1": 2, "f1": "v2"},
  549. }, {
  550. Emitter: "src1",
  551. Message: xsql.Message{"id1": 3, "f1": "v3"},
  552. },
  553. },
  554. },
  555. {
  556. Emitter: "src2",
  557. Tuples: []xsql.Tuple{
  558. {
  559. Emitter: "src2",
  560. Message: xsql.Message{"id2": 1, "f2": "w1"},
  561. }, {
  562. Emitter: "src2",
  563. Message: xsql.Message{"id2": 2, "f2": "w2"},
  564. }, {
  565. Emitter: "src2",
  566. Message: xsql.Message{"id2": 4, "f2": "w3"},
  567. },
  568. },
  569. },
  570. },
  571. },
  572. result: &xsql.JoinTupleSets{
  573. Content: []xsql.JoinTuple{
  574. {
  575. Tuples: []xsql.Tuple{
  576. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  577. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  578. },
  579. },
  580. {
  581. Tuples: []xsql.Tuple{
  582. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  583. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  584. },
  585. },
  586. {
  587. Tuples: []xsql.Tuple{
  588. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  589. },
  590. },
  591. },
  592. },
  593. },
  594. {
  595. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2*2",
  596. data: xsql.WindowTuplesSet{
  597. Content: []xsql.WindowTuples{
  598. {
  599. Emitter: "src1",
  600. Tuples: []xsql.Tuple{
  601. {
  602. Emitter: "src1",
  603. Message: xsql.Message{"id1": 1, "f1": "v1"},
  604. }, {
  605. Emitter: "src1",
  606. Message: xsql.Message{"id1": 2, "f1": "v2"},
  607. }, {
  608. Emitter: "src1",
  609. Message: xsql.Message{"id1": 3, "f1": "v3"},
  610. },
  611. },
  612. },
  613. {
  614. Emitter: "src2",
  615. Tuples: []xsql.Tuple{
  616. {
  617. Emitter: "src2",
  618. Message: xsql.Message{"id2": 1, "f2": "w1"},
  619. }, {
  620. Emitter: "src2",
  621. Message: xsql.Message{"id2": 2, "f2": "w2"},
  622. }, {
  623. Emitter: "src2",
  624. Message: xsql.Message{"id2": 4, "f2": "w3"},
  625. },
  626. },
  627. },
  628. },
  629. },
  630. result: &xsql.JoinTupleSets{
  631. Content: []xsql.JoinTuple{
  632. {
  633. Tuples: []xsql.Tuple{
  634. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  635. },
  636. },
  637. {
  638. Tuples: []xsql.Tuple{
  639. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  640. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  641. },
  642. },
  643. {
  644. Tuples: []xsql.Tuple{
  645. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  646. },
  647. },
  648. },
  649. },
  650. },
  651. {
  652. sql: "SELECT id1 FROM src1 left join src2 on src1.f1->cid = src2.f2->cid",
  653. data: xsql.WindowTuplesSet{
  654. Content: []xsql.WindowTuples{
  655. {
  656. Emitter: "src1",
  657. Tuples: []xsql.Tuple{
  658. {
  659. Emitter: "src1",
  660. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  661. }, {
  662. Emitter: "src1",
  663. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  664. }, {
  665. Emitter: "src1",
  666. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  667. },
  668. },
  669. },
  670. {
  671. Emitter: "src2",
  672. Tuples: []xsql.Tuple{
  673. {
  674. Emitter: "src2",
  675. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  676. }, {
  677. Emitter: "src2",
  678. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  679. }, {
  680. Emitter: "src2",
  681. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  682. },
  683. },
  684. },
  685. },
  686. },
  687. result: &xsql.JoinTupleSets{
  688. Content: []xsql.JoinTuple{
  689. {
  690. Tuples: []xsql.Tuple{
  691. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  692. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  693. },
  694. },
  695. {
  696. Tuples: []xsql.Tuple{
  697. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  698. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  699. },
  700. },
  701. {
  702. Tuples: []xsql.Tuple{
  703. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)}},
  704. },
  705. },
  706. },
  707. },
  708. },
  709. {
  710. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 left join src2 on src1.id1 = src2.id2",
  711. data: xsql.WindowTuplesSet{
  712. Content: []xsql.WindowTuples{
  713. {
  714. Emitter: "src1",
  715. Tuples: []xsql.Tuple{
  716. {
  717. Emitter: "src1",
  718. Message: xsql.Message{"id1": 1, "f1": "v1"},
  719. Metadata: xsql.Metadata{"topic": "devices/type1/device001"},
  720. },
  721. },
  722. },
  723. {
  724. Emitter: "src2",
  725. Tuples: []xsql.Tuple{
  726. {
  727. Emitter: "src2",
  728. Message: xsql.Message{"id2": 1, "f2": "w1"},
  729. Metadata: xsql.Metadata{"topic": "devices/type2/device001"},
  730. },
  731. },
  732. },
  733. },
  734. },
  735. result: &xsql.JoinTupleSets{
  736. Content: []xsql.JoinTuple{
  737. {
  738. Tuples: []xsql.Tuple{
  739. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  740. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  741. },
  742. },
  743. },
  744. },
  745. },
  746. {
  747. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  748. data: xsql.WindowTuplesSet{
  749. Content: []xsql.WindowTuples{
  750. {
  751. Emitter: "src1",
  752. Tuples: []xsql.Tuple{
  753. {
  754. Emitter: "src1",
  755. Message: xsql.Message{"id1": 1, "f1": "v1"},
  756. }, {
  757. Emitter: "src1",
  758. Message: xsql.Message{"id1": 1, "f1": "v2"},
  759. }, {
  760. Emitter: "src1",
  761. Message: xsql.Message{"id1": 3, "f1": "v3"},
  762. }, {
  763. Emitter: "src1",
  764. Message: xsql.Message{"id1": 3, "f1": "v4"},
  765. }, {
  766. Emitter: "src1",
  767. Message: xsql.Message{"id1": 4, "f1": "v5"},
  768. },
  769. },
  770. },
  771. {
  772. Emitter: "src2",
  773. Tuples: []xsql.Tuple{
  774. {
  775. Emitter: "src2",
  776. Message: xsql.Message{"id2": 1, "f2": "w1"},
  777. }, {
  778. Emitter: "src2",
  779. Message: xsql.Message{"id2": 3, "f2": "w2"},
  780. }, {
  781. Emitter: "src2",
  782. Message: xsql.Message{"id2": 3, "f2": "w3"},
  783. },
  784. },
  785. },
  786. },
  787. },
  788. result: &xsql.JoinTupleSets{
  789. Content: []xsql.JoinTuple{
  790. {
  791. Tuples: []xsql.Tuple{
  792. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  793. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  794. },
  795. },
  796. {
  797. Tuples: []xsql.Tuple{
  798. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v2"}},
  799. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  800. },
  801. },
  802. {
  803. Tuples: []xsql.Tuple{
  804. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  805. {Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w2"}},
  806. },
  807. },
  808. {
  809. Tuples: []xsql.Tuple{
  810. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  811. {Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w3"}},
  812. },
  813. },
  814. {
  815. Tuples: []xsql.Tuple{
  816. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v4"}},
  817. {Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w2"}},
  818. },
  819. },
  820. {
  821. Tuples: []xsql.Tuple{
  822. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v4"}},
  823. {Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w3"}},
  824. },
  825. },
  826. {
  827. Tuples: []xsql.Tuple{
  828. {Emitter: "src1", Message: xsql.Message{"id1": 4, "f1": "v5"}},
  829. },
  830. },
  831. },
  832. },
  833. },
  834. {
  835. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  836. data: xsql.WindowTuplesSet{
  837. Content: []xsql.WindowTuples{
  838. {
  839. Emitter: "src1",
  840. Tuples: []xsql.Tuple{
  841. {
  842. Emitter: "src1",
  843. Message: xsql.Message{"id1": 1, "f1": "v1"},
  844. }, {
  845. Emitter: "src1",
  846. Message: xsql.Message{"id1": 2, "f1": "v2"},
  847. }, {
  848. Emitter: "src1",
  849. Message: xsql.Message{"id1": 3, "f1": "v3"},
  850. },
  851. },
  852. },
  853. {
  854. Emitter: "src2",
  855. Tuples: []xsql.Tuple{
  856. {
  857. Emitter: "src2",
  858. Message: xsql.Message{"id2": 1, "f2": "w1"},
  859. }, {
  860. Emitter: "src2",
  861. Message: xsql.Message{"id2": 2, "f2": "w2"},
  862. }, {
  863. Emitter: "src2",
  864. Message: xsql.Message{"id2": 2, "f2": "w3"},
  865. },
  866. },
  867. },
  868. },
  869. },
  870. result: &xsql.JoinTupleSets{
  871. Content: []xsql.JoinTuple{
  872. {
  873. Tuples: []xsql.Tuple{
  874. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  875. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  876. },
  877. },
  878. {
  879. Tuples: []xsql.Tuple{
  880. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  881. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  882. },
  883. },
  884. {
  885. Tuples: []xsql.Tuple{
  886. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  887. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w3"}},
  888. },
  889. },
  890. {
  891. Tuples: []xsql.Tuple{
  892. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  893. },
  894. },
  895. },
  896. },
  897. },
  898. }
  899. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  900. contextLogger := conf.Log.WithField("rule", "TestLeftJoinPlan_Apply")
  901. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  902. for i, tt := range tests {
  903. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  904. if err != nil {
  905. t.Errorf("statement parse error %s", err)
  906. break
  907. }
  908. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  909. t.Errorf("statement source is not a table")
  910. } else {
  911. fv, afv := xsql.NewFunctionValuersForOp(nil)
  912. pp := &JoinOp{Joins: stmt.Joins, From: table}
  913. result := pp.Apply(ctx, tt.data, fv, afv)
  914. if !reflect.DeepEqual(tt.result, result) {
  915. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  916. }
  917. }
  918. }
  919. }
  920. func TestInnerJoinPlan_Apply(t *testing.T) {
  921. var tests = []struct {
  922. sql string
  923. data xsql.WindowTuplesSet
  924. result interface{}
  925. }{
  926. {
  927. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  928. data: xsql.WindowTuplesSet{
  929. Content: []xsql.WindowTuples{
  930. {
  931. Emitter: "src1",
  932. Tuples: []xsql.Tuple{
  933. {
  934. Emitter: "src1",
  935. Message: xsql.Message{"id1": 1, "f1": "v1"},
  936. }, {
  937. Emitter: "src1",
  938. Message: xsql.Message{"id1": 2, "f1": "v2"},
  939. }, {
  940. Emitter: "src1",
  941. Message: xsql.Message{"id1": 3, "f1": "v3"},
  942. },
  943. },
  944. },
  945. {
  946. Emitter: "src2",
  947. Tuples: []xsql.Tuple{
  948. {
  949. Emitter: "src2",
  950. Message: xsql.Message{"id2": 1, "f2": "w1"},
  951. }, {
  952. Emitter: "src2",
  953. Message: xsql.Message{"id2": 2, "f2": "w2"},
  954. }, {
  955. Emitter: "src2",
  956. Message: xsql.Message{"id2": 4, "f2": "w3"},
  957. },
  958. },
  959. },
  960. },
  961. },
  962. result: &xsql.JoinTupleSets{
  963. Content: []xsql.JoinTuple{
  964. {
  965. Tuples: []xsql.Tuple{
  966. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  967. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  968. },
  969. },
  970. {
  971. Tuples: []xsql.Tuple{
  972. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  973. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  974. },
  975. },
  976. },
  977. },
  978. },
  979. {
  980. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  981. data: xsql.WindowTuplesSet{
  982. Content: []xsql.WindowTuples{
  983. {
  984. Emitter: "src1",
  985. Tuples: []xsql.Tuple{
  986. {
  987. Emitter: "src1",
  988. Message: xsql.Message{"id1": 1, "f1": "v1"},
  989. }, {
  990. Emitter: "src1",
  991. Message: xsql.Message{"id1": 2, "f1": "v2"},
  992. }, {
  993. Emitter: "src1",
  994. Message: xsql.Message{"id1": 3, "f1": "v3"},
  995. },
  996. },
  997. },
  998. {
  999. Emitter: "src2",
  1000. Tuples: []xsql.Tuple{
  1001. {
  1002. Emitter: "src2",
  1003. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1004. }, {
  1005. Emitter: "src2",
  1006. Message: xsql.Message{"f2": "w2"},
  1007. }, {
  1008. Emitter: "src2",
  1009. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1010. },
  1011. },
  1012. },
  1013. },
  1014. },
  1015. result: &xsql.JoinTupleSets{
  1016. Content: []xsql.JoinTuple{
  1017. {
  1018. Tuples: []xsql.Tuple{
  1019. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1020. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1021. },
  1022. },
  1023. },
  1024. },
  1025. },
  1026. {
  1027. sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
  1028. data: xsql.WindowTuplesSet{
  1029. Content: []xsql.WindowTuples{
  1030. {
  1031. Emitter: "s1",
  1032. Tuples: []xsql.Tuple{
  1033. {
  1034. Emitter: "s1",
  1035. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1036. }, {
  1037. Emitter: "s1",
  1038. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1039. }, {
  1040. Emitter: "s1",
  1041. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1042. },
  1043. },
  1044. },
  1045. {
  1046. Emitter: "s2",
  1047. Tuples: []xsql.Tuple{
  1048. {
  1049. Emitter: "s2",
  1050. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1051. }, {
  1052. Emitter: "s2",
  1053. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1054. }, {
  1055. Emitter: "s2",
  1056. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1057. },
  1058. },
  1059. },
  1060. },
  1061. },
  1062. result: &xsql.JoinTupleSets{
  1063. Content: []xsql.JoinTuple{
  1064. {
  1065. Tuples: []xsql.Tuple{
  1066. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1067. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1068. },
  1069. },
  1070. {
  1071. Tuples: []xsql.Tuple{
  1072. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1073. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1074. },
  1075. },
  1076. },
  1077. },
  1078. },
  1079. {
  1080. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1081. data: xsql.WindowTuplesSet{
  1082. Content: []xsql.WindowTuples{
  1083. {
  1084. Emitter: "src1",
  1085. Tuples: []xsql.Tuple{
  1086. {
  1087. Emitter: "src1",
  1088. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1089. },
  1090. },
  1091. },
  1092. {
  1093. Emitter: "src2",
  1094. Tuples: []xsql.Tuple{
  1095. {
  1096. Emitter: "src2",
  1097. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1098. }, {
  1099. Emitter: "src2",
  1100. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1101. },
  1102. },
  1103. },
  1104. },
  1105. },
  1106. result: &xsql.JoinTupleSets{
  1107. Content: []xsql.JoinTuple{
  1108. {
  1109. Tuples: []xsql.Tuple{
  1110. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1111. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1112. },
  1113. },
  1114. {
  1115. Tuples: []xsql.Tuple{
  1116. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1117. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1118. },
  1119. },
  1120. },
  1121. },
  1122. },
  1123. {
  1124. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1125. data: xsql.WindowTuplesSet{
  1126. Content: []xsql.WindowTuples{
  1127. {
  1128. Emitter: "src1",
  1129. Tuples: []xsql.Tuple{
  1130. {
  1131. Emitter: "src1",
  1132. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1133. },
  1134. {
  1135. Emitter: "src1",
  1136. Message: xsql.Message{"id1": 1, "f1": "v2"},
  1137. },
  1138. },
  1139. },
  1140. {
  1141. Emitter: "src2",
  1142. Tuples: []xsql.Tuple{
  1143. {
  1144. Emitter: "src2",
  1145. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1146. }, {
  1147. Emitter: "src2",
  1148. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1149. },
  1150. },
  1151. },
  1152. },
  1153. },
  1154. result: &xsql.JoinTupleSets{
  1155. Content: []xsql.JoinTuple{
  1156. {
  1157. Tuples: []xsql.Tuple{
  1158. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1159. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1160. },
  1161. },
  1162. {
  1163. Tuples: []xsql.Tuple{
  1164. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1165. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1166. },
  1167. },
  1168. {
  1169. Tuples: []xsql.Tuple{
  1170. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v2"}},
  1171. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1172. },
  1173. },
  1174. {
  1175. Tuples: []xsql.Tuple{
  1176. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v2"}},
  1177. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1178. },
  1179. },
  1180. },
  1181. },
  1182. },
  1183. {
  1184. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1185. data: xsql.WindowTuplesSet{
  1186. Content: []xsql.WindowTuples{
  1187. {
  1188. Emitter: "src1",
  1189. Tuples: []xsql.Tuple{
  1190. {
  1191. Emitter: "src1",
  1192. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1193. }, {
  1194. Emitter: "src1",
  1195. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1196. }, {
  1197. Emitter: "src1",
  1198. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1199. },
  1200. },
  1201. },
  1202. {
  1203. Emitter: "src2",
  1204. Tuples: []xsql.Tuple{},
  1205. },
  1206. },
  1207. },
  1208. result: nil,
  1209. },
  1210. {
  1211. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1212. data: xsql.WindowTuplesSet{
  1213. Content: []xsql.WindowTuples{
  1214. {
  1215. Emitter: "src1",
  1216. Tuples: []xsql.Tuple{
  1217. {
  1218. Emitter: "src1",
  1219. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1220. }, {
  1221. Emitter: "src1",
  1222. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1223. }, {
  1224. Emitter: "src1",
  1225. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1226. },
  1227. },
  1228. },
  1229. {
  1230. Emitter: "src2",
  1231. Tuples: nil,
  1232. },
  1233. },
  1234. },
  1235. result: nil,
  1236. },
  1237. {
  1238. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1239. data: xsql.WindowTuplesSet{
  1240. Content: []xsql.WindowTuples{
  1241. {
  1242. Emitter: "src1",
  1243. Tuples: []xsql.Tuple{},
  1244. },
  1245. {
  1246. Emitter: "src2",
  1247. Tuples: []xsql.Tuple{
  1248. {
  1249. Emitter: "src2",
  1250. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1251. }, {
  1252. Emitter: "src2",
  1253. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1254. },
  1255. },
  1256. },
  1257. },
  1258. },
  1259. result: nil,
  1260. },
  1261. {
  1262. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1263. data: xsql.WindowTuplesSet{
  1264. Content: []xsql.WindowTuples{
  1265. {
  1266. Emitter: "src1",
  1267. Tuples: nil,
  1268. },
  1269. {
  1270. Emitter: "src2",
  1271. Tuples: []xsql.Tuple{
  1272. {
  1273. Emitter: "src2",
  1274. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1275. }, {
  1276. Emitter: "src2",
  1277. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1278. },
  1279. },
  1280. },
  1281. },
  1282. },
  1283. result: nil,
  1284. },
  1285. {
  1286. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1*2 = src2.id2",
  1287. data: xsql.WindowTuplesSet{
  1288. Content: []xsql.WindowTuples{
  1289. {
  1290. Emitter: "src1",
  1291. Tuples: []xsql.Tuple{
  1292. {
  1293. Emitter: "src1",
  1294. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1295. }, {
  1296. Emitter: "src1",
  1297. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1298. }, {
  1299. Emitter: "src1",
  1300. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1301. },
  1302. },
  1303. },
  1304. {
  1305. Emitter: "src2",
  1306. Tuples: []xsql.Tuple{
  1307. {
  1308. Emitter: "src2",
  1309. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1310. }, {
  1311. Emitter: "src2",
  1312. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1313. }, {
  1314. Emitter: "src2",
  1315. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1316. },
  1317. },
  1318. },
  1319. },
  1320. },
  1321. result: &xsql.JoinTupleSets{
  1322. Content: []xsql.JoinTuple{
  1323. {
  1324. Tuples: []xsql.Tuple{
  1325. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1326. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1327. },
  1328. },
  1329. {
  1330. Tuples: []xsql.Tuple{
  1331. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1332. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1333. },
  1334. },
  1335. },
  1336. },
  1337. },
  1338. {
  1339. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2*2",
  1340. data: xsql.WindowTuplesSet{
  1341. Content: []xsql.WindowTuples{
  1342. {
  1343. Emitter: "src1",
  1344. Tuples: []xsql.Tuple{
  1345. {
  1346. Emitter: "src1",
  1347. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1348. }, {
  1349. Emitter: "src1",
  1350. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1351. }, {
  1352. Emitter: "src1",
  1353. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1354. },
  1355. },
  1356. },
  1357. {
  1358. Emitter: "src2",
  1359. Tuples: []xsql.Tuple{
  1360. {
  1361. Emitter: "src2",
  1362. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1363. }, {
  1364. Emitter: "src2",
  1365. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1366. }, {
  1367. Emitter: "src2",
  1368. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1369. },
  1370. },
  1371. },
  1372. },
  1373. },
  1374. result: &xsql.JoinTupleSets{
  1375. Content: []xsql.JoinTuple{
  1376. {
  1377. Tuples: []xsql.Tuple{
  1378. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1379. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1380. },
  1381. },
  1382. },
  1383. },
  1384. },
  1385. {
  1386. sql: "SELECT id1 FROM src1 inner join src2 on src1.f1->cid = src2.f2->cid",
  1387. data: xsql.WindowTuplesSet{
  1388. Content: []xsql.WindowTuples{
  1389. {
  1390. Emitter: "src1",
  1391. Tuples: []xsql.Tuple{
  1392. {
  1393. Emitter: "src1",
  1394. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  1395. }, {
  1396. Emitter: "src1",
  1397. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  1398. }, {
  1399. Emitter: "src1",
  1400. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  1401. },
  1402. },
  1403. },
  1404. {
  1405. Emitter: "src2",
  1406. Tuples: []xsql.Tuple{
  1407. {
  1408. Emitter: "src2",
  1409. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  1410. }, {
  1411. Emitter: "src2",
  1412. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  1413. }, {
  1414. Emitter: "src2",
  1415. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  1416. },
  1417. },
  1418. },
  1419. },
  1420. },
  1421. result: &xsql.JoinTupleSets{
  1422. Content: []xsql.JoinTuple{
  1423. {
  1424. Tuples: []xsql.Tuple{
  1425. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  1426. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  1427. },
  1428. },
  1429. {
  1430. Tuples: []xsql.Tuple{
  1431. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  1432. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  1433. },
  1434. },
  1435. },
  1436. },
  1437. },
  1438. {
  1439. sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
  1440. data: xsql.WindowTuplesSet{
  1441. Content: []xsql.WindowTuples{
  1442. {
  1443. Emitter: "s1",
  1444. Tuples: []xsql.Tuple{
  1445. {
  1446. Emitter: "s1",
  1447. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1448. }, {
  1449. Emitter: "s1",
  1450. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1451. }, {
  1452. Emitter: "s1",
  1453. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1454. },
  1455. },
  1456. },
  1457. {
  1458. Emitter: "s2",
  1459. Tuples: []xsql.Tuple{
  1460. {
  1461. Emitter: "s2",
  1462. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1463. }, {
  1464. Emitter: "s2",
  1465. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1466. }, {
  1467. Emitter: "s2",
  1468. Message: xsql.Message{"id2": 2, "f2": "w3"},
  1469. },
  1470. },
  1471. },
  1472. },
  1473. },
  1474. result: &xsql.JoinTupleSets{
  1475. Content: []xsql.JoinTuple{
  1476. {
  1477. Tuples: []xsql.Tuple{
  1478. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1479. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1480. },
  1481. },
  1482. {
  1483. Tuples: []xsql.Tuple{
  1484. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1485. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1486. },
  1487. },
  1488. {
  1489. Tuples: []xsql.Tuple{
  1490. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1491. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w3"}},
  1492. },
  1493. },
  1494. },
  1495. },
  1496. },
  1497. }
  1498. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1499. contextLogger := conf.Log.WithField("rule", "TestInnerJoinPlan_Apply")
  1500. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1501. for i, tt := range tests {
  1502. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1503. if err != nil {
  1504. t.Errorf("statement parse error %s", err)
  1505. break
  1506. }
  1507. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  1508. t.Errorf("statement source is not a table")
  1509. } else {
  1510. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1511. pp := &JoinOp{Joins: stmt.Joins, From: table}
  1512. result := pp.Apply(ctx, tt.data, fv, afv)
  1513. if !reflect.DeepEqual(tt.result, result) {
  1514. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1515. }
  1516. }
  1517. }
  1518. }
  1519. func TestRightJoinPlan_Apply(t *testing.T) {
  1520. var tests = []struct {
  1521. sql string
  1522. data xsql.WindowTuplesSet
  1523. result interface{}
  1524. }{
  1525. {
  1526. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1527. data: xsql.WindowTuplesSet{
  1528. Content: []xsql.WindowTuples{
  1529. {
  1530. Emitter: "src1",
  1531. Tuples: []xsql.Tuple{
  1532. {
  1533. Emitter: "src1",
  1534. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1535. }, {
  1536. Emitter: "src1",
  1537. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1538. }, {
  1539. Emitter: "src1",
  1540. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1541. },
  1542. },
  1543. },
  1544. {
  1545. Emitter: "src2",
  1546. Tuples: []xsql.Tuple{
  1547. {
  1548. Emitter: "src2",
  1549. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1550. }, {
  1551. Emitter: "src2",
  1552. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1553. }, {
  1554. Emitter: "src2",
  1555. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1556. },
  1557. },
  1558. },
  1559. },
  1560. },
  1561. result: &xsql.JoinTupleSets{
  1562. Content: []xsql.JoinTuple{
  1563. {
  1564. Tuples: []xsql.Tuple{
  1565. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1566. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1567. },
  1568. },
  1569. {
  1570. Tuples: []xsql.Tuple{
  1571. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1572. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1573. },
  1574. },
  1575. {
  1576. Tuples: []xsql.Tuple{
  1577. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1578. },
  1579. },
  1580. },
  1581. },
  1582. },
  1583. {
  1584. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1585. data: xsql.WindowTuplesSet{
  1586. Content: []xsql.WindowTuples{
  1587. {
  1588. Emitter: "src1",
  1589. Tuples: []xsql.Tuple{
  1590. {
  1591. Emitter: "src1",
  1592. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1593. }, {
  1594. Emitter: "src1",
  1595. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1596. }, {
  1597. Emitter: "src1",
  1598. Message: xsql.Message{"id1": 1, "f1": "v3"},
  1599. },
  1600. },
  1601. },
  1602. {
  1603. Emitter: "src2",
  1604. Tuples: []xsql.Tuple{
  1605. {
  1606. Emitter: "src2",
  1607. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1608. }, {
  1609. Emitter: "src2",
  1610. Message: xsql.Message{"f2": "w2"},
  1611. }, {
  1612. Emitter: "src2",
  1613. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1614. },
  1615. },
  1616. },
  1617. },
  1618. },
  1619. result: &xsql.JoinTupleSets{
  1620. Content: []xsql.JoinTuple{
  1621. {
  1622. Tuples: []xsql.Tuple{
  1623. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1624. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1625. },
  1626. },
  1627. {
  1628. Tuples: []xsql.Tuple{
  1629. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1630. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v3"}},
  1631. },
  1632. },
  1633. {
  1634. Tuples: []xsql.Tuple{
  1635. {Emitter: "src2", Message: xsql.Message{"f2": "w2"}},
  1636. },
  1637. },
  1638. {
  1639. Tuples: []xsql.Tuple{
  1640. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1641. },
  1642. },
  1643. },
  1644. },
  1645. },
  1646. {
  1647. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1648. data: xsql.WindowTuplesSet{
  1649. Content: []xsql.WindowTuples{
  1650. {
  1651. Emitter: "src1",
  1652. Tuples: []xsql.Tuple{
  1653. {
  1654. Emitter: "src1",
  1655. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1656. },
  1657. },
  1658. },
  1659. {
  1660. Emitter: "src2",
  1661. Tuples: []xsql.Tuple{
  1662. {
  1663. Emitter: "src2",
  1664. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1665. }, {
  1666. Emitter: "src2",
  1667. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1668. },
  1669. },
  1670. },
  1671. },
  1672. },
  1673. result: &xsql.JoinTupleSets{
  1674. Content: []xsql.JoinTuple{
  1675. {
  1676. Tuples: []xsql.Tuple{
  1677. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1678. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1679. },
  1680. },
  1681. {
  1682. Tuples: []xsql.Tuple{
  1683. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1684. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1685. },
  1686. },
  1687. },
  1688. },
  1689. },
  1690. {
  1691. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1692. data: xsql.WindowTuplesSet{
  1693. Content: []xsql.WindowTuples{
  1694. {
  1695. Emitter: "src1",
  1696. Tuples: []xsql.Tuple{
  1697. {
  1698. Emitter: "src1",
  1699. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1700. }, {
  1701. Emitter: "src1",
  1702. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1703. }, {
  1704. Emitter: "src1",
  1705. Message: xsql.Message{"id1": 1, "f1": "v3"},
  1706. },
  1707. },
  1708. },
  1709. {
  1710. Emitter: "src2",
  1711. Tuples: []xsql.Tuple{},
  1712. },
  1713. },
  1714. },
  1715. result: nil,
  1716. },
  1717. {
  1718. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1719. data: xsql.WindowTuplesSet{
  1720. Content: []xsql.WindowTuples{
  1721. {
  1722. Emitter: "src1",
  1723. Tuples: []xsql.Tuple{},
  1724. },
  1725. {
  1726. Emitter: "src2",
  1727. Tuples: []xsql.Tuple{
  1728. {
  1729. Emitter: "src2",
  1730. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1731. }, {
  1732. Emitter: "src2",
  1733. Message: xsql.Message{"f2": "w2"},
  1734. }, {
  1735. Emitter: "src2",
  1736. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1737. },
  1738. },
  1739. },
  1740. },
  1741. },
  1742. result: &xsql.JoinTupleSets{
  1743. Content: []xsql.JoinTuple{
  1744. {
  1745. Tuples: []xsql.Tuple{
  1746. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1747. },
  1748. },
  1749. {
  1750. Tuples: []xsql.Tuple{
  1751. {Emitter: "src2", Message: xsql.Message{"f2": "w2"}},
  1752. },
  1753. },
  1754. {
  1755. Tuples: []xsql.Tuple{
  1756. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1757. },
  1758. },
  1759. },
  1760. },
  1761. },
  1762. }
  1763. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1764. contextLogger := conf.Log.WithField("rule", "TestRightJoinPlan_Apply")
  1765. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1766. for i, tt := range tests {
  1767. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1768. if err != nil {
  1769. t.Errorf("statement parse error %s", err)
  1770. break
  1771. }
  1772. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  1773. t.Errorf("statement source is not a table")
  1774. } else {
  1775. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1776. pp := &JoinOp{Joins: stmt.Joins, From: table}
  1777. result := pp.Apply(ctx, tt.data, fv, afv)
  1778. if !reflect.DeepEqual(tt.result, result) {
  1779. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1780. }
  1781. }
  1782. }
  1783. }
  1784. func TestFullJoinPlan_Apply(t *testing.T) {
  1785. var tests = []struct {
  1786. sql string
  1787. data xsql.WindowTuplesSet
  1788. result interface{}
  1789. }{
  1790. {
  1791. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1792. data: xsql.WindowTuplesSet{
  1793. Content: []xsql.WindowTuples{
  1794. {
  1795. Emitter: "src1",
  1796. Tuples: []xsql.Tuple{
  1797. {
  1798. Emitter: "src1",
  1799. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1800. }, {
  1801. Emitter: "src1",
  1802. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1803. }, {
  1804. Emitter: "src1",
  1805. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1806. },
  1807. },
  1808. },
  1809. {
  1810. Emitter: "src2",
  1811. Tuples: []xsql.Tuple{
  1812. {
  1813. Emitter: "src2",
  1814. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1815. }, {
  1816. Emitter: "src2",
  1817. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1818. }, {
  1819. Emitter: "src2",
  1820. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1821. }, {
  1822. Emitter: "src2",
  1823. Message: xsql.Message{"id2": 2, "f2": "w4"},
  1824. },
  1825. },
  1826. },
  1827. },
  1828. },
  1829. result: &xsql.JoinTupleSets{
  1830. Content: []xsql.JoinTuple{
  1831. {
  1832. Tuples: []xsql.Tuple{
  1833. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1834. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1835. },
  1836. },
  1837. {
  1838. Tuples: []xsql.Tuple{
  1839. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1840. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1841. },
  1842. },
  1843. {
  1844. Tuples: []xsql.Tuple{
  1845. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}}, {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w4"}},
  1846. },
  1847. },
  1848. {
  1849. Tuples: []xsql.Tuple{
  1850. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1851. },
  1852. },
  1853. {
  1854. Tuples: []xsql.Tuple{
  1855. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1856. },
  1857. },
  1858. },
  1859. },
  1860. },
  1861. {
  1862. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1863. data: xsql.WindowTuplesSet{
  1864. Content: []xsql.WindowTuples{
  1865. {
  1866. Emitter: "src1",
  1867. Tuples: []xsql.Tuple{
  1868. {
  1869. Emitter: "src1",
  1870. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1871. }, {
  1872. Emitter: "src1",
  1873. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1874. }, {
  1875. Emitter: "src1",
  1876. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1877. },
  1878. },
  1879. },
  1880. {
  1881. Emitter: "src2",
  1882. Tuples: []xsql.Tuple{
  1883. {
  1884. Emitter: "src2",
  1885. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1886. }, {
  1887. Emitter: "src2",
  1888. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1889. }, {
  1890. Emitter: "src2",
  1891. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1892. },
  1893. },
  1894. },
  1895. },
  1896. },
  1897. result: &xsql.JoinTupleSets{
  1898. Content: []xsql.JoinTuple{
  1899. {
  1900. Tuples: []xsql.Tuple{
  1901. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1902. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1903. },
  1904. },
  1905. {
  1906. Tuples: []xsql.Tuple{
  1907. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1908. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1909. },
  1910. },
  1911. {
  1912. Tuples: []xsql.Tuple{
  1913. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1914. },
  1915. },
  1916. {
  1917. Tuples: []xsql.Tuple{
  1918. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1919. },
  1920. },
  1921. },
  1922. },
  1923. },
  1924. {
  1925. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1926. data: xsql.WindowTuplesSet{
  1927. Content: []xsql.WindowTuples{
  1928. {
  1929. Emitter: "src1",
  1930. Tuples: []xsql.Tuple{
  1931. {
  1932. Emitter: "src1",
  1933. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1934. }, {
  1935. Emitter: "src1",
  1936. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1937. }, {
  1938. Emitter: "src1",
  1939. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1940. },
  1941. },
  1942. },
  1943. {
  1944. Emitter: "src2",
  1945. Tuples: []xsql.Tuple{},
  1946. },
  1947. },
  1948. },
  1949. result: &xsql.JoinTupleSets{
  1950. Content: []xsql.JoinTuple{
  1951. {
  1952. Tuples: []xsql.Tuple{
  1953. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1954. },
  1955. },
  1956. {
  1957. Tuples: []xsql.Tuple{
  1958. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1959. },
  1960. },
  1961. {
  1962. Tuples: []xsql.Tuple{
  1963. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1964. },
  1965. },
  1966. },
  1967. },
  1968. },
  1969. {
  1970. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1971. data: xsql.WindowTuplesSet{
  1972. Content: []xsql.WindowTuples{
  1973. {
  1974. Emitter: "src1",
  1975. Tuples: []xsql.Tuple{
  1976. {
  1977. Emitter: "src1",
  1978. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1979. }, {
  1980. Emitter: "src1",
  1981. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1982. }, {
  1983. Emitter: "src1",
  1984. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1985. },
  1986. },
  1987. },
  1988. {
  1989. Emitter: "src2",
  1990. Tuples: []xsql.Tuple{
  1991. {
  1992. Emitter: "src2",
  1993. Message: xsql.Message{"id2": 4, "f2": "w1"},
  1994. }, {
  1995. Emitter: "src2",
  1996. Message: xsql.Message{"id2": 5, "f2": "w2"},
  1997. }, {
  1998. Emitter: "src2",
  1999. Message: xsql.Message{"id2": 6, "f2": "w3"},
  2000. },
  2001. },
  2002. },
  2003. },
  2004. },
  2005. result: &xsql.JoinTupleSets{
  2006. Content: []xsql.JoinTuple{
  2007. {
  2008. Tuples: []xsql.Tuple{
  2009. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  2010. },
  2011. },
  2012. {
  2013. Tuples: []xsql.Tuple{
  2014. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  2015. },
  2016. },
  2017. {
  2018. Tuples: []xsql.Tuple{
  2019. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  2020. },
  2021. },
  2022. {
  2023. Tuples: []xsql.Tuple{
  2024. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w1"}},
  2025. },
  2026. },
  2027. {
  2028. Tuples: []xsql.Tuple{
  2029. {Emitter: "src2", Message: xsql.Message{"id2": 5, "f2": "w2"}},
  2030. },
  2031. },
  2032. {
  2033. Tuples: []xsql.Tuple{
  2034. {Emitter: "src2", Message: xsql.Message{"id2": 6, "f2": "w3"}},
  2035. },
  2036. },
  2037. },
  2038. },
  2039. },
  2040. {
  2041. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  2042. data: xsql.WindowTuplesSet{
  2043. Content: []xsql.WindowTuples{
  2044. {
  2045. Emitter: "src1",
  2046. Tuples: []xsql.Tuple{},
  2047. },
  2048. {
  2049. Emitter: "src2",
  2050. Tuples: []xsql.Tuple{
  2051. {
  2052. Emitter: "src2",
  2053. Message: xsql.Message{"id2": 1, "f2": "w1"},
  2054. }, {
  2055. Emitter: "src2",
  2056. Message: xsql.Message{"id2": 2, "f2": "w2"},
  2057. }, {
  2058. Emitter: "src2",
  2059. Message: xsql.Message{"id2": 4, "f2": "w3"},
  2060. },
  2061. },
  2062. },
  2063. },
  2064. },
  2065. result: &xsql.JoinTupleSets{
  2066. Content: []xsql.JoinTuple{
  2067. {
  2068. Tuples: []xsql.Tuple{
  2069. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  2070. },
  2071. },
  2072. {
  2073. Tuples: []xsql.Tuple{
  2074. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  2075. },
  2076. },
  2077. {
  2078. Tuples: []xsql.Tuple{
  2079. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  2080. },
  2081. },
  2082. },
  2083. },
  2084. },
  2085. }
  2086. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2087. contextLogger := conf.Log.WithField("rule", "TestFullJoinPlan_Apply")
  2088. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2089. for i, tt := range tests {
  2090. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2091. if err != nil {
  2092. t.Errorf("statement parse error %s", err)
  2093. break
  2094. }
  2095. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  2096. t.Errorf("statement source is not a table")
  2097. } else {
  2098. fv, afv := xsql.NewFunctionValuersForOp(nil)
  2099. pp := &JoinOp{Joins: stmt.Joins, From: table}
  2100. result := pp.Apply(ctx, tt.data, fv, afv)
  2101. if !reflect.DeepEqual(tt.result, result) {
  2102. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  2103. }
  2104. }
  2105. }
  2106. }
  2107. func TestCrossJoinPlan_Apply(t *testing.T) {
  2108. var tests = []struct {
  2109. sql string
  2110. data xsql.WindowTuplesSet
  2111. result interface{}
  2112. }{
  2113. {
  2114. sql: "SELECT id1 FROM src1 cross join src2",
  2115. data: xsql.WindowTuplesSet{
  2116. Content: []xsql.WindowTuples{
  2117. {
  2118. Emitter: "src1",
  2119. Tuples: []xsql.Tuple{
  2120. {
  2121. Emitter: "src1",
  2122. Message: xsql.Message{"id1": 1, "f1": "v1"},
  2123. }, {
  2124. Emitter: "src1",
  2125. Message: xsql.Message{"id1": 2, "f1": "v2"},
  2126. }, {
  2127. Emitter: "src1",
  2128. Message: xsql.Message{"id1": 3, "f1": "v3"},
  2129. },
  2130. },
  2131. },
  2132. {
  2133. Emitter: "src2",
  2134. Tuples: []xsql.Tuple{
  2135. {
  2136. Emitter: "src2",
  2137. Message: xsql.Message{"id2": 1, "f2": "w1"},
  2138. }, {
  2139. Emitter: "src2",
  2140. Message: xsql.Message{"id2": 2, "f2": "w2"},
  2141. }, {
  2142. Emitter: "src2",
  2143. Message: xsql.Message{"id2": 4, "f2": "w3"},
  2144. },
  2145. },
  2146. },
  2147. },
  2148. },
  2149. result: &xsql.JoinTupleSets{
  2150. Content: []xsql.JoinTuple{
  2151. {
  2152. Tuples: []xsql.Tuple{
  2153. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  2154. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  2155. },
  2156. },
  2157. {
  2158. Tuples: []xsql.Tuple{
  2159. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  2160. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  2161. },
  2162. },
  2163. {
  2164. Tuples: []xsql.Tuple{
  2165. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  2166. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  2167. },
  2168. },
  2169. {
  2170. Tuples: []xsql.Tuple{
  2171. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  2172. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  2173. },
  2174. },
  2175. {
  2176. Tuples: []xsql.Tuple{
  2177. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  2178. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  2179. },
  2180. },
  2181. {
  2182. Tuples: []xsql.Tuple{
  2183. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  2184. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  2185. },
  2186. },
  2187. {
  2188. Tuples: []xsql.Tuple{
  2189. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  2190. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  2191. },
  2192. },
  2193. {
  2194. Tuples: []xsql.Tuple{
  2195. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  2196. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  2197. },
  2198. },
  2199. {
  2200. Tuples: []xsql.Tuple{
  2201. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  2202. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  2203. },
  2204. },
  2205. },
  2206. },
  2207. },
  2208. {
  2209. sql: "SELECT id1 FROM src1 cross join src2",
  2210. data: xsql.WindowTuplesSet{
  2211. Content: []xsql.WindowTuples{
  2212. {
  2213. Emitter: "src1",
  2214. Tuples: []xsql.Tuple{
  2215. {
  2216. Emitter: "src1",
  2217. Message: xsql.Message{"id1": 1, "f1": "v1"},
  2218. },
  2219. },
  2220. },
  2221. {
  2222. Emitter: "src2",
  2223. Tuples: []xsql.Tuple{
  2224. {
  2225. Emitter: "src2",
  2226. Message: xsql.Message{"id2": 1, "f2": "w1"},
  2227. }, {
  2228. Emitter: "src2",
  2229. Message: xsql.Message{"id2": 1, "f2": "w2"},
  2230. },
  2231. },
  2232. },
  2233. },
  2234. },
  2235. result: &xsql.JoinTupleSets{
  2236. Content: []xsql.JoinTuple{
  2237. {
  2238. Tuples: []xsql.Tuple{
  2239. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  2240. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  2241. },
  2242. },
  2243. {
  2244. Tuples: []xsql.Tuple{
  2245. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}}, {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  2246. },
  2247. },
  2248. },
  2249. },
  2250. },
  2251. {
  2252. sql: "SELECT id1 FROM src1 cross join src2",
  2253. data: xsql.WindowTuplesSet{
  2254. Content: []xsql.WindowTuples{
  2255. {
  2256. Emitter: "src1",
  2257. Tuples: []xsql.Tuple{},
  2258. },
  2259. {
  2260. Emitter: "src2",
  2261. Tuples: []xsql.Tuple{
  2262. {
  2263. Emitter: "src2",
  2264. Message: xsql.Message{"id2": 1, "f2": "w1"},
  2265. }, {
  2266. Emitter: "src2",
  2267. Message: xsql.Message{"id2": 1, "f2": "w2"},
  2268. },
  2269. },
  2270. },
  2271. },
  2272. },
  2273. result: nil,
  2274. },
  2275. {
  2276. sql: "SELECT id1 FROM src2 cross join src1",
  2277. data: xsql.WindowTuplesSet{
  2278. Content: []xsql.WindowTuples{
  2279. {
  2280. Emitter: "src1",
  2281. Tuples: []xsql.Tuple{},
  2282. },
  2283. {
  2284. Emitter: "src2",
  2285. Tuples: []xsql.Tuple{
  2286. {
  2287. Emitter: "src2",
  2288. Message: xsql.Message{"id2": 1, "f2": "w1"},
  2289. }, {
  2290. Emitter: "src2",
  2291. Message: xsql.Message{"id2": 1, "f2": "w2"},
  2292. },
  2293. },
  2294. },
  2295. },
  2296. },
  2297. result: nil,
  2298. },
  2299. }
  2300. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2301. contextLogger := conf.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  2302. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2303. for i, tt := range tests {
  2304. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2305. if err != nil {
  2306. t.Errorf("statement parse error %s", err)
  2307. break
  2308. }
  2309. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  2310. t.Errorf("statement source is not a table")
  2311. } else {
  2312. fv, afv := xsql.NewFunctionValuersForOp(nil)
  2313. pp := &JoinOp{Joins: stmt.Joins, From: table}
  2314. result := pp.Apply(ctx, tt.data, fv, afv)
  2315. if !reflect.DeepEqual(tt.result, result) {
  2316. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  2317. }
  2318. }
  2319. }
  2320. }
  2321. func TestCrossJoinPlanError(t *testing.T) {
  2322. var tests = []struct {
  2323. sql string
  2324. data interface{}
  2325. result interface{}
  2326. }{
  2327. {
  2328. sql: "SELECT id1 FROM src1 cross join src2",
  2329. data: errors.New("an error from upstream"),
  2330. result: errors.New("an error from upstream"),
  2331. }, {
  2332. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  2333. data: xsql.WindowTuplesSet{
  2334. Content: []xsql.WindowTuples{
  2335. {
  2336. Emitter: "src1",
  2337. Tuples: []xsql.Tuple{
  2338. {
  2339. Emitter: "src1",
  2340. Message: xsql.Message{"id1": 1, "f1": "v1"},
  2341. }, {
  2342. Emitter: "src1",
  2343. Message: xsql.Message{"id1": 2, "f1": "v2"},
  2344. }, {
  2345. Emitter: "src1",
  2346. Message: xsql.Message{"id1": 3, "f1": "v3"},
  2347. },
  2348. },
  2349. },
  2350. {
  2351. Emitter: "src2",
  2352. Tuples: []xsql.Tuple{
  2353. {
  2354. Emitter: "src2",
  2355. Message: xsql.Message{"id2": 1, "f2": "w1"},
  2356. }, {
  2357. Emitter: "src2",
  2358. Message: xsql.Message{"id2": "3", "f2": "w2"},
  2359. }, {
  2360. Emitter: "src2",
  2361. Message: xsql.Message{"id2": 4, "f2": "w3"},
  2362. }, {
  2363. Emitter: "src2",
  2364. Message: xsql.Message{"id2": 2, "f2": "w4"},
  2365. },
  2366. },
  2367. },
  2368. },
  2369. },
  2370. result: errors.New("run Join error: invalid operation int64(1) = string(3)"),
  2371. },
  2372. }
  2373. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2374. contextLogger := conf.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  2375. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2376. for i, tt := range tests {
  2377. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2378. if err != nil {
  2379. t.Errorf("statement parse error %s", err)
  2380. break
  2381. }
  2382. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  2383. t.Errorf("statement source is not a table")
  2384. } else {
  2385. fv, afv := xsql.NewFunctionValuersForOp(nil)
  2386. pp := &JoinOp{Joins: stmt.Joins, From: table}
  2387. result := pp.Apply(ctx, tt.data, fv, afv)
  2388. if !reflect.DeepEqual(tt.result, result) {
  2389. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  2390. }
  2391. }
  2392. }
  2393. }
  2394. func str2Map(s string) map[string]interface{} {
  2395. var input map[string]interface{}
  2396. if err := json.Unmarshal([]byte(s), &input); err != nil {
  2397. fmt.Printf("Failed to parse the JSON data.\n")
  2398. return nil
  2399. }
  2400. return input
  2401. }