order_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. "reflect"
  23. "strings"
  24. "testing"
  25. )
  26. func TestOrderPlan_Apply(t *testing.T) {
  27. var tests = []struct {
  28. sql string
  29. data interface{}
  30. result interface{}
  31. }{
  32. {
  33. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20 ORDER BY abc",
  34. data: &xsql.Tuple{
  35. Emitter: "tbl",
  36. Message: xsql.Message{
  37. "abc": int64(6),
  38. },
  39. },
  40. result: &xsql.Tuple{
  41. Emitter: "tbl",
  42. Message: xsql.Message{
  43. "abc": int64(6),
  44. },
  45. },
  46. },
  47. {
  48. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  49. data: &xsql.Tuple{
  50. Emitter: "tbl",
  51. Message: xsql.Message{
  52. "abc": int64(34),
  53. "def": "hello",
  54. },
  55. },
  56. result: &xsql.Tuple{
  57. Emitter: "tbl",
  58. Message: xsql.Message{
  59. "abc": int64(34),
  60. "def": "hello",
  61. },
  62. },
  63. },
  64. {
  65. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY id1 DESC",
  66. data: xsql.WindowTuplesSet{
  67. Content: []xsql.WindowTuples{
  68. {
  69. Emitter: "src1",
  70. Tuples: []xsql.Tuple{
  71. {
  72. Emitter: "src1",
  73. Message: xsql.Message{"id1": 1, "f1": "v1"},
  74. }, {
  75. Emitter: "src1",
  76. Message: xsql.Message{"id1": 2, "f1": "v2"},
  77. }, {
  78. Emitter: "src1",
  79. Message: xsql.Message{"id1": 3, "f1": "v1"},
  80. },
  81. },
  82. },
  83. },
  84. },
  85. result: xsql.WindowTuplesSet{
  86. Content: []xsql.WindowTuples{
  87. {
  88. Emitter: "src1",
  89. Tuples: []xsql.Tuple{
  90. {
  91. Emitter: "src1",
  92. Message: xsql.Message{"id1": 3, "f1": "v1"},
  93. }, {
  94. Emitter: "src1",
  95. Message: xsql.Message{"id1": 2, "f1": "v2"},
  96. }, {
  97. Emitter: "src1",
  98. Message: xsql.Message{"id1": 1, "f1": "v1"},
  99. },
  100. },
  101. },
  102. },
  103. },
  104. },
  105. {
  106. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY id1 DESC",
  107. data: xsql.WindowTuplesSet{
  108. Content: []xsql.WindowTuples{
  109. {
  110. Emitter: "src1",
  111. Tuples: []xsql.Tuple{
  112. {
  113. Emitter: "src1",
  114. Message: xsql.Message{"id1": 1, "f1": "v1"},
  115. }, {
  116. Emitter: "src1",
  117. Message: xsql.Message{"f1": "v2"},
  118. }, {
  119. Emitter: "src1",
  120. Message: xsql.Message{"id1": 3, "f1": "v1"},
  121. },
  122. },
  123. },
  124. },
  125. WindowRange: &xsql.WindowRange{
  126. WindowStart: 1541152486013,
  127. WindowEnd: 1541152487013,
  128. },
  129. },
  130. result: xsql.WindowTuplesSet{
  131. Content: []xsql.WindowTuples{
  132. {
  133. Emitter: "src1",
  134. Tuples: []xsql.Tuple{
  135. {
  136. Emitter: "src1",
  137. Message: xsql.Message{"id1": 3, "f1": "v1"},
  138. }, {
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 1, "f1": "v1"},
  141. }, {
  142. Emitter: "src1",
  143. Message: xsql.Message{"f1": "v2"},
  144. },
  145. },
  146. },
  147. },
  148. WindowRange: &xsql.WindowRange{
  149. WindowStart: 1541152486013,
  150. WindowEnd: 1541152487013,
  151. },
  152. },
  153. },
  154. {
  155. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY id1 DESC",
  156. data: xsql.WindowTuplesSet{
  157. Content: []xsql.WindowTuples{
  158. {
  159. Emitter: "src1",
  160. Tuples: []xsql.Tuple{
  161. {
  162. Emitter: "src1",
  163. Message: xsql.Message{"id1": 1, "f1": "v1"},
  164. }, {
  165. Emitter: "src1",
  166. Message: xsql.Message{"id1": "2string", "f1": "v2"},
  167. }, {
  168. Emitter: "src1",
  169. Message: xsql.Message{"id1": 3, "f1": "v1"},
  170. },
  171. },
  172. },
  173. },
  174. },
  175. result: errors.New("run Order By error: incompatible types for comparison: int and string"),
  176. },
  177. {
  178. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY f1, id1 DESC",
  179. data: xsql.WindowTuplesSet{
  180. Content: []xsql.WindowTuples{
  181. {
  182. Emitter: "src1",
  183. Tuples: []xsql.Tuple{
  184. {
  185. Emitter: "src1",
  186. Message: xsql.Message{"id1": 1, "f1": "v1"},
  187. }, {
  188. Emitter: "src1",
  189. Message: xsql.Message{"id1": 2, "f1": "v2"},
  190. }, {
  191. Emitter: "src1",
  192. Message: xsql.Message{"id1": 3, "f1": "v1"},
  193. },
  194. },
  195. },
  196. },
  197. },
  198. result: xsql.WindowTuplesSet{
  199. Content: []xsql.WindowTuples{
  200. {
  201. Emitter: "src1",
  202. Tuples: []xsql.Tuple{
  203. {
  204. Emitter: "src1",
  205. Message: xsql.Message{"id1": 3, "f1": "v1"},
  206. }, {
  207. Emitter: "src1",
  208. Message: xsql.Message{"id1": 1, "f1": "v1"},
  209. }, {
  210. Emitter: "src1",
  211. Message: xsql.Message{"id1": 2, "f1": "v2"},
  212. },
  213. },
  214. },
  215. },
  216. },
  217. },
  218. {
  219. sql: "SELECT * FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY ts DESC",
  220. data: xsql.WindowTuplesSet{
  221. Content: []xsql.WindowTuples{
  222. {
  223. Emitter: "src1",
  224. Tuples: []xsql.Tuple{
  225. {
  226. Emitter: "src1",
  227. Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)},
  228. }, {
  229. Emitter: "src1",
  230. Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854525000)},
  231. }, {
  232. Emitter: "src1",
  233. Message: xsql.Message{"id1": 3, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854535000)},
  234. },
  235. },
  236. },
  237. },
  238. },
  239. result: xsql.WindowTuplesSet{
  240. Content: []xsql.WindowTuples{
  241. {
  242. Emitter: "src1",
  243. Tuples: []xsql.Tuple{
  244. {
  245. Emitter: "src1",
  246. Message: xsql.Message{"id1": 3, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854535000)},
  247. }, {
  248. Emitter: "src1",
  249. Message: xsql.Message{"id1": 2, "f1": "v2", "ts": cast.TimeFromUnixMilli(1568854525000)},
  250. }, {
  251. Emitter: "src1",
  252. Message: xsql.Message{"id1": 1, "f1": "v1", "ts": cast.TimeFromUnixMilli(1568854515000)},
  253. },
  254. },
  255. },
  256. },
  257. },
  258. },
  259. {
  260. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY src1.id1 desc",
  261. data: &xsql.JoinTupleSets{
  262. Content: []xsql.JoinTuple{
  263. {
  264. Tuples: []xsql.Tuple{
  265. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  266. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  267. },
  268. },
  269. {
  270. Tuples: []xsql.Tuple{
  271. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  272. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  273. },
  274. },
  275. {
  276. Tuples: []xsql.Tuple{
  277. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  278. },
  279. },
  280. },
  281. },
  282. result: &xsql.JoinTupleSets{
  283. Content: []xsql.JoinTuple{
  284. {
  285. Tuples: []xsql.Tuple{
  286. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  287. },
  288. },
  289. {
  290. Tuples: []xsql.Tuple{
  291. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  292. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  293. },
  294. },
  295. {
  296. Tuples: []xsql.Tuple{
  297. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  298. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  299. },
  300. },
  301. },
  302. },
  303. },
  304. {
  305. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2",
  306. data: &xsql.JoinTupleSets{
  307. Content: []xsql.JoinTuple{
  308. {
  309. Tuples: []xsql.Tuple{
  310. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  311. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  312. },
  313. },
  314. {
  315. Tuples: []xsql.Tuple{
  316. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  317. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  318. },
  319. },
  320. {
  321. Tuples: []xsql.Tuple{
  322. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  323. },
  324. },
  325. },
  326. },
  327. result: &xsql.JoinTupleSets{
  328. Content: []xsql.JoinTuple{
  329. {
  330. Tuples: []xsql.Tuple{
  331. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  332. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  333. },
  334. },
  335. {
  336. Tuples: []xsql.Tuple{
  337. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  338. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  339. },
  340. },
  341. {
  342. Tuples: []xsql.Tuple{
  343. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  344. },
  345. },
  346. },
  347. },
  348. },
  349. {
  350. sql: "SELECT abc FROM tbl group by abc ORDER BY def",
  351. data: xsql.GroupedTuplesSet{
  352. {
  353. Content: []xsql.DataValuer{
  354. &xsql.Tuple{
  355. Emitter: "tbl",
  356. Message: xsql.Message{
  357. "abc": int64(6),
  358. "def": "hello",
  359. },
  360. },
  361. },
  362. },
  363. },
  364. result: xsql.GroupedTuplesSet{
  365. {
  366. Content: []xsql.DataValuer{
  367. &xsql.Tuple{
  368. Emitter: "tbl",
  369. Message: xsql.Message{
  370. "abc": int64(6),
  371. "def": "hello",
  372. },
  373. },
  374. },
  375. },
  376. },
  377. },
  378. {
  379. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 ORDER BY id1 desc",
  380. data: xsql.GroupedTuplesSet{
  381. {
  382. Content: []xsql.DataValuer{
  383. &xsql.Tuple{
  384. Emitter: "src1",
  385. Message: xsql.Message{"id1": 1, "f1": "v1"},
  386. },
  387. &xsql.Tuple{
  388. Emitter: "src1",
  389. Message: xsql.Message{"id1": 3, "f1": "v1"},
  390. },
  391. },
  392. },
  393. {
  394. Content: []xsql.DataValuer{
  395. &xsql.Tuple{
  396. Emitter: "src1",
  397. Message: xsql.Message{"id1": 2, "f1": "v2"},
  398. },
  399. },
  400. },
  401. },
  402. result: xsql.GroupedTuplesSet{
  403. {
  404. Content: []xsql.DataValuer{
  405. &xsql.Tuple{
  406. Emitter: "src1",
  407. Message: xsql.Message{"id1": 2, "f1": "v2"},
  408. },
  409. },
  410. },
  411. {
  412. Content: []xsql.DataValuer{
  413. &xsql.Tuple{
  414. Emitter: "src1",
  415. Message: xsql.Message{"id1": 1, "f1": "v1"},
  416. },
  417. &xsql.Tuple{
  418. Emitter: "src1",
  419. Message: xsql.Message{"id1": 3, "f1": "v1"},
  420. },
  421. },
  422. },
  423. },
  424. },
  425. {
  426. sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 ORDER BY c",
  427. data: xsql.GroupedTuplesSet{
  428. {
  429. Content: []xsql.DataValuer{
  430. &xsql.Tuple{
  431. Emitter: "src1",
  432. Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
  433. },
  434. &xsql.Tuple{
  435. Emitter: "src1",
  436. Message: xsql.Message{"id1": 3, "f1": "v1"},
  437. },
  438. },
  439. },
  440. {
  441. Content: []xsql.DataValuer{
  442. &xsql.Tuple{
  443. Emitter: "src1",
  444. Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
  445. },
  446. },
  447. },
  448. },
  449. result: xsql.GroupedTuplesSet{
  450. {
  451. Content: []xsql.DataValuer{
  452. &xsql.Tuple{
  453. Emitter: "src1",
  454. Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
  455. },
  456. },
  457. },
  458. {
  459. Content: []xsql.DataValuer{
  460. &xsql.Tuple{
  461. Emitter: "src1",
  462. Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
  463. },
  464. &xsql.Tuple{
  465. Emitter: "src1",
  466. Message: xsql.Message{"id1": 3, "f1": "v1"},
  467. },
  468. },
  469. },
  470. },
  471. },
  472. {
  473. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2 DESC",
  474. data: xsql.GroupedTuplesSet{
  475. {
  476. Content: []xsql.DataValuer{
  477. &xsql.JoinTuple{
  478. Tuples: []xsql.Tuple{
  479. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  480. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  481. },
  482. },
  483. },
  484. WindowRange: &xsql.WindowRange{
  485. WindowStart: 1541152486013,
  486. WindowEnd: 1541152487013,
  487. },
  488. },
  489. {
  490. Content: []xsql.DataValuer{
  491. &xsql.JoinTuple{
  492. Tuples: []xsql.Tuple{
  493. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  494. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  495. },
  496. },
  497. },
  498. WindowRange: &xsql.WindowRange{
  499. WindowStart: 1541152486013,
  500. WindowEnd: 1541152487013,
  501. },
  502. },
  503. {
  504. Content: []xsql.DataValuer{
  505. &xsql.JoinTuple{
  506. Tuples: []xsql.Tuple{
  507. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  508. },
  509. },
  510. },
  511. WindowRange: &xsql.WindowRange{
  512. WindowStart: 1541152486013,
  513. WindowEnd: 1541152487013,
  514. },
  515. },
  516. },
  517. result: xsql.GroupedTuplesSet{
  518. {
  519. Content: []xsql.DataValuer{
  520. &xsql.JoinTuple{
  521. Tuples: []xsql.Tuple{
  522. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  523. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  524. },
  525. },
  526. },
  527. WindowRange: &xsql.WindowRange{
  528. WindowStart: 1541152486013,
  529. WindowEnd: 1541152487013,
  530. },
  531. },
  532. {
  533. Content: []xsql.DataValuer{
  534. &xsql.JoinTuple{
  535. Tuples: []xsql.Tuple{
  536. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  537. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  538. },
  539. },
  540. },
  541. WindowRange: &xsql.WindowRange{
  542. WindowStart: 1541152486013,
  543. WindowEnd: 1541152487013,
  544. },
  545. },
  546. {
  547. Content: []xsql.DataValuer{
  548. &xsql.JoinTuple{
  549. Tuples: []xsql.Tuple{
  550. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  551. },
  552. },
  553. },
  554. WindowRange: &xsql.WindowRange{
  555. WindowStart: 1541152486013,
  556. WindowEnd: 1541152487013,
  557. },
  558. },
  559. },
  560. },
  561. }
  562. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  563. contextLogger := conf.Log.WithField("rule", "TestOrderPlan_Apply")
  564. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  565. for i, tt := range tests {
  566. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  567. if err != nil {
  568. t.Errorf("statement parse error %s", err)
  569. break
  570. }
  571. pp := &OrderOp{SortFields: stmt.SortFields}
  572. fv, afv := xsql.NewFunctionValuersForOp(nil)
  573. result := pp.Apply(ctx, tt.data, fv, afv)
  574. if !reflect.DeepEqual(tt.result, result) {
  575. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  576. }
  577. }
  578. }