join_test.go 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185
  1. package operator
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/internal/conf"
  7. "github.com/emqx/kuiper/internal/topo/context"
  8. "github.com/emqx/kuiper/internal/xsql"
  9. "github.com/emqx/kuiper/pkg/ast"
  10. "github.com/emqx/kuiper/pkg/cast"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. )
  15. func TestLeftJoinPlan_Apply(t *testing.T) {
  16. var tests = []struct {
  17. sql string
  18. data xsql.WindowTuplesSet
  19. result interface{}
  20. }{
  21. { //0
  22. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  23. data: xsql.WindowTuplesSet{
  24. Content: []xsql.WindowTuples{
  25. {
  26. Emitter: "src1",
  27. Tuples: []xsql.Tuple{
  28. {
  29. Emitter: "src1",
  30. Message: xsql.Message{"id1": 1, "f1": "v1"},
  31. }, {
  32. Emitter: "src1",
  33. Message: xsql.Message{"id1": 2, "f1": "v2"},
  34. }, {
  35. Emitter: "src1",
  36. Message: xsql.Message{"id1": 3, "f1": "v3"},
  37. },
  38. },
  39. },
  40. {
  41. Emitter: "src2",
  42. Tuples: []xsql.Tuple{
  43. {
  44. Emitter: "src2",
  45. Message: xsql.Message{"id2": 1, "f2": "w1"},
  46. }, {
  47. Emitter: "src2",
  48. Message: xsql.Message{"id2": 2, "f2": "w2"},
  49. }, {
  50. Emitter: "src2",
  51. Message: xsql.Message{"id2": 4, "f2": "w3"},
  52. },
  53. },
  54. },
  55. },
  56. WindowRange: &xsql.WindowRange{
  57. WindowStart: 1541152486013,
  58. WindowEnd: 1541152487013,
  59. },
  60. },
  61. result: &xsql.JoinTupleSets{
  62. Content: []xsql.JoinTuple{
  63. {
  64. Tuples: []xsql.Tuple{
  65. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  66. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  67. },
  68. },
  69. {
  70. Tuples: []xsql.Tuple{
  71. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  72. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  73. },
  74. },
  75. {
  76. Tuples: []xsql.Tuple{
  77. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  78. },
  79. },
  80. },
  81. WindowRange: &xsql.WindowRange{
  82. WindowStart: 1541152486013,
  83. WindowEnd: 1541152487013,
  84. },
  85. },
  86. },
  87. { // 1
  88. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  89. data: xsql.WindowTuplesSet{
  90. Content: []xsql.WindowTuples{
  91. {
  92. Emitter: "src1",
  93. Tuples: []xsql.Tuple{
  94. {
  95. Emitter: "src1",
  96. Message: xsql.Message{"id1": 1, "f1": "v1"},
  97. }, {
  98. Emitter: "src1",
  99. Message: xsql.Message{"id1": 2, "f1": "v2"},
  100. }, {
  101. Emitter: "src1",
  102. Message: xsql.Message{"id1": 3, "f1": "v3"},
  103. },
  104. },
  105. },
  106. {
  107. Emitter: "src2",
  108. Tuples: []xsql.Tuple{
  109. {
  110. Emitter: "src2",
  111. Message: xsql.Message{"id2": 1, "f2": "w1"},
  112. }, {
  113. Emitter: "src2",
  114. Message: xsql.Message{"f2": "w2"},
  115. }, {
  116. Emitter: "src2",
  117. Message: xsql.Message{"id2": 4, "f2": "w3"},
  118. },
  119. },
  120. },
  121. },
  122. },
  123. result: &xsql.JoinTupleSets{
  124. Content: []xsql.JoinTuple{
  125. {
  126. Tuples: []xsql.Tuple{
  127. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  128. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  129. },
  130. },
  131. {
  132. Tuples: []xsql.Tuple{
  133. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  134. },
  135. },
  136. {
  137. Tuples: []xsql.Tuple{
  138. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  139. },
  140. },
  141. },
  142. },
  143. },
  144. { // 2
  145. sql: "SELECT id1 FROM src1 left join src2 on src1.ts = src2.ts",
  146. data: xsql.WindowTuplesSet{
  147. Content: []xsql.WindowTuples{
  148. {
  149. Emitter: "src1",
  150. Tuples: []xsql.Tuple{
  151. {
  152. Emitter: "src1",
  153. Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)},
  154. }, {
  155. Emitter: "src1",
  156. Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854525000)},
  157. }, {
  158. Emitter: "src1",
  159. Message: xsql.Message{"id1": 3, "f1": "v3", "ts": cast.TimeFromUnixMilli(1568854535000)},
  160. },
  161. },
  162. },
  163. {
  164. Emitter: "src2",
  165. Tuples: []xsql.Tuple{
  166. {
  167. Emitter: "src2",
  168. Message: xsql.Message{"id2": 1, "f2": "w1", "ts": cast.TimeFromUnixMilli(1568854515000)},
  169. }, {
  170. Emitter: "src2",
  171. Message: xsql.Message{"id2": 2, "f2": "w2", "ts": cast.TimeFromUnixMilli(1568854525000)},
  172. }, {
  173. Emitter: "src2",
  174. Message: xsql.Message{"id2": 4, "f2": "w3", "ts": cast.TimeFromUnixMilli(1568854545000)},
  175. },
  176. },
  177. },
  178. },
  179. },
  180. result: &xsql.JoinTupleSets{
  181. Content: []xsql.JoinTuple{
  182. {
  183. Tuples: []xsql.Tuple{
  184. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)}},
  185. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1", "ts": cast.TimeFromUnixMilli(1568854515000)}},
  186. },
  187. },
  188. {
  189. Tuples: []xsql.Tuple{
  190. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854525000)}},
  191. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2", "ts": cast.TimeFromUnixMilli(1568854525000)}},
  192. },
  193. },
  194. {
  195. Tuples: []xsql.Tuple{
  196. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3", "ts": cast.TimeFromUnixMilli(1568854535000)}},
  197. },
  198. },
  199. },
  200. },
  201. },
  202. { // 3
  203. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  204. data: xsql.WindowTuplesSet{
  205. Content: []xsql.WindowTuples{
  206. {
  207. Emitter: "src1",
  208. Tuples: []xsql.Tuple{
  209. {
  210. Emitter: "src1",
  211. Message: xsql.Message{"id1": 1, "f1": "v1"},
  212. }, {
  213. Emitter: "src1",
  214. Message: xsql.Message{"id1": 2, "f1": "v2"},
  215. }, {
  216. Emitter: "src1",
  217. Message: xsql.Message{"id1": 3, "f1": "v3"},
  218. },
  219. },
  220. },
  221. {
  222. Emitter: "src2",
  223. Tuples: []xsql.Tuple{
  224. {
  225. Emitter: "src2",
  226. Message: xsql.Message{"id2": 4, "f2": "w1"},
  227. }, {
  228. Emitter: "src2",
  229. Message: xsql.Message{"id2": 5, "f2": "w2"},
  230. }, {
  231. Emitter: "src2",
  232. Message: xsql.Message{"id2": 6, "f2": "w3"},
  233. },
  234. },
  235. },
  236. },
  237. },
  238. result: nil,
  239. },
  240. { // 4
  241. sql: "SELECT id1 FROM src1 As s1 left join src2 as s2 on s1.id1 = s2.id2",
  242. data: xsql.WindowTuplesSet{
  243. Content: []xsql.WindowTuples{
  244. {
  245. Emitter: "s1",
  246. Tuples: []xsql.Tuple{
  247. {
  248. Emitter: "s1",
  249. Message: xsql.Message{"id1": 1, "f1": "v1"},
  250. }, {
  251. Emitter: "s1",
  252. Message: xsql.Message{"id1": 2, "f1": "v2"},
  253. }, {
  254. Emitter: "s1",
  255. Message: xsql.Message{"id1": 3, "f1": "v3"},
  256. },
  257. },
  258. },
  259. {
  260. Emitter: "s2",
  261. Tuples: []xsql.Tuple{
  262. {
  263. Emitter: "s2",
  264. Message: xsql.Message{"id2": 1, "f2": "w1"},
  265. }, {
  266. Emitter: "s2",
  267. Message: xsql.Message{"id2": 2, "f2": "w2"},
  268. }, {
  269. Emitter: "s2",
  270. Message: xsql.Message{"id2": 4, "f2": "w3"},
  271. },
  272. },
  273. },
  274. },
  275. },
  276. result: &xsql.JoinTupleSets{
  277. Content: []xsql.JoinTuple{
  278. {
  279. Tuples: []xsql.Tuple{
  280. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  281. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  282. },
  283. },
  284. {
  285. Tuples: []xsql.Tuple{
  286. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  287. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  288. },
  289. },
  290. {
  291. Tuples: []xsql.Tuple{
  292. {Emitter: "s1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  293. },
  294. },
  295. },
  296. },
  297. },
  298. { // 5
  299. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  300. data: xsql.WindowTuplesSet{
  301. Content: []xsql.WindowTuples{
  302. {
  303. Emitter: "src1",
  304. Tuples: []xsql.Tuple{
  305. {
  306. Emitter: "src1",
  307. Message: xsql.Message{"id1": 1, "f1": "v1"},
  308. },
  309. },
  310. },
  311. {
  312. Emitter: "src2",
  313. Tuples: []xsql.Tuple{
  314. {
  315. Emitter: "src2",
  316. Message: xsql.Message{"id2": 1, "f2": "w1"},
  317. }, {
  318. Emitter: "src2",
  319. Message: xsql.Message{"id2": 1, "f2": "w2"},
  320. },
  321. },
  322. },
  323. },
  324. },
  325. result: &xsql.JoinTupleSets{
  326. Content: []xsql.JoinTuple{
  327. {
  328. Tuples: []xsql.Tuple{
  329. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  330. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  331. },
  332. },
  333. {
  334. Tuples: []xsql.Tuple{
  335. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}}, {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  336. },
  337. },
  338. },
  339. },
  340. },
  341. { // 6
  342. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  343. data: xsql.WindowTuplesSet{
  344. Content: []xsql.WindowTuples{
  345. {
  346. Emitter: "src1",
  347. Tuples: []xsql.Tuple{
  348. {
  349. Emitter: "src1",
  350. Message: xsql.Message{"id1": 1, "f1": "v1"},
  351. }, {
  352. Emitter: "src1",
  353. Message: xsql.Message{"id1": 2, "f1": "v2"},
  354. }, {
  355. Emitter: "src1",
  356. Message: xsql.Message{"id1": 3, "f1": "v3"},
  357. },
  358. },
  359. },
  360. {
  361. Emitter: "src2",
  362. Tuples: []xsql.Tuple{},
  363. },
  364. },
  365. },
  366. result: &xsql.JoinTupleSets{
  367. Content: []xsql.JoinTuple{
  368. {
  369. Tuples: []xsql.Tuple{
  370. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  371. },
  372. },
  373. {
  374. Tuples: []xsql.Tuple{
  375. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  376. },
  377. },
  378. {
  379. Tuples: []xsql.Tuple{
  380. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  381. },
  382. },
  383. },
  384. },
  385. },
  386. {
  387. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  388. data: xsql.WindowTuplesSet{
  389. Content: []xsql.WindowTuples{
  390. {
  391. Emitter: "src1",
  392. Tuples: []xsql.Tuple{
  393. {
  394. Emitter: "src1",
  395. Message: xsql.Message{"id1": 1, "f1": "v1"},
  396. }, {
  397. Emitter: "src1",
  398. Message: xsql.Message{"id1": 2, "f1": "v2"},
  399. }, {
  400. Emitter: "src1",
  401. Message: xsql.Message{"id1": 3, "f1": "v3"},
  402. },
  403. },
  404. },
  405. {
  406. Emitter: "src2",
  407. Tuples: nil,
  408. },
  409. },
  410. },
  411. result: &xsql.JoinTupleSets{
  412. Content: []xsql.JoinTuple{
  413. {
  414. Tuples: []xsql.Tuple{
  415. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  416. },
  417. },
  418. {
  419. Tuples: []xsql.Tuple{
  420. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  421. },
  422. },
  423. {
  424. Tuples: []xsql.Tuple{
  425. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  426. },
  427. },
  428. },
  429. },
  430. },
  431. {
  432. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  433. data: xsql.WindowTuplesSet{
  434. Content: []xsql.WindowTuples{
  435. {
  436. Emitter: "src1",
  437. Tuples: []xsql.Tuple{},
  438. },
  439. {
  440. Emitter: "src2",
  441. Tuples: []xsql.Tuple{
  442. {
  443. Emitter: "src2",
  444. Message: xsql.Message{"id2": 1, "f2": "w1"},
  445. }, {
  446. Emitter: "src2",
  447. Message: xsql.Message{"id2": 1, "f2": "w2"},
  448. },
  449. },
  450. },
  451. },
  452. },
  453. result: nil,
  454. },
  455. {
  456. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  457. data: xsql.WindowTuplesSet{
  458. Content: []xsql.WindowTuples{
  459. {
  460. Emitter: "src1",
  461. Tuples: nil,
  462. },
  463. {
  464. Emitter: "src2",
  465. Tuples: []xsql.Tuple{
  466. {
  467. Emitter: "src2",
  468. Message: xsql.Message{"id2": 1, "f2": "w1"},
  469. }, {
  470. Emitter: "src2",
  471. Message: xsql.Message{"id2": 1, "f2": "w2"},
  472. },
  473. },
  474. },
  475. },
  476. },
  477. result: nil,
  478. },
  479. {
  480. sql: "SELECT id1 FROM src1 left join src2 on src1.id1*2 = src2.id2",
  481. data: xsql.WindowTuplesSet{
  482. Content: []xsql.WindowTuples{
  483. {
  484. Emitter: "src1",
  485. Tuples: []xsql.Tuple{
  486. {
  487. Emitter: "src1",
  488. Message: xsql.Message{"id1": 1, "f1": "v1"},
  489. }, {
  490. Emitter: "src1",
  491. Message: xsql.Message{"id1": 2, "f1": "v2"},
  492. }, {
  493. Emitter: "src1",
  494. Message: xsql.Message{"id1": 3, "f1": "v3"},
  495. },
  496. },
  497. },
  498. {
  499. Emitter: "src2",
  500. Tuples: []xsql.Tuple{
  501. {
  502. Emitter: "src2",
  503. Message: xsql.Message{"id2": 1, "f2": "w1"},
  504. }, {
  505. Emitter: "src2",
  506. Message: xsql.Message{"id2": 2, "f2": "w2"},
  507. }, {
  508. Emitter: "src2",
  509. Message: xsql.Message{"id2": 4, "f2": "w3"},
  510. },
  511. },
  512. },
  513. },
  514. },
  515. result: &xsql.JoinTupleSets{
  516. Content: []xsql.JoinTuple{
  517. {
  518. Tuples: []xsql.Tuple{
  519. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  520. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  521. },
  522. },
  523. {
  524. Tuples: []xsql.Tuple{
  525. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  526. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  527. },
  528. },
  529. {
  530. Tuples: []xsql.Tuple{
  531. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  532. },
  533. },
  534. },
  535. },
  536. },
  537. {
  538. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2*2",
  539. data: xsql.WindowTuplesSet{
  540. Content: []xsql.WindowTuples{
  541. {
  542. Emitter: "src1",
  543. Tuples: []xsql.Tuple{
  544. {
  545. Emitter: "src1",
  546. Message: xsql.Message{"id1": 1, "f1": "v1"},
  547. }, {
  548. Emitter: "src1",
  549. Message: xsql.Message{"id1": 2, "f1": "v2"},
  550. }, {
  551. Emitter: "src1",
  552. Message: xsql.Message{"id1": 3, "f1": "v3"},
  553. },
  554. },
  555. },
  556. {
  557. Emitter: "src2",
  558. Tuples: []xsql.Tuple{
  559. {
  560. Emitter: "src2",
  561. Message: xsql.Message{"id2": 1, "f2": "w1"},
  562. }, {
  563. Emitter: "src2",
  564. Message: xsql.Message{"id2": 2, "f2": "w2"},
  565. }, {
  566. Emitter: "src2",
  567. Message: xsql.Message{"id2": 4, "f2": "w3"},
  568. },
  569. },
  570. },
  571. },
  572. },
  573. result: &xsql.JoinTupleSets{
  574. Content: []xsql.JoinTuple{
  575. {
  576. Tuples: []xsql.Tuple{
  577. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  578. },
  579. },
  580. {
  581. Tuples: []xsql.Tuple{
  582. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  583. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  584. },
  585. },
  586. {
  587. Tuples: []xsql.Tuple{
  588. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  589. },
  590. },
  591. },
  592. },
  593. },
  594. {
  595. sql: "SELECT id1 FROM src1 left join src2 on src1.f1->cid = src2.f2->cid",
  596. data: xsql.WindowTuplesSet{
  597. Content: []xsql.WindowTuples{
  598. {
  599. Emitter: "src1",
  600. Tuples: []xsql.Tuple{
  601. {
  602. Emitter: "src1",
  603. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  604. }, {
  605. Emitter: "src1",
  606. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  607. }, {
  608. Emitter: "src1",
  609. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  610. },
  611. },
  612. },
  613. {
  614. Emitter: "src2",
  615. Tuples: []xsql.Tuple{
  616. {
  617. Emitter: "src2",
  618. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  619. }, {
  620. Emitter: "src2",
  621. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  622. }, {
  623. Emitter: "src2",
  624. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  625. },
  626. },
  627. },
  628. },
  629. },
  630. result: &xsql.JoinTupleSets{
  631. Content: []xsql.JoinTuple{
  632. {
  633. Tuples: []xsql.Tuple{
  634. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  635. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  636. },
  637. },
  638. {
  639. Tuples: []xsql.Tuple{
  640. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  641. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  642. },
  643. },
  644. {
  645. Tuples: []xsql.Tuple{
  646. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)}},
  647. },
  648. },
  649. },
  650. },
  651. },
  652. {
  653. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 left join src2 on src1.id1 = src2.id2",
  654. data: xsql.WindowTuplesSet{
  655. Content: []xsql.WindowTuples{
  656. {
  657. Emitter: "src1",
  658. Tuples: []xsql.Tuple{
  659. {
  660. Emitter: "src1",
  661. Message: xsql.Message{"id1": 1, "f1": "v1"},
  662. Metadata: xsql.Metadata{"topic": "devices/type1/device001"},
  663. },
  664. },
  665. },
  666. {
  667. Emitter: "src2",
  668. Tuples: []xsql.Tuple{
  669. {
  670. Emitter: "src2",
  671. Message: xsql.Message{"id2": 1, "f2": "w1"},
  672. Metadata: xsql.Metadata{"topic": "devices/type2/device001"},
  673. },
  674. },
  675. },
  676. },
  677. },
  678. result: &xsql.JoinTupleSets{
  679. Content: []xsql.JoinTuple{
  680. {
  681. Tuples: []xsql.Tuple{
  682. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  683. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  684. },
  685. },
  686. },
  687. },
  688. },
  689. {
  690. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  691. data: xsql.WindowTuplesSet{
  692. Content: []xsql.WindowTuples{
  693. {
  694. Emitter: "src1",
  695. Tuples: []xsql.Tuple{
  696. {
  697. Emitter: "src1",
  698. Message: xsql.Message{"id1": 1, "f1": "v1"},
  699. }, {
  700. Emitter: "src1",
  701. Message: xsql.Message{"id1": 1, "f1": "v2"},
  702. }, {
  703. Emitter: "src1",
  704. Message: xsql.Message{"id1": 3, "f1": "v3"},
  705. }, {
  706. Emitter: "src1",
  707. Message: xsql.Message{"id1": 3, "f1": "v4"},
  708. }, {
  709. Emitter: "src1",
  710. Message: xsql.Message{"id1": 4, "f1": "v5"},
  711. },
  712. },
  713. },
  714. {
  715. Emitter: "src2",
  716. Tuples: []xsql.Tuple{
  717. {
  718. Emitter: "src2",
  719. Message: xsql.Message{"id2": 1, "f2": "w1"},
  720. }, {
  721. Emitter: "src2",
  722. Message: xsql.Message{"id2": 3, "f2": "w2"},
  723. }, {
  724. Emitter: "src2",
  725. Message: xsql.Message{"id2": 3, "f2": "w3"},
  726. },
  727. },
  728. },
  729. },
  730. },
  731. result: &xsql.JoinTupleSets{
  732. Content: []xsql.JoinTuple{
  733. {
  734. Tuples: []xsql.Tuple{
  735. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  736. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  737. },
  738. },
  739. {
  740. Tuples: []xsql.Tuple{
  741. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v2"}},
  742. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  743. },
  744. },
  745. {
  746. Tuples: []xsql.Tuple{
  747. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  748. {Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w2"}},
  749. },
  750. },
  751. {
  752. Tuples: []xsql.Tuple{
  753. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  754. {Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w3"}},
  755. },
  756. },
  757. {
  758. Tuples: []xsql.Tuple{
  759. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v4"}},
  760. {Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w2"}},
  761. },
  762. },
  763. {
  764. Tuples: []xsql.Tuple{
  765. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v4"}},
  766. {Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w3"}},
  767. },
  768. },
  769. {
  770. Tuples: []xsql.Tuple{
  771. {Emitter: "src1", Message: xsql.Message{"id1": 4, "f1": "v5"}},
  772. },
  773. },
  774. },
  775. },
  776. },
  777. {
  778. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  779. data: xsql.WindowTuplesSet{
  780. Content: []xsql.WindowTuples{
  781. {
  782. Emitter: "src1",
  783. Tuples: []xsql.Tuple{
  784. {
  785. Emitter: "src1",
  786. Message: xsql.Message{"id1": 1, "f1": "v1"},
  787. }, {
  788. Emitter: "src1",
  789. Message: xsql.Message{"id1": 2, "f1": "v2"},
  790. }, {
  791. Emitter: "src1",
  792. Message: xsql.Message{"id1": 3, "f1": "v3"},
  793. },
  794. },
  795. },
  796. {
  797. Emitter: "src2",
  798. Tuples: []xsql.Tuple{
  799. {
  800. Emitter: "src2",
  801. Message: xsql.Message{"id2": 1, "f2": "w1"},
  802. }, {
  803. Emitter: "src2",
  804. Message: xsql.Message{"id2": 2, "f2": "w2"},
  805. }, {
  806. Emitter: "src2",
  807. Message: xsql.Message{"id2": 2, "f2": "w3"},
  808. },
  809. },
  810. },
  811. },
  812. },
  813. result: &xsql.JoinTupleSets{
  814. Content: []xsql.JoinTuple{
  815. {
  816. Tuples: []xsql.Tuple{
  817. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  818. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  819. },
  820. },
  821. {
  822. Tuples: []xsql.Tuple{
  823. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  824. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  825. },
  826. },
  827. {
  828. Tuples: []xsql.Tuple{
  829. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  830. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w3"}},
  831. },
  832. },
  833. {
  834. Tuples: []xsql.Tuple{
  835. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  836. },
  837. },
  838. },
  839. },
  840. },
  841. }
  842. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  843. contextLogger := conf.Log.WithField("rule", "TestLeftJoinPlan_Apply")
  844. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  845. for i, tt := range tests {
  846. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  847. if err != nil {
  848. t.Errorf("statement parse error %s", err)
  849. break
  850. }
  851. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  852. t.Errorf("statement source is not a table")
  853. } else {
  854. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  855. pp := &JoinOp{Joins: stmt.Joins, From: table}
  856. result := pp.Apply(ctx, tt.data, fv, afv)
  857. if !reflect.DeepEqual(tt.result, result) {
  858. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  859. }
  860. }
  861. }
  862. }
  863. func TestInnerJoinPlan_Apply(t *testing.T) {
  864. var tests = []struct {
  865. sql string
  866. data xsql.WindowTuplesSet
  867. result interface{}
  868. }{
  869. {
  870. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  871. data: xsql.WindowTuplesSet{
  872. Content: []xsql.WindowTuples{
  873. {
  874. Emitter: "src1",
  875. Tuples: []xsql.Tuple{
  876. {
  877. Emitter: "src1",
  878. Message: xsql.Message{"id1": 1, "f1": "v1"},
  879. }, {
  880. Emitter: "src1",
  881. Message: xsql.Message{"id1": 2, "f1": "v2"},
  882. }, {
  883. Emitter: "src1",
  884. Message: xsql.Message{"id1": 3, "f1": "v3"},
  885. },
  886. },
  887. },
  888. {
  889. Emitter: "src2",
  890. Tuples: []xsql.Tuple{
  891. {
  892. Emitter: "src2",
  893. Message: xsql.Message{"id2": 1, "f2": "w1"},
  894. }, {
  895. Emitter: "src2",
  896. Message: xsql.Message{"id2": 2, "f2": "w2"},
  897. }, {
  898. Emitter: "src2",
  899. Message: xsql.Message{"id2": 4, "f2": "w3"},
  900. },
  901. },
  902. },
  903. },
  904. },
  905. result: &xsql.JoinTupleSets{
  906. Content: []xsql.JoinTuple{
  907. {
  908. Tuples: []xsql.Tuple{
  909. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  910. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  911. },
  912. },
  913. {
  914. Tuples: []xsql.Tuple{
  915. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  916. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  917. },
  918. },
  919. },
  920. },
  921. },
  922. {
  923. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  924. data: xsql.WindowTuplesSet{
  925. Content: []xsql.WindowTuples{
  926. {
  927. Emitter: "src1",
  928. Tuples: []xsql.Tuple{
  929. {
  930. Emitter: "src1",
  931. Message: xsql.Message{"id1": 1, "f1": "v1"},
  932. }, {
  933. Emitter: "src1",
  934. Message: xsql.Message{"id1": 2, "f1": "v2"},
  935. }, {
  936. Emitter: "src1",
  937. Message: xsql.Message{"id1": 3, "f1": "v3"},
  938. },
  939. },
  940. },
  941. {
  942. Emitter: "src2",
  943. Tuples: []xsql.Tuple{
  944. {
  945. Emitter: "src2",
  946. Message: xsql.Message{"id2": 1, "f2": "w1"},
  947. }, {
  948. Emitter: "src2",
  949. Message: xsql.Message{"f2": "w2"},
  950. }, {
  951. Emitter: "src2",
  952. Message: xsql.Message{"id2": 4, "f2": "w3"},
  953. },
  954. },
  955. },
  956. },
  957. },
  958. result: &xsql.JoinTupleSets{
  959. Content: []xsql.JoinTuple{
  960. {
  961. Tuples: []xsql.Tuple{
  962. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  963. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  964. },
  965. },
  966. },
  967. },
  968. },
  969. {
  970. sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
  971. data: xsql.WindowTuplesSet{
  972. Content: []xsql.WindowTuples{
  973. {
  974. Emitter: "s1",
  975. Tuples: []xsql.Tuple{
  976. {
  977. Emitter: "s1",
  978. Message: xsql.Message{"id1": 1, "f1": "v1"},
  979. }, {
  980. Emitter: "s1",
  981. Message: xsql.Message{"id1": 2, "f1": "v2"},
  982. }, {
  983. Emitter: "s1",
  984. Message: xsql.Message{"id1": 3, "f1": "v3"},
  985. },
  986. },
  987. },
  988. {
  989. Emitter: "s2",
  990. Tuples: []xsql.Tuple{
  991. {
  992. Emitter: "s2",
  993. Message: xsql.Message{"id2": 1, "f2": "w1"},
  994. }, {
  995. Emitter: "s2",
  996. Message: xsql.Message{"id2": 2, "f2": "w2"},
  997. }, {
  998. Emitter: "s2",
  999. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1000. },
  1001. },
  1002. },
  1003. },
  1004. },
  1005. result: &xsql.JoinTupleSets{
  1006. Content: []xsql.JoinTuple{
  1007. {
  1008. Tuples: []xsql.Tuple{
  1009. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1010. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1011. },
  1012. },
  1013. {
  1014. Tuples: []xsql.Tuple{
  1015. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1016. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1017. },
  1018. },
  1019. },
  1020. },
  1021. },
  1022. {
  1023. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1024. data: xsql.WindowTuplesSet{
  1025. Content: []xsql.WindowTuples{
  1026. {
  1027. Emitter: "src1",
  1028. Tuples: []xsql.Tuple{
  1029. {
  1030. Emitter: "src1",
  1031. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1032. },
  1033. },
  1034. },
  1035. {
  1036. Emitter: "src2",
  1037. Tuples: []xsql.Tuple{
  1038. {
  1039. Emitter: "src2",
  1040. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1041. }, {
  1042. Emitter: "src2",
  1043. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1044. },
  1045. },
  1046. },
  1047. },
  1048. },
  1049. result: &xsql.JoinTupleSets{
  1050. Content: []xsql.JoinTuple{
  1051. {
  1052. Tuples: []xsql.Tuple{
  1053. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1054. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1055. },
  1056. },
  1057. {
  1058. Tuples: []xsql.Tuple{
  1059. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1060. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1061. },
  1062. },
  1063. },
  1064. },
  1065. },
  1066. {
  1067. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1068. data: xsql.WindowTuplesSet{
  1069. Content: []xsql.WindowTuples{
  1070. {
  1071. Emitter: "src1",
  1072. Tuples: []xsql.Tuple{
  1073. {
  1074. Emitter: "src1",
  1075. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1076. }, {
  1077. Emitter: "src1",
  1078. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1079. }, {
  1080. Emitter: "src1",
  1081. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1082. },
  1083. },
  1084. },
  1085. {
  1086. Emitter: "src2",
  1087. Tuples: []xsql.Tuple{},
  1088. },
  1089. },
  1090. },
  1091. result: nil,
  1092. },
  1093. {
  1094. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1095. data: xsql.WindowTuplesSet{
  1096. Content: []xsql.WindowTuples{
  1097. {
  1098. Emitter: "src1",
  1099. Tuples: []xsql.Tuple{
  1100. {
  1101. Emitter: "src1",
  1102. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1103. }, {
  1104. Emitter: "src1",
  1105. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1106. }, {
  1107. Emitter: "src1",
  1108. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1109. },
  1110. },
  1111. },
  1112. {
  1113. Emitter: "src2",
  1114. Tuples: nil,
  1115. },
  1116. },
  1117. },
  1118. result: nil,
  1119. },
  1120. {
  1121. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1122. data: xsql.WindowTuplesSet{
  1123. Content: []xsql.WindowTuples{
  1124. {
  1125. Emitter: "src1",
  1126. Tuples: []xsql.Tuple{},
  1127. },
  1128. {
  1129. Emitter: "src2",
  1130. Tuples: []xsql.Tuple{
  1131. {
  1132. Emitter: "src2",
  1133. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1134. }, {
  1135. Emitter: "src2",
  1136. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1137. },
  1138. },
  1139. },
  1140. },
  1141. },
  1142. result: nil,
  1143. },
  1144. {
  1145. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  1146. data: xsql.WindowTuplesSet{
  1147. Content: []xsql.WindowTuples{
  1148. {
  1149. Emitter: "src1",
  1150. Tuples: nil,
  1151. },
  1152. {
  1153. Emitter: "src2",
  1154. Tuples: []xsql.Tuple{
  1155. {
  1156. Emitter: "src2",
  1157. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1158. }, {
  1159. Emitter: "src2",
  1160. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1161. },
  1162. },
  1163. },
  1164. },
  1165. },
  1166. result: nil,
  1167. },
  1168. {
  1169. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1*2 = src2.id2",
  1170. data: xsql.WindowTuplesSet{
  1171. Content: []xsql.WindowTuples{
  1172. {
  1173. Emitter: "src1",
  1174. Tuples: []xsql.Tuple{
  1175. {
  1176. Emitter: "src1",
  1177. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1178. }, {
  1179. Emitter: "src1",
  1180. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1181. }, {
  1182. Emitter: "src1",
  1183. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1184. },
  1185. },
  1186. },
  1187. {
  1188. Emitter: "src2",
  1189. Tuples: []xsql.Tuple{
  1190. {
  1191. Emitter: "src2",
  1192. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1193. }, {
  1194. Emitter: "src2",
  1195. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1196. }, {
  1197. Emitter: "src2",
  1198. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1199. },
  1200. },
  1201. },
  1202. },
  1203. },
  1204. result: &xsql.JoinTupleSets{
  1205. Content: []xsql.JoinTuple{
  1206. {
  1207. Tuples: []xsql.Tuple{
  1208. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1209. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1210. },
  1211. },
  1212. {
  1213. Tuples: []xsql.Tuple{
  1214. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1215. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1216. },
  1217. },
  1218. },
  1219. },
  1220. },
  1221. {
  1222. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2*2",
  1223. data: xsql.WindowTuplesSet{
  1224. Content: []xsql.WindowTuples{
  1225. {
  1226. Emitter: "src1",
  1227. Tuples: []xsql.Tuple{
  1228. {
  1229. Emitter: "src1",
  1230. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1231. }, {
  1232. Emitter: "src1",
  1233. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1234. }, {
  1235. Emitter: "src1",
  1236. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1237. },
  1238. },
  1239. },
  1240. {
  1241. Emitter: "src2",
  1242. Tuples: []xsql.Tuple{
  1243. {
  1244. Emitter: "src2",
  1245. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1246. }, {
  1247. Emitter: "src2",
  1248. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1249. }, {
  1250. Emitter: "src2",
  1251. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1252. },
  1253. },
  1254. },
  1255. },
  1256. },
  1257. result: &xsql.JoinTupleSets{
  1258. Content: []xsql.JoinTuple{
  1259. {
  1260. Tuples: []xsql.Tuple{
  1261. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1262. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1263. },
  1264. },
  1265. },
  1266. },
  1267. },
  1268. {
  1269. sql: "SELECT id1 FROM src1 inner join src2 on src1.f1->cid = src2.f2->cid",
  1270. data: xsql.WindowTuplesSet{
  1271. Content: []xsql.WindowTuples{
  1272. {
  1273. Emitter: "src1",
  1274. Tuples: []xsql.Tuple{
  1275. {
  1276. Emitter: "src1",
  1277. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  1278. }, {
  1279. Emitter: "src1",
  1280. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  1281. }, {
  1282. Emitter: "src1",
  1283. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  1284. },
  1285. },
  1286. },
  1287. {
  1288. Emitter: "src2",
  1289. Tuples: []xsql.Tuple{
  1290. {
  1291. Emitter: "src2",
  1292. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  1293. }, {
  1294. Emitter: "src2",
  1295. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  1296. }, {
  1297. Emitter: "src2",
  1298. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  1299. },
  1300. },
  1301. },
  1302. },
  1303. },
  1304. result: &xsql.JoinTupleSets{
  1305. Content: []xsql.JoinTuple{
  1306. {
  1307. Tuples: []xsql.Tuple{
  1308. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  1309. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  1310. },
  1311. },
  1312. {
  1313. Tuples: []xsql.Tuple{
  1314. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  1315. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  1316. },
  1317. },
  1318. },
  1319. },
  1320. },
  1321. {
  1322. sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
  1323. data: xsql.WindowTuplesSet{
  1324. Content: []xsql.WindowTuples{
  1325. {
  1326. Emitter: "s1",
  1327. Tuples: []xsql.Tuple{
  1328. {
  1329. Emitter: "s1",
  1330. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1331. }, {
  1332. Emitter: "s1",
  1333. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1334. }, {
  1335. Emitter: "s1",
  1336. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1337. },
  1338. },
  1339. },
  1340. {
  1341. Emitter: "s2",
  1342. Tuples: []xsql.Tuple{
  1343. {
  1344. Emitter: "s2",
  1345. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1346. }, {
  1347. Emitter: "s2",
  1348. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1349. }, {
  1350. Emitter: "s2",
  1351. Message: xsql.Message{"id2": 2, "f2": "w3"},
  1352. },
  1353. },
  1354. },
  1355. },
  1356. },
  1357. result: &xsql.JoinTupleSets{
  1358. Content: []xsql.JoinTuple{
  1359. {
  1360. Tuples: []xsql.Tuple{
  1361. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1362. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1363. },
  1364. },
  1365. {
  1366. Tuples: []xsql.Tuple{
  1367. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1368. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1369. },
  1370. },
  1371. {
  1372. Tuples: []xsql.Tuple{
  1373. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1374. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w3"}},
  1375. },
  1376. },
  1377. },
  1378. },
  1379. },
  1380. }
  1381. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1382. contextLogger := conf.Log.WithField("rule", "TestInnerJoinPlan_Apply")
  1383. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1384. for i, tt := range tests {
  1385. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1386. if err != nil {
  1387. t.Errorf("statement parse error %s", err)
  1388. break
  1389. }
  1390. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  1391. t.Errorf("statement source is not a table")
  1392. } else {
  1393. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1394. pp := &JoinOp{Joins: stmt.Joins, From: table}
  1395. result := pp.Apply(ctx, tt.data, fv, afv)
  1396. if !reflect.DeepEqual(tt.result, result) {
  1397. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1398. }
  1399. }
  1400. }
  1401. }
  1402. func TestRightJoinPlan_Apply(t *testing.T) {
  1403. var tests = []struct {
  1404. sql string
  1405. data xsql.WindowTuplesSet
  1406. result interface{}
  1407. }{
  1408. {
  1409. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1410. data: xsql.WindowTuplesSet{
  1411. Content: []xsql.WindowTuples{
  1412. {
  1413. Emitter: "src1",
  1414. Tuples: []xsql.Tuple{
  1415. {
  1416. Emitter: "src1",
  1417. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1418. }, {
  1419. Emitter: "src1",
  1420. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1421. }, {
  1422. Emitter: "src1",
  1423. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1424. },
  1425. },
  1426. },
  1427. {
  1428. Emitter: "src2",
  1429. Tuples: []xsql.Tuple{
  1430. {
  1431. Emitter: "src2",
  1432. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1433. }, {
  1434. Emitter: "src2",
  1435. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1436. }, {
  1437. Emitter: "src2",
  1438. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1439. },
  1440. },
  1441. },
  1442. },
  1443. },
  1444. result: &xsql.JoinTupleSets{
  1445. Content: []xsql.JoinTuple{
  1446. {
  1447. Tuples: []xsql.Tuple{
  1448. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1449. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1450. },
  1451. },
  1452. {
  1453. Tuples: []xsql.Tuple{
  1454. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1455. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1456. },
  1457. },
  1458. {
  1459. Tuples: []xsql.Tuple{
  1460. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1461. },
  1462. },
  1463. },
  1464. },
  1465. },
  1466. {
  1467. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1468. data: xsql.WindowTuplesSet{
  1469. Content: []xsql.WindowTuples{
  1470. {
  1471. Emitter: "src1",
  1472. Tuples: []xsql.Tuple{
  1473. {
  1474. Emitter: "src1",
  1475. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1476. }, {
  1477. Emitter: "src1",
  1478. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1479. }, {
  1480. Emitter: "src1",
  1481. Message: xsql.Message{"id1": 1, "f1": "v3"},
  1482. },
  1483. },
  1484. },
  1485. {
  1486. Emitter: "src2",
  1487. Tuples: []xsql.Tuple{
  1488. {
  1489. Emitter: "src2",
  1490. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1491. }, {
  1492. Emitter: "src2",
  1493. Message: xsql.Message{"f2": "w2"},
  1494. }, {
  1495. Emitter: "src2",
  1496. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1497. },
  1498. },
  1499. },
  1500. },
  1501. },
  1502. result: &xsql.JoinTupleSets{
  1503. Content: []xsql.JoinTuple{
  1504. {
  1505. Tuples: []xsql.Tuple{
  1506. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1507. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1508. },
  1509. },
  1510. {
  1511. Tuples: []xsql.Tuple{
  1512. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1513. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v3"}},
  1514. },
  1515. },
  1516. {
  1517. Tuples: []xsql.Tuple{
  1518. {Emitter: "src2", Message: xsql.Message{"f2": "w2"}},
  1519. },
  1520. },
  1521. {
  1522. Tuples: []xsql.Tuple{
  1523. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1524. },
  1525. },
  1526. },
  1527. },
  1528. },
  1529. {
  1530. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1531. data: xsql.WindowTuplesSet{
  1532. Content: []xsql.WindowTuples{
  1533. {
  1534. Emitter: "src1",
  1535. Tuples: []xsql.Tuple{
  1536. {
  1537. Emitter: "src1",
  1538. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1539. },
  1540. },
  1541. },
  1542. {
  1543. Emitter: "src2",
  1544. Tuples: []xsql.Tuple{
  1545. {
  1546. Emitter: "src2",
  1547. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1548. }, {
  1549. Emitter: "src2",
  1550. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1551. },
  1552. },
  1553. },
  1554. },
  1555. },
  1556. result: &xsql.JoinTupleSets{
  1557. Content: []xsql.JoinTuple{
  1558. {
  1559. Tuples: []xsql.Tuple{
  1560. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1561. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1562. },
  1563. },
  1564. {
  1565. Tuples: []xsql.Tuple{
  1566. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1567. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1568. },
  1569. },
  1570. },
  1571. },
  1572. },
  1573. }
  1574. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1575. contextLogger := conf.Log.WithField("rule", "TestRightJoinPlan_Apply")
  1576. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1577. for i, tt := range tests {
  1578. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1579. if err != nil {
  1580. t.Errorf("statement parse error %s", err)
  1581. break
  1582. }
  1583. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  1584. t.Errorf("statement source is not a table")
  1585. } else {
  1586. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1587. pp := &JoinOp{Joins: stmt.Joins, From: table}
  1588. result := pp.Apply(ctx, tt.data, fv, afv)
  1589. if !reflect.DeepEqual(tt.result, result) {
  1590. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1591. }
  1592. }
  1593. }
  1594. }
  1595. func TestFullJoinPlan_Apply(t *testing.T) {
  1596. var tests = []struct {
  1597. sql string
  1598. data xsql.WindowTuplesSet
  1599. result interface{}
  1600. }{
  1601. {
  1602. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1603. data: xsql.WindowTuplesSet{
  1604. Content: []xsql.WindowTuples{
  1605. {
  1606. Emitter: "src1",
  1607. Tuples: []xsql.Tuple{
  1608. {
  1609. Emitter: "src1",
  1610. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1611. }, {
  1612. Emitter: "src1",
  1613. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1614. }, {
  1615. Emitter: "src1",
  1616. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1617. },
  1618. },
  1619. },
  1620. {
  1621. Emitter: "src2",
  1622. Tuples: []xsql.Tuple{
  1623. {
  1624. Emitter: "src2",
  1625. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1626. }, {
  1627. Emitter: "src2",
  1628. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1629. }, {
  1630. Emitter: "src2",
  1631. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1632. }, {
  1633. Emitter: "src2",
  1634. Message: xsql.Message{"id2": 2, "f2": "w4"},
  1635. },
  1636. },
  1637. },
  1638. },
  1639. },
  1640. result: &xsql.JoinTupleSets{
  1641. Content: []xsql.JoinTuple{
  1642. {
  1643. Tuples: []xsql.Tuple{
  1644. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1645. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1646. },
  1647. },
  1648. {
  1649. Tuples: []xsql.Tuple{
  1650. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1651. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1652. },
  1653. },
  1654. {
  1655. Tuples: []xsql.Tuple{
  1656. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}}, {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w4"}},
  1657. },
  1658. },
  1659. {
  1660. Tuples: []xsql.Tuple{
  1661. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1662. },
  1663. },
  1664. {
  1665. Tuples: []xsql.Tuple{
  1666. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1667. },
  1668. },
  1669. },
  1670. },
  1671. },
  1672. {
  1673. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1674. data: xsql.WindowTuplesSet{
  1675. Content: []xsql.WindowTuples{
  1676. {
  1677. Emitter: "src1",
  1678. Tuples: []xsql.Tuple{
  1679. {
  1680. Emitter: "src1",
  1681. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1682. }, {
  1683. Emitter: "src1",
  1684. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1685. }, {
  1686. Emitter: "src1",
  1687. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1688. },
  1689. },
  1690. },
  1691. {
  1692. Emitter: "src2",
  1693. Tuples: []xsql.Tuple{
  1694. {
  1695. Emitter: "src2",
  1696. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1697. }, {
  1698. Emitter: "src2",
  1699. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1700. }, {
  1701. Emitter: "src2",
  1702. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1703. },
  1704. },
  1705. },
  1706. },
  1707. },
  1708. result: &xsql.JoinTupleSets{
  1709. Content: []xsql.JoinTuple{
  1710. {
  1711. Tuples: []xsql.Tuple{
  1712. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1713. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1714. },
  1715. },
  1716. {
  1717. Tuples: []xsql.Tuple{
  1718. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1719. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1720. },
  1721. },
  1722. {
  1723. Tuples: []xsql.Tuple{
  1724. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1725. },
  1726. },
  1727. {
  1728. Tuples: []xsql.Tuple{
  1729. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1730. },
  1731. },
  1732. },
  1733. },
  1734. },
  1735. {
  1736. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1737. data: xsql.WindowTuplesSet{
  1738. Content: []xsql.WindowTuples{
  1739. {
  1740. Emitter: "src1",
  1741. Tuples: []xsql.Tuple{
  1742. {
  1743. Emitter: "src1",
  1744. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1745. }, {
  1746. Emitter: "src1",
  1747. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1748. }, {
  1749. Emitter: "src1",
  1750. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1751. },
  1752. },
  1753. },
  1754. {
  1755. Emitter: "src2",
  1756. Tuples: []xsql.Tuple{},
  1757. },
  1758. },
  1759. },
  1760. result: &xsql.JoinTupleSets{
  1761. Content: []xsql.JoinTuple{
  1762. {
  1763. Tuples: []xsql.Tuple{
  1764. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1765. },
  1766. },
  1767. {
  1768. Tuples: []xsql.Tuple{
  1769. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1770. },
  1771. },
  1772. {
  1773. Tuples: []xsql.Tuple{
  1774. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1775. },
  1776. },
  1777. },
  1778. },
  1779. },
  1780. {
  1781. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1782. data: xsql.WindowTuplesSet{
  1783. Content: []xsql.WindowTuples{
  1784. {
  1785. Emitter: "src1",
  1786. Tuples: []xsql.Tuple{},
  1787. },
  1788. {
  1789. Emitter: "src2",
  1790. Tuples: []xsql.Tuple{
  1791. {
  1792. Emitter: "src2",
  1793. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1794. }, {
  1795. Emitter: "src2",
  1796. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1797. }, {
  1798. Emitter: "src2",
  1799. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1800. },
  1801. },
  1802. },
  1803. },
  1804. },
  1805. result: &xsql.JoinTupleSets{
  1806. Content: []xsql.JoinTuple{
  1807. {
  1808. Tuples: []xsql.Tuple{
  1809. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1810. },
  1811. },
  1812. {
  1813. Tuples: []xsql.Tuple{
  1814. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1815. },
  1816. },
  1817. {
  1818. Tuples: []xsql.Tuple{
  1819. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1820. },
  1821. },
  1822. },
  1823. },
  1824. },
  1825. }
  1826. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1827. contextLogger := conf.Log.WithField("rule", "TestFullJoinPlan_Apply")
  1828. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1829. for i, tt := range tests {
  1830. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1831. if err != nil {
  1832. t.Errorf("statement parse error %s", err)
  1833. break
  1834. }
  1835. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  1836. t.Errorf("statement source is not a table")
  1837. } else {
  1838. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1839. pp := &JoinOp{Joins: stmt.Joins, From: table}
  1840. result := pp.Apply(ctx, tt.data, fv, afv)
  1841. if !reflect.DeepEqual(tt.result, result) {
  1842. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1843. }
  1844. }
  1845. }
  1846. }
  1847. func TestCrossJoinPlan_Apply(t *testing.T) {
  1848. var tests = []struct {
  1849. sql string
  1850. data xsql.WindowTuplesSet
  1851. result interface{}
  1852. }{
  1853. {
  1854. sql: "SELECT id1 FROM src1 cross join src2",
  1855. data: xsql.WindowTuplesSet{
  1856. Content: []xsql.WindowTuples{
  1857. {
  1858. Emitter: "src1",
  1859. Tuples: []xsql.Tuple{
  1860. {
  1861. Emitter: "src1",
  1862. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1863. }, {
  1864. Emitter: "src1",
  1865. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1866. }, {
  1867. Emitter: "src1",
  1868. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1869. },
  1870. },
  1871. },
  1872. {
  1873. Emitter: "src2",
  1874. Tuples: []xsql.Tuple{
  1875. {
  1876. Emitter: "src2",
  1877. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1878. }, {
  1879. Emitter: "src2",
  1880. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1881. }, {
  1882. Emitter: "src2",
  1883. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1884. },
  1885. },
  1886. },
  1887. },
  1888. },
  1889. result: &xsql.JoinTupleSets{
  1890. Content: []xsql.JoinTuple{
  1891. {
  1892. Tuples: []xsql.Tuple{
  1893. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1894. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1895. },
  1896. },
  1897. {
  1898. Tuples: []xsql.Tuple{
  1899. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1900. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1901. },
  1902. },
  1903. {
  1904. Tuples: []xsql.Tuple{
  1905. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1906. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1907. },
  1908. },
  1909. {
  1910. Tuples: []xsql.Tuple{
  1911. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1912. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1913. },
  1914. },
  1915. {
  1916. Tuples: []xsql.Tuple{
  1917. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1918. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1919. },
  1920. },
  1921. {
  1922. Tuples: []xsql.Tuple{
  1923. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1924. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1925. },
  1926. },
  1927. {
  1928. Tuples: []xsql.Tuple{
  1929. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1930. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1931. },
  1932. },
  1933. {
  1934. Tuples: []xsql.Tuple{
  1935. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1936. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1937. },
  1938. },
  1939. {
  1940. Tuples: []xsql.Tuple{
  1941. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1942. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1943. },
  1944. },
  1945. },
  1946. },
  1947. },
  1948. {
  1949. sql: "SELECT id1 FROM src1 cross join src2",
  1950. data: xsql.WindowTuplesSet{
  1951. Content: []xsql.WindowTuples{
  1952. {
  1953. Emitter: "src1",
  1954. Tuples: []xsql.Tuple{
  1955. {
  1956. Emitter: "src1",
  1957. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1958. },
  1959. },
  1960. },
  1961. {
  1962. Emitter: "src2",
  1963. Tuples: []xsql.Tuple{
  1964. {
  1965. Emitter: "src2",
  1966. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1967. }, {
  1968. Emitter: "src2",
  1969. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1970. },
  1971. },
  1972. },
  1973. },
  1974. },
  1975. result: &xsql.JoinTupleSets{
  1976. Content: []xsql.JoinTuple{
  1977. {
  1978. Tuples: []xsql.Tuple{
  1979. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1980. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1981. },
  1982. },
  1983. {
  1984. Tuples: []xsql.Tuple{
  1985. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}}, {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1986. },
  1987. },
  1988. },
  1989. },
  1990. },
  1991. }
  1992. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1993. contextLogger := conf.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  1994. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1995. for i, tt := range tests {
  1996. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1997. if err != nil {
  1998. t.Errorf("statement parse error %s", err)
  1999. break
  2000. }
  2001. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  2002. t.Errorf("statement source is not a table")
  2003. } else {
  2004. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  2005. pp := &JoinOp{Joins: stmt.Joins, From: table}
  2006. result := pp.Apply(ctx, tt.data, fv, afv)
  2007. if !reflect.DeepEqual(tt.result, result) {
  2008. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  2009. }
  2010. }
  2011. }
  2012. }
  2013. func TestCrossJoinPlanError(t *testing.T) {
  2014. var tests = []struct {
  2015. sql string
  2016. data interface{}
  2017. result interface{}
  2018. }{
  2019. {
  2020. sql: "SELECT id1 FROM src1 cross join src2",
  2021. data: errors.New("an error from upstream"),
  2022. result: errors.New("an error from upstream"),
  2023. }, {
  2024. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  2025. data: xsql.WindowTuplesSet{
  2026. Content: []xsql.WindowTuples{
  2027. {
  2028. Emitter: "src1",
  2029. Tuples: []xsql.Tuple{
  2030. {
  2031. Emitter: "src1",
  2032. Message: xsql.Message{"id1": 1, "f1": "v1"},
  2033. }, {
  2034. Emitter: "src1",
  2035. Message: xsql.Message{"id1": 2, "f1": "v2"},
  2036. }, {
  2037. Emitter: "src1",
  2038. Message: xsql.Message{"id1": 3, "f1": "v3"},
  2039. },
  2040. },
  2041. },
  2042. {
  2043. Emitter: "src2",
  2044. Tuples: []xsql.Tuple{
  2045. {
  2046. Emitter: "src2",
  2047. Message: xsql.Message{"id2": 1, "f2": "w1"},
  2048. }, {
  2049. Emitter: "src2",
  2050. Message: xsql.Message{"id2": "3", "f2": "w2"},
  2051. }, {
  2052. Emitter: "src2",
  2053. Message: xsql.Message{"id2": 4, "f2": "w3"},
  2054. }, {
  2055. Emitter: "src2",
  2056. Message: xsql.Message{"id2": 2, "f2": "w4"},
  2057. },
  2058. },
  2059. },
  2060. },
  2061. },
  2062. result: errors.New("run Join error: invalid operation int64(1) = string(3)"),
  2063. },
  2064. }
  2065. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2066. contextLogger := conf.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  2067. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2068. for i, tt := range tests {
  2069. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2070. if err != nil {
  2071. t.Errorf("statement parse error %s", err)
  2072. break
  2073. }
  2074. if table, ok := stmt.Sources[0].(*ast.Table); !ok {
  2075. t.Errorf("statement source is not a table")
  2076. } else {
  2077. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  2078. pp := &JoinOp{Joins: stmt.Joins, From: table}
  2079. result := pp.Apply(ctx, tt.data, fv, afv)
  2080. if !reflect.DeepEqual(tt.result, result) {
  2081. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  2082. }
  2083. }
  2084. }
  2085. }
  2086. func str2Map(s string) map[string]interface{} {
  2087. var input map[string]interface{}
  2088. if err := json.Unmarshal([]byte(s), &input); err != nil {
  2089. fmt.Printf("Failed to parse the JSON data.\n")
  2090. return nil
  2091. }
  2092. return input
  2093. }