order_test.go 9.8 KB


  1. package plans
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/contexts"
  7. "reflect"
  8. "strings"
  9. "testing"
  10. )
  11. func TestOrderPlan_Apply(t *testing.T) {
  12. var tests = []struct {
  13. sql string
  14. data interface{}
  15. result interface{}
  16. }{
  17. {
  18. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20 ORDER BY abc",
  19. data: &xsql.Tuple{
  20. Emitter: "tbl",
  21. Message: xsql.Message{
  22. "abc" : int64(6),
  23. },
  24. },
  25. result: &xsql.Tuple{
  26. Emitter: "tbl",
  27. Message: xsql.Message{
  28. "abc" : int64(6),
  29. },
  30. },
  31. },
  32. {
  33. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  34. data: &xsql.Tuple{
  35. Emitter: "tbl",
  36. Message: xsql.Message{
  37. "abc" : int64(34),
  38. "def" : "hello",
  39. },
  40. },
  41. result: &xsql.Tuple{
  42. Emitter: "tbl",
  43. Message: xsql.Message{
  44. "abc" : int64(34),
  45. "def" : "hello",
  46. },
  47. },
  48. },
  49. {
  50. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY id1 DESC",
  51. data: xsql.WindowTuplesSet{
  52. xsql.WindowTuples{
  53. Emitter:"src1",
  54. Tuples:[]xsql.Tuple{
  55. {
  56. Emitter: "src1",
  57. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  58. },{
  59. Emitter: "src1",
  60. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  61. },{
  62. Emitter: "src1",
  63. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  64. },
  65. },
  66. },
  67. },
  68. result: xsql.WindowTuplesSet{
  69. xsql.WindowTuples{
  70. Emitter:"src1",
  71. Tuples:[]xsql.Tuple{
  72. {
  73. Emitter: "src1",
  74. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  75. },{
  76. Emitter: "src1",
  77. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  78. },{
  79. Emitter: "src1",
  80. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  81. },
  82. },
  83. },
  84. },
  85. },
  86. {
  87. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY f1, id1 DESC",
  88. data: xsql.WindowTuplesSet{
  89. xsql.WindowTuples{
  90. Emitter:"src1",
  91. Tuples:[]xsql.Tuple{
  92. {
  93. Emitter: "src1",
  94. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  95. },{
  96. Emitter: "src1",
  97. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  98. },{
  99. Emitter: "src1",
  100. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  101. },
  102. },
  103. },
  104. },
  105. result: xsql.WindowTuplesSet{
  106. xsql.WindowTuples{
  107. Emitter:"src1",
  108. Tuples:[]xsql.Tuple{
  109. {
  110. Emitter: "src1",
  111. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  112. },{
  113. Emitter: "src1",
  114. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  115. },{
  116. Emitter: "src1",
  117. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  118. },
  119. },
  120. },
  121. },
  122. },
  123. {
  124. sql: "SELECT * FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY ts DESC",
  125. data: xsql.WindowTuplesSet{
  126. xsql.WindowTuples{
  127. Emitter:"src1",
  128. Tuples:[]xsql.Tuple{
  129. {
  130. Emitter: "src1",
  131. Message: xsql.Message{ "id1" : 1, "f1" : "v1", "ts": common.TimeFromUnixMilli(1568854515000)},
  132. },{
  133. Emitter: "src1",
  134. Message: xsql.Message{ "id1" : 2, "f1" : "v2", "ts": common.TimeFromUnixMilli(1568854525000)},
  135. },{
  136. Emitter: "src1",
  137. Message: xsql.Message{ "id1" : 3, "f1" : "v1", "ts": common.TimeFromUnixMilli(1568854535000)},
  138. },
  139. },
  140. },
  141. },
  142. result: xsql.WindowTuplesSet{
  143. xsql.WindowTuples{
  144. Emitter:"src1",
  145. Tuples:[]xsql.Tuple{
  146. {
  147. Emitter: "src1",
  148. Message: xsql.Message{ "id1" : 3, "f1" : "v1", "ts": common.TimeFromUnixMilli(1568854535000)},
  149. },{
  150. Emitter: "src1",
  151. Message: xsql.Message{ "id1" : 2, "f1" : "v2", "ts": common.TimeFromUnixMilli(1568854525000)},
  152. },{
  153. Emitter: "src1",
  154. Message: xsql.Message{ "id1" : 1, "f1" : "v1", "ts": common.TimeFromUnixMilli(1568854515000)},
  155. },
  156. },
  157. },
  158. },
  159. },
  160. {
  161. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY src1.id1 desc",
  162. data: xsql.JoinTupleSets{
  163. xsql.JoinTuple{
  164. Tuples: []xsql.Tuple{
  165. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  166. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  167. },
  168. },
  169. xsql.JoinTuple{
  170. Tuples: []xsql.Tuple{
  171. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  172. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  173. },
  174. },
  175. xsql.JoinTuple{
  176. Tuples: []xsql.Tuple{
  177. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  178. },
  179. },
  180. },
  181. result: xsql.JoinTupleSets{
  182. xsql.JoinTuple{
  183. Tuples: []xsql.Tuple{
  184. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  185. },
  186. },
  187. xsql.JoinTuple{
  188. Tuples: []xsql.Tuple{
  189. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  190. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  191. },
  192. },
  193. xsql.JoinTuple{
  194. Tuples: []xsql.Tuple{
  195. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  196. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  197. },
  198. },
  199. },
  200. },
  201. {
  202. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2",
  203. data: xsql.JoinTupleSets{
  204. xsql.JoinTuple{
  205. Tuples: []xsql.Tuple{
  206. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  207. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  208. },
  209. },
  210. xsql.JoinTuple{
  211. Tuples: []xsql.Tuple{
  212. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  213. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  214. },
  215. },
  216. xsql.JoinTuple{
  217. Tuples: []xsql.Tuple{
  218. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  219. },
  220. },
  221. },
  222. result: xsql.JoinTupleSets{
  223. xsql.JoinTuple{
  224. Tuples: []xsql.Tuple{
  225. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  226. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  227. },
  228. },
  229. xsql.JoinTuple{
  230. Tuples: []xsql.Tuple{
  231. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  232. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  233. },
  234. },
  235. xsql.JoinTuple{
  236. Tuples: []xsql.Tuple{
  237. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  238. },
  239. },
  240. },
  241. },
  242. {
  243. sql: "SELECT abc FROM tbl group by abc ORDER BY def",
  244. data: xsql.GroupedTuplesSet{
  245. {
  246. &xsql.Tuple{
  247. Emitter: "tbl",
  248. Message: xsql.Message{
  249. "abc" : int64(6),
  250. "def" : "hello",
  251. },
  252. },
  253. },
  254. },
  255. result:xsql.GroupedTuplesSet{
  256. {
  257. &xsql.Tuple{
  258. Emitter: "tbl",
  259. Message: xsql.Message{
  260. "abc" : int64(6),
  261. "def" : "hello",
  262. },
  263. },
  264. },
  265. },
  266. },
  267. {
  268. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 ORDER BY id1 desc",
  269. data: xsql.GroupedTuplesSet{
  270. {
  271. &xsql.Tuple{
  272. Emitter: "src1",
  273. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  274. },
  275. &xsql.Tuple{
  276. Emitter: "src1",
  277. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  278. },
  279. },
  280. {
  281. &xsql.Tuple{
  282. Emitter: "src1",
  283. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  284. },
  285. },
  286. },
  287. result: xsql.GroupedTuplesSet{
  288. {
  289. &xsql.Tuple{
  290. Emitter: "src1",
  291. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  292. },
  293. },
  294. {
  295. &xsql.Tuple{
  296. Emitter: "src1",
  297. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  298. },
  299. &xsql.Tuple{
  300. Emitter: "src1",
  301. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  302. },
  303. },
  304. },
  305. },
  306. {
  307. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2 DESC",
  308. data: xsql.GroupedTuplesSet{
  309. {
  310. &xsql.JoinTuple{
  311. Tuples: []xsql.Tuple{
  312. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  313. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  314. },
  315. },
  316. },
  317. {
  318. &xsql.JoinTuple{
  319. Tuples: []xsql.Tuple{
  320. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  321. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  322. },
  323. },
  324. },
  325. {
  326. &xsql.JoinTuple{
  327. Tuples: []xsql.Tuple{
  328. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  329. },
  330. },
  331. },
  332. },
  333. result: xsql.GroupedTuplesSet{
  334. {
  335. &xsql.JoinTuple{
  336. Tuples: []xsql.Tuple{
  337. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  338. },
  339. },
  340. },
  341. {
  342. &xsql.JoinTuple{
  343. Tuples: []xsql.Tuple{
  344. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  345. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  346. },
  347. },
  348. },
  349. {
  350. &xsql.JoinTuple{
  351. Tuples: []xsql.Tuple{
  352. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  353. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  354. },
  355. },
  356. },
  357. },
  358. },
  359. }
  360. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  361. contextLogger := common.Log.WithField("rule", "TestOrderPlan_Apply")
  362. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  363. for i, tt := range tests {
  364. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  365. if err != nil {
  366. t.Errorf("statement parse error %s", err)
  367. break
  368. }
  369. pp := &OrderPlan{SortFields:stmt.SortFields}
  370. result := pp.Apply(ctx, tt.data)
  371. if !reflect.DeepEqual(tt.result, result) {
  372. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  373. }
  374. }
  375. }