aggregate_test.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package plans
  2. import (
  3. "engine/common"
  4. "engine/xsql"
  5. "fmt"
  6. "reflect"
  7. "strings"
  8. "testing"
  9. )
  10. func TestAggregatePlan_Apply(t *testing.T) {
  11. var tests = []struct {
  12. sql string
  13. data interface{}
  14. result xsql.GroupedTuplesSet
  15. }{
  16. {
  17. sql: "SELECT abc FROM tbl group by abc",
  18. data: &xsql.Tuple{
  19. Emitter: "tbl",
  20. Message: xsql.Message{
  21. "abc" : int64(6),
  22. "def" : "hello",
  23. },
  24. },
  25. result: xsql.GroupedTuplesSet{
  26. {
  27. &xsql.Tuple{
  28. Emitter: "tbl",
  29. Message: xsql.Message{
  30. "abc" : int64(6),
  31. "def" : "hello",
  32. },
  33. },
  34. },
  35. },
  36. },
  37. {
  38. sql: "SELECT abc FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  39. data: xsql.WindowTuplesSet{
  40. xsql.WindowTuples{
  41. Emitter:"src1",
  42. Tuples:[]xsql.Tuple{
  43. {
  44. Emitter: "src1",
  45. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  46. },{
  47. Emitter: "src1",
  48. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  49. },{
  50. Emitter: "src1",
  51. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  52. },
  53. },
  54. },
  55. },
  56. result: xsql.GroupedTuplesSet{
  57. {
  58. &xsql.Tuple{
  59. Emitter: "src1",
  60. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  61. },
  62. &xsql.Tuple{
  63. Emitter: "src1",
  64. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  65. },
  66. },
  67. {
  68. &xsql.Tuple{
  69. Emitter: "src1",
  70. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  71. },
  72. },
  73. },
  74. },
  75. {
  76. sql: "SELECT abc FROM src1 GROUP BY id1, TUMBLINGWINDOW(ss, 10), f1",
  77. data: xsql.WindowTuplesSet{
  78. xsql.WindowTuples{
  79. Emitter:"src1",
  80. Tuples:[]xsql.Tuple{
  81. {
  82. Emitter: "src1",
  83. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  84. },{
  85. Emitter: "src1",
  86. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  87. },{
  88. Emitter: "src1",
  89. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  90. },
  91. },
  92. },
  93. },
  94. result: xsql.GroupedTuplesSet{
  95. {
  96. &xsql.Tuple{
  97. Emitter: "src1",
  98. Message: xsql.Message{ "id1" : 1, "f1" : "v1", },
  99. },
  100. },
  101. {
  102. &xsql.Tuple{
  103. Emitter: "src1",
  104. Message: xsql.Message{ "id1" : 2, "f1" : "v2", },
  105. },
  106. },
  107. {
  108. &xsql.Tuple{
  109. Emitter: "src1",
  110. Message: xsql.Message{ "id1" : 3, "f1" : "v1", },
  111. },
  112. },
  113. },
  114. },
  115. {
  116. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  117. data: xsql.JoinTupleSets{
  118. xsql.JoinTuple{
  119. Tuples: []xsql.Tuple{
  120. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  121. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  122. },
  123. },
  124. xsql.JoinTuple{
  125. Tuples: []xsql.Tuple{
  126. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  127. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  128. },
  129. },
  130. xsql.JoinTuple{
  131. Tuples: []xsql.Tuple{
  132. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  133. },
  134. },
  135. },
  136. result: xsql.GroupedTuplesSet{
  137. {
  138. &xsql.JoinTuple{
  139. Tuples: []xsql.Tuple{
  140. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  141. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  142. },
  143. },
  144. },
  145. {
  146. &xsql.JoinTuple{
  147. Tuples: []xsql.Tuple{
  148. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  149. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  150. },
  151. },
  152. },
  153. {
  154. &xsql.JoinTuple{
  155. Tuples: []xsql.Tuple{
  156. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  157. },
  158. },
  159. },
  160. },
  161. },
  162. {
  163. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY TUMBLINGWINDOW(ss, 10), src1.f1",
  164. data: xsql.JoinTupleSets{
  165. xsql.JoinTuple{
  166. Tuples: []xsql.Tuple{
  167. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  168. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  169. },
  170. },
  171. xsql.JoinTuple{
  172. Tuples: []xsql.Tuple{
  173. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  174. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  175. },
  176. },
  177. xsql.JoinTuple{
  178. Tuples: []xsql.Tuple{
  179. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  180. },
  181. },
  182. },
  183. result: xsql.GroupedTuplesSet{
  184. {
  185. &xsql.JoinTuple{
  186. Tuples: []xsql.Tuple{
  187. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  188. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  189. },
  190. },
  191. &xsql.JoinTuple{
  192. Tuples: []xsql.Tuple{
  193. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  194. },
  195. },
  196. },
  197. {
  198. &xsql.JoinTuple{
  199. Tuples: []xsql.Tuple{
  200. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  201. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  202. },
  203. },
  204. },
  205. },
  206. },
  207. {
  208. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY TUMBLINGWINDOW(ss, 10), src1.ts",
  209. data: xsql.JoinTupleSets{
  210. xsql.JoinTuple{
  211. Tuples: []xsql.Tuple{
  212. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1", "ts": common.TimeFromUnixMilli(1568854515000),},},
  213. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  214. },
  215. },
  216. xsql.JoinTuple{
  217. Tuples: []xsql.Tuple{
  218. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2","ts": common.TimeFromUnixMilli(1568854573431),},},
  219. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  220. },
  221. },
  222. xsql.JoinTuple{
  223. Tuples: []xsql.Tuple{
  224. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1","ts": common.TimeFromUnixMilli(1568854515000),},},
  225. },
  226. },
  227. },
  228. result: xsql.GroupedTuplesSet{
  229. {
  230. &xsql.JoinTuple{
  231. Tuples: []xsql.Tuple{
  232. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1", "ts": common.TimeFromUnixMilli(1568854515000),},},
  233. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  234. },
  235. },
  236. &xsql.JoinTuple{
  237. Tuples: []xsql.Tuple{
  238. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1", "ts": common.TimeFromUnixMilli(1568854515000),},},
  239. },
  240. },
  241. },
  242. {
  243. &xsql.JoinTuple{
  244. Tuples: []xsql.Tuple{
  245. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2","ts": common.TimeFromUnixMilli(1568854573431),},},
  246. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  247. },
  248. },
  249. },
  250. },
  251. },
  252. }
  253. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  254. for i, tt := range tests {
  255. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  256. if err != nil {
  257. t.Errorf("statement parse error %s", err)
  258. break
  259. }
  260. pp := &AggregatePlan{Dimensions:stmt.Dimensions.GetGroups()}
  261. result := pp.Apply(nil, tt.data)
  262. gr, ok := result.(xsql.GroupedTuplesSet)
  263. if !ok {
  264. t.Errorf("result is not GroupedTuplesSet")
  265. }
  266. if len(tt.result) != len(gr) {
  267. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, gr)
  268. }
  269. for _, r := range tt.result{
  270. matched := false
  271. for _, gre := range gr{
  272. if reflect.DeepEqual(r, gre){
  273. matched = true
  274. }
  275. }
  276. if !matched{
  277. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
  278. }
  279. }
  280. }
  281. }