having_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. package plans
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestHavingPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data interface{}
  16. result interface{}
  17. }{
  18. {
  19. sql: `SELECT id1 FROM src1 HAVING avg(id1) > 1`,
  20. data: xsql.WindowTuplesSet{
  21. xsql.WindowTuples{
  22. Emitter: "src1",
  23. Tuples: []xsql.Tuple{
  24. {
  25. Emitter: "src1",
  26. Message: xsql.Message{"id1": 1, "f1": "v1"},
  27. }, {
  28. Emitter: "src1",
  29. Message: xsql.Message{"id1": 2, "f1": "v2"},
  30. }, {
  31. Emitter: "src1",
  32. Message: xsql.Message{"id1": 5, "f1": "v1"},
  33. },
  34. },
  35. },
  36. },
  37. result: xsql.WindowTuplesSet{
  38. xsql.WindowTuples{
  39. Emitter: "src1",
  40. Tuples: []xsql.Tuple{
  41. {
  42. Emitter: "src1",
  43. Message: xsql.Message{"id1": 1, "f1": "v1"},
  44. }, {
  45. Emitter: "src1",
  46. Message: xsql.Message{"id1": 2, "f1": "v2"},
  47. }, {
  48. Emitter: "src1",
  49. Message: xsql.Message{"id1": 5, "f1": "v1"},
  50. },
  51. },
  52. },
  53. },
  54. },
  55. {
  56. sql: `SELECT id1 FROM src1 HAVING sum(id1) > 1`,
  57. data: xsql.WindowTuplesSet{
  58. xsql.WindowTuples{
  59. Emitter: "src1",
  60. Tuples: []xsql.Tuple{
  61. {
  62. Emitter: "src1",
  63. Message: xsql.Message{"id1": 1, "f1": "v1"},
  64. },
  65. },
  66. },
  67. },
  68. result: nil,
  69. },
  70. {
  71. sql: `SELECT id1 FROM src1 HAVING sum(id1) = 1`,
  72. data: xsql.WindowTuplesSet{
  73. xsql.WindowTuples{
  74. Emitter: "src1",
  75. Tuples: []xsql.Tuple{
  76. {
  77. Emitter: "src1",
  78. Message: xsql.Message{"id1": 1, "f1": "v1"},
  79. },
  80. },
  81. },
  82. },
  83. result: xsql.WindowTuplesSet{
  84. xsql.WindowTuples{
  85. Emitter: "src1",
  86. Tuples: []xsql.Tuple{
  87. {
  88. Emitter: "src1",
  89. Message: xsql.Message{"id1": 1, "f1": "v1"},
  90. },
  91. },
  92. },
  93. },
  94. },
  95. {
  96. sql: `SELECT id1 FROM src1 HAVING max(id1) > 10`,
  97. data: xsql.WindowTuplesSet{
  98. xsql.WindowTuples{
  99. Emitter: "src1",
  100. Tuples: []xsql.Tuple{
  101. {
  102. Emitter: "src1",
  103. Message: xsql.Message{"id1": 1, "f1": "v1"},
  104. },
  105. },
  106. },
  107. },
  108. result: nil,
  109. },
  110. {
  111. sql: `SELECT id1 FROM src1 HAVING max(id1) = 1`,
  112. data: xsql.WindowTuplesSet{
  113. xsql.WindowTuples{
  114. Emitter: "src1",
  115. Tuples: []xsql.Tuple{
  116. {
  117. Emitter: "src1",
  118. Message: xsql.Message{"id1": 1, "f1": "v1"},
  119. },
  120. },
  121. },
  122. },
  123. result: xsql.WindowTuplesSet{
  124. xsql.WindowTuples{
  125. Emitter: "src1",
  126. Tuples: []xsql.Tuple{
  127. {
  128. Emitter: "src1",
  129. Message: xsql.Message{"id1": 1, "f1": "v1"},
  130. },
  131. },
  132. },
  133. },
  134. }, {
  135. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
  136. data: xsql.GroupedTuplesSet{
  137. {
  138. &xsql.Tuple{
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 1, "f1": "v1"},
  141. },
  142. &xsql.Tuple{
  143. Emitter: "src1",
  144. Message: xsql.Message{"id1": 3, "f1": "v1"},
  145. },
  146. },
  147. {
  148. &xsql.Tuple{
  149. Emitter: "src1",
  150. Message: xsql.Message{"id1": 2, "f1": "v2"},
  151. },
  152. },
  153. },
  154. result: xsql.GroupedTuplesSet{
  155. {
  156. &xsql.Tuple{
  157. Emitter: "src1",
  158. Message: xsql.Message{"id1": 2, "f1": "v2"},
  159. },
  160. },
  161. },
  162. }, {
  163. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having a > 100",
  164. data: xsql.GroupedTuplesSet{
  165. {
  166. &xsql.JoinTuple{
  167. Tuples: []xsql.Tuple{
  168. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  169. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  170. },
  171. },
  172. &xsql.JoinTuple{
  173. Tuples: []xsql.Tuple{
  174. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  175. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  176. },
  177. },
  178. },
  179. {
  180. &xsql.JoinTuple{
  181. Tuples: []xsql.Tuple{
  182. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  183. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  184. },
  185. },
  186. &xsql.JoinTuple{
  187. Tuples: []xsql.Tuple{
  188. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  189. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  190. },
  191. },
  192. },
  193. },
  194. result: xsql.GroupedTuplesSet{
  195. {
  196. &xsql.JoinTuple{
  197. Tuples: []xsql.Tuple{
  198. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  199. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  200. },
  201. },
  202. &xsql.JoinTuple{
  203. Tuples: []xsql.Tuple{
  204. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  205. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  206. },
  207. },
  208. },
  209. },
  210. }, {
  211. sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10) having a > 100",
  212. data: xsql.JoinTupleSets{
  213. xsql.JoinTuple{
  214. Tuples: []xsql.Tuple{
  215. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  216. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  217. },
  218. },
  219. xsql.JoinTuple{
  220. Tuples: []xsql.Tuple{
  221. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  222. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  223. },
  224. },
  225. xsql.JoinTuple{
  226. Tuples: []xsql.Tuple{
  227. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  228. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  229. },
  230. },
  231. },
  232. result: xsql.JoinTupleSets{
  233. xsql.JoinTuple{
  234. Tuples: []xsql.Tuple{
  235. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  236. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  237. },
  238. },
  239. xsql.JoinTuple{
  240. Tuples: []xsql.Tuple{
  241. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  242. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  243. },
  244. },
  245. },
  246. },
  247. }
  248. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  249. contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
  250. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  251. for i, tt := range tests {
  252. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  253. if err != nil {
  254. t.Errorf("statement parse error %s", err)
  255. break
  256. }
  257. fv, afv := xsql.NewFunctionValuersForOp(nil)
  258. pp := &HavingPlan{Condition: stmt.Having}
  259. result := pp.Apply(ctx, tt.data, fv, afv)
  260. if !reflect.DeepEqual(tt.result, result) {
  261. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  262. }
  263. }
  264. }
  265. func TestHavingPlanAlias_Apply(t *testing.T) {
  266. var tests = []struct {
  267. sql string
  268. data interface{}
  269. result interface{}
  270. }{
  271. {
  272. sql: `SELECT avg(id1) as a FROM src1 HAVING a > 1`,
  273. data: xsql.WindowTuplesSet{
  274. xsql.WindowTuples{
  275. Emitter: "src1",
  276. Tuples: []xsql.Tuple{
  277. {
  278. Emitter: "src1",
  279. Message: xsql.Message{"id1": 1, "f1": "v1", "a": 8 / 3},
  280. }, {
  281. Emitter: "src1",
  282. Message: xsql.Message{"id1": 2, "f1": "v2"},
  283. }, {
  284. Emitter: "src1",
  285. Message: xsql.Message{"id1": 5, "f1": "v1"},
  286. },
  287. },
  288. },
  289. },
  290. result: xsql.WindowTuplesSet{
  291. xsql.WindowTuples{
  292. Emitter: "src1",
  293. Tuples: []xsql.Tuple{
  294. {
  295. Emitter: "src1",
  296. Message: xsql.Message{"id1": 1, "f1": "v1", "a": 8 / 3},
  297. }, {
  298. Emitter: "src1",
  299. Message: xsql.Message{"id1": 2, "f1": "v2"},
  300. }, {
  301. Emitter: "src1",
  302. Message: xsql.Message{"id1": 5, "f1": "v1"},
  303. },
  304. },
  305. },
  306. },
  307. },
  308. {
  309. sql: `SELECT sum(id1) as s FROM src1 HAVING s > 1`,
  310. data: xsql.WindowTuplesSet{
  311. xsql.WindowTuples{
  312. Emitter: "src1",
  313. Tuples: []xsql.Tuple{
  314. {
  315. Emitter: "src1",
  316. Message: xsql.Message{"id1": 1, "f1": "v1", "s": 1},
  317. },
  318. },
  319. },
  320. },
  321. result: nil,
  322. }, {
  323. sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having c > 1",
  324. data: xsql.GroupedTuplesSet{
  325. {
  326. &xsql.Tuple{
  327. Emitter: "src1",
  328. Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
  329. },
  330. &xsql.Tuple{
  331. Emitter: "src1",
  332. Message: xsql.Message{"id1": 3, "f1": "v1"},
  333. },
  334. },
  335. {
  336. &xsql.Tuple{
  337. Emitter: "src1",
  338. Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
  339. },
  340. },
  341. },
  342. result: xsql.GroupedTuplesSet{
  343. {
  344. &xsql.Tuple{
  345. Emitter: "src1",
  346. Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
  347. },
  348. &xsql.Tuple{
  349. Emitter: "src1",
  350. Message: xsql.Message{"id1": 3, "f1": "v1"},
  351. },
  352. },
  353. },
  354. }, {
  355. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having c > 1",
  356. data: xsql.GroupedTuplesSet{
  357. {
  358. &xsql.JoinTuple{
  359. Tuples: []xsql.Tuple{
  360. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
  361. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  362. },
  363. },
  364. &xsql.JoinTuple{
  365. Tuples: []xsql.Tuple{
  366. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  367. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  368. },
  369. },
  370. },
  371. {
  372. &xsql.JoinTuple{
  373. Tuples: []xsql.Tuple{
  374. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 1}},
  375. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  376. },
  377. },
  378. },
  379. },
  380. result: xsql.GroupedTuplesSet{
  381. {
  382. &xsql.JoinTuple{
  383. Tuples: []xsql.Tuple{
  384. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
  385. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  386. },
  387. },
  388. &xsql.JoinTuple{
  389. Tuples: []xsql.Tuple{
  390. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  391. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  392. },
  393. },
  394. },
  395. },
  396. },
  397. }
  398. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  399. contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
  400. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  401. for i, tt := range tests {
  402. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  403. if err != nil {
  404. t.Errorf("statement parse error %s", err)
  405. break
  406. }
  407. fv, afv := xsql.NewFunctionValuersForOp(nil)
  408. pp := &HavingPlan{Condition: stmt.Having}
  409. result := pp.Apply(ctx, tt.data, fv, afv)
  410. if !reflect.DeepEqual(tt.result, result) {
  411. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  412. }
  413. }
  414. }
  415. func TestHavingPlanError(t *testing.T) {
  416. var tests = []struct {
  417. sql string
  418. data interface{}
  419. result interface{}
  420. }{
  421. {
  422. sql: `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
  423. data: xsql.WindowTuplesSet{
  424. xsql.WindowTuples{
  425. Emitter: "src1",
  426. Tuples: []xsql.Tuple{
  427. {
  428. Emitter: "src1",
  429. Message: xsql.Message{"id1": 1, "f1": "v1"},
  430. }, {
  431. Emitter: "src1",
  432. Message: xsql.Message{"id1": 2, "f1": "v2"},
  433. }, {
  434. Emitter: "src1",
  435. Message: xsql.Message{"id1": 5, "f1": "v1"},
  436. },
  437. },
  438. },
  439. },
  440. result: errors.New("run Having error: invalid operation int64(2) > string(str)"),
  441. }, {
  442. sql: `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
  443. data: errors.New("an error from upstream"),
  444. result: errors.New("an error from upstream"),
  445. }, {
  446. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
  447. data: xsql.GroupedTuplesSet{
  448. {
  449. &xsql.Tuple{
  450. Emitter: "src1",
  451. Message: xsql.Message{"id1": 1, "f1": 3},
  452. },
  453. &xsql.Tuple{
  454. Emitter: "src1",
  455. Message: xsql.Message{"id1": 3, "f1": 3},
  456. },
  457. },
  458. {
  459. &xsql.Tuple{
  460. Emitter: "src1",
  461. Message: xsql.Message{"id1": 2, "f1": "v2"},
  462. },
  463. },
  464. },
  465. result: errors.New("run Having error: invalid operation int64(3) = string(v2)"),
  466. },
  467. }
  468. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  469. contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
  470. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  471. for i, tt := range tests {
  472. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  473. if err != nil {
  474. t.Errorf("statement parse error %s", err)
  475. break
  476. }
  477. fv, afv := xsql.NewFunctionValuersForOp(nil)
  478. pp := &HavingPlan{Condition: stmt.Having}
  479. result := pp.Apply(ctx, tt.data, fv, afv)
  480. if !reflect.DeepEqual(tt.result, result) {
  481. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  482. }
  483. }
  484. }