aggregate_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. package operator
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/lf-edge/ekuiper/internal/conf"
  6. "github.com/lf-edge/ekuiper/internal/topo/context"
  7. "github.com/lf-edge/ekuiper/internal/xsql"
  8. "github.com/lf-edge/ekuiper/pkg/cast"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestAggregatePlan_Apply(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data interface{}
  17. result xsql.GroupedTuplesSet
  18. }{
  19. {
  20. sql: "SELECT abc FROM tbl group by abc",
  21. data: &xsql.Tuple{
  22. Emitter: "tbl",
  23. Message: xsql.Message{
  24. "abc": int64(6),
  25. "def": "hello",
  26. },
  27. },
  28. result: xsql.GroupedTuplesSet{
  29. {
  30. Content: []xsql.DataValuer{&xsql.Tuple{
  31. Emitter: "tbl",
  32. Message: xsql.Message{
  33. "abc": int64(6),
  34. "def": "hello",
  35. },
  36. },
  37. },
  38. },
  39. },
  40. },
  41. {
  42. sql: "SELECT abc FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  43. data: xsql.WindowTuplesSet{
  44. Content: []xsql.WindowTuples{{
  45. Emitter: "src1",
  46. Tuples: []xsql.Tuple{
  47. {
  48. Emitter: "src1",
  49. Message: xsql.Message{"id1": 1, "f1": "v1"},
  50. }, {
  51. Emitter: "src1",
  52. Message: xsql.Message{"id1": 2, "f1": "v2"},
  53. }, {
  54. Emitter: "src1",
  55. Message: xsql.Message{"id1": 3, "f1": "v1"},
  56. },
  57. },
  58. },
  59. },
  60. WindowRange: &xsql.WindowRange{
  61. WindowStart: 1541152486013,
  62. WindowEnd: 1541152487013,
  63. },
  64. },
  65. result: xsql.GroupedTuplesSet{
  66. {
  67. Content: []xsql.DataValuer{
  68. &xsql.Tuple{
  69. Emitter: "src1",
  70. Message: xsql.Message{"id1": 1, "f1": "v1"},
  71. },
  72. &xsql.Tuple{
  73. Emitter: "src1",
  74. Message: xsql.Message{"id1": 3, "f1": "v1"},
  75. },
  76. },
  77. WindowRange: &xsql.WindowRange{
  78. WindowStart: 1541152486013,
  79. WindowEnd: 1541152487013,
  80. },
  81. },
  82. {
  83. Content: []xsql.DataValuer{
  84. &xsql.Tuple{
  85. Emitter: "src1",
  86. Message: xsql.Message{"id1": 2, "f1": "v2"},
  87. },
  88. },
  89. WindowRange: &xsql.WindowRange{
  90. WindowStart: 1541152486013,
  91. WindowEnd: 1541152487013,
  92. },
  93. },
  94. },
  95. },
  96. {
  97. sql: "SELECT abc FROM src1 GROUP BY id1, TUMBLINGWINDOW(ss, 10), f1",
  98. data: xsql.WindowTuplesSet{
  99. Content: []xsql.WindowTuples{{
  100. Emitter: "src1",
  101. Tuples: []xsql.Tuple{
  102. {
  103. Emitter: "src1",
  104. Message: xsql.Message{"id1": 1, "f1": "v1"},
  105. }, {
  106. Emitter: "src1",
  107. Message: xsql.Message{"id1": 2, "f1": "v2"},
  108. }, {
  109. Emitter: "src1",
  110. Message: xsql.Message{"id1": 3, "f1": "v1"},
  111. },
  112. },
  113. },
  114. },
  115. },
  116. result: xsql.GroupedTuplesSet{
  117. {
  118. Content: []xsql.DataValuer{
  119. &xsql.Tuple{
  120. Emitter: "src1",
  121. Message: xsql.Message{"id1": 1, "f1": "v1"},
  122. },
  123. },
  124. },
  125. {
  126. Content: []xsql.DataValuer{
  127. &xsql.Tuple{
  128. Emitter: "src1",
  129. Message: xsql.Message{"id1": 2, "f1": "v2"},
  130. },
  131. },
  132. },
  133. {
  134. Content: []xsql.DataValuer{
  135. &xsql.Tuple{
  136. Emitter: "src1",
  137. Message: xsql.Message{"id1": 3, "f1": "v1"},
  138. },
  139. },
  140. },
  141. },
  142. },
  143. {
  144. sql: "SELECT abc FROM src1 GROUP BY meta(topic), TUMBLINGWINDOW(ss, 10)",
  145. data: xsql.WindowTuplesSet{
  146. Content: []xsql.WindowTuples{{
  147. Emitter: "src1",
  148. Tuples: []xsql.Tuple{
  149. {
  150. Emitter: "src1",
  151. Message: xsql.Message{"id1": 1, "f1": "v1"},
  152. Metadata: xsql.Metadata{"topic": "topic1"},
  153. }, {
  154. Emitter: "src1",
  155. Message: xsql.Message{"id1": 2, "f1": "v2"},
  156. Metadata: xsql.Metadata{"topic": "topic2"},
  157. }, {
  158. Emitter: "src1",
  159. Message: xsql.Message{"id1": 3, "f1": "v1"},
  160. Metadata: xsql.Metadata{"topic": "topic1"},
  161. },
  162. },
  163. },
  164. },
  165. },
  166. result: xsql.GroupedTuplesSet{
  167. {
  168. Content: []xsql.DataValuer{
  169. &xsql.Tuple{
  170. Emitter: "src1",
  171. Message: xsql.Message{"id1": 1, "f1": "v1"},
  172. Metadata: xsql.Metadata{"topic": "topic1"},
  173. },
  174. &xsql.Tuple{
  175. Emitter: "src1",
  176. Message: xsql.Message{"id1": 3, "f1": "v1"},
  177. Metadata: xsql.Metadata{"topic": "topic1"},
  178. },
  179. },
  180. },
  181. {
  182. Content: []xsql.DataValuer{
  183. &xsql.Tuple{
  184. Emitter: "src1",
  185. Message: xsql.Message{"id1": 2, "f1": "v2"},
  186. Metadata: xsql.Metadata{"topic": "topic2"},
  187. },
  188. },
  189. },
  190. },
  191. },
  192. {
  193. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  194. data: &xsql.JoinTupleSets{
  195. Content: []xsql.JoinTuple{
  196. {
  197. Tuples: []xsql.Tuple{
  198. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  199. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  200. },
  201. },
  202. {
  203. Tuples: []xsql.Tuple{
  204. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  205. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  206. },
  207. },
  208. {
  209. Tuples: []xsql.Tuple{
  210. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  211. },
  212. },
  213. },
  214. WindowRange: &xsql.WindowRange{
  215. WindowStart: 1541152486013,
  216. WindowEnd: 1541152487013,
  217. },
  218. },
  219. result: xsql.GroupedTuplesSet{
  220. {
  221. Content: []xsql.DataValuer{
  222. &xsql.JoinTuple{
  223. Tuples: []xsql.Tuple{
  224. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  225. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  226. },
  227. },
  228. },
  229. WindowRange: &xsql.WindowRange{
  230. WindowStart: 1541152486013,
  231. WindowEnd: 1541152487013,
  232. },
  233. },
  234. {
  235. Content: []xsql.DataValuer{
  236. &xsql.JoinTuple{
  237. Tuples: []xsql.Tuple{
  238. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  239. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  240. },
  241. },
  242. },
  243. WindowRange: &xsql.WindowRange{
  244. WindowStart: 1541152486013,
  245. WindowEnd: 1541152487013,
  246. },
  247. },
  248. {
  249. Content: []xsql.DataValuer{
  250. &xsql.JoinTuple{
  251. Tuples: []xsql.Tuple{
  252. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  253. },
  254. },
  255. },
  256. WindowRange: &xsql.WindowRange{
  257. WindowStart: 1541152486013,
  258. WindowEnd: 1541152487013,
  259. },
  260. },
  261. },
  262. },
  263. {
  264. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY TUMBLINGWINDOW(ss, 10), src1.f1",
  265. data: &xsql.JoinTupleSets{
  266. Content: []xsql.JoinTuple{
  267. {
  268. Tuples: []xsql.Tuple{
  269. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  270. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  271. },
  272. },
  273. {
  274. Tuples: []xsql.Tuple{
  275. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  276. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  277. },
  278. },
  279. {
  280. Tuples: []xsql.Tuple{
  281. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  282. },
  283. },
  284. },
  285. },
  286. result: xsql.GroupedTuplesSet{
  287. {
  288. Content: []xsql.DataValuer{
  289. &xsql.JoinTuple{
  290. Tuples: []xsql.Tuple{
  291. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  292. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  293. },
  294. },
  295. &xsql.JoinTuple{
  296. Tuples: []xsql.Tuple{
  297. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  298. },
  299. },
  300. },
  301. },
  302. {
  303. Content: []xsql.DataValuer{
  304. &xsql.JoinTuple{
  305. Tuples: []xsql.Tuple{
  306. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  307. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  308. },
  309. },
  310. },
  311. },
  312. },
  313. },
  314. {
  315. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY TUMBLINGWINDOW(ss, 10), src1.ts",
  316. data: &xsql.JoinTupleSets{
  317. Content: []xsql.JoinTuple{
  318. {
  319. Tuples: []xsql.Tuple{
  320. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)}},
  321. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  322. },
  323. },
  324. {
  325. Tuples: []xsql.Tuple{
  326. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854573431)}},
  327. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  328. },
  329. },
  330. {
  331. Tuples: []xsql.Tuple{
  332. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)}},
  333. },
  334. },
  335. },
  336. },
  337. result: xsql.GroupedTuplesSet{
  338. {
  339. Content: []xsql.DataValuer{
  340. &xsql.JoinTuple{
  341. Tuples: []xsql.Tuple{
  342. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)}},
  343. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  344. },
  345. },
  346. &xsql.JoinTuple{
  347. Tuples: []xsql.Tuple{
  348. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)}},
  349. },
  350. },
  351. },
  352. },
  353. {
  354. Content: []xsql.DataValuer{
  355. &xsql.JoinTuple{
  356. Tuples: []xsql.Tuple{
  357. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854573431)}},
  358. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  359. },
  360. },
  361. },
  362. },
  363. },
  364. },
  365. {
  366. sql: "SELECT abc FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), CASE WHEN id1 > 1 THEN \"others\" ELSE \"one\" END",
  367. data: xsql.WindowTuplesSet{
  368. Content: []xsql.WindowTuples{{
  369. Emitter: "src1",
  370. Tuples: []xsql.Tuple{
  371. {
  372. Emitter: "src1",
  373. Message: xsql.Message{"id1": 1, "f1": "v1"},
  374. }, {
  375. Emitter: "src1",
  376. Message: xsql.Message{"id1": 2, "f1": "v2"},
  377. }, {
  378. Emitter: "src1",
  379. Message: xsql.Message{"id1": 3, "f1": "v1"},
  380. },
  381. },
  382. },
  383. },
  384. },
  385. result: xsql.GroupedTuplesSet{
  386. {
  387. Content: []xsql.DataValuer{
  388. &xsql.Tuple{
  389. Emitter: "src1",
  390. Message: xsql.Message{"id1": 1, "f1": "v1"},
  391. },
  392. },
  393. },
  394. {
  395. Content: []xsql.DataValuer{
  396. &xsql.Tuple{
  397. Emitter: "src1",
  398. Message: xsql.Message{"id1": 2, "f1": "v2"},
  399. },
  400. &xsql.Tuple{
  401. Emitter: "src1",
  402. Message: xsql.Message{"id1": 3, "f1": "v1"},
  403. },
  404. },
  405. },
  406. },
  407. },
  408. }
  409. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  410. contextLogger := conf.Log.WithField("rule", "TestFilterPlan_Apply")
  411. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  412. for i, tt := range tests {
  413. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  414. if err != nil {
  415. t.Errorf("statement parse error %s", err)
  416. break
  417. }
  418. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  419. pp := &AggregateOp{Dimensions: stmt.Dimensions.GetGroups()}
  420. result := pp.Apply(ctx, tt.data, fv, afv)
  421. gr, ok := result.(xsql.GroupedTuplesSet)
  422. if !ok {
  423. t.Errorf("result is not GroupedTuplesSet")
  424. }
  425. if len(tt.result) != len(gr) {
  426. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, gr)
  427. }
  428. for _, r := range tt.result {
  429. matched := false
  430. for _, gre := range gr {
  431. if reflect.DeepEqual(r, gre) {
  432. matched = true
  433. }
  434. }
  435. if !matched {
  436. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, gr)
  437. }
  438. }
  439. }
  440. }
  441. func TestAggregatePlanError(t *testing.T) {
  442. tests := []struct {
  443. sql string
  444. data interface{}
  445. result error
  446. }{
  447. {
  448. sql: "SELECT abc FROM tbl group by abc",
  449. data: errors.New("an error from upstream"),
  450. result: errors.New("an error from upstream"),
  451. },
  452. {
  453. sql: "SELECT abc FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 * 2",
  454. data: xsql.WindowTuplesSet{
  455. Content: []xsql.WindowTuples{
  456. {
  457. Emitter: "src1",
  458. Tuples: []xsql.Tuple{
  459. {
  460. Emitter: "src1",
  461. Message: xsql.Message{"id1": 1, "f1": "v1"},
  462. }, {
  463. Emitter: "src1",
  464. Message: xsql.Message{"id1": 2, "f1": "v2"},
  465. }, {
  466. Emitter: "src1",
  467. Message: xsql.Message{"id1": 3, "f1": "v1"},
  468. },
  469. },
  470. },
  471. },
  472. },
  473. result: errors.New("run Group By error: invalid operation string(v1) * int64(2)"),
  474. },
  475. }
  476. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  477. contextLogger := conf.Log.WithField("rule", "TestFilterPlanError")
  478. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  479. for i, tt := range tests {
  480. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  481. if err != nil {
  482. t.Errorf("statement parse error %s", err)
  483. break
  484. }
  485. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  486. pp := &AggregateOp{Dimensions: stmt.Dimensions.GetGroups()}
  487. result := pp.Apply(ctx, tt.data, fv, afv)
  488. if !reflect.DeepEqual(tt.result, result) {
  489. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  490. }
  491. }
  492. }