file_sink_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/compressor"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  21. "github.com/lf-edge/ekuiper/internal/topo/transform"
  22. "github.com/lf-edge/ekuiper/pkg/message"
  23. "os"
  24. "path/filepath"
  25. "reflect"
  26. "strconv"
  27. "testing"
  28. "time"
  29. )
  30. // Unit test for Configure function
  31. func TestConfigure(t *testing.T) {
  32. props := map[string]interface{}{
  33. "interval": 500,
  34. "path": "test",
  35. }
  36. m := File().(*fileSink)
  37. err := m.Configure(props)
  38. if err != nil {
  39. t.Errorf("Configure() error = %v, wantErr nil", err)
  40. }
  41. if *m.c.Interval != 500 {
  42. t.Errorf("Configure() Interval = %v, want 500", m.c.Interval)
  43. }
  44. if m.c.Path != "test" {
  45. t.Errorf("Configure() Path = %v, want test", m.c.Path)
  46. }
  47. err = m.Configure(map[string]interface{}{"interval": -1, "path": "test"})
  48. if err == nil {
  49. t.Errorf("Configure() error = %v, wantErr not nil", err)
  50. }
  51. err = m.Configure(map[string]interface{}{"interval": 500, "path": ""})
  52. if err == nil {
  53. t.Errorf("Configure() error = %v, wantErr not nil", err)
  54. }
  55. err = m.Configure(map[string]interface{}{"fileType": "csv2"})
  56. if err == nil {
  57. t.Errorf("Configure() error = %v, wantErr not nil", err)
  58. }
  59. err = m.Configure(map[string]interface{}{"interval": 500,
  60. "path": "test",
  61. "fileType": "csv"})
  62. if err == nil {
  63. t.Errorf("Configure() error = %v, wantErr not nil", err)
  64. }
  65. err = m.Configure(map[string]interface{}{"interval": 60, "path": "test", "checkInterval": -1})
  66. if err == nil {
  67. t.Errorf("Configure() error = %v, wantErr not nil", err)
  68. }
  69. err = m.Configure(map[string]interface{}{"rollingInterval": -1})
  70. if err == nil {
  71. t.Errorf("Configure() error = %v, wantErr not nil", err)
  72. }
  73. err = m.Configure(map[string]interface{}{"rollingCount": -1})
  74. if err == nil {
  75. t.Errorf("Configure() error = %v, wantErr not nil", err)
  76. }
  77. err = m.Configure(map[string]interface{}{"rollingCount": 0, "rollingInterval": 0})
  78. if err == nil {
  79. t.Errorf("Configure() error = %v, wantErr not nil", err)
  80. }
  81. err = m.Configure(map[string]interface{}{"RollingNamePattern": "test"})
  82. if err == nil {
  83. t.Errorf("Configure() error = %v, wantErr not nil", err)
  84. }
  85. err = m.Configure(map[string]interface{}{"RollingNamePattern": 0})
  86. if err == nil {
  87. t.Errorf("Configure() error = %v, wantErr not nil", err)
  88. }
  89. for k := range compressionTypes {
  90. err = m.Configure(map[string]interface{}{
  91. "interval": 500,
  92. "path": "test",
  93. "compression": k,
  94. "rollingNamePattern": "suffix",
  95. })
  96. if err != nil {
  97. t.Errorf("Configure() error = %v, wantErr nil", err)
  98. }
  99. if m.c.Compression != k {
  100. t.Errorf("Configure() Compression = %v, want %v", m.c.Compression, k)
  101. }
  102. }
  103. err = m.Configure(map[string]interface{}{
  104. "interval": 500,
  105. "path": "test",
  106. "compression": "",
  107. "rollingNamePattern": "suffix",
  108. })
  109. if err != nil {
  110. t.Errorf("Configure() error = %v, wantErr nil", err)
  111. }
  112. if m.c.Compression != "" {
  113. t.Errorf("Configure() Compression = %v, want %v", m.c.Compression, "")
  114. }
  115. err = m.Configure(map[string]interface{}{
  116. "interval": 500,
  117. "path": "test",
  118. "compression": "not_exist_algorithm",
  119. })
  120. if err == nil {
  121. t.Errorf("Configure() error = %v, wantErr not nil", err)
  122. }
  123. }
  124. func TestFileSink_Configure(t *testing.T) {
  125. var (
  126. defaultCheckInterval = (5 * time.Minute).Milliseconds()
  127. int500 = 500
  128. int64_500 = int64(int500)
  129. )
  130. tests := []struct {
  131. name string
  132. c *sinkConf
  133. p map[string]interface{}
  134. }{
  135. {
  136. name: "default configurations",
  137. c: &sinkConf{
  138. CheckInterval: &defaultCheckInterval,
  139. Path: "cache",
  140. FileType: LINES_TYPE,
  141. RollingCount: 1000000,
  142. },
  143. p: map[string]interface{}{},
  144. },
  145. {
  146. name: "previous setting",
  147. c: &sinkConf{
  148. Interval: &int500,
  149. CheckInterval: &int64_500,
  150. Path: "test",
  151. FileType: LINES_TYPE,
  152. RollingCount: 1000000,
  153. },
  154. p: map[string]interface{}{
  155. "interval": 500,
  156. "path": "test",
  157. },
  158. },
  159. {
  160. name: "new props",
  161. c: &sinkConf{
  162. CheckInterval: &int64_500,
  163. Path: "test",
  164. FileType: CSV_TYPE,
  165. Format: message.FormatDelimited,
  166. Delimiter: ",",
  167. RollingCount: 1000000,
  168. RollingNamePattern: "none",
  169. },
  170. p: map[string]interface{}{
  171. "checkInterval": 500,
  172. "path": "test",
  173. "fileType": "csv",
  174. "format": message.FormatDelimited,
  175. "rollingNamePattern": "none",
  176. },
  177. },
  178. { // only set rolling interval
  179. name: "rolling",
  180. c: &sinkConf{
  181. CheckInterval: &defaultCheckInterval,
  182. Path: "cache",
  183. FileType: LINES_TYPE,
  184. RollingInterval: 500,
  185. RollingCount: 0,
  186. },
  187. p: map[string]interface{}{
  188. "rollingInterval": 500,
  189. "rollingCount": 0,
  190. },
  191. },
  192. }
  193. for _, tt := range tests {
  194. t.Run(tt.name, func(t *testing.T) {
  195. m := &fileSink{}
  196. if err := m.Configure(tt.p); err != nil {
  197. t.Errorf("fileSink.Configure() error = %v", err)
  198. return
  199. }
  200. if !reflect.DeepEqual(m.c, tt.c) {
  201. t.Errorf("fileSink.Configure() = %v, want %v", m.c, tt.c)
  202. }
  203. })
  204. }
  205. }
  206. // Test single file writing and flush by close
  207. func TestFileSink_Collect(t *testing.T) {
  208. tests := []struct {
  209. name string
  210. ft FileType
  211. fname string
  212. content []byte
  213. compress string
  214. }{
  215. {
  216. name: "lines",
  217. ft: LINES_TYPE,
  218. fname: "test_lines",
  219. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  220. }, {
  221. name: "json",
  222. ft: JSON_TYPE,
  223. fname: "test_json",
  224. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  225. }, {
  226. name: "csv",
  227. ft: CSV_TYPE,
  228. fname: "test_csv",
  229. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  230. },
  231. {
  232. name: "lines",
  233. ft: LINES_TYPE,
  234. fname: "test_lines",
  235. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  236. compress: GZIP,
  237. }, {
  238. name: "json",
  239. ft: JSON_TYPE,
  240. fname: "test_json",
  241. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  242. compress: GZIP,
  243. }, {
  244. name: "csv",
  245. ft: CSV_TYPE,
  246. fname: "test_csv",
  247. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  248. compress: GZIP,
  249. },
  250. {
  251. name: "lines",
  252. ft: LINES_TYPE,
  253. fname: "test_lines",
  254. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  255. compress: ZSTD,
  256. }, {
  257. name: "json",
  258. ft: JSON_TYPE,
  259. fname: "test_json",
  260. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  261. compress: ZSTD,
  262. }, {
  263. name: "csv",
  264. ft: CSV_TYPE,
  265. fname: "test_csv",
  266. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  267. compress: ZSTD,
  268. },
  269. }
  270. // Create a stream context for testing
  271. contextLogger := conf.Log.WithField("rule", "test2")
  272. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  273. tf, _ := transform.GenTransform("", "json", "", "", []string{})
  274. vCtx := context.WithValue(ctx, context.TransKey, tf)
  275. for _, tt := range tests {
  276. t.Run(tt.name, func(t *testing.T) {
  277. // Create a temporary file for testing
  278. tmpfile, err := os.CreateTemp("", tt.fname)
  279. if err != nil {
  280. t.Fatal(err)
  281. }
  282. defer os.Remove(tmpfile.Name())
  283. // Create a file sink with the temporary file path
  284. sink := &fileSink{}
  285. f := message.FormatJson
  286. if tt.ft == CSV_TYPE {
  287. f = message.FormatDelimited
  288. }
  289. err = sink.Configure(map[string]interface{}{
  290. "path": tmpfile.Name(),
  291. "fileType": tt.ft,
  292. "hasHeader": true,
  293. "format": f,
  294. "rollingNamePattern": "none",
  295. "compression": tt.compress,
  296. "fields": []string{"key"},
  297. })
  298. if err != nil {
  299. t.Fatal(err)
  300. }
  301. err = sink.Open(ctx)
  302. if err != nil {
  303. t.Fatal(err)
  304. }
  305. // Test collecting a map item
  306. m := map[string]interface{}{"key": "value1"}
  307. if err := sink.Collect(vCtx, m); err != nil {
  308. t.Errorf("unexpected error: %s", err)
  309. }
  310. // Test collecting another map item
  311. m = map[string]interface{}{"key": "value2"}
  312. if err := sink.Collect(ctx, m); err != nil {
  313. t.Errorf("unexpected error: %s", err)
  314. }
  315. if err = sink.Close(ctx); err != nil {
  316. t.Errorf("unexpected close error: %s", err)
  317. }
  318. // Read the contents of the temporary file and check if they match the collected items
  319. contents, err := os.ReadFile(tmpfile.Name())
  320. if err != nil {
  321. t.Fatal(err)
  322. }
  323. if tt.compress != "" {
  324. decompressor, _ := compressor.GetDecompressor(tt.compress)
  325. decompress, err := decompressor.Decompress(contents)
  326. if err != nil {
  327. t.Errorf("%v", err)
  328. }
  329. if !reflect.DeepEqual(decompress, tt.content) {
  330. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  331. }
  332. } else {
  333. if !reflect.DeepEqual(contents, tt.content) {
  334. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  335. }
  336. }
  337. })
  338. }
  339. }
  340. // Test file rolling by time
  341. func TestFileSinkRolling_Collect(t *testing.T) {
  342. // Remove existing files
  343. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  344. if err != nil {
  345. return err
  346. }
  347. if filepath.Ext(path) == ".log" {
  348. fmt.Println("Deleting file:", path)
  349. return os.Remove(path)
  350. }
  351. return nil
  352. })
  353. if err != nil {
  354. t.Fatal(err)
  355. }
  356. conf.IsTesting = true
  357. tests := []struct {
  358. name string
  359. ft FileType
  360. fname string
  361. contents [2][]byte
  362. compress string
  363. }{
  364. {
  365. name: "lines",
  366. ft: LINES_TYPE,
  367. fname: "test_lines.log",
  368. contents: [2][]byte{
  369. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  370. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  371. },
  372. }, {
  373. name: "json",
  374. ft: JSON_TYPE,
  375. fname: "test_json.log",
  376. contents: [2][]byte{
  377. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  378. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  379. },
  380. },
  381. {
  382. name: "lines",
  383. ft: LINES_TYPE,
  384. fname: "test_lines_gzip.log",
  385. contents: [2][]byte{
  386. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  387. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  388. },
  389. compress: GZIP,
  390. }, {
  391. name: "json",
  392. ft: JSON_TYPE,
  393. fname: "test_json_gzip.log",
  394. contents: [2][]byte{
  395. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  396. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  397. },
  398. compress: GZIP,
  399. },
  400. {
  401. name: "lines",
  402. ft: LINES_TYPE,
  403. fname: "test_lines_zstd.log",
  404. contents: [2][]byte{
  405. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  406. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  407. },
  408. compress: ZSTD,
  409. }, {
  410. name: "json",
  411. ft: JSON_TYPE,
  412. fname: "test_json_zstd.log",
  413. contents: [2][]byte{
  414. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  415. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  416. },
  417. compress: ZSTD,
  418. },
  419. }
  420. // Create a stream context for testing
  421. contextLogger := conf.Log.WithField("rule", "testRolling")
  422. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  423. tf, _ := transform.GenTransform("", "json", "", "", []string{})
  424. vCtx := context.WithValue(ctx, context.TransKey, tf)
  425. for _, tt := range tests {
  426. t.Run(tt.name, func(t *testing.T) {
  427. // Create a file sink with the temporary file path
  428. sink := &fileSink{}
  429. err := sink.Configure(map[string]interface{}{
  430. "path": tt.fname,
  431. "fileType": tt.ft,
  432. "rollingInterval": 1000,
  433. "checkInterval": 500,
  434. "rollingCount": 0,
  435. "rollingNamePattern": "suffix",
  436. "compression": tt.compress,
  437. })
  438. if err != nil {
  439. t.Fatal(err)
  440. }
  441. mockclock.ResetClock(10)
  442. err = sink.Open(ctx)
  443. if err != nil {
  444. t.Fatal(err)
  445. }
  446. c := mockclock.GetMockClock()
  447. for i := 0; i < 5; i++ {
  448. c.Add(450 * time.Millisecond)
  449. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  450. if err := sink.Collect(vCtx, m); err != nil {
  451. t.Errorf("unexpected error: %s", err)
  452. }
  453. }
  454. c.After(2000 * time.Millisecond)
  455. if err = sink.Close(ctx); err != nil {
  456. t.Errorf("unexpected close error: %s", err)
  457. }
  458. // Should write to 2 files
  459. for i := 0; i < 2; i++ {
  460. // Read the contents of the temporary file and check if they match the collected items
  461. var fn string
  462. if tt.compress != "" {
  463. fn = fmt.Sprintf("test_%s_%s-%d.log", tt.ft, tt.compress, 460+1350*i)
  464. } else {
  465. fn = fmt.Sprintf("test_%s-%d.log", tt.ft, 460+1350*i)
  466. }
  467. var contents []byte
  468. contents, err := os.ReadFile(fn)
  469. if err != nil {
  470. t.Fatal(err)
  471. }
  472. if tt.compress != "" {
  473. decompressor, _ := compressor.GetDecompressor(tt.compress)
  474. contents, err = decompressor.Decompress(contents)
  475. if err != nil {
  476. t.Errorf("%v", err)
  477. }
  478. }
  479. if !reflect.DeepEqual(contents, tt.contents[i]) {
  480. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  481. }
  482. }
  483. })
  484. }
  485. }
  486. // Test file rolling by count
  487. func TestFileSinkRollingCount_Collect(t *testing.T) {
  488. // Remove existing files
  489. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  490. if err != nil {
  491. return err
  492. }
  493. if filepath.Ext(path) == ".dd" {
  494. fmt.Println("Deleting file:", path)
  495. return os.Remove(path)
  496. }
  497. return nil
  498. })
  499. if err != nil {
  500. t.Fatal(err)
  501. }
  502. conf.IsTesting = true
  503. tests := []struct {
  504. name string
  505. ft FileType
  506. fname string
  507. contents [3][]byte
  508. compress string
  509. }{
  510. {
  511. name: "csv",
  512. ft: CSV_TYPE,
  513. fname: "test_csv_{{.ts}}.dd",
  514. contents: [3][]byte{
  515. []byte("key,ts\nvalue0,460"),
  516. []byte("key,ts\nvalue1,910"),
  517. []byte("key,ts\nvalue2,1360"),
  518. },
  519. },
  520. {
  521. name: "csv",
  522. ft: CSV_TYPE,
  523. fname: "test_csv_gzip_{{.ts}}.dd",
  524. contents: [3][]byte{
  525. []byte("key,ts\nvalue0,460"),
  526. []byte("key,ts\nvalue1,910"),
  527. []byte("key,ts\nvalue2,1360"),
  528. },
  529. compress: GZIP,
  530. },
  531. {
  532. name: "csv",
  533. ft: CSV_TYPE,
  534. fname: "test_csv_zstd_{{.ts}}.dd",
  535. contents: [3][]byte{
  536. []byte("key,ts\nvalue0,460"),
  537. []byte("key,ts\nvalue1,910"),
  538. []byte("key,ts\nvalue2,1360"),
  539. },
  540. compress: ZSTD,
  541. },
  542. }
  543. // Create a stream context for testing
  544. contextLogger := conf.Log.WithField("rule", "testRollingCount")
  545. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  546. tf, _ := transform.GenTransform("", "delimited", "", ",", []string{})
  547. vCtx := context.WithValue(ctx, context.TransKey, tf)
  548. for _, tt := range tests {
  549. t.Run(tt.name, func(t *testing.T) {
  550. // Create a file sink with the temporary file path
  551. sink := &fileSink{}
  552. err := sink.Configure(map[string]interface{}{
  553. "path": tt.fname,
  554. "fileType": tt.ft,
  555. "rollingInterval": 0,
  556. "rollingCount": 1,
  557. "rollingNamePattern": "none",
  558. "hasHeader": true,
  559. "format": "delimited",
  560. "compression": tt.compress,
  561. })
  562. if err != nil {
  563. t.Fatal(err)
  564. }
  565. mockclock.ResetClock(10)
  566. err = sink.Open(ctx)
  567. if err != nil {
  568. t.Fatal(err)
  569. }
  570. c := mockclock.GetMockClock()
  571. for i := 0; i < 3; i++ {
  572. c.Add(450 * time.Millisecond)
  573. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  574. if err := sink.Collect(vCtx, m); err != nil {
  575. t.Errorf("unexpected error: %s", err)
  576. }
  577. }
  578. c.After(2000 * time.Millisecond)
  579. if err = sink.Close(ctx); err != nil {
  580. t.Errorf("unexpected close error: %s", err)
  581. }
  582. // Should write to 2 files
  583. for i := 0; i < 3; i++ {
  584. // Read the contents of the temporary file and check if they match the collected items
  585. var fn string
  586. if tt.compress != "" {
  587. fn = fmt.Sprintf("test_%s_%s_%d.dd", tt.ft, tt.compress, 460+450*i)
  588. } else {
  589. fn = fmt.Sprintf("test_%s_%d.dd", tt.ft, 460+450*i)
  590. }
  591. contents, err := os.ReadFile(fn)
  592. if err != nil {
  593. t.Fatal(err)
  594. }
  595. if tt.compress != "" {
  596. decompressor, _ := compressor.GetDecompressor(tt.compress)
  597. contents, err = decompressor.Decompress(contents)
  598. if err != nil {
  599. t.Errorf("%v", err)
  600. }
  601. }
  602. if !reflect.DeepEqual(contents, tt.contents[i]) {
  603. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  604. }
  605. }
  606. })
  607. }
  608. }
  609. func TestFileSinkReopen(t *testing.T) {
  610. // Remove existing files
  611. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  612. if err != nil {
  613. return err
  614. }
  615. if filepath.Ext(path) == ".log" {
  616. fmt.Println("Deleting file:", path)
  617. return os.Remove(path)
  618. }
  619. return nil
  620. })
  621. if err != nil {
  622. t.Fatal(err)
  623. }
  624. conf.IsTesting = true
  625. tmpfile, err := os.CreateTemp("", "reopen.log")
  626. if err != nil {
  627. t.Fatal(err)
  628. }
  629. defer os.Remove(tmpfile.Name())
  630. // Create a stream context for testing
  631. contextLogger := conf.Log.WithField("rule", "testRollingCount")
  632. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  633. tf, _ := transform.GenTransform("", "json", "", "", []string{})
  634. vCtx := context.WithValue(ctx, context.TransKey, tf)
  635. sink := &fileSink{}
  636. err = sink.Configure(map[string]interface{}{
  637. "path": tmpfile.Name(),
  638. "fileType": LINES_TYPE,
  639. "format": "json",
  640. "rollingNamePattern": "none",
  641. })
  642. if err != nil {
  643. t.Fatal(err)
  644. }
  645. err = sink.Open(vCtx)
  646. if err != nil {
  647. t.Fatal(err)
  648. }
  649. // Test collecting a map item
  650. m := map[string]interface{}{"key": "value1"}
  651. if err := sink.Collect(vCtx, m); err != nil {
  652. t.Errorf("unexpected error: %s", err)
  653. }
  654. sink.Close(vCtx)
  655. exp := []byte(`{"key":"value1"}`)
  656. contents, err := os.ReadFile(tmpfile.Name())
  657. if err != nil {
  658. t.Fatal(err)
  659. }
  660. if !reflect.DeepEqual(contents, exp) {
  661. t.Errorf("\nexpected\t %q \nbut got\t\t %q", string(exp), string(contents))
  662. }
  663. sink = &fileSink{}
  664. err = sink.Configure(map[string]interface{}{
  665. "path": tmpfile.Name(),
  666. "fileType": LINES_TYPE,
  667. "hasHeader": true,
  668. "format": "json",
  669. "rollingNamePattern": "none",
  670. })
  671. if err != nil {
  672. t.Fatal(err)
  673. }
  674. err = sink.Open(vCtx)
  675. if err != nil {
  676. t.Fatal(err)
  677. }
  678. // Test collecting another map item
  679. m = map[string]interface{}{"key": "value2"}
  680. if err := sink.Collect(vCtx, m); err != nil {
  681. t.Errorf("unexpected error: %s", err)
  682. }
  683. if err = sink.Close(vCtx); err != nil {
  684. t.Errorf("unexpected close error: %s", err)
  685. }
  686. exp = []byte(`{"key":"value2"}`)
  687. contents, err = os.ReadFile(tmpfile.Name())
  688. if err != nil {
  689. t.Fatal(err)
  690. }
  691. if !reflect.DeepEqual(contents, exp) {
  692. t.Errorf("\nexpected\t %q \nbut got\t\t %q", string(exp), string(contents))
  693. }
  694. }