watermark_op_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "testing"
  17. "time"
  18. "github.com/stretchr/testify/assert"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/topo/state"
  22. "github.com/lf-edge/ekuiper/internal/xsql"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. )
  25. func TestSingleStreamWatermark(t *testing.T) {
  26. tests := []struct {
  27. name string
  28. latetol int64
  29. inputs []any // a tuple or a window
  30. outputs []any
  31. }{
  32. {
  33. name: "ordered tuple",
  34. inputs: []any{
  35. &xsql.Tuple{
  36. Emitter: "demo",
  37. Message: map[string]interface{}{
  38. "a": 6,
  39. "b": "aaaa",
  40. },
  41. Timestamp: 10,
  42. },
  43. &xsql.Tuple{
  44. Emitter: "demo",
  45. Message: map[string]interface{}{
  46. "a": 6,
  47. "b": "aaaa",
  48. },
  49. Timestamp: 20,
  50. },
  51. &xsql.Tuple{
  52. Emitter: "demo",
  53. Message: map[string]interface{}{
  54. "a": 6,
  55. "b": "aaaa",
  56. },
  57. Timestamp: 30,
  58. },
  59. },
  60. outputs: []any{
  61. &xsql.Tuple{
  62. Emitter: "demo",
  63. Message: map[string]interface{}{
  64. "a": 6,
  65. "b": "aaaa",
  66. },
  67. Timestamp: 10,
  68. },
  69. &xsql.Tuple{
  70. Emitter: "demo",
  71. Message: map[string]interface{}{
  72. "a": 6,
  73. "b": "aaaa",
  74. },
  75. Timestamp: 20,
  76. },
  77. &xsql.Tuple{
  78. Emitter: "demo",
  79. Message: map[string]interface{}{
  80. "a": 6,
  81. "b": "aaaa",
  82. },
  83. Timestamp: 30,
  84. },
  85. },
  86. }, {
  87. name: "disordered tuple",
  88. latetol: 5,
  89. inputs: []any{
  90. &xsql.Tuple{
  91. Emitter: "demo",
  92. Message: map[string]interface{}{
  93. "a": 6,
  94. "b": "aaaa",
  95. },
  96. Timestamp: 20,
  97. },
  98. &xsql.Tuple{
  99. Emitter: "demo",
  100. Message: map[string]interface{}{
  101. "a": 6,
  102. "b": "aaaa",
  103. },
  104. Timestamp: 10,
  105. },
  106. &xsql.Tuple{
  107. Emitter: "demo",
  108. Message: map[string]interface{}{
  109. "a": 6,
  110. "b": "aaaa",
  111. },
  112. Timestamp: 30,
  113. },
  114. &xsql.Tuple{
  115. Emitter: "demo",
  116. Message: map[string]interface{}{
  117. "a": 5,
  118. "b": "aaaa",
  119. },
  120. Timestamp: 32,
  121. },
  122. &xsql.Tuple{
  123. Emitter: "demo",
  124. Message: map[string]interface{}{
  125. "a": 6,
  126. "b": "aaaa",
  127. },
  128. Timestamp: 32,
  129. },
  130. &xsql.Tuple{
  131. Emitter: "demo",
  132. Message: map[string]interface{}{
  133. "a": 6,
  134. "b": "aaaa",
  135. },
  136. Timestamp: 28,
  137. },
  138. &xsql.Tuple{
  139. Emitter: "demo",
  140. Message: map[string]interface{}{
  141. "a": 6,
  142. "b": "aaaa",
  143. },
  144. Timestamp: 40,
  145. },
  146. },
  147. outputs: []any{
  148. &xsql.Tuple{
  149. Emitter: "demo",
  150. Message: map[string]interface{}{
  151. "a": 6,
  152. "b": "aaaa",
  153. },
  154. Timestamp: 20,
  155. },
  156. &xsql.Tuple{
  157. Emitter: "demo",
  158. Message: map[string]interface{}{
  159. "a": 6,
  160. "b": "aaaa",
  161. },
  162. Timestamp: 28,
  163. },
  164. &xsql.Tuple{
  165. Emitter: "demo",
  166. Message: map[string]interface{}{
  167. "a": 6,
  168. "b": "aaaa",
  169. },
  170. Timestamp: 30,
  171. },
  172. &xsql.Tuple{
  173. Emitter: "demo",
  174. Message: map[string]interface{}{
  175. "a": 5,
  176. "b": "aaaa",
  177. },
  178. Timestamp: 32,
  179. },
  180. &xsql.Tuple{
  181. Emitter: "demo",
  182. Message: map[string]interface{}{
  183. "a": 6,
  184. "b": "aaaa",
  185. },
  186. Timestamp: 32,
  187. },
  188. },
  189. },
  190. }
  191. for _, tt := range tests {
  192. t.Run(tt.name, func(t *testing.T) {
  193. contextLogger := conf.Log.WithField("rule", "TestWatermark")
  194. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  195. tempStore, _ := state.CreateStore("TestWatermark", api.AtMostOnce)
  196. nctx := ctx.WithMeta("TestWatermark", "test", tempStore)
  197. w := NewWatermarkOp("mock", false, []string{"demo"}, &api.RuleOption{
  198. IsEventTime: true,
  199. LateTol: tt.latetol,
  200. Concurrency: 0,
  201. BufferLength: 0,
  202. SendMetaToSink: false,
  203. SendError: false,
  204. Qos: 0,
  205. CheckpointInterval: 0,
  206. })
  207. errCh := make(chan error)
  208. outputCh := make(chan interface{}, 50)
  209. w.outputs["mock"] = outputCh
  210. w.Exec(nctx, errCh)
  211. in := 0
  212. out := 0
  213. result := make([]interface{}, len(tt.outputs))
  214. for in < len(tt.inputs) || out < len(tt.outputs) {
  215. // may send more than once
  216. if in < len(tt.inputs) {
  217. select {
  218. case err := <-errCh:
  219. t.Error(err)
  220. return
  221. case w.input <- tt.inputs[in]:
  222. in++
  223. case outval := <-outputCh:
  224. result[out] = outval
  225. out++
  226. case <-time.After(5 * time.Second):
  227. t.Error("send message timeout")
  228. return
  229. }
  230. } else {
  231. select {
  232. case err := <-errCh:
  233. t.Error(err)
  234. return
  235. case outval := <-outputCh:
  236. result[out] = outval
  237. out++
  238. case <-time.After(5 * time.Second):
  239. t.Error("send message timeout")
  240. return
  241. }
  242. }
  243. }
  244. assert.Equal(t, tt.outputs, result)
  245. })
  246. }
  247. }
  248. func TestMultiStreamWatermark(t *testing.T) {
  249. tests := []struct {
  250. name string
  251. latetol int64
  252. inputs []any // a tuple or a window
  253. outputs []any
  254. }{
  255. {
  256. name: "ordered tuple",
  257. inputs: []any{
  258. &xsql.Tuple{
  259. Emitter: "demo1",
  260. Message: map[string]interface{}{
  261. "a": 6,
  262. "b": "aaaa",
  263. },
  264. Timestamp: 10,
  265. },
  266. &xsql.Tuple{
  267. Emitter: "demo2",
  268. Message: map[string]interface{}{
  269. "a": 6,
  270. "b": "aaaa",
  271. },
  272. Timestamp: 20,
  273. },
  274. &xsql.Tuple{
  275. Emitter: "demo1",
  276. Message: map[string]interface{}{
  277. "a": 6,
  278. "b": "aaaa",
  279. },
  280. Timestamp: 30,
  281. },
  282. },
  283. outputs: []any{
  284. &xsql.Tuple{
  285. Emitter: "demo1",
  286. Message: map[string]interface{}{
  287. "a": 6,
  288. "b": "aaaa",
  289. },
  290. Timestamp: 10,
  291. },
  292. &xsql.WatermarkTuple{
  293. Timestamp: 10,
  294. },
  295. &xsql.Tuple{
  296. Emitter: "demo2",
  297. Message: map[string]interface{}{
  298. "a": 6,
  299. "b": "aaaa",
  300. },
  301. Timestamp: 20,
  302. },
  303. &xsql.WatermarkTuple{
  304. Timestamp: 20,
  305. },
  306. },
  307. }, {
  308. name: "disordered tuple",
  309. latetol: 5,
  310. inputs: []any{
  311. &xsql.Tuple{
  312. Emitter: "demo1",
  313. Message: map[string]interface{}{
  314. "a": 6,
  315. "b": "aaaa",
  316. },
  317. Timestamp: 20,
  318. },
  319. &xsql.Tuple{
  320. Emitter: "demo2",
  321. Message: map[string]interface{}{
  322. "a": 6,
  323. "b": "aaaa",
  324. },
  325. Timestamp: 10,
  326. },
  327. &xsql.Tuple{
  328. Emitter: "demo1",
  329. Message: map[string]interface{}{
  330. "a": 6,
  331. "b": "aaaa",
  332. },
  333. Timestamp: 30,
  334. },
  335. &xsql.Tuple{
  336. Emitter: "demo2",
  337. Message: map[string]interface{}{
  338. "a": 5,
  339. "b": "aaaa",
  340. },
  341. Timestamp: 32,
  342. },
  343. &xsql.Tuple{
  344. Emitter: "demo1",
  345. Message: map[string]interface{}{
  346. "a": 6,
  347. "b": "aaaa",
  348. },
  349. Timestamp: 32,
  350. },
  351. &xsql.Tuple{
  352. Emitter: "demo2",
  353. Message: map[string]interface{}{
  354. "a": 6,
  355. "b": "aaaa",
  356. },
  357. Timestamp: 28,
  358. },
  359. &xsql.Tuple{
  360. Emitter: "demo1",
  361. Message: map[string]interface{}{
  362. "a": 6,
  363. "b": "aaaa",
  364. },
  365. Timestamp: 40,
  366. },
  367. &xsql.Tuple{
  368. Emitter: "demo2",
  369. Message: map[string]interface{}{
  370. "a": 6,
  371. "b": "aaaa",
  372. },
  373. Timestamp: 45,
  374. },
  375. },
  376. outputs: []any{
  377. &xsql.WatermarkTuple{
  378. Timestamp: 5,
  379. },
  380. &xsql.Tuple{
  381. Emitter: "demo2",
  382. Message: map[string]interface{}{
  383. "a": 6,
  384. "b": "aaaa",
  385. },
  386. Timestamp: 10,
  387. },
  388. &xsql.Tuple{
  389. Emitter: "demo1",
  390. Message: map[string]interface{}{
  391. "a": 6,
  392. "b": "aaaa",
  393. },
  394. Timestamp: 20,
  395. },
  396. &xsql.WatermarkTuple{
  397. Timestamp: 25,
  398. },
  399. &xsql.WatermarkTuple{
  400. Timestamp: 27,
  401. },
  402. &xsql.Tuple{
  403. Emitter: "demo2",
  404. Message: map[string]interface{}{
  405. "a": 6,
  406. "b": "aaaa",
  407. },
  408. Timestamp: 28,
  409. },
  410. &xsql.Tuple{
  411. Emitter: "demo1",
  412. Message: map[string]interface{}{
  413. "a": 6,
  414. "b": "aaaa",
  415. },
  416. Timestamp: 30,
  417. },
  418. &xsql.Tuple{
  419. Emitter: "demo2",
  420. Message: map[string]interface{}{
  421. "a": 5,
  422. "b": "aaaa",
  423. },
  424. Timestamp: 32,
  425. },
  426. &xsql.Tuple{
  427. Emitter: "demo1",
  428. Message: map[string]interface{}{
  429. "a": 6,
  430. "b": "aaaa",
  431. },
  432. Timestamp: 32,
  433. },
  434. &xsql.WatermarkTuple{
  435. Timestamp: 35,
  436. },
  437. },
  438. },
  439. }
  440. for _, tt := range tests {
  441. t.Run(tt.name, func(t *testing.T) {
  442. contextLogger := conf.Log.WithField("rule", "TestWatermark")
  443. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  444. tempStore, _ := state.CreateStore("TestWatermark", api.AtMostOnce)
  445. nctx := ctx.WithMeta("TestWatermark", "test", tempStore)
  446. w := NewWatermarkOp("mock", true, []string{"demo1", "demo2"}, &api.RuleOption{
  447. IsEventTime: true,
  448. LateTol: tt.latetol,
  449. Concurrency: 0,
  450. BufferLength: 0,
  451. SendMetaToSink: false,
  452. SendError: false,
  453. Qos: 0,
  454. CheckpointInterval: 0,
  455. })
  456. errCh := make(chan error)
  457. outputCh := make(chan interface{}, 50)
  458. w.outputs["mock"] = outputCh
  459. w.Exec(nctx, errCh)
  460. in := 0
  461. out := 0
  462. result := make([]interface{}, len(tt.outputs))
  463. for in < len(tt.inputs) || out < len(tt.outputs) {
  464. // may send more than once
  465. if in < len(tt.inputs) {
  466. select {
  467. case err := <-errCh:
  468. t.Error(err)
  469. return
  470. case w.input <- tt.inputs[in]:
  471. in++
  472. case outval := <-outputCh:
  473. // fmt.Printf("%v\n", outval)
  474. result[out] = outval
  475. out++
  476. case <-time.After(5 * time.Second):
  477. t.Error("send message timeout")
  478. return
  479. }
  480. } else {
  481. select {
  482. case err := <-errCh:
  483. t.Error(err)
  484. return
  485. case outval := <-outputCh:
  486. // fmt.Printf("%v\n", outval)
  487. result[out] = outval
  488. out++
  489. case <-time.After(5 * time.Second):
  490. t.Error("send message timeout")
  491. return
  492. }
  493. }
  494. }
  495. assert.Equal(t, tt.outputs, result)
  496. })
  497. }
  498. }