order_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. package operator
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/internal/conf"
  6. "github.com/emqx/kuiper/internal/topo/context"
  7. "github.com/emqx/kuiper/internal/xsql"
  8. "github.com/emqx/kuiper/pkg/cast"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestOrderPlan_Apply(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data interface{}
  17. result interface{}
  18. }{
  19. {
  20. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20 ORDER BY abc",
  21. data: &xsql.Tuple{
  22. Emitter: "tbl",
  23. Message: xsql.Message{
  24. "abc": int64(6),
  25. },
  26. },
  27. result: &xsql.Tuple{
  28. Emitter: "tbl",
  29. Message: xsql.Message{
  30. "abc": int64(6),
  31. },
  32. },
  33. },
  34. {
  35. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  36. data: &xsql.Tuple{
  37. Emitter: "tbl",
  38. Message: xsql.Message{
  39. "abc": int64(34),
  40. "def": "hello",
  41. },
  42. },
  43. result: &xsql.Tuple{
  44. Emitter: "tbl",
  45. Message: xsql.Message{
  46. "abc": int64(34),
  47. "def": "hello",
  48. },
  49. },
  50. },
  51. {
  52. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY id1 DESC",
  53. data: xsql.WindowTuplesSet{
  54. Content: []xsql.WindowTuples{
  55. {
  56. Emitter: "src1",
  57. Tuples: []xsql.Tuple{
  58. {
  59. Emitter: "src1",
  60. Message: xsql.Message{"id1": 1, "f1": "v1"},
  61. }, {
  62. Emitter: "src1",
  63. Message: xsql.Message{"id1": 2, "f1": "v2"},
  64. }, {
  65. Emitter: "src1",
  66. Message: xsql.Message{"id1": 3, "f1": "v1"},
  67. },
  68. },
  69. },
  70. },
  71. },
  72. result: xsql.WindowTuplesSet{
  73. Content: []xsql.WindowTuples{
  74. {
  75. Emitter: "src1",
  76. Tuples: []xsql.Tuple{
  77. {
  78. Emitter: "src1",
  79. Message: xsql.Message{"id1": 3, "f1": "v1"},
  80. }, {
  81. Emitter: "src1",
  82. Message: xsql.Message{"id1": 2, "f1": "v2"},
  83. }, {
  84. Emitter: "src1",
  85. Message: xsql.Message{"id1": 1, "f1": "v1"},
  86. },
  87. },
  88. },
  89. },
  90. },
  91. },
  92. {
  93. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY id1 DESC",
  94. data: xsql.WindowTuplesSet{
  95. Content: []xsql.WindowTuples{
  96. {
  97. Emitter: "src1",
  98. Tuples: []xsql.Tuple{
  99. {
  100. Emitter: "src1",
  101. Message: xsql.Message{"id1": 1, "f1": "v1"},
  102. }, {
  103. Emitter: "src1",
  104. Message: xsql.Message{"f1": "v2"},
  105. }, {
  106. Emitter: "src1",
  107. Message: xsql.Message{"id1": 3, "f1": "v1"},
  108. },
  109. },
  110. },
  111. },
  112. WindowRange: &xsql.WindowRange{
  113. WindowStart: 1541152486013,
  114. WindowEnd: 1541152487013,
  115. },
  116. },
  117. result: xsql.WindowTuplesSet{
  118. Content: []xsql.WindowTuples{
  119. {
  120. Emitter: "src1",
  121. Tuples: []xsql.Tuple{
  122. {
  123. Emitter: "src1",
  124. Message: xsql.Message{"id1": 3, "f1": "v1"},
  125. }, {
  126. Emitter: "src1",
  127. Message: xsql.Message{"id1": 1, "f1": "v1"},
  128. }, {
  129. Emitter: "src1",
  130. Message: xsql.Message{"f1": "v2"},
  131. },
  132. },
  133. },
  134. },
  135. WindowRange: &xsql.WindowRange{
  136. WindowStart: 1541152486013,
  137. WindowEnd: 1541152487013,
  138. },
  139. },
  140. },
  141. {
  142. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY id1 DESC",
  143. data: xsql.WindowTuplesSet{
  144. Content: []xsql.WindowTuples{
  145. {
  146. Emitter: "src1",
  147. Tuples: []xsql.Tuple{
  148. {
  149. Emitter: "src1",
  150. Message: xsql.Message{"id1": 1, "f1": "v1"},
  151. }, {
  152. Emitter: "src1",
  153. Message: xsql.Message{"id1": "2string", "f1": "v2"},
  154. }, {
  155. Emitter: "src1",
  156. Message: xsql.Message{"id1": 3, "f1": "v1"},
  157. },
  158. },
  159. },
  160. },
  161. },
  162. result: errors.New("run Order By error: incompatible types for comparison: int and string"),
  163. },
  164. {
  165. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY f1, id1 DESC",
  166. data: xsql.WindowTuplesSet{
  167. Content: []xsql.WindowTuples{
  168. {
  169. Emitter: "src1",
  170. Tuples: []xsql.Tuple{
  171. {
  172. Emitter: "src1",
  173. Message: xsql.Message{"id1": 1, "f1": "v1"},
  174. }, {
  175. Emitter: "src1",
  176. Message: xsql.Message{"id1": 2, "f1": "v2"},
  177. }, {
  178. Emitter: "src1",
  179. Message: xsql.Message{"id1": 3, "f1": "v1"},
  180. },
  181. },
  182. },
  183. },
  184. },
  185. result: xsql.WindowTuplesSet{
  186. Content: []xsql.WindowTuples{
  187. {
  188. Emitter: "src1",
  189. Tuples: []xsql.Tuple{
  190. {
  191. Emitter: "src1",
  192. Message: xsql.Message{"id1": 3, "f1": "v1"},
  193. }, {
  194. Emitter: "src1",
  195. Message: xsql.Message{"id1": 1, "f1": "v1"},
  196. }, {
  197. Emitter: "src1",
  198. Message: xsql.Message{"id1": 2, "f1": "v2"},
  199. },
  200. },
  201. },
  202. },
  203. },
  204. },
  205. {
  206. sql: "SELECT * FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY ts DESC",
  207. data: xsql.WindowTuplesSet{
  208. Content: []xsql.WindowTuples{
  209. {
  210. Emitter: "src1",
  211. Tuples: []xsql.Tuple{
  212. {
  213. Emitter: "src1",
  214. Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)},
  215. }, {
  216. Emitter: "src1",
  217. Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854525000)},
  218. }, {
  219. Emitter: "src1",
  220. Message: xsql.Message{"id1": 3, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854535000)},
  221. },
  222. },
  223. },
  224. },
  225. },
  226. result: xsql.WindowTuplesSet{
  227. Content: []xsql.WindowTuples{
  228. {
  229. Emitter: "src1",
  230. Tuples: []xsql.Tuple{
  231. {
  232. Emitter: "src1",
  233. Message: xsql.Message{"id1": 3, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854535000)},
  234. }, {
  235. Emitter: "src1",
  236. Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854525000)},
  237. }, {
  238. Emitter: "src1",
  239. Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)},
  240. },
  241. },
  242. },
  243. },
  244. },
  245. },
  246. {
  247. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY src1.id1 desc",
  248. data: &xsql.JoinTupleSets{
  249. Content: []xsql.JoinTuple{
  250. {
  251. Tuples: []xsql.Tuple{
  252. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  253. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  254. },
  255. },
  256. {
  257. Tuples: []xsql.Tuple{
  258. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  259. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  260. },
  261. },
  262. {
  263. Tuples: []xsql.Tuple{
  264. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  265. },
  266. },
  267. },
  268. },
  269. result: &xsql.JoinTupleSets{
  270. Content: []xsql.JoinTuple{
  271. {
  272. Tuples: []xsql.Tuple{
  273. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  274. },
  275. },
  276. {
  277. Tuples: []xsql.Tuple{
  278. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  279. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  280. },
  281. },
  282. {
  283. Tuples: []xsql.Tuple{
  284. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  285. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  286. },
  287. },
  288. },
  289. },
  290. },
  291. {
  292. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2",
  293. data: &xsql.JoinTupleSets{
  294. Content: []xsql.JoinTuple{
  295. {
  296. Tuples: []xsql.Tuple{
  297. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  298. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  299. },
  300. },
  301. {
  302. Tuples: []xsql.Tuple{
  303. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  304. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  305. },
  306. },
  307. {
  308. Tuples: []xsql.Tuple{
  309. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  310. },
  311. },
  312. },
  313. },
  314. result: &xsql.JoinTupleSets{
  315. Content: []xsql.JoinTuple{
  316. {
  317. Tuples: []xsql.Tuple{
  318. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  319. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  320. },
  321. },
  322. {
  323. Tuples: []xsql.Tuple{
  324. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  325. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  326. },
  327. },
  328. {
  329. Tuples: []xsql.Tuple{
  330. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  331. },
  332. },
  333. },
  334. },
  335. },
  336. {
  337. sql: "SELECT abc FROM tbl group by abc ORDER BY def",
  338. data: xsql.GroupedTuplesSet{
  339. {
  340. Content: []xsql.DataValuer{
  341. &xsql.Tuple{
  342. Emitter: "tbl",
  343. Message: xsql.Message{
  344. "abc": int64(6),
  345. "def": "hello",
  346. },
  347. },
  348. },
  349. },
  350. },
  351. result: xsql.GroupedTuplesSet{
  352. {
  353. Content: []xsql.DataValuer{
  354. &xsql.Tuple{
  355. Emitter: "tbl",
  356. Message: xsql.Message{
  357. "abc": int64(6),
  358. "def": "hello",
  359. },
  360. },
  361. },
  362. },
  363. },
  364. },
  365. {
  366. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 ORDER BY id1 desc",
  367. data: xsql.GroupedTuplesSet{
  368. {
  369. Content: []xsql.DataValuer{
  370. &xsql.Tuple{
  371. Emitter: "src1",
  372. Message: xsql.Message{"id1": 1, "f1": "v1"},
  373. },
  374. &xsql.Tuple{
  375. Emitter: "src1",
  376. Message: xsql.Message{"id1": 3, "f1": "v1"},
  377. },
  378. },
  379. },
  380. {
  381. Content: []xsql.DataValuer{
  382. &xsql.Tuple{
  383. Emitter: "src1",
  384. Message: xsql.Message{"id1": 2, "f1": "v2"},
  385. },
  386. },
  387. },
  388. },
  389. result: xsql.GroupedTuplesSet{
  390. {
  391. Content: []xsql.DataValuer{
  392. &xsql.Tuple{
  393. Emitter: "src1",
  394. Message: xsql.Message{"id1": 2, "f1": "v2"},
  395. },
  396. },
  397. },
  398. {
  399. Content: []xsql.DataValuer{
  400. &xsql.Tuple{
  401. Emitter: "src1",
  402. Message: xsql.Message{"id1": 1, "f1": "v1"},
  403. },
  404. &xsql.Tuple{
  405. Emitter: "src1",
  406. Message: xsql.Message{"id1": 3, "f1": "v1"},
  407. },
  408. },
  409. },
  410. },
  411. },
  412. {
  413. sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 ORDER BY c",
  414. data: xsql.GroupedTuplesSet{
  415. {
  416. Content: []xsql.DataValuer{
  417. &xsql.Tuple{
  418. Emitter: "src1",
  419. Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
  420. },
  421. &xsql.Tuple{
  422. Emitter: "src1",
  423. Message: xsql.Message{"id1": 3, "f1": "v1"},
  424. },
  425. },
  426. },
  427. {
  428. Content: []xsql.DataValuer{
  429. &xsql.Tuple{
  430. Emitter: "src1",
  431. Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
  432. },
  433. },
  434. },
  435. },
  436. result: xsql.GroupedTuplesSet{
  437. {
  438. Content: []xsql.DataValuer{
  439. &xsql.Tuple{
  440. Emitter: "src1",
  441. Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
  442. },
  443. },
  444. },
  445. {
  446. Content: []xsql.DataValuer{
  447. &xsql.Tuple{
  448. Emitter: "src1",
  449. Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
  450. },
  451. &xsql.Tuple{
  452. Emitter: "src1",
  453. Message: xsql.Message{"id1": 3, "f1": "v1"},
  454. },
  455. },
  456. },
  457. },
  458. },
  459. {
  460. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2 DESC",
  461. data: xsql.GroupedTuplesSet{
  462. {
  463. Content: []xsql.DataValuer{
  464. &xsql.JoinTuple{
  465. Tuples: []xsql.Tuple{
  466. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  467. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  468. },
  469. },
  470. },
  471. WindowRange: &xsql.WindowRange{
  472. WindowStart: 1541152486013,
  473. WindowEnd: 1541152487013,
  474. },
  475. },
  476. {
  477. Content: []xsql.DataValuer{
  478. &xsql.JoinTuple{
  479. Tuples: []xsql.Tuple{
  480. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  481. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  482. },
  483. },
  484. },
  485. WindowRange: &xsql.WindowRange{
  486. WindowStart: 1541152486013,
  487. WindowEnd: 1541152487013,
  488. },
  489. },
  490. {
  491. Content: []xsql.DataValuer{
  492. &xsql.JoinTuple{
  493. Tuples: []xsql.Tuple{
  494. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  495. },
  496. },
  497. },
  498. WindowRange: &xsql.WindowRange{
  499. WindowStart: 1541152486013,
  500. WindowEnd: 1541152487013,
  501. },
  502. },
  503. },
  504. result: xsql.GroupedTuplesSet{
  505. {
  506. Content: []xsql.DataValuer{
  507. &xsql.JoinTuple{
  508. Tuples: []xsql.Tuple{
  509. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  510. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  511. },
  512. },
  513. },
  514. WindowRange: &xsql.WindowRange{
  515. WindowStart: 1541152486013,
  516. WindowEnd: 1541152487013,
  517. },
  518. },
  519. {
  520. Content: []xsql.DataValuer{
  521. &xsql.JoinTuple{
  522. Tuples: []xsql.Tuple{
  523. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  524. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  525. },
  526. },
  527. },
  528. WindowRange: &xsql.WindowRange{
  529. WindowStart: 1541152486013,
  530. WindowEnd: 1541152487013,
  531. },
  532. },
  533. {
  534. Content: []xsql.DataValuer{
  535. &xsql.JoinTuple{
  536. Tuples: []xsql.Tuple{
  537. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  538. },
  539. },
  540. },
  541. WindowRange: &xsql.WindowRange{
  542. WindowStart: 1541152486013,
  543. WindowEnd: 1541152487013,
  544. },
  545. },
  546. },
  547. },
  548. }
  549. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  550. contextLogger := conf.Log.WithField("rule", "TestOrderPlan_Apply")
  551. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  552. for i, tt := range tests {
  553. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  554. if err != nil {
  555. t.Errorf("statement parse error %s", err)
  556. break
  557. }
  558. pp := &OrderOp{SortFields: stmt.SortFields}
  559. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  560. result := pp.Apply(ctx, tt.data, fv, afv)
  561. if !reflect.DeepEqual(tt.result, result) {
  562. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  563. }
  564. }
  565. }