file_sink_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "fmt"
  17. "os"
  18. "path/filepath"
  19. "reflect"
  20. "strconv"
  21. "testing"
  22. "time"
  23. "github.com/lf-edge/ekuiper/internal/compressor"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/topo/context"
  26. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  27. "github.com/lf-edge/ekuiper/internal/topo/transform"
  28. "github.com/lf-edge/ekuiper/pkg/message"
  29. )
  30. // Unit test for Configure function
  31. func TestConfigure(t *testing.T) {
  32. props := map[string]interface{}{
  33. "interval": 500,
  34. "path": "test",
  35. }
  36. m := File().(*fileSink)
  37. err := m.Configure(props)
  38. if err != nil {
  39. t.Errorf("Configure() error = %v, wantErr nil", err)
  40. }
  41. if m.c.Path != "test" {
  42. t.Errorf("Configure() Path = %v, want test", m.c.Path)
  43. }
  44. err = m.Configure(map[string]interface{}{"interval": 500, "path": ""})
  45. if err == nil {
  46. t.Errorf("Configure() error = %v, wantErr not nil", err)
  47. }
  48. err = m.Configure(map[string]interface{}{"fileType": "csv2"})
  49. if err == nil {
  50. t.Errorf("Configure() error = %v, wantErr not nil", err)
  51. }
  52. err = m.Configure(map[string]interface{}{
  53. "interval": 500,
  54. "path": "test",
  55. "fileType": "csv",
  56. })
  57. if err == nil {
  58. t.Errorf("Configure() error = %v, wantErr not nil", err)
  59. }
  60. err = m.Configure(map[string]interface{}{"interval": 60, "path": "test", "checkInterval": -1})
  61. if err == nil {
  62. t.Errorf("Configure() error = %v, wantErr not nil", err)
  63. }
  64. err = m.Configure(map[string]interface{}{"rollingInterval": -1})
  65. if err == nil {
  66. t.Errorf("Configure() error = %v, wantErr not nil", err)
  67. }
  68. err = m.Configure(map[string]interface{}{"rollingCount": -1})
  69. if err == nil {
  70. t.Errorf("Configure() error = %v, wantErr not nil", err)
  71. }
  72. err = m.Configure(map[string]interface{}{"rollingCount": 0, "rollingInterval": 0})
  73. if err == nil {
  74. t.Errorf("Configure() error = %v, wantErr not nil", err)
  75. }
  76. err = m.Configure(map[string]interface{}{"RollingNamePattern": "test"})
  77. if err == nil {
  78. t.Errorf("Configure() error = %v, wantErr not nil", err)
  79. }
  80. err = m.Configure(map[string]interface{}{"RollingNamePattern": 0})
  81. if err == nil {
  82. t.Errorf("Configure() error = %v, wantErr not nil", err)
  83. }
  84. for k := range compressionTypes {
  85. err = m.Configure(map[string]interface{}{
  86. "interval": 500,
  87. "path": "test",
  88. "compression": k,
  89. "rollingNamePattern": "suffix",
  90. })
  91. if err != nil {
  92. t.Errorf("Configure() error = %v, wantErr nil", err)
  93. }
  94. if m.c.Compression != k {
  95. t.Errorf("Configure() Compression = %v, want %v", m.c.Compression, k)
  96. }
  97. }
  98. err = m.Configure(map[string]interface{}{
  99. "interval": 500,
  100. "path": "test",
  101. "compression": "",
  102. "rollingNamePattern": "suffix",
  103. })
  104. if err != nil {
  105. t.Errorf("Configure() error = %v, wantErr nil", err)
  106. }
  107. if m.c.Compression != "" {
  108. t.Errorf("Configure() Compression = %v, want %v", m.c.Compression, "")
  109. }
  110. err = m.Configure(map[string]interface{}{
  111. "interval": 500,
  112. "path": "test",
  113. "compression": "not_exist_algorithm",
  114. })
  115. if err == nil {
  116. t.Errorf("Configure() error = %v, wantErr not nil", err)
  117. }
  118. }
  119. func TestFileSink_Configure(t *testing.T) {
  120. defaultCheckInterval := (5 * time.Minute).Milliseconds()
  121. tests := []struct {
  122. name string
  123. c *sinkConf
  124. p map[string]interface{}
  125. }{
  126. {
  127. name: "default configurations",
  128. c: &sinkConf{
  129. CheckInterval: defaultCheckInterval,
  130. Path: "cache",
  131. FileType: LINES_TYPE,
  132. RollingCount: 1000000,
  133. },
  134. p: map[string]interface{}{},
  135. },
  136. {
  137. name: "new props",
  138. c: &sinkConf{
  139. CheckInterval: 500,
  140. Path: "test",
  141. FileType: CSV_TYPE,
  142. Format: message.FormatDelimited,
  143. Delimiter: ",",
  144. RollingCount: 1000000,
  145. RollingNamePattern: "none",
  146. },
  147. p: map[string]interface{}{
  148. "checkInterval": 500,
  149. "path": "test",
  150. "fileType": "csv",
  151. "format": message.FormatDelimited,
  152. "rollingNamePattern": "none",
  153. },
  154. },
  155. { // only set rolling interval
  156. name: "rolling",
  157. c: &sinkConf{
  158. CheckInterval: defaultCheckInterval,
  159. Path: "cache",
  160. FileType: LINES_TYPE,
  161. RollingInterval: 500,
  162. RollingCount: 0,
  163. },
  164. p: map[string]interface{}{
  165. "rollingInterval": 500,
  166. "rollingCount": 0,
  167. },
  168. },
  169. {
  170. name: "fields",
  171. c: &sinkConf{
  172. CheckInterval: defaultCheckInterval,
  173. Path: "cache",
  174. FileType: LINES_TYPE,
  175. RollingInterval: 500,
  176. RollingCount: 0,
  177. Fields: []string{"c", "a", "b"},
  178. },
  179. p: map[string]interface{}{
  180. "rollingInterval": 500,
  181. "rollingCount": 0,
  182. "fields": []string{"c", "a", "b"},
  183. },
  184. },
  185. }
  186. for _, tt := range tests {
  187. t.Run(tt.name, func(t *testing.T) {
  188. m := &fileSink{}
  189. if err := m.Configure(tt.p); err != nil {
  190. t.Errorf("fileSink.Configure() error = %v", err)
  191. return
  192. }
  193. if !reflect.DeepEqual(m.c, tt.c) {
  194. t.Errorf("fileSink.Configure() = %v, want %v", m.c, tt.c)
  195. }
  196. })
  197. }
  198. }
  199. // Test single file writing and flush by close
  200. func TestFileSink_Collect(t *testing.T) {
  201. tests := []struct {
  202. name string
  203. ft FileType
  204. fname string
  205. content []byte
  206. compress string
  207. }{
  208. {
  209. name: "lines",
  210. ft: LINES_TYPE,
  211. fname: "test_lines",
  212. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  213. },
  214. {
  215. name: "json",
  216. ft: JSON_TYPE,
  217. fname: "test_json",
  218. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  219. },
  220. {
  221. name: "csv",
  222. ft: CSV_TYPE,
  223. fname: "test_csv",
  224. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  225. },
  226. {
  227. name: "lines",
  228. ft: LINES_TYPE,
  229. fname: "test_lines",
  230. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  231. compress: GZIP,
  232. },
  233. {
  234. name: "json",
  235. ft: JSON_TYPE,
  236. fname: "test_json",
  237. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  238. compress: GZIP,
  239. },
  240. {
  241. name: "csv",
  242. ft: CSV_TYPE,
  243. fname: "test_csv",
  244. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  245. compress: GZIP,
  246. },
  247. {
  248. name: "lines",
  249. ft: LINES_TYPE,
  250. fname: "test_lines",
  251. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  252. compress: ZSTD,
  253. },
  254. {
  255. name: "json",
  256. ft: JSON_TYPE,
  257. fname: "test_json",
  258. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  259. compress: ZSTD,
  260. },
  261. {
  262. name: "csv",
  263. ft: CSV_TYPE,
  264. fname: "test_csv",
  265. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  266. compress: ZSTD,
  267. },
  268. }
  269. // Create a stream context for testing
  270. contextLogger := conf.Log.WithField("rule", "test2")
  271. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  272. tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
  273. vCtx := context.WithValue(ctx, context.TransKey, tf)
  274. for _, tt := range tests {
  275. t.Run(tt.name, func(t *testing.T) {
  276. // Create a temporary file for testing
  277. tmpfile, err := os.CreateTemp("", tt.fname)
  278. if err != nil {
  279. t.Fatal(err)
  280. }
  281. defer os.Remove(tmpfile.Name())
  282. // Create a file sink with the temporary file path
  283. sink := &fileSink{}
  284. f := message.FormatJson
  285. if tt.ft == CSV_TYPE {
  286. f = message.FormatDelimited
  287. }
  288. err = sink.Configure(map[string]interface{}{
  289. "path": tmpfile.Name(),
  290. "fileType": tt.ft,
  291. "hasHeader": true,
  292. "format": f,
  293. "rollingNamePattern": "none",
  294. "compression": tt.compress,
  295. "fields": []string{"key"},
  296. })
  297. if err != nil {
  298. t.Fatal(err)
  299. }
  300. err = sink.Open(ctx)
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. // Test collecting a map item
  305. m := map[string]interface{}{"key": "value1"}
  306. if err := sink.Collect(vCtx, m); err != nil {
  307. t.Errorf("unexpected error: %s", err)
  308. }
  309. // Test collecting another map item
  310. m = map[string]interface{}{"key": "value2"}
  311. if err := sink.Collect(ctx, m); err != nil {
  312. t.Errorf("unexpected error: %s", err)
  313. }
  314. if err = sink.Close(ctx); err != nil {
  315. t.Errorf("unexpected close error: %s", err)
  316. }
  317. // Read the contents of the temporary file and check if they match the collected items
  318. contents, err := os.ReadFile(tmpfile.Name())
  319. if err != nil {
  320. t.Fatal(err)
  321. }
  322. if tt.compress != "" {
  323. decompressor, _ := compressor.GetDecompressor(tt.compress)
  324. decompress, err := decompressor.Decompress(contents)
  325. if err != nil {
  326. t.Errorf("%v", err)
  327. }
  328. if !reflect.DeepEqual(decompress, tt.content) {
  329. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  330. }
  331. } else {
  332. if !reflect.DeepEqual(contents, tt.content) {
  333. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  334. }
  335. }
  336. })
  337. }
  338. }
  339. // Test file collect by fields
  340. func TestFileSinkFields_Collect(t *testing.T) {
  341. tests := []struct {
  342. name string
  343. ft FileType
  344. fname string
  345. format string
  346. delimiter string
  347. fields []string
  348. content []byte
  349. }{
  350. {
  351. name: "test1",
  352. ft: CSV_TYPE,
  353. fname: "test_csv",
  354. format: "delimited",
  355. delimiter: ",",
  356. fields: []string{"temperature", "humidity"},
  357. content: []byte("temperature,humidity\n31.2,40"),
  358. },
  359. {
  360. name: "test2",
  361. ft: CSV_TYPE,
  362. fname: "test_csv",
  363. format: "delimited",
  364. delimiter: ",",
  365. content: []byte("humidity,temperature\n40,31.2"),
  366. },
  367. }
  368. // Create a stream context for testing
  369. contextLogger := conf.Log.WithField("rule", "testFields")
  370. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  371. for _, tt := range tests {
  372. t.Run(tt.name, func(t *testing.T) {
  373. tf, _ := transform.GenTransform("", tt.format, "", tt.delimiter, "", tt.fields)
  374. vCtx := context.WithValue(ctx, context.TransKey, tf)
  375. // Create a temporary file for testing
  376. tmpfile, err := os.CreateTemp("", tt.fname)
  377. if err != nil {
  378. t.Fatal(err)
  379. }
  380. defer os.Remove(tmpfile.Name())
  381. // Create a file sink with the temporary file path
  382. sink := &fileSink{}
  383. err = sink.Configure(map[string]interface{}{
  384. "path": tmpfile.Name(),
  385. "fileType": tt.ft,
  386. "hasHeader": true,
  387. "format": tt.format,
  388. "rollingNamePattern": "none",
  389. "fields": tt.fields,
  390. })
  391. if err != nil {
  392. t.Fatal(err)
  393. }
  394. err = sink.Open(ctx)
  395. if err != nil {
  396. t.Fatal(err)
  397. }
  398. // Test collecting a map item
  399. m := map[string]interface{}{"humidity": 40, "temperature": 31.2}
  400. if err := sink.Collect(vCtx, m); err != nil {
  401. t.Errorf("unexpected error: %s", err)
  402. }
  403. if err = sink.Close(ctx); err != nil {
  404. t.Errorf("unexpected close error: %s", err)
  405. }
  406. // Read the contents of the temporary file and check if they match the collected items
  407. contents, err := os.ReadFile(tmpfile.Name())
  408. if err != nil {
  409. t.Fatal(err)
  410. }
  411. if !reflect.DeepEqual(contents, tt.content) {
  412. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  413. }
  414. })
  415. }
  416. }
  417. // Test file rolling by time
  418. func TestFileSinkRolling_Collect(t *testing.T) {
  419. // Remove existing files
  420. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  421. if err != nil {
  422. return err
  423. }
  424. if filepath.Ext(path) == ".log" {
  425. fmt.Println("Deleting file:", path)
  426. return os.Remove(path)
  427. }
  428. return nil
  429. })
  430. if err != nil {
  431. t.Fatal(err)
  432. }
  433. conf.IsTesting = true
  434. tests := []struct {
  435. name string
  436. ft FileType
  437. fname string
  438. contents [2][]byte
  439. compress string
  440. }{
  441. {
  442. name: "lines",
  443. ft: LINES_TYPE,
  444. fname: "test_lines.log",
  445. contents: [2][]byte{
  446. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  447. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  448. },
  449. },
  450. {
  451. name: "json",
  452. ft: JSON_TYPE,
  453. fname: "test_json.log",
  454. contents: [2][]byte{
  455. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  456. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  457. },
  458. },
  459. {
  460. name: "lines",
  461. ft: LINES_TYPE,
  462. fname: "test_lines_gzip.log",
  463. contents: [2][]byte{
  464. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  465. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  466. },
  467. compress: GZIP,
  468. },
  469. {
  470. name: "json",
  471. ft: JSON_TYPE,
  472. fname: "test_json_gzip.log",
  473. contents: [2][]byte{
  474. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  475. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  476. },
  477. compress: GZIP,
  478. },
  479. {
  480. name: "lines",
  481. ft: LINES_TYPE,
  482. fname: "test_lines_zstd.log",
  483. contents: [2][]byte{
  484. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  485. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  486. },
  487. compress: ZSTD,
  488. },
  489. {
  490. name: "json",
  491. ft: JSON_TYPE,
  492. fname: "test_json_zstd.log",
  493. contents: [2][]byte{
  494. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  495. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  496. },
  497. compress: ZSTD,
  498. },
  499. }
  500. // Create a stream context for testing
  501. contextLogger := conf.Log.WithField("rule", "testRolling")
  502. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  503. tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
  504. vCtx := context.WithValue(ctx, context.TransKey, tf)
  505. for _, tt := range tests {
  506. t.Run(tt.name, func(t *testing.T) {
  507. // Create a file sink with the temporary file path
  508. sink := &fileSink{}
  509. err := sink.Configure(map[string]interface{}{
  510. "path": tt.fname,
  511. "fileType": tt.ft,
  512. "rollingInterval": 1000,
  513. "checkInterval": 500,
  514. "rollingCount": 0,
  515. "rollingNamePattern": "suffix",
  516. "compression": tt.compress,
  517. })
  518. if err != nil {
  519. t.Fatal(err)
  520. }
  521. mockclock.ResetClock(10)
  522. err = sink.Open(ctx)
  523. if err != nil {
  524. t.Fatal(err)
  525. }
  526. c := mockclock.GetMockClock()
  527. for i := 0; i < 5; i++ {
  528. c.Add(450 * time.Millisecond)
  529. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  530. if err := sink.Collect(vCtx, m); err != nil {
  531. t.Errorf("unexpected error: %s", err)
  532. }
  533. }
  534. c.After(2000 * time.Millisecond)
  535. if err = sink.Close(ctx); err != nil {
  536. t.Errorf("unexpected close error: %s", err)
  537. }
  538. // Should write to 2 files
  539. for i := 0; i < 2; i++ {
  540. // Read the contents of the temporary file and check if they match the collected items
  541. var fn string
  542. if tt.compress != "" {
  543. fn = fmt.Sprintf("test_%s_%s-%d.log", tt.ft, tt.compress, 460+1350*i)
  544. } else {
  545. fn = fmt.Sprintf("test_%s-%d.log", tt.ft, 460+1350*i)
  546. }
  547. var contents []byte
  548. contents, err := os.ReadFile(fn)
  549. if err != nil {
  550. t.Fatal(err)
  551. }
  552. if tt.compress != "" {
  553. decompressor, _ := compressor.GetDecompressor(tt.compress)
  554. contents, err = decompressor.Decompress(contents)
  555. if err != nil {
  556. t.Errorf("%v", err)
  557. }
  558. }
  559. if !reflect.DeepEqual(contents, tt.contents[i]) {
  560. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  561. }
  562. }
  563. })
  564. }
  565. }
  566. // Test file rolling by count
  567. func TestFileSinkRollingCount_Collect(t *testing.T) {
  568. // Remove existing files
  569. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  570. if err != nil {
  571. return err
  572. }
  573. if filepath.Ext(path) == ".dd" {
  574. fmt.Println("Deleting file:", path)
  575. return os.Remove(path)
  576. }
  577. return nil
  578. })
  579. if err != nil {
  580. t.Fatal(err)
  581. }
  582. conf.IsTesting = true
  583. tests := []struct {
  584. name string
  585. ft FileType
  586. fname string
  587. contents [3][]byte
  588. compress string
  589. }{
  590. {
  591. name: "csv",
  592. ft: CSV_TYPE,
  593. fname: "test_csv_{{.ts}}.dd",
  594. contents: [3][]byte{
  595. []byte("key,ts\nvalue0,460"),
  596. []byte("key,ts\nvalue1,910"),
  597. []byte("key,ts\nvalue2,1360"),
  598. },
  599. },
  600. {
  601. name: "csv",
  602. ft: CSV_TYPE,
  603. fname: "test_csv_gzip_{{.ts}}.dd",
  604. contents: [3][]byte{
  605. []byte("key,ts\nvalue0,460"),
  606. []byte("key,ts\nvalue1,910"),
  607. []byte("key,ts\nvalue2,1360"),
  608. },
  609. compress: GZIP,
  610. },
  611. {
  612. name: "csv",
  613. ft: CSV_TYPE,
  614. fname: "test_csv_zstd_{{.ts}}.dd",
  615. contents: [3][]byte{
  616. []byte("key,ts\nvalue0,460"),
  617. []byte("key,ts\nvalue1,910"),
  618. []byte("key,ts\nvalue2,1360"),
  619. },
  620. compress: ZSTD,
  621. },
  622. }
  623. // Create a stream context for testing
  624. contextLogger := conf.Log.WithField("rule", "testRollingCount")
  625. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  626. tf, _ := transform.GenTransform("", "delimited", "", ",", "", []string{})
  627. vCtx := context.WithValue(ctx, context.TransKey, tf)
  628. for _, tt := range tests {
  629. t.Run(tt.name, func(t *testing.T) {
  630. // Create a file sink with the temporary file path
  631. sink := &fileSink{}
  632. err := sink.Configure(map[string]interface{}{
  633. "path": tt.fname,
  634. "fileType": tt.ft,
  635. "rollingInterval": 0,
  636. "rollingCount": 1,
  637. "rollingNamePattern": "none",
  638. "hasHeader": true,
  639. "format": "delimited",
  640. "compression": tt.compress,
  641. })
  642. if err != nil {
  643. t.Fatal(err)
  644. }
  645. mockclock.ResetClock(10)
  646. err = sink.Open(ctx)
  647. if err != nil {
  648. t.Fatal(err)
  649. }
  650. c := mockclock.GetMockClock()
  651. for i := 0; i < 3; i++ {
  652. c.Add(450 * time.Millisecond)
  653. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  654. if err := sink.Collect(vCtx, m); err != nil {
  655. t.Errorf("unexpected error: %s", err)
  656. }
  657. }
  658. c.After(2000 * time.Millisecond)
  659. if err = sink.Close(ctx); err != nil {
  660. t.Errorf("unexpected close error: %s", err)
  661. }
  662. // Should write to 2 files
  663. for i := 0; i < 3; i++ {
  664. // Read the contents of the temporary file and check if they match the collected items
  665. var fn string
  666. if tt.compress != "" {
  667. fn = fmt.Sprintf("test_%s_%s_%d.dd", tt.ft, tt.compress, 460+450*i)
  668. } else {
  669. fn = fmt.Sprintf("test_%s_%d.dd", tt.ft, 460+450*i)
  670. }
  671. contents, err := os.ReadFile(fn)
  672. if err != nil {
  673. t.Fatal(err)
  674. }
  675. if tt.compress != "" {
  676. decompressor, _ := compressor.GetDecompressor(tt.compress)
  677. contents, err = decompressor.Decompress(contents)
  678. if err != nil {
  679. t.Errorf("%v", err)
  680. }
  681. }
  682. if !reflect.DeepEqual(contents, tt.contents[i]) {
  683. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  684. }
  685. }
  686. })
  687. }
  688. }
  689. func TestFileSinkReopen(t *testing.T) {
  690. // Remove existing files
  691. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  692. if err != nil {
  693. return err
  694. }
  695. if filepath.Ext(path) == ".log" {
  696. fmt.Println("Deleting file:", path)
  697. return os.Remove(path)
  698. }
  699. return nil
  700. })
  701. if err != nil {
  702. t.Fatal(err)
  703. }
  704. conf.IsTesting = true
  705. tmpfile, err := os.CreateTemp("", "reopen.log")
  706. if err != nil {
  707. t.Fatal(err)
  708. }
  709. defer os.Remove(tmpfile.Name())
  710. // Create a stream context for testing
  711. contextLogger := conf.Log.WithField("rule", "testRollingCount")
  712. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  713. tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
  714. vCtx := context.WithValue(ctx, context.TransKey, tf)
  715. sink := &fileSink{}
  716. err = sink.Configure(map[string]interface{}{
  717. "path": tmpfile.Name(),
  718. "fileType": LINES_TYPE,
  719. "format": "json",
  720. "rollingNamePattern": "none",
  721. })
  722. if err != nil {
  723. t.Fatal(err)
  724. }
  725. err = sink.Open(vCtx)
  726. if err != nil {
  727. t.Fatal(err)
  728. }
  729. // Test collecting a map item
  730. m := map[string]interface{}{"key": "value1"}
  731. if err := sink.Collect(vCtx, m); err != nil {
  732. t.Errorf("unexpected error: %s", err)
  733. }
  734. sink.Close(vCtx)
  735. exp := []byte(`{"key":"value1"}`)
  736. contents, err := os.ReadFile(tmpfile.Name())
  737. if err != nil {
  738. t.Fatal(err)
  739. }
  740. if !reflect.DeepEqual(contents, exp) {
  741. t.Errorf("\nexpected\t %q \nbut got\t\t %q", string(exp), string(contents))
  742. }
  743. sink = &fileSink{}
  744. err = sink.Configure(map[string]interface{}{
  745. "path": tmpfile.Name(),
  746. "fileType": LINES_TYPE,
  747. "hasHeader": true,
  748. "format": "json",
  749. "rollingNamePattern": "none",
  750. })
  751. if err != nil {
  752. t.Fatal(err)
  753. }
  754. err = sink.Open(vCtx)
  755. if err != nil {
  756. t.Fatal(err)
  757. }
  758. // Test collecting another map item
  759. m = map[string]interface{}{"key": "value2"}
  760. if err := sink.Collect(vCtx, m); err != nil {
  761. t.Errorf("unexpected error: %s", err)
  762. }
  763. if err = sink.Close(vCtx); err != nil {
  764. t.Errorf("unexpected close error: %s", err)
  765. }
  766. exp = []byte(`{"key":"value2"}`)
  767. contents, err = os.ReadFile(tmpfile.Name())
  768. if err != nil {
  769. t.Fatal(err)
  770. }
  771. if !reflect.DeepEqual(contents, exp) {
  772. t.Errorf("\nexpected\t %q \nbut got\t\t %q", string(exp), string(contents))
  773. }
  774. }