having_test.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package plans
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestHavingPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data interface{}
  16. result interface{}
  17. }{
  18. {
  19. sql: `SELECT id1 FROM src1 HAVING avg(id1) > 1`,
  20. data: xsql.WindowTuplesSet{
  21. xsql.WindowTuples{
  22. Emitter: "src1",
  23. Tuples: []xsql.Tuple{
  24. {
  25. Emitter: "src1",
  26. Message: xsql.Message{"id1": 1, "f1": "v1"},
  27. }, {
  28. Emitter: "src1",
  29. Message: xsql.Message{"id1": 2, "f1": "v2"},
  30. }, {
  31. Emitter: "src1",
  32. Message: xsql.Message{"id1": 5, "f1": "v1"},
  33. },
  34. },
  35. },
  36. },
  37. result: xsql.WindowTuplesSet{
  38. xsql.WindowTuples{
  39. Emitter: "src1",
  40. Tuples: []xsql.Tuple{
  41. {
  42. Emitter: "src1",
  43. Message: xsql.Message{"id1": 1, "f1": "v1"},
  44. }, {
  45. Emitter: "src1",
  46. Message: xsql.Message{"id1": 2, "f1": "v2"},
  47. }, {
  48. Emitter: "src1",
  49. Message: xsql.Message{"id1": 5, "f1": "v1"},
  50. },
  51. },
  52. },
  53. },
  54. },
  55. {
  56. sql: `SELECT id1 FROM src1 HAVING sum(id1) > 1`,
  57. data: xsql.WindowTuplesSet{
  58. xsql.WindowTuples{
  59. Emitter: "src1",
  60. Tuples: []xsql.Tuple{
  61. {
  62. Emitter: "src1",
  63. Message: xsql.Message{"id1": 1, "f1": "v1"},
  64. },
  65. },
  66. },
  67. },
  68. result: nil,
  69. },
  70. {
  71. sql: `SELECT id1 FROM src1 HAVING sum(id1) = 1`,
  72. data: xsql.WindowTuplesSet{
  73. xsql.WindowTuples{
  74. Emitter: "src1",
  75. Tuples: []xsql.Tuple{
  76. {
  77. Emitter: "src1",
  78. Message: xsql.Message{"id1": 1, "f1": "v1"},
  79. },
  80. },
  81. },
  82. },
  83. result: xsql.WindowTuplesSet{
  84. xsql.WindowTuples{
  85. Emitter: "src1",
  86. Tuples: []xsql.Tuple{
  87. {
  88. Emitter: "src1",
  89. Message: xsql.Message{"id1": 1, "f1": "v1"},
  90. },
  91. },
  92. },
  93. },
  94. },
  95. {
  96. sql: `SELECT id1 FROM src1 HAVING max(id1) > 10`,
  97. data: xsql.WindowTuplesSet{
  98. xsql.WindowTuples{
  99. Emitter: "src1",
  100. Tuples: []xsql.Tuple{
  101. {
  102. Emitter: "src1",
  103. Message: xsql.Message{"id1": 1, "f1": "v1"},
  104. },
  105. },
  106. },
  107. },
  108. result: nil,
  109. },
  110. {
  111. sql: `SELECT id1 FROM src1 HAVING max(id1) = 1`,
  112. data: xsql.WindowTuplesSet{
  113. xsql.WindowTuples{
  114. Emitter: "src1",
  115. Tuples: []xsql.Tuple{
  116. {
  117. Emitter: "src1",
  118. Message: xsql.Message{"id1": 1, "f1": "v1"},
  119. },
  120. },
  121. },
  122. },
  123. result: xsql.WindowTuplesSet{
  124. xsql.WindowTuples{
  125. Emitter: "src1",
  126. Tuples: []xsql.Tuple{
  127. {
  128. Emitter: "src1",
  129. Message: xsql.Message{"id1": 1, "f1": "v1"},
  130. },
  131. },
  132. },
  133. },
  134. }, {
  135. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
  136. data: xsql.GroupedTuplesSet{
  137. {
  138. &xsql.Tuple{
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 1, "f1": "v1"},
  141. },
  142. &xsql.Tuple{
  143. Emitter: "src1",
  144. Message: xsql.Message{"id1": 3, "f1": "v1"},
  145. },
  146. },
  147. {
  148. &xsql.Tuple{
  149. Emitter: "src1",
  150. Message: xsql.Message{"id1": 2, "f1": "v2"},
  151. },
  152. },
  153. },
  154. result: xsql.GroupedTuplesSet{
  155. {
  156. &xsql.Tuple{
  157. Emitter: "src1",
  158. Message: xsql.Message{"id1": 2, "f1": "v2"},
  159. },
  160. },
  161. },
  162. }, {
  163. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having a > 100",
  164. data: xsql.GroupedTuplesSet{
  165. {
  166. &xsql.JoinTuple{
  167. Tuples: []xsql.Tuple{
  168. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  169. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  170. },
  171. },
  172. &xsql.JoinTuple{
  173. Tuples: []xsql.Tuple{
  174. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  175. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  176. },
  177. },
  178. },
  179. {
  180. &xsql.JoinTuple{
  181. Tuples: []xsql.Tuple{
  182. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  183. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  184. },
  185. },
  186. &xsql.JoinTuple{
  187. Tuples: []xsql.Tuple{
  188. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  189. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  190. },
  191. },
  192. },
  193. },
  194. result: xsql.GroupedTuplesSet{
  195. {
  196. &xsql.JoinTuple{
  197. Tuples: []xsql.Tuple{
  198. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  199. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  200. },
  201. },
  202. &xsql.JoinTuple{
  203. Tuples: []xsql.Tuple{
  204. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  205. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  206. },
  207. },
  208. },
  209. },
  210. }, {
  211. sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10) having a > 100",
  212. data: xsql.JoinTupleSets{
  213. xsql.JoinTuple{
  214. Tuples: []xsql.Tuple{
  215. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  216. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  217. },
  218. },
  219. xsql.JoinTuple{
  220. Tuples: []xsql.Tuple{
  221. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  222. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  223. },
  224. },
  225. xsql.JoinTuple{
  226. Tuples: []xsql.Tuple{
  227. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  228. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  229. },
  230. },
  231. },
  232. result: xsql.JoinTupleSets{
  233. xsql.JoinTuple{
  234. Tuples: []xsql.Tuple{
  235. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  236. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  237. },
  238. },
  239. xsql.JoinTuple{
  240. Tuples: []xsql.Tuple{
  241. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  242. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  243. },
  244. },
  245. },
  246. },
  247. }
  248. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  249. contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
  250. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  251. for i, tt := range tests {
  252. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  253. if err != nil {
  254. t.Errorf("statement parse error %s", err)
  255. break
  256. }
  257. pp := &HavingPlan{Condition: stmt.Having}
  258. result := pp.Apply(ctx, tt.data)
  259. if !reflect.DeepEqual(tt.result, result) {
  260. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  261. }
  262. }
  263. }
  264. func TestHavingPlanError(t *testing.T) {
  265. var tests = []struct {
  266. sql string
  267. data interface{}
  268. result interface{}
  269. }{
  270. {
  271. sql: `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
  272. data: xsql.WindowTuplesSet{
  273. xsql.WindowTuples{
  274. Emitter: "src1",
  275. Tuples: []xsql.Tuple{
  276. {
  277. Emitter: "src1",
  278. Message: xsql.Message{"id1": 1, "f1": "v1"},
  279. }, {
  280. Emitter: "src1",
  281. Message: xsql.Message{"id1": 2, "f1": "v2"},
  282. }, {
  283. Emitter: "src1",
  284. Message: xsql.Message{"id1": 5, "f1": "v1"},
  285. },
  286. },
  287. },
  288. },
  289. result: errors.New("run Having error: invalid operation int64(2) > string(str)"),
  290. }, {
  291. sql: `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
  292. data: errors.New("an error from upstream"),
  293. result: errors.New("an error from upstream"),
  294. }, {
  295. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
  296. data: xsql.GroupedTuplesSet{
  297. {
  298. &xsql.Tuple{
  299. Emitter: "src1",
  300. Message: xsql.Message{"id1": 1, "f1": 3},
  301. },
  302. &xsql.Tuple{
  303. Emitter: "src1",
  304. Message: xsql.Message{"id1": 3, "f1": 3},
  305. },
  306. },
  307. {
  308. &xsql.Tuple{
  309. Emitter: "src1",
  310. Message: xsql.Message{"id1": 2, "f1": "v2"},
  311. },
  312. },
  313. },
  314. result: errors.New("run Having error: invalid operation int64(3) = string(v2)"),
  315. },
  316. }
  317. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  318. contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
  319. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  320. for i, tt := range tests {
  321. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  322. if err != nil {
  323. t.Errorf("statement parse error %s", err)
  324. break
  325. }
  326. pp := &HavingPlan{Condition: stmt.Having}
  327. result := pp.Apply(ctx, tt.data)
  328. if !reflect.DeepEqual(tt.result, result) {
  329. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  330. }
  331. }
  332. }