join_test.go 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782
  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestLeftJoinPlan_Apply(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data xsql.WindowTuplesSet
  17. result interface{}
  18. }{
  19. {
  20. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  21. data: xsql.WindowTuplesSet{
  22. xsql.WindowTuples{
  23. Emitter: "src1",
  24. Tuples: []xsql.Tuple{
  25. {
  26. Emitter: "src1",
  27. Message: xsql.Message{"id1": 1, "f1": "v1"},
  28. }, {
  29. Emitter: "src1",
  30. Message: xsql.Message{"id1": 2, "f1": "v2"},
  31. }, {
  32. Emitter: "src1",
  33. Message: xsql.Message{"id1": 3, "f1": "v3"},
  34. },
  35. },
  36. },
  37. xsql.WindowTuples{
  38. Emitter: "src2",
  39. Tuples: []xsql.Tuple{
  40. {
  41. Emitter: "src2",
  42. Message: xsql.Message{"id2": 1, "f2": "w1"},
  43. }, {
  44. Emitter: "src2",
  45. Message: xsql.Message{"id2": 2, "f2": "w2"},
  46. }, {
  47. Emitter: "src2",
  48. Message: xsql.Message{"id2": 4, "f2": "w3"},
  49. },
  50. },
  51. },
  52. },
  53. result: xsql.JoinTupleSets{
  54. xsql.JoinTuple{
  55. Tuples: []xsql.Tuple{
  56. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  57. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  58. },
  59. },
  60. xsql.JoinTuple{
  61. Tuples: []xsql.Tuple{
  62. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  63. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  64. },
  65. },
  66. xsql.JoinTuple{
  67. Tuples: []xsql.Tuple{
  68. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  69. },
  70. },
  71. },
  72. },
  73. {
  74. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  75. data: xsql.WindowTuplesSet{
  76. xsql.WindowTuples{
  77. Emitter: "src1",
  78. Tuples: []xsql.Tuple{
  79. {
  80. Emitter: "src1",
  81. Message: xsql.Message{"id1": 1, "f1": "v1"},
  82. }, {
  83. Emitter: "src1",
  84. Message: xsql.Message{"id1": 2, "f1": "v2"},
  85. }, {
  86. Emitter: "src1",
  87. Message: xsql.Message{"id1": 3, "f1": "v3"},
  88. },
  89. },
  90. },
  91. xsql.WindowTuples{
  92. Emitter: "src2",
  93. Tuples: []xsql.Tuple{
  94. {
  95. Emitter: "src2",
  96. Message: xsql.Message{"id2": 1, "f2": "w1"},
  97. }, {
  98. Emitter: "src2",
  99. Message: xsql.Message{"f2": "w2"},
  100. }, {
  101. Emitter: "src2",
  102. Message: xsql.Message{"id2": 4, "f2": "w3"},
  103. },
  104. },
  105. },
  106. },
  107. result: xsql.JoinTupleSets{
  108. xsql.JoinTuple{
  109. Tuples: []xsql.Tuple{
  110. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  111. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  112. },
  113. },
  114. xsql.JoinTuple{
  115. Tuples: []xsql.Tuple{
  116. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  117. },
  118. },
  119. xsql.JoinTuple{
  120. Tuples: []xsql.Tuple{
  121. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  122. },
  123. },
  124. },
  125. },
  126. {
  127. sql: "SELECT id1 FROM src1 left join src2 on src1.ts = src2.ts",
  128. data: xsql.WindowTuplesSet{
  129. xsql.WindowTuples{
  130. Emitter: "src1",
  131. Tuples: []xsql.Tuple{
  132. {
  133. Emitter: "src1",
  134. Message: xsql.Message{"id1": 1, "f1": "v1", "ts": common.TimeFromUnixMilli(1568854515000)},
  135. }, {
  136. Emitter: "src1",
  137. Message: xsql.Message{"id1": 2, "f1": "v2", "ts": common.TimeFromUnixMilli(1568854525000)},
  138. }, {
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 3, "f1": "v3", "ts": common.TimeFromUnixMilli(1568854535000)},
  141. },
  142. },
  143. },
  144. xsql.WindowTuples{
  145. Emitter: "src2",
  146. Tuples: []xsql.Tuple{
  147. {
  148. Emitter: "src2",
  149. Message: xsql.Message{"id2": 1, "f2": "w1", "ts": common.TimeFromUnixMilli(1568854515000)},
  150. }, {
  151. Emitter: "src2",
  152. Message: xsql.Message{"id2": 2, "f2": "w2", "ts": common.TimeFromUnixMilli(1568854525000)},
  153. }, {
  154. Emitter: "src2",
  155. Message: xsql.Message{"id2": 4, "f2": "w3", "ts": common.TimeFromUnixMilli(1568854545000)},
  156. },
  157. },
  158. },
  159. },
  160. result: xsql.JoinTupleSets{
  161. xsql.JoinTuple{
  162. Tuples: []xsql.Tuple{
  163. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "ts": common.TimeFromUnixMilli(1568854515000)}},
  164. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1", "ts": common.TimeFromUnixMilli(1568854515000)}},
  165. },
  166. },
  167. xsql.JoinTuple{
  168. Tuples: []xsql.Tuple{
  169. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "ts": common.TimeFromUnixMilli(1568854525000)}},
  170. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2", "ts": common.TimeFromUnixMilli(1568854525000)}},
  171. },
  172. },
  173. xsql.JoinTuple{
  174. Tuples: []xsql.Tuple{
  175. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3", "ts": common.TimeFromUnixMilli(1568854535000)}},
  176. },
  177. },
  178. },
  179. },
  180. {
  181. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  182. data: xsql.WindowTuplesSet{
  183. xsql.WindowTuples{
  184. Emitter: "src1",
  185. Tuples: []xsql.Tuple{
  186. {
  187. Emitter: "src1",
  188. Message: xsql.Message{"id1": 1, "f1": "v1"},
  189. }, {
  190. Emitter: "src1",
  191. Message: xsql.Message{"id1": 2, "f1": "v2"},
  192. }, {
  193. Emitter: "src1",
  194. Message: xsql.Message{"id1": 3, "f1": "v3"},
  195. },
  196. },
  197. },
  198. xsql.WindowTuples{
  199. Emitter: "src2",
  200. Tuples: []xsql.Tuple{
  201. {
  202. Emitter: "src2",
  203. Message: xsql.Message{"id2": 4, "f2": "w1"},
  204. }, {
  205. Emitter: "src2",
  206. Message: xsql.Message{"id2": 5, "f2": "w2"},
  207. }, {
  208. Emitter: "src2",
  209. Message: xsql.Message{"id2": 6, "f2": "w3"},
  210. },
  211. },
  212. },
  213. },
  214. result: nil,
  215. },
  216. {
  217. sql: "SELECT id1 FROM src1 As s1 left join src2 as s2 on s1.id1 = s2.id2",
  218. data: xsql.WindowTuplesSet{
  219. xsql.WindowTuples{
  220. Emitter: "s1",
  221. Tuples: []xsql.Tuple{
  222. {
  223. Emitter: "s1",
  224. Message: xsql.Message{"id1": 1, "f1": "v1"},
  225. }, {
  226. Emitter: "s1",
  227. Message: xsql.Message{"id1": 2, "f1": "v2"},
  228. }, {
  229. Emitter: "s1",
  230. Message: xsql.Message{"id1": 3, "f1": "v3"},
  231. },
  232. },
  233. },
  234. xsql.WindowTuples{
  235. Emitter: "s2",
  236. Tuples: []xsql.Tuple{
  237. {
  238. Emitter: "s2",
  239. Message: xsql.Message{"id2": 1, "f2": "w1"},
  240. }, {
  241. Emitter: "s2",
  242. Message: xsql.Message{"id2": 2, "f2": "w2"},
  243. }, {
  244. Emitter: "s2",
  245. Message: xsql.Message{"id2": 4, "f2": "w3"},
  246. },
  247. },
  248. },
  249. },
  250. result: xsql.JoinTupleSets{
  251. xsql.JoinTuple{
  252. Tuples: []xsql.Tuple{
  253. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  254. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  255. },
  256. },
  257. xsql.JoinTuple{
  258. Tuples: []xsql.Tuple{
  259. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  260. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  261. },
  262. },
  263. xsql.JoinTuple{
  264. Tuples: []xsql.Tuple{
  265. {Emitter: "s1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  266. },
  267. },
  268. },
  269. },
  270. {
  271. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  272. data: xsql.WindowTuplesSet{
  273. xsql.WindowTuples{
  274. Emitter: "src1",
  275. Tuples: []xsql.Tuple{
  276. {
  277. Emitter: "src1",
  278. Message: xsql.Message{"id1": 1, "f1": "v1"},
  279. },
  280. },
  281. },
  282. xsql.WindowTuples{
  283. Emitter: "src2",
  284. Tuples: []xsql.Tuple{
  285. {
  286. Emitter: "src2",
  287. Message: xsql.Message{"id2": 1, "f2": "w1"},
  288. }, {
  289. Emitter: "src2",
  290. Message: xsql.Message{"id2": 1, "f2": "w2"},
  291. },
  292. },
  293. },
  294. },
  295. result: xsql.JoinTupleSets{
  296. xsql.JoinTuple{
  297. Tuples: []xsql.Tuple{
  298. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  299. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  300. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  301. },
  302. },
  303. },
  304. },
  305. {
  306. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  307. data: xsql.WindowTuplesSet{
  308. xsql.WindowTuples{
  309. Emitter: "src1",
  310. Tuples: []xsql.Tuple{
  311. {
  312. Emitter: "src1",
  313. Message: xsql.Message{"id1": 1, "f1": "v1"},
  314. }, {
  315. Emitter: "src1",
  316. Message: xsql.Message{"id1": 2, "f1": "v2"},
  317. }, {
  318. Emitter: "src1",
  319. Message: xsql.Message{"id1": 3, "f1": "v3"},
  320. },
  321. },
  322. },
  323. xsql.WindowTuples{
  324. Emitter: "src2",
  325. Tuples: []xsql.Tuple{},
  326. },
  327. },
  328. result: xsql.JoinTupleSets{
  329. xsql.JoinTuple{
  330. Tuples: []xsql.Tuple{
  331. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  332. },
  333. },
  334. xsql.JoinTuple{
  335. Tuples: []xsql.Tuple{
  336. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  337. },
  338. },
  339. xsql.JoinTuple{
  340. Tuples: []xsql.Tuple{
  341. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  342. },
  343. },
  344. },
  345. },
  346. {
  347. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  348. data: xsql.WindowTuplesSet{
  349. xsql.WindowTuples{
  350. Emitter: "src1",
  351. Tuples: []xsql.Tuple{
  352. {
  353. Emitter: "src1",
  354. Message: xsql.Message{"id1": 1, "f1": "v1"},
  355. }, {
  356. Emitter: "src1",
  357. Message: xsql.Message{"id1": 2, "f1": "v2"},
  358. }, {
  359. Emitter: "src1",
  360. Message: xsql.Message{"id1": 3, "f1": "v3"},
  361. },
  362. },
  363. },
  364. xsql.WindowTuples{
  365. Emitter: "src2",
  366. Tuples: nil,
  367. },
  368. },
  369. result: xsql.JoinTupleSets{
  370. xsql.JoinTuple{
  371. Tuples: []xsql.Tuple{
  372. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  373. },
  374. },
  375. xsql.JoinTuple{
  376. Tuples: []xsql.Tuple{
  377. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  378. },
  379. },
  380. xsql.JoinTuple{
  381. Tuples: []xsql.Tuple{
  382. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  383. },
  384. },
  385. },
  386. },
  387. {
  388. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  389. data: xsql.WindowTuplesSet{
  390. xsql.WindowTuples{
  391. Emitter: "src1",
  392. Tuples: []xsql.Tuple{},
  393. },
  394. xsql.WindowTuples{
  395. Emitter: "src2",
  396. Tuples: []xsql.Tuple{
  397. {
  398. Emitter: "src2",
  399. Message: xsql.Message{"id2": 1, "f2": "w1"},
  400. }, {
  401. Emitter: "src2",
  402. Message: xsql.Message{"id2": 1, "f2": "w2"},
  403. },
  404. },
  405. },
  406. },
  407. result: nil,
  408. },
  409. {
  410. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
  411. data: xsql.WindowTuplesSet{
  412. xsql.WindowTuples{
  413. Emitter: "src1",
  414. Tuples: nil,
  415. },
  416. xsql.WindowTuples{
  417. Emitter: "src2",
  418. Tuples: []xsql.Tuple{
  419. {
  420. Emitter: "src2",
  421. Message: xsql.Message{"id2": 1, "f2": "w1"},
  422. }, {
  423. Emitter: "src2",
  424. Message: xsql.Message{"id2": 1, "f2": "w2"},
  425. },
  426. },
  427. },
  428. },
  429. result: nil,
  430. },
  431. {
  432. sql: "SELECT id1 FROM src1 left join src2 on src1.id1*2 = src2.id2",
  433. data: xsql.WindowTuplesSet{
  434. xsql.WindowTuples{
  435. Emitter: "src1",
  436. Tuples: []xsql.Tuple{
  437. {
  438. Emitter: "src1",
  439. Message: xsql.Message{"id1": 1, "f1": "v1"},
  440. }, {
  441. Emitter: "src1",
  442. Message: xsql.Message{"id1": 2, "f1": "v2"},
  443. }, {
  444. Emitter: "src1",
  445. Message: xsql.Message{"id1": 3, "f1": "v3"},
  446. },
  447. },
  448. },
  449. xsql.WindowTuples{
  450. Emitter: "src2",
  451. Tuples: []xsql.Tuple{
  452. {
  453. Emitter: "src2",
  454. Message: xsql.Message{"id2": 1, "f2": "w1"},
  455. }, {
  456. Emitter: "src2",
  457. Message: xsql.Message{"id2": 2, "f2": "w2"},
  458. }, {
  459. Emitter: "src2",
  460. Message: xsql.Message{"id2": 4, "f2": "w3"},
  461. },
  462. },
  463. },
  464. },
  465. result: xsql.JoinTupleSets{
  466. xsql.JoinTuple{
  467. Tuples: []xsql.Tuple{
  468. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  469. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  470. },
  471. },
  472. xsql.JoinTuple{
  473. Tuples: []xsql.Tuple{
  474. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  475. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  476. },
  477. },
  478. xsql.JoinTuple{
  479. Tuples: []xsql.Tuple{
  480. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  481. },
  482. },
  483. },
  484. },
  485. {
  486. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2*2",
  487. data: xsql.WindowTuplesSet{
  488. xsql.WindowTuples{
  489. Emitter: "src1",
  490. Tuples: []xsql.Tuple{
  491. {
  492. Emitter: "src1",
  493. Message: xsql.Message{"id1": 1, "f1": "v1"},
  494. }, {
  495. Emitter: "src1",
  496. Message: xsql.Message{"id1": 2, "f1": "v2"},
  497. }, {
  498. Emitter: "src1",
  499. Message: xsql.Message{"id1": 3, "f1": "v3"},
  500. },
  501. },
  502. },
  503. xsql.WindowTuples{
  504. Emitter: "src2",
  505. Tuples: []xsql.Tuple{
  506. {
  507. Emitter: "src2",
  508. Message: xsql.Message{"id2": 1, "f2": "w1"},
  509. }, {
  510. Emitter: "src2",
  511. Message: xsql.Message{"id2": 2, "f2": "w2"},
  512. }, {
  513. Emitter: "src2",
  514. Message: xsql.Message{"id2": 4, "f2": "w3"},
  515. },
  516. },
  517. },
  518. },
  519. result: xsql.JoinTupleSets{
  520. xsql.JoinTuple{
  521. Tuples: []xsql.Tuple{
  522. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  523. },
  524. },
  525. xsql.JoinTuple{
  526. Tuples: []xsql.Tuple{
  527. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  528. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  529. },
  530. },
  531. xsql.JoinTuple{
  532. Tuples: []xsql.Tuple{
  533. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  534. },
  535. },
  536. },
  537. },
  538. {
  539. sql: "SELECT id1 FROM src1 left join src2 on src1.f1->cid = src2.f2->cid",
  540. data: xsql.WindowTuplesSet{
  541. xsql.WindowTuples{
  542. Emitter: "src1",
  543. Tuples: []xsql.Tuple{
  544. {
  545. Emitter: "src1",
  546. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  547. }, {
  548. Emitter: "src1",
  549. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  550. }, {
  551. Emitter: "src1",
  552. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  553. },
  554. },
  555. },
  556. xsql.WindowTuples{
  557. Emitter: "src2",
  558. Tuples: []xsql.Tuple{
  559. {
  560. Emitter: "src2",
  561. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  562. }, {
  563. Emitter: "src2",
  564. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  565. }, {
  566. Emitter: "src2",
  567. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  568. },
  569. },
  570. },
  571. },
  572. result: xsql.JoinTupleSets{
  573. xsql.JoinTuple{
  574. Tuples: []xsql.Tuple{
  575. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  576. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  577. },
  578. },
  579. xsql.JoinTuple{
  580. Tuples: []xsql.Tuple{
  581. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  582. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  583. },
  584. },
  585. xsql.JoinTuple{
  586. Tuples: []xsql.Tuple{
  587. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)}},
  588. },
  589. },
  590. },
  591. },
  592. {
  593. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 left join src2 on src1.id1 = src2.id2",
  594. data: xsql.WindowTuplesSet{
  595. xsql.WindowTuples{
  596. Emitter: "src1",
  597. Tuples: []xsql.Tuple{
  598. {
  599. Emitter: "src1",
  600. Message: xsql.Message{"id1": 1, "f1": "v1"},
  601. Metadata: xsql.Metadata{"topic": "devices/type1/device001"},
  602. },
  603. },
  604. },
  605. xsql.WindowTuples{
  606. Emitter: "src2",
  607. Tuples: []xsql.Tuple{
  608. {
  609. Emitter: "src2",
  610. Message: xsql.Message{"id2": 1, "f2": "w1"},
  611. Metadata: xsql.Metadata{"topic": "devices/type2/device001"},
  612. },
  613. },
  614. },
  615. },
  616. result: xsql.JoinTupleSets{
  617. xsql.JoinTuple{
  618. Tuples: []xsql.Tuple{
  619. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  620. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  621. },
  622. },
  623. },
  624. },
  625. }
  626. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  627. contextLogger := common.Log.WithField("rule", "TestLeftJoinPlan_Apply")
  628. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  629. for i, tt := range tests {
  630. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  631. if err != nil {
  632. t.Errorf("statement parse error %s", err)
  633. break
  634. }
  635. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  636. t.Errorf("statement source is not a table")
  637. } else {
  638. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  639. result := pp.Apply(ctx, tt.data)
  640. if !reflect.DeepEqual(tt.result, result) {
  641. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  642. }
  643. }
  644. }
  645. }
  646. func TestInnerJoinPlan_Apply(t *testing.T) {
  647. var tests = []struct {
  648. sql string
  649. data xsql.WindowTuplesSet
  650. result interface{}
  651. }{
  652. {
  653. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  654. data: xsql.WindowTuplesSet{
  655. xsql.WindowTuples{
  656. Emitter: "src1",
  657. Tuples: []xsql.Tuple{
  658. {
  659. Emitter: "src1",
  660. Message: xsql.Message{"id1": 1, "f1": "v1"},
  661. }, {
  662. Emitter: "src1",
  663. Message: xsql.Message{"id1": 2, "f1": "v2"},
  664. }, {
  665. Emitter: "src1",
  666. Message: xsql.Message{"id1": 3, "f1": "v3"},
  667. },
  668. },
  669. },
  670. xsql.WindowTuples{
  671. Emitter: "src2",
  672. Tuples: []xsql.Tuple{
  673. {
  674. Emitter: "src2",
  675. Message: xsql.Message{"id2": 1, "f2": "w1"},
  676. }, {
  677. Emitter: "src2",
  678. Message: xsql.Message{"id2": 2, "f2": "w2"},
  679. }, {
  680. Emitter: "src2",
  681. Message: xsql.Message{"id2": 4, "f2": "w3"},
  682. },
  683. },
  684. },
  685. },
  686. result: xsql.JoinTupleSets{
  687. xsql.JoinTuple{
  688. Tuples: []xsql.Tuple{
  689. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  690. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  691. },
  692. },
  693. xsql.JoinTuple{
  694. Tuples: []xsql.Tuple{
  695. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  696. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  697. },
  698. },
  699. },
  700. },
  701. {
  702. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  703. data: xsql.WindowTuplesSet{
  704. xsql.WindowTuples{
  705. Emitter: "src1",
  706. Tuples: []xsql.Tuple{
  707. {
  708. Emitter: "src1",
  709. Message: xsql.Message{"id1": 1, "f1": "v1"},
  710. }, {
  711. Emitter: "src1",
  712. Message: xsql.Message{"id1": 2, "f1": "v2"},
  713. }, {
  714. Emitter: "src1",
  715. Message: xsql.Message{"id1": 3, "f1": "v3"},
  716. },
  717. },
  718. },
  719. xsql.WindowTuples{
  720. Emitter: "src2",
  721. Tuples: []xsql.Tuple{
  722. {
  723. Emitter: "src2",
  724. Message: xsql.Message{"id2": 1, "f2": "w1"},
  725. }, {
  726. Emitter: "src2",
  727. Message: xsql.Message{"f2": "w2"},
  728. }, {
  729. Emitter: "src2",
  730. Message: xsql.Message{"id2": 4, "f2": "w3"},
  731. },
  732. },
  733. },
  734. },
  735. result: xsql.JoinTupleSets{
  736. xsql.JoinTuple{
  737. Tuples: []xsql.Tuple{
  738. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  739. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  740. },
  741. },
  742. },
  743. },
  744. {
  745. sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
  746. data: xsql.WindowTuplesSet{
  747. xsql.WindowTuples{
  748. Emitter: "s1",
  749. Tuples: []xsql.Tuple{
  750. {
  751. Emitter: "s1",
  752. Message: xsql.Message{"id1": 1, "f1": "v1"},
  753. }, {
  754. Emitter: "s1",
  755. Message: xsql.Message{"id1": 2, "f1": "v2"},
  756. }, {
  757. Emitter: "s1",
  758. Message: xsql.Message{"id1": 3, "f1": "v3"},
  759. },
  760. },
  761. },
  762. xsql.WindowTuples{
  763. Emitter: "s2",
  764. Tuples: []xsql.Tuple{
  765. {
  766. Emitter: "s2",
  767. Message: xsql.Message{"id2": 1, "f2": "w1"},
  768. }, {
  769. Emitter: "s2",
  770. Message: xsql.Message{"id2": 2, "f2": "w2"},
  771. }, {
  772. Emitter: "s2",
  773. Message: xsql.Message{"id2": 4, "f2": "w3"},
  774. },
  775. },
  776. },
  777. },
  778. result: xsql.JoinTupleSets{
  779. xsql.JoinTuple{
  780. Tuples: []xsql.Tuple{
  781. {Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  782. {Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  783. },
  784. },
  785. xsql.JoinTuple{
  786. Tuples: []xsql.Tuple{
  787. {Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  788. {Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  789. },
  790. },
  791. },
  792. },
  793. {
  794. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  795. data: xsql.WindowTuplesSet{
  796. xsql.WindowTuples{
  797. Emitter: "src1",
  798. Tuples: []xsql.Tuple{
  799. {
  800. Emitter: "src1",
  801. Message: xsql.Message{"id1": 1, "f1": "v1"},
  802. },
  803. },
  804. },
  805. xsql.WindowTuples{
  806. Emitter: "src2",
  807. Tuples: []xsql.Tuple{
  808. {
  809. Emitter: "src2",
  810. Message: xsql.Message{"id2": 1, "f2": "w1"},
  811. }, {
  812. Emitter: "src2",
  813. Message: xsql.Message{"id2": 1, "f2": "w2"},
  814. },
  815. },
  816. },
  817. },
  818. result: xsql.JoinTupleSets{
  819. xsql.JoinTuple{
  820. Tuples: []xsql.Tuple{
  821. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  822. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  823. },
  824. },
  825. xsql.JoinTuple{
  826. Tuples: []xsql.Tuple{
  827. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  828. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  829. },
  830. },
  831. },
  832. },
  833. {
  834. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  835. data: xsql.WindowTuplesSet{
  836. xsql.WindowTuples{
  837. Emitter: "src1",
  838. Tuples: []xsql.Tuple{
  839. {
  840. Emitter: "src1",
  841. Message: xsql.Message{"id1": 1, "f1": "v1"},
  842. }, {
  843. Emitter: "src1",
  844. Message: xsql.Message{"id1": 2, "f1": "v2"},
  845. }, {
  846. Emitter: "src1",
  847. Message: xsql.Message{"id1": 3, "f1": "v3"},
  848. },
  849. },
  850. },
  851. xsql.WindowTuples{
  852. Emitter: "src2",
  853. Tuples: []xsql.Tuple{},
  854. },
  855. },
  856. result: nil,
  857. },
  858. {
  859. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  860. data: xsql.WindowTuplesSet{
  861. xsql.WindowTuples{
  862. Emitter: "src1",
  863. Tuples: []xsql.Tuple{
  864. {
  865. Emitter: "src1",
  866. Message: xsql.Message{"id1": 1, "f1": "v1"},
  867. }, {
  868. Emitter: "src1",
  869. Message: xsql.Message{"id1": 2, "f1": "v2"},
  870. }, {
  871. Emitter: "src1",
  872. Message: xsql.Message{"id1": 3, "f1": "v3"},
  873. },
  874. },
  875. },
  876. xsql.WindowTuples{
  877. Emitter: "src2",
  878. Tuples: nil,
  879. },
  880. },
  881. result: nil,
  882. },
  883. {
  884. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  885. data: xsql.WindowTuplesSet{
  886. xsql.WindowTuples{
  887. Emitter: "src1",
  888. Tuples: []xsql.Tuple{},
  889. },
  890. xsql.WindowTuples{
  891. Emitter: "src2",
  892. Tuples: []xsql.Tuple{
  893. {
  894. Emitter: "src2",
  895. Message: xsql.Message{"id2": 1, "f2": "w1"},
  896. }, {
  897. Emitter: "src2",
  898. Message: xsql.Message{"id2": 1, "f2": "w2"},
  899. },
  900. },
  901. },
  902. },
  903. result: nil,
  904. },
  905. {
  906. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
  907. data: xsql.WindowTuplesSet{
  908. xsql.WindowTuples{
  909. Emitter: "src1",
  910. Tuples: nil,
  911. },
  912. xsql.WindowTuples{
  913. Emitter: "src2",
  914. Tuples: []xsql.Tuple{
  915. {
  916. Emitter: "src2",
  917. Message: xsql.Message{"id2": 1, "f2": "w1"},
  918. }, {
  919. Emitter: "src2",
  920. Message: xsql.Message{"id2": 1, "f2": "w2"},
  921. },
  922. },
  923. },
  924. },
  925. result: nil,
  926. },
  927. {
  928. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1*2 = src2.id2",
  929. data: xsql.WindowTuplesSet{
  930. xsql.WindowTuples{
  931. Emitter: "src1",
  932. Tuples: []xsql.Tuple{
  933. {
  934. Emitter: "src1",
  935. Message: xsql.Message{"id1": 1, "f1": "v1"},
  936. }, {
  937. Emitter: "src1",
  938. Message: xsql.Message{"id1": 2, "f1": "v2"},
  939. }, {
  940. Emitter: "src1",
  941. Message: xsql.Message{"id1": 3, "f1": "v3"},
  942. },
  943. },
  944. },
  945. xsql.WindowTuples{
  946. Emitter: "src2",
  947. Tuples: []xsql.Tuple{
  948. {
  949. Emitter: "src2",
  950. Message: xsql.Message{"id2": 1, "f2": "w1"},
  951. }, {
  952. Emitter: "src2",
  953. Message: xsql.Message{"id2": 2, "f2": "w2"},
  954. }, {
  955. Emitter: "src2",
  956. Message: xsql.Message{"id2": 4, "f2": "w3"},
  957. },
  958. },
  959. },
  960. },
  961. result: xsql.JoinTupleSets{
  962. xsql.JoinTuple{
  963. Tuples: []xsql.Tuple{
  964. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  965. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  966. },
  967. },
  968. xsql.JoinTuple{
  969. Tuples: []xsql.Tuple{
  970. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  971. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  972. },
  973. },
  974. },
  975. },
  976. {
  977. sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2*2",
  978. data: xsql.WindowTuplesSet{
  979. xsql.WindowTuples{
  980. Emitter: "src1",
  981. Tuples: []xsql.Tuple{
  982. {
  983. Emitter: "src1",
  984. Message: xsql.Message{"id1": 1, "f1": "v1"},
  985. }, {
  986. Emitter: "src1",
  987. Message: xsql.Message{"id1": 2, "f1": "v2"},
  988. }, {
  989. Emitter: "src1",
  990. Message: xsql.Message{"id1": 3, "f1": "v3"},
  991. },
  992. },
  993. },
  994. xsql.WindowTuples{
  995. Emitter: "src2",
  996. Tuples: []xsql.Tuple{
  997. {
  998. Emitter: "src2",
  999. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1000. }, {
  1001. Emitter: "src2",
  1002. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1003. }, {
  1004. Emitter: "src2",
  1005. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1006. },
  1007. },
  1008. },
  1009. },
  1010. result: xsql.JoinTupleSets{
  1011. xsql.JoinTuple{
  1012. Tuples: []xsql.Tuple{
  1013. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1014. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1015. },
  1016. },
  1017. },
  1018. },
  1019. {
  1020. sql: "SELECT id1 FROM src1 inner join src2 on src1.f1->cid = src2.f2->cid",
  1021. data: xsql.WindowTuplesSet{
  1022. xsql.WindowTuples{
  1023. Emitter: "src1",
  1024. Tuples: []xsql.Tuple{
  1025. {
  1026. Emitter: "src1",
  1027. Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)},
  1028. }, {
  1029. Emitter: "src1",
  1030. Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)},
  1031. }, {
  1032. Emitter: "src1",
  1033. Message: xsql.Message{"id1": 3, "f1": str2Map(`{"cid" : 3, "name" : "alice1"}`)},
  1034. },
  1035. },
  1036. },
  1037. xsql.WindowTuples{
  1038. Emitter: "src2",
  1039. Tuples: []xsql.Tuple{
  1040. {
  1041. Emitter: "src2",
  1042. Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)},
  1043. }, {
  1044. Emitter: "src2",
  1045. Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)},
  1046. }, {
  1047. Emitter: "src2",
  1048. Message: xsql.Message{"id2": 4, "f2": str2Map(`{"cid" : 4, "name" : "alice2"}`)},
  1049. },
  1050. },
  1051. },
  1052. },
  1053. result: xsql.JoinTupleSets{
  1054. xsql.JoinTuple{
  1055. Tuples: []xsql.Tuple{
  1056. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": str2Map(`{"cid" : 1, "name" : "tom1"}`)}},
  1057. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": str2Map(`{"cid" : 1, "name" : "tom2"}`)}},
  1058. },
  1059. },
  1060. xsql.JoinTuple{
  1061. Tuples: []xsql.Tuple{
  1062. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": str2Map(`{"cid" : 2, "name" : "mike1"}`)}},
  1063. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": str2Map(`{"cid" : 2, "name" : "mike2"}`)}},
  1064. },
  1065. },
  1066. },
  1067. },
  1068. }
  1069. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1070. contextLogger := common.Log.WithField("rule", "TestInnerJoinPlan_Apply")
  1071. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1072. for i, tt := range tests {
  1073. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1074. if err != nil {
  1075. t.Errorf("statement parse error %s", err)
  1076. break
  1077. }
  1078. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1079. t.Errorf("statement source is not a table")
  1080. } else {
  1081. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1082. result := pp.Apply(ctx, tt.data)
  1083. if !reflect.DeepEqual(tt.result, result) {
  1084. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1085. }
  1086. }
  1087. }
  1088. }
  1089. func TestRightJoinPlan_Apply(t *testing.T) {
  1090. var tests = []struct {
  1091. sql string
  1092. data xsql.WindowTuplesSet
  1093. result interface{}
  1094. }{
  1095. {
  1096. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1097. data: xsql.WindowTuplesSet{
  1098. xsql.WindowTuples{
  1099. Emitter: "src1",
  1100. Tuples: []xsql.Tuple{
  1101. {
  1102. Emitter: "src1",
  1103. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1104. }, {
  1105. Emitter: "src1",
  1106. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1107. }, {
  1108. Emitter: "src1",
  1109. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1110. },
  1111. },
  1112. },
  1113. xsql.WindowTuples{
  1114. Emitter: "src2",
  1115. Tuples: []xsql.Tuple{
  1116. {
  1117. Emitter: "src2",
  1118. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1119. }, {
  1120. Emitter: "src2",
  1121. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1122. }, {
  1123. Emitter: "src2",
  1124. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1125. },
  1126. },
  1127. },
  1128. },
  1129. result: xsql.JoinTupleSets{
  1130. xsql.JoinTuple{
  1131. Tuples: []xsql.Tuple{
  1132. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1133. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1134. },
  1135. },
  1136. xsql.JoinTuple{
  1137. Tuples: []xsql.Tuple{
  1138. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1139. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1140. },
  1141. },
  1142. xsql.JoinTuple{
  1143. Tuples: []xsql.Tuple{
  1144. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1145. },
  1146. },
  1147. },
  1148. },
  1149. {
  1150. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1151. data: xsql.WindowTuplesSet{
  1152. xsql.WindowTuples{
  1153. Emitter: "src1",
  1154. Tuples: []xsql.Tuple{
  1155. {
  1156. Emitter: "src1",
  1157. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1158. }, {
  1159. Emitter: "src1",
  1160. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1161. }, {
  1162. Emitter: "src1",
  1163. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1164. },
  1165. },
  1166. },
  1167. xsql.WindowTuples{
  1168. Emitter: "src2",
  1169. Tuples: []xsql.Tuple{
  1170. {
  1171. Emitter: "src2",
  1172. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1173. }, {
  1174. Emitter: "src2",
  1175. Message: xsql.Message{"f2": "w2"},
  1176. }, {
  1177. Emitter: "src2",
  1178. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1179. },
  1180. },
  1181. },
  1182. },
  1183. result: xsql.JoinTupleSets{
  1184. xsql.JoinTuple{
  1185. Tuples: []xsql.Tuple{
  1186. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1187. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1188. },
  1189. },
  1190. xsql.JoinTuple{
  1191. Tuples: []xsql.Tuple{
  1192. {Emitter: "src2", Message: xsql.Message{"f2": "w2"}},
  1193. },
  1194. },
  1195. xsql.JoinTuple{
  1196. Tuples: []xsql.Tuple{
  1197. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1198. },
  1199. },
  1200. },
  1201. },
  1202. {
  1203. sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
  1204. data: xsql.WindowTuplesSet{
  1205. xsql.WindowTuples{
  1206. Emitter: "src1",
  1207. Tuples: []xsql.Tuple{
  1208. {
  1209. Emitter: "src1",
  1210. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1211. },
  1212. },
  1213. },
  1214. xsql.WindowTuples{
  1215. Emitter: "src2",
  1216. Tuples: []xsql.Tuple{
  1217. {
  1218. Emitter: "src2",
  1219. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1220. }, {
  1221. Emitter: "src2",
  1222. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1223. },
  1224. },
  1225. },
  1226. },
  1227. result: xsql.JoinTupleSets{
  1228. xsql.JoinTuple{
  1229. Tuples: []xsql.Tuple{
  1230. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1231. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1232. },
  1233. },
  1234. xsql.JoinTuple{
  1235. Tuples: []xsql.Tuple{
  1236. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1237. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1238. },
  1239. },
  1240. },
  1241. },
  1242. }
  1243. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1244. contextLogger := common.Log.WithField("rule", "TestRightJoinPlan_Apply")
  1245. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1246. for i, tt := range tests {
  1247. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1248. if err != nil {
  1249. t.Errorf("statement parse error %s", err)
  1250. break
  1251. }
  1252. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1253. t.Errorf("statement source is not a table")
  1254. } else {
  1255. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1256. result := pp.Apply(ctx, tt.data)
  1257. if !reflect.DeepEqual(tt.result, result) {
  1258. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1259. }
  1260. }
  1261. }
  1262. }
  1263. func TestFullJoinPlan_Apply(t *testing.T) {
  1264. var tests = []struct {
  1265. sql string
  1266. data xsql.WindowTuplesSet
  1267. result interface{}
  1268. }{
  1269. {
  1270. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1271. data: xsql.WindowTuplesSet{
  1272. xsql.WindowTuples{
  1273. Emitter: "src1",
  1274. Tuples: []xsql.Tuple{
  1275. {
  1276. Emitter: "src1",
  1277. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1278. }, {
  1279. Emitter: "src1",
  1280. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1281. }, {
  1282. Emitter: "src1",
  1283. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1284. },
  1285. },
  1286. },
  1287. xsql.WindowTuples{
  1288. Emitter: "src2",
  1289. Tuples: []xsql.Tuple{
  1290. {
  1291. Emitter: "src2",
  1292. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1293. }, {
  1294. Emitter: "src2",
  1295. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1296. }, {
  1297. Emitter: "src2",
  1298. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1299. }, {
  1300. Emitter: "src2",
  1301. Message: xsql.Message{"id2": 2, "f2": "w4"},
  1302. },
  1303. },
  1304. },
  1305. },
  1306. result: xsql.JoinTupleSets{
  1307. xsql.JoinTuple{
  1308. Tuples: []xsql.Tuple{
  1309. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1310. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1311. },
  1312. },
  1313. xsql.JoinTuple{
  1314. Tuples: []xsql.Tuple{
  1315. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1316. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1317. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w4"}},
  1318. },
  1319. },
  1320. xsql.JoinTuple{
  1321. Tuples: []xsql.Tuple{
  1322. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1323. },
  1324. },
  1325. xsql.JoinTuple{
  1326. Tuples: []xsql.Tuple{
  1327. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1328. },
  1329. },
  1330. },
  1331. },
  1332. {
  1333. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1334. data: xsql.WindowTuplesSet{
  1335. xsql.WindowTuples{
  1336. Emitter: "src1",
  1337. Tuples: []xsql.Tuple{
  1338. {
  1339. Emitter: "src1",
  1340. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1341. }, {
  1342. Emitter: "src1",
  1343. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1344. }, {
  1345. Emitter: "src1",
  1346. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1347. },
  1348. },
  1349. },
  1350. xsql.WindowTuples{
  1351. Emitter: "src2",
  1352. Tuples: []xsql.Tuple{
  1353. {
  1354. Emitter: "src2",
  1355. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1356. }, {
  1357. Emitter: "src2",
  1358. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1359. }, {
  1360. Emitter: "src2",
  1361. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1362. },
  1363. },
  1364. },
  1365. },
  1366. result: xsql.JoinTupleSets{
  1367. xsql.JoinTuple{
  1368. Tuples: []xsql.Tuple{
  1369. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1370. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1371. },
  1372. },
  1373. xsql.JoinTuple{
  1374. Tuples: []xsql.Tuple{
  1375. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1376. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1377. },
  1378. },
  1379. xsql.JoinTuple{
  1380. Tuples: []xsql.Tuple{
  1381. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1382. },
  1383. },
  1384. xsql.JoinTuple{
  1385. Tuples: []xsql.Tuple{
  1386. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1387. },
  1388. },
  1389. },
  1390. },
  1391. {
  1392. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1393. data: xsql.WindowTuplesSet{
  1394. xsql.WindowTuples{
  1395. Emitter: "src1",
  1396. Tuples: []xsql.Tuple{
  1397. {
  1398. Emitter: "src1",
  1399. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1400. }, {
  1401. Emitter: "src1",
  1402. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1403. }, {
  1404. Emitter: "src1",
  1405. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1406. },
  1407. },
  1408. },
  1409. xsql.WindowTuples{
  1410. Emitter: "src2",
  1411. Tuples: []xsql.Tuple{},
  1412. },
  1413. },
  1414. result: xsql.JoinTupleSets{
  1415. xsql.JoinTuple{
  1416. Tuples: []xsql.Tuple{
  1417. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1418. },
  1419. },
  1420. xsql.JoinTuple{
  1421. Tuples: []xsql.Tuple{
  1422. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1423. },
  1424. },
  1425. xsql.JoinTuple{
  1426. Tuples: []xsql.Tuple{
  1427. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1428. },
  1429. },
  1430. },
  1431. },
  1432. {
  1433. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1434. data: xsql.WindowTuplesSet{
  1435. xsql.WindowTuples{
  1436. Emitter: "src1",
  1437. Tuples: []xsql.Tuple{},
  1438. },
  1439. xsql.WindowTuples{
  1440. Emitter: "src2",
  1441. Tuples: []xsql.Tuple{
  1442. {
  1443. Emitter: "src2",
  1444. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1445. }, {
  1446. Emitter: "src2",
  1447. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1448. }, {
  1449. Emitter: "src2",
  1450. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1451. },
  1452. },
  1453. },
  1454. },
  1455. result: xsql.JoinTupleSets{
  1456. xsql.JoinTuple{
  1457. Tuples: []xsql.Tuple{
  1458. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1459. },
  1460. },
  1461. xsql.JoinTuple{
  1462. Tuples: []xsql.Tuple{
  1463. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1464. },
  1465. },
  1466. xsql.JoinTuple{
  1467. Tuples: []xsql.Tuple{
  1468. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1469. },
  1470. },
  1471. },
  1472. },
  1473. }
  1474. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1475. contextLogger := common.Log.WithField("rule", "TestFullJoinPlan_Apply")
  1476. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1477. for i, tt := range tests {
  1478. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1479. if err != nil {
  1480. t.Errorf("statement parse error %s", err)
  1481. break
  1482. }
  1483. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1484. t.Errorf("statement source is not a table")
  1485. } else {
  1486. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1487. result := pp.Apply(ctx, tt.data)
  1488. if !reflect.DeepEqual(tt.result, result) {
  1489. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1490. }
  1491. }
  1492. }
  1493. }
  1494. func TestCrossJoinPlan_Apply(t *testing.T) {
  1495. var tests = []struct {
  1496. sql string
  1497. data xsql.WindowTuplesSet
  1498. result interface{}
  1499. }{
  1500. {
  1501. sql: "SELECT id1 FROM src1 cross join src2",
  1502. data: xsql.WindowTuplesSet{
  1503. xsql.WindowTuples{
  1504. Emitter: "src1",
  1505. Tuples: []xsql.Tuple{
  1506. {
  1507. Emitter: "src1",
  1508. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1509. }, {
  1510. Emitter: "src1",
  1511. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1512. }, {
  1513. Emitter: "src1",
  1514. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1515. },
  1516. },
  1517. },
  1518. xsql.WindowTuples{
  1519. Emitter: "src2",
  1520. Tuples: []xsql.Tuple{
  1521. {
  1522. Emitter: "src2",
  1523. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1524. }, {
  1525. Emitter: "src2",
  1526. Message: xsql.Message{"id2": 2, "f2": "w2"},
  1527. }, {
  1528. Emitter: "src2",
  1529. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1530. },
  1531. },
  1532. },
  1533. },
  1534. result: xsql.JoinTupleSets{
  1535. xsql.JoinTuple{
  1536. Tuples: []xsql.Tuple{
  1537. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1538. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1539. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1540. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1541. },
  1542. },
  1543. xsql.JoinTuple{
  1544. Tuples: []xsql.Tuple{
  1545. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1546. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1547. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1548. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1549. },
  1550. },
  1551. xsql.JoinTuple{
  1552. Tuples: []xsql.Tuple{
  1553. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
  1554. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1555. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1556. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1557. },
  1558. },
  1559. },
  1560. },
  1561. {
  1562. sql: "SELECT id1 FROM src1 cross join src2",
  1563. data: xsql.WindowTuplesSet{
  1564. xsql.WindowTuples{
  1565. Emitter: "src1",
  1566. Tuples: []xsql.Tuple{
  1567. {
  1568. Emitter: "src1",
  1569. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1570. },
  1571. },
  1572. },
  1573. xsql.WindowTuples{
  1574. Emitter: "src2",
  1575. Tuples: []xsql.Tuple{
  1576. {
  1577. Emitter: "src2",
  1578. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1579. }, {
  1580. Emitter: "src2",
  1581. Message: xsql.Message{"id2": 1, "f2": "w2"},
  1582. },
  1583. },
  1584. },
  1585. },
  1586. result: xsql.JoinTupleSets{
  1587. xsql.JoinTuple{
  1588. Tuples: []xsql.Tuple{
  1589. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1590. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
  1591. {Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
  1592. },
  1593. },
  1594. },
  1595. },
  1596. }
  1597. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1598. contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  1599. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1600. for i, tt := range tests {
  1601. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1602. if err != nil {
  1603. t.Errorf("statement parse error %s", err)
  1604. break
  1605. }
  1606. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1607. t.Errorf("statement source is not a table")
  1608. } else {
  1609. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1610. result := pp.Apply(ctx, tt.data)
  1611. if !reflect.DeepEqual(tt.result, result) {
  1612. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1613. }
  1614. }
  1615. }
  1616. }
  1617. func TestCrossJoinPlanError(t *testing.T) {
  1618. var tests = []struct {
  1619. sql string
  1620. data interface{}
  1621. result interface{}
  1622. }{
  1623. {
  1624. sql: "SELECT id1 FROM src1 cross join src2",
  1625. data: errors.New("an error from upstream"),
  1626. result: errors.New("an error from upstream"),
  1627. }, {
  1628. sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
  1629. data: xsql.WindowTuplesSet{
  1630. xsql.WindowTuples{
  1631. Emitter: "src1",
  1632. Tuples: []xsql.Tuple{
  1633. {
  1634. Emitter: "src1",
  1635. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1636. }, {
  1637. Emitter: "src1",
  1638. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1639. }, {
  1640. Emitter: "src1",
  1641. Message: xsql.Message{"id1": 3, "f1": "v3"},
  1642. },
  1643. },
  1644. },
  1645. xsql.WindowTuples{
  1646. Emitter: "src2",
  1647. Tuples: []xsql.Tuple{
  1648. {
  1649. Emitter: "src2",
  1650. Message: xsql.Message{"id2": 1, "f2": "w1"},
  1651. }, {
  1652. Emitter: "src2",
  1653. Message: xsql.Message{"id2": "3", "f2": "w2"},
  1654. }, {
  1655. Emitter: "src2",
  1656. Message: xsql.Message{"id2": 4, "f2": "w3"},
  1657. }, {
  1658. Emitter: "src2",
  1659. Message: xsql.Message{"id2": 2, "f2": "w4"},
  1660. },
  1661. },
  1662. },
  1663. },
  1664. result: errors.New("run Join error: invalid operation int64(1) = string(3)"),
  1665. },
  1666. }
  1667. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1668. contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
  1669. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1670. for i, tt := range tests {
  1671. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1672. if err != nil {
  1673. t.Errorf("statement parse error %s", err)
  1674. break
  1675. }
  1676. if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
  1677. t.Errorf("statement source is not a table")
  1678. } else {
  1679. pp := &JoinPlan{Joins: stmt.Joins, From: table}
  1680. result := pp.Apply(ctx, tt.data)
  1681. if !reflect.DeepEqual(tt.result, result) {
  1682. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1683. }
  1684. }
  1685. }
  1686. }
  1687. func str2Map(s string) map[string]interface{} {
  1688. var input map[string]interface{}
  1689. if err := json.Unmarshal([]byte(s), &input); err != nil {
  1690. fmt.Printf("Failed to parse the JSON data.\n")
  1691. return nil
  1692. }
  1693. return input
  1694. }