having_test.go 15 KB


  1. package operator
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/internal/conf"
  6. "github.com/emqx/kuiper/internal/topo/context"
  7. "github.com/emqx/kuiper/internal/xsql"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestHavingPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data interface{}
  16. result interface{}
  17. }{
  18. {
  19. sql: `SELECT id1 FROM src1 HAVING avg(id1) > 1`,
  20. data: xsql.WindowTuplesSet{
  21. Content: []xsql.WindowTuples{
  22. {
  23. Emitter: "src1",
  24. Tuples: []xsql.Tuple{
  25. {
  26. Emitter: "src1",
  27. Message: xsql.Message{"id1": 1, "f1": "v1"},
  28. }, {
  29. Emitter: "src1",
  30. Message: xsql.Message{"id1": 2, "f1": "v2"},
  31. }, {
  32. Emitter: "src1",
  33. Message: xsql.Message{"id1": 5, "f1": "v1"},
  34. },
  35. },
  36. },
  37. },
  38. WindowRange: &xsql.WindowRange{
  39. WindowStart: 1541152486013,
  40. WindowEnd: 1541152487013,
  41. },
  42. },
  43. result: xsql.WindowTuplesSet{
  44. Content: []xsql.WindowTuples{
  45. {
  46. Emitter: "src1",
  47. Tuples: []xsql.Tuple{
  48. {
  49. Emitter: "src1",
  50. Message: xsql.Message{"id1": 1, "f1": "v1"},
  51. }, {
  52. Emitter: "src1",
  53. Message: xsql.Message{"id1": 2, "f1": "v2"},
  54. }, {
  55. Emitter: "src1",
  56. Message: xsql.Message{"id1": 5, "f1": "v1"},
  57. },
  58. },
  59. },
  60. },
  61. WindowRange: &xsql.WindowRange{
  62. WindowStart: 1541152486013,
  63. WindowEnd: 1541152487013,
  64. },
  65. },
  66. },
  67. {
  68. sql: `SELECT id1 FROM src1 HAVING sum(id1) > 1`,
  69. data: xsql.WindowTuplesSet{
  70. Content: []xsql.WindowTuples{
  71. {
  72. Emitter: "src1",
  73. Tuples: []xsql.Tuple{
  74. {
  75. Emitter: "src1",
  76. Message: xsql.Message{"id1": 1, "f1": "v1"},
  77. },
  78. },
  79. },
  80. },
  81. },
  82. result: nil,
  83. },
  84. {
  85. sql: `SELECT id1 FROM src1 HAVING sum(id1) = 1`,
  86. data: xsql.WindowTuplesSet{
  87. Content: []xsql.WindowTuples{
  88. {
  89. Emitter: "src1",
  90. Tuples: []xsql.Tuple{
  91. {
  92. Emitter: "src1",
  93. Message: xsql.Message{"id1": 1, "f1": "v1"},
  94. },
  95. },
  96. },
  97. },
  98. },
  99. result: xsql.WindowTuplesSet{
  100. Content: []xsql.WindowTuples{
  101. {
  102. Emitter: "src1",
  103. Tuples: []xsql.Tuple{
  104. {
  105. Emitter: "src1",
  106. Message: xsql.Message{"id1": 1, "f1": "v1"},
  107. },
  108. },
  109. },
  110. },
  111. },
  112. },
  113. {
  114. sql: `SELECT id1 FROM src1 HAVING max(id1) > 10`,
  115. data: xsql.WindowTuplesSet{
  116. Content: []xsql.WindowTuples{
  117. {
  118. Emitter: "src1",
  119. Tuples: []xsql.Tuple{
  120. {
  121. Emitter: "src1",
  122. Message: xsql.Message{"id1": 1, "f1": "v1"},
  123. },
  124. },
  125. },
  126. },
  127. },
  128. result: nil,
  129. },
  130. {
  131. sql: `SELECT id1 FROM src1 HAVING max(id1) = 1`,
  132. data: xsql.WindowTuplesSet{
  133. Content: []xsql.WindowTuples{
  134. {
  135. Emitter: "src1",
  136. Tuples: []xsql.Tuple{
  137. {
  138. Emitter: "src1",
  139. Message: xsql.Message{"id1": 1, "f1": "v1"},
  140. },
  141. },
  142. },
  143. },
  144. },
  145. result: xsql.WindowTuplesSet{
  146. Content: []xsql.WindowTuples{
  147. {
  148. Emitter: "src1",
  149. Tuples: []xsql.Tuple{
  150. {
  151. Emitter: "src1",
  152. Message: xsql.Message{"id1": 1, "f1": "v1"},
  153. },
  154. },
  155. },
  156. },
  157. },
  158. }, {
  159. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
  160. data: xsql.GroupedTuplesSet{
  161. {
  162. Content: []xsql.DataValuer{
  163. &xsql.Tuple{
  164. Emitter: "src1",
  165. Message: xsql.Message{"id1": 1, "f1": "v1"},
  166. },
  167. &xsql.Tuple{
  168. Emitter: "src1",
  169. Message: xsql.Message{"id1": 3, "f1": "v1"},
  170. },
  171. },
  172. WindowRange: &xsql.WindowRange{
  173. WindowStart: 1541152486013,
  174. WindowEnd: 1541152487013,
  175. },
  176. },
  177. {
  178. Content: []xsql.DataValuer{
  179. &xsql.Tuple{
  180. Emitter: "src1",
  181. Message: xsql.Message{"id1": 2, "f1": "v2"},
  182. },
  183. },
  184. WindowRange: &xsql.WindowRange{
  185. WindowStart: 1541152486055,
  186. WindowEnd: 1541152487055,
  187. },
  188. },
  189. },
  190. result: xsql.GroupedTuplesSet{
  191. {
  192. Content: []xsql.DataValuer{
  193. &xsql.Tuple{
  194. Emitter: "src1",
  195. Message: xsql.Message{"id1": 2, "f1": "v2"},
  196. },
  197. },
  198. WindowRange: &xsql.WindowRange{
  199. WindowStart: 1541152486055,
  200. WindowEnd: 1541152487055,
  201. },
  202. },
  203. },
  204. }, {
  205. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having a > 100",
  206. data: xsql.GroupedTuplesSet{
  207. {
  208. Content: []xsql.DataValuer{
  209. &xsql.JoinTuple{
  210. Tuples: []xsql.Tuple{
  211. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  212. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  213. },
  214. },
  215. &xsql.JoinTuple{
  216. Tuples: []xsql.Tuple{
  217. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  218. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  219. },
  220. },
  221. },
  222. WindowRange: &xsql.WindowRange{
  223. WindowStart: 1541152486013,
  224. WindowEnd: 1541152487013,
  225. },
  226. },
  227. {
  228. Content: []xsql.DataValuer{
  229. &xsql.JoinTuple{
  230. Tuples: []xsql.Tuple{
  231. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  232. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  233. },
  234. },
  235. &xsql.JoinTuple{
  236. Tuples: []xsql.Tuple{
  237. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  238. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  239. },
  240. },
  241. },
  242. WindowRange: &xsql.WindowRange{
  243. WindowStart: 1541152486013,
  244. WindowEnd: 1541152487013,
  245. },
  246. },
  247. },
  248. result: xsql.GroupedTuplesSet{
  249. {
  250. Content: []xsql.DataValuer{
  251. &xsql.JoinTuple{
  252. Tuples: []xsql.Tuple{
  253. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  254. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  255. },
  256. },
  257. &xsql.JoinTuple{
  258. Tuples: []xsql.Tuple{
  259. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  260. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  261. },
  262. },
  263. },
  264. WindowRange: &xsql.WindowRange{
  265. WindowStart: 1541152486013,
  266. WindowEnd: 1541152487013,
  267. },
  268. },
  269. },
  270. }, {
  271. sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10) having a > 100",
  272. data: &xsql.JoinTupleSets{
  273. Content: []xsql.JoinTuple{
  274. {
  275. Tuples: []xsql.Tuple{
  276. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  277. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  278. },
  279. },
  280. {
  281. Tuples: []xsql.Tuple{
  282. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  283. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  284. },
  285. },
  286. {
  287. Tuples: []xsql.Tuple{
  288. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  289. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  290. },
  291. },
  292. },
  293. },
  294. result: &xsql.JoinTupleSets{
  295. Content: []xsql.JoinTuple{
  296. {
  297. Tuples: []xsql.Tuple{
  298. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  299. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  300. },
  301. },
  302. {
  303. Tuples: []xsql.Tuple{
  304. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  305. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  306. },
  307. },
  308. },
  309. },
  310. },
  311. }
  312. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  313. contextLogger := conf.Log.WithField("rule", "TestHavingPlan_Apply")
  314. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  315. for i, tt := range tests {
  316. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  317. if err != nil {
  318. t.Errorf("statement parse error %s", err)
  319. break
  320. }
  321. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  322. pp := &HavingOp{Condition: stmt.Having}
  323. result := pp.Apply(ctx, tt.data, fv, afv)
  324. if !reflect.DeepEqual(tt.result, result) {
  325. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  326. }
  327. }
  328. }
  329. func TestHavingPlanAlias_Apply(t *testing.T) {
  330. var tests = []struct {
  331. sql string
  332. data interface{}
  333. result interface{}
  334. }{
  335. {
  336. sql: `SELECT avg(id1) as a FROM src1 HAVING a > 1`,
  337. data: xsql.WindowTuplesSet{
  338. Content: []xsql.WindowTuples{
  339. {
  340. Emitter: "src1",
  341. Tuples: []xsql.Tuple{
  342. {
  343. Emitter: "src1",
  344. Message: xsql.Message{"id1": 1, "f1": "v1", "a": 8 / 3},
  345. }, {
  346. Emitter: "src1",
  347. Message: xsql.Message{"id1": 2, "f1": "v2"},
  348. }, {
  349. Emitter: "src1",
  350. Message: xsql.Message{"id1": 5, "f1": "v1"},
  351. },
  352. },
  353. },
  354. },
  355. },
  356. result: xsql.WindowTuplesSet{
  357. Content: []xsql.WindowTuples{
  358. {
  359. Emitter: "src1",
  360. Tuples: []xsql.Tuple{
  361. {
  362. Emitter: "src1",
  363. Message: xsql.Message{"id1": 1, "f1": "v1", "a": 8 / 3},
  364. }, {
  365. Emitter: "src1",
  366. Message: xsql.Message{"id1": 2, "f1": "v2"},
  367. }, {
  368. Emitter: "src1",
  369. Message: xsql.Message{"id1": 5, "f1": "v1"},
  370. },
  371. },
  372. },
  373. },
  374. },
  375. },
  376. {
  377. sql: `SELECT sum(id1) as s FROM src1 HAVING s > 1`,
  378. data: xsql.WindowTuplesSet{
  379. Content: []xsql.WindowTuples{
  380. {
  381. Emitter: "src1",
  382. Tuples: []xsql.Tuple{
  383. {
  384. Emitter: "src1",
  385. Message: xsql.Message{"id1": 1, "f1": "v1", "s": 1},
  386. },
  387. },
  388. },
  389. },
  390. },
  391. result: nil,
  392. }, {
  393. sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having c > 1",
  394. data: xsql.GroupedTuplesSet{
  395. {
  396. Content: []xsql.DataValuer{
  397. &xsql.Tuple{
  398. Emitter: "src1",
  399. Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
  400. },
  401. &xsql.Tuple{
  402. Emitter: "src1",
  403. Message: xsql.Message{"id1": 3, "f1": "v1"},
  404. },
  405. },
  406. },
  407. {
  408. Content: []xsql.DataValuer{
  409. &xsql.Tuple{
  410. Emitter: "src1",
  411. Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
  412. },
  413. },
  414. },
  415. },
  416. result: xsql.GroupedTuplesSet{
  417. {
  418. Content: []xsql.DataValuer{
  419. &xsql.Tuple{
  420. Emitter: "src1",
  421. Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
  422. },
  423. &xsql.Tuple{
  424. Emitter: "src1",
  425. Message: xsql.Message{"id1": 3, "f1": "v1"},
  426. },
  427. },
  428. },
  429. },
  430. }, {
  431. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having c > 1",
  432. data: xsql.GroupedTuplesSet{
  433. {
  434. Content: []xsql.DataValuer{
  435. &xsql.JoinTuple{
  436. Tuples: []xsql.Tuple{
  437. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
  438. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  439. },
  440. },
  441. &xsql.JoinTuple{
  442. Tuples: []xsql.Tuple{
  443. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  444. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  445. },
  446. },
  447. },
  448. },
  449. {
  450. Content: []xsql.DataValuer{
  451. &xsql.JoinTuple{
  452. Tuples: []xsql.Tuple{
  453. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 1}},
  454. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  455. },
  456. },
  457. },
  458. },
  459. },
  460. result: xsql.GroupedTuplesSet{
  461. {
  462. Content: []xsql.DataValuer{
  463. &xsql.JoinTuple{
  464. Tuples: []xsql.Tuple{
  465. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
  466. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  467. },
  468. },
  469. &xsql.JoinTuple{
  470. Tuples: []xsql.Tuple{
  471. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  472. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  473. },
  474. },
  475. },
  476. },
  477. },
  478. },
  479. }
  480. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  481. contextLogger := conf.Log.WithField("rule", "TestHavingPlan_Apply")
  482. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  483. for i, tt := range tests {
  484. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  485. if err != nil {
  486. t.Errorf("statement parse error %s", err)
  487. break
  488. }
  489. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  490. pp := &HavingOp{Condition: stmt.Having}
  491. result := pp.Apply(ctx, tt.data, fv, afv)
  492. if !reflect.DeepEqual(tt.result, result) {
  493. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  494. }
  495. }
  496. }
  497. func TestHavingPlanError(t *testing.T) {
  498. var tests = []struct {
  499. sql string
  500. data interface{}
  501. result interface{}
  502. }{
  503. {
  504. sql: `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
  505. data: xsql.WindowTuplesSet{
  506. Content: []xsql.WindowTuples{
  507. {
  508. Emitter: "src1",
  509. Tuples: []xsql.Tuple{
  510. {
  511. Emitter: "src1",
  512. Message: xsql.Message{"id1": 1, "f1": "v1"},
  513. }, {
  514. Emitter: "src1",
  515. Message: xsql.Message{"id1": 2, "f1": "v2"},
  516. }, {
  517. Emitter: "src1",
  518. Message: xsql.Message{"id1": 5, "f1": "v1"},
  519. },
  520. },
  521. },
  522. },
  523. },
  524. result: errors.New("run Having error: invalid operation int64(2) > string(str)"),
  525. }, {
  526. sql: `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
  527. data: errors.New("an error from upstream"),
  528. result: errors.New("an error from upstream"),
  529. }, {
  530. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
  531. data: xsql.GroupedTuplesSet{
  532. {
  533. Content: []xsql.DataValuer{
  534. &xsql.Tuple{
  535. Emitter: "src1",
  536. Message: xsql.Message{"id1": 1, "f1": 3},
  537. },
  538. &xsql.Tuple{
  539. Emitter: "src1",
  540. Message: xsql.Message{"id1": 3, "f1": 3},
  541. },
  542. },
  543. },
  544. {
  545. Content: []xsql.DataValuer{
  546. &xsql.Tuple{
  547. Emitter: "src1",
  548. Message: xsql.Message{"id1": 2, "f1": "v2"},
  549. },
  550. },
  551. },
  552. },
  553. result: errors.New("run Having error: invalid operation int64(3) = string(v2)"),
  554. },
  555. }
  556. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  557. contextLogger := conf.Log.WithField("rule", "TestHavingPlan_Apply")
  558. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  559. for i, tt := range tests {
  560. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  561. if err != nil {
  562. t.Errorf("statement parse error %s", err)
  563. break
  564. }
  565. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  566. pp := &HavingOp{Condition: stmt.Having}
  567. result := pp.Apply(ctx, tt.data, fv, afv)
  568. if !reflect.DeepEqual(tt.result, result) {
  569. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  570. }
  571. }
  572. }