file_sink_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/compressor"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  21. "github.com/lf-edge/ekuiper/internal/topo/transform"
  22. "github.com/lf-edge/ekuiper/pkg/message"
  23. "os"
  24. "path/filepath"
  25. "reflect"
  26. "strconv"
  27. "testing"
  28. "time"
  29. )
  30. // Unit test for Configure function
  31. func TestConfigure(t *testing.T) {
  32. props := map[string]interface{}{
  33. "interval": 500,
  34. "path": "test",
  35. }
  36. m := File().(*fileSink)
  37. err := m.Configure(props)
  38. if err != nil {
  39. t.Errorf("Configure() error = %v, wantErr nil", err)
  40. }
  41. if *m.c.Interval != 500 {
  42. t.Errorf("Configure() Interval = %v, want 500", m.c.Interval)
  43. }
  44. if m.c.Path != "test" {
  45. t.Errorf("Configure() Path = %v, want test", m.c.Path)
  46. }
  47. err = m.Configure(map[string]interface{}{"interval": -1, "path": "test"})
  48. if err == nil {
  49. t.Errorf("Configure() error = %v, wantErr not nil", err)
  50. }
  51. err = m.Configure(map[string]interface{}{"interval": 500, "path": ""})
  52. if err == nil {
  53. t.Errorf("Configure() error = %v, wantErr not nil", err)
  54. }
  55. err = m.Configure(map[string]interface{}{"fileType": "csv2"})
  56. if err == nil {
  57. t.Errorf("Configure() error = %v, wantErr not nil", err)
  58. }
  59. err = m.Configure(map[string]interface{}{"interval": 500,
  60. "path": "test",
  61. "fileType": "csv"})
  62. if err == nil {
  63. t.Errorf("Configure() error = %v, wantErr not nil", err)
  64. }
  65. err = m.Configure(map[string]interface{}{"interval": 60, "path": "test", "checkInterval": -1})
  66. if err == nil {
  67. t.Errorf("Configure() error = %v, wantErr not nil", err)
  68. }
  69. err = m.Configure(map[string]interface{}{"rollingInterval": -1})
  70. if err == nil {
  71. t.Errorf("Configure() error = %v, wantErr not nil", err)
  72. }
  73. err = m.Configure(map[string]interface{}{"rollingCount": -1})
  74. if err == nil {
  75. t.Errorf("Configure() error = %v, wantErr not nil", err)
  76. }
  77. err = m.Configure(map[string]interface{}{"rollingCount": 0, "rollingInterval": 0})
  78. if err == nil {
  79. t.Errorf("Configure() error = %v, wantErr not nil", err)
  80. }
  81. err = m.Configure(map[string]interface{}{"RollingNamePattern": "test"})
  82. if err == nil {
  83. t.Errorf("Configure() error = %v, wantErr not nil", err)
  84. }
  85. err = m.Configure(map[string]interface{}{"RollingNamePattern": 0})
  86. if err == nil {
  87. t.Errorf("Configure() error = %v, wantErr not nil", err)
  88. }
  89. for k := range compressionTypes {
  90. err = m.Configure(map[string]interface{}{
  91. "interval": 500,
  92. "path": "test",
  93. "compression": k,
  94. "rollingNamePattern": "suffix",
  95. })
  96. if err != nil {
  97. t.Errorf("Configure() error = %v, wantErr nil", err)
  98. }
  99. if m.c.Compression != k {
  100. t.Errorf("Configure() Compression = %v, want %v", m.c.Compression, k)
  101. }
  102. }
  103. err = m.Configure(map[string]interface{}{
  104. "interval": 500,
  105. "path": "test",
  106. "compression": "",
  107. "rollingNamePattern": "suffix",
  108. })
  109. if err != nil {
  110. t.Errorf("Configure() error = %v, wantErr nil", err)
  111. }
  112. if m.c.Compression != "" {
  113. t.Errorf("Configure() Compression = %v, want %v", m.c.Compression, "")
  114. }
  115. err = m.Configure(map[string]interface{}{
  116. "interval": 500,
  117. "path": "test",
  118. "compression": "not_exist_algorithm",
  119. })
  120. if err == nil {
  121. t.Errorf("Configure() error = %v, wantErr not nil", err)
  122. }
  123. }
  124. func TestFileSink_Configure(t *testing.T) {
  125. var (
  126. defaultCheckInterval = (5 * time.Minute).Milliseconds()
  127. int500 = 500
  128. int64_500 = int64(int500)
  129. )
  130. tests := []struct {
  131. name string
  132. c *sinkConf
  133. p map[string]interface{}
  134. }{
  135. {
  136. name: "default configurations",
  137. c: &sinkConf{
  138. CheckInterval: &defaultCheckInterval,
  139. Path: "cache",
  140. FileType: LINES_TYPE,
  141. RollingCount: 1000000,
  142. },
  143. p: map[string]interface{}{},
  144. },
  145. {
  146. name: "previous setting",
  147. c: &sinkConf{
  148. Interval: &int500,
  149. CheckInterval: &int64_500,
  150. Path: "test",
  151. FileType: LINES_TYPE,
  152. RollingCount: 1000000,
  153. },
  154. p: map[string]interface{}{
  155. "interval": 500,
  156. "path": "test",
  157. },
  158. },
  159. {
  160. name: "new props",
  161. c: &sinkConf{
  162. CheckInterval: &int64_500,
  163. Path: "test",
  164. FileType: CSV_TYPE,
  165. Format: message.FormatDelimited,
  166. Delimiter: ",",
  167. RollingCount: 1000000,
  168. RollingNamePattern: "none",
  169. },
  170. p: map[string]interface{}{
  171. "checkInterval": 500,
  172. "path": "test",
  173. "fileType": "csv",
  174. "format": message.FormatDelimited,
  175. "rollingNamePattern": "none",
  176. },
  177. },
  178. { // only set rolling interval
  179. name: "rolling",
  180. c: &sinkConf{
  181. CheckInterval: &defaultCheckInterval,
  182. Path: "cache",
  183. FileType: LINES_TYPE,
  184. RollingInterval: 500,
  185. RollingCount: 0,
  186. },
  187. p: map[string]interface{}{
  188. "rollingInterval": 500,
  189. "rollingCount": 0,
  190. },
  191. },
  192. {
  193. name: "fields",
  194. c: &sinkConf{
  195. CheckInterval: &defaultCheckInterval,
  196. Path: "cache",
  197. FileType: LINES_TYPE,
  198. RollingInterval: 500,
  199. RollingCount: 0,
  200. Fields: []string{"c", "a", "b"},
  201. },
  202. p: map[string]interface{}{
  203. "rollingInterval": 500,
  204. "rollingCount": 0,
  205. "fields": []string{"c", "a", "b"},
  206. },
  207. },
  208. }
  209. for _, tt := range tests {
  210. t.Run(tt.name, func(t *testing.T) {
  211. m := &fileSink{}
  212. if err := m.Configure(tt.p); err != nil {
  213. t.Errorf("fileSink.Configure() error = %v", err)
  214. return
  215. }
  216. if !reflect.DeepEqual(m.c, tt.c) {
  217. t.Errorf("fileSink.Configure() = %v, want %v", m.c, tt.c)
  218. }
  219. })
  220. }
  221. }
  222. // Test single file writing and flush by close
  223. func TestFileSink_Collect(t *testing.T) {
  224. tests := []struct {
  225. name string
  226. ft FileType
  227. fname string
  228. content []byte
  229. compress string
  230. }{
  231. {
  232. name: "lines",
  233. ft: LINES_TYPE,
  234. fname: "test_lines",
  235. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  236. }, {
  237. name: "json",
  238. ft: JSON_TYPE,
  239. fname: "test_json",
  240. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  241. }, {
  242. name: "csv",
  243. ft: CSV_TYPE,
  244. fname: "test_csv",
  245. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  246. },
  247. {
  248. name: "lines",
  249. ft: LINES_TYPE,
  250. fname: "test_lines",
  251. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  252. compress: GZIP,
  253. }, {
  254. name: "json",
  255. ft: JSON_TYPE,
  256. fname: "test_json",
  257. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  258. compress: GZIP,
  259. }, {
  260. name: "csv",
  261. ft: CSV_TYPE,
  262. fname: "test_csv",
  263. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  264. compress: GZIP,
  265. },
  266. {
  267. name: "lines",
  268. ft: LINES_TYPE,
  269. fname: "test_lines",
  270. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  271. compress: ZSTD,
  272. }, {
  273. name: "json",
  274. ft: JSON_TYPE,
  275. fname: "test_json",
  276. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  277. compress: ZSTD,
  278. }, {
  279. name: "csv",
  280. ft: CSV_TYPE,
  281. fname: "test_csv",
  282. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  283. compress: ZSTD,
  284. },
  285. }
  286. // Create a stream context for testing
  287. contextLogger := conf.Log.WithField("rule", "test2")
  288. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  289. tf, _ := transform.GenTransform("", "json", "", "", []string{})
  290. vCtx := context.WithValue(ctx, context.TransKey, tf)
  291. for _, tt := range tests {
  292. t.Run(tt.name, func(t *testing.T) {
  293. // Create a temporary file for testing
  294. tmpfile, err := os.CreateTemp("", tt.fname)
  295. if err != nil {
  296. t.Fatal(err)
  297. }
  298. defer os.Remove(tmpfile.Name())
  299. // Create a file sink with the temporary file path
  300. sink := &fileSink{}
  301. f := message.FormatJson
  302. if tt.ft == CSV_TYPE {
  303. f = message.FormatDelimited
  304. }
  305. err = sink.Configure(map[string]interface{}{
  306. "path": tmpfile.Name(),
  307. "fileType": tt.ft,
  308. "hasHeader": true,
  309. "format": f,
  310. "rollingNamePattern": "none",
  311. "compression": tt.compress,
  312. "fields": []string{"key"},
  313. })
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. err = sink.Open(ctx)
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. // Test collecting a map item
  322. m := map[string]interface{}{"key": "value1"}
  323. if err := sink.Collect(vCtx, m); err != nil {
  324. t.Errorf("unexpected error: %s", err)
  325. }
  326. // Test collecting another map item
  327. m = map[string]interface{}{"key": "value2"}
  328. if err := sink.Collect(ctx, m); err != nil {
  329. t.Errorf("unexpected error: %s", err)
  330. }
  331. if err = sink.Close(ctx); err != nil {
  332. t.Errorf("unexpected close error: %s", err)
  333. }
  334. // Read the contents of the temporary file and check if they match the collected items
  335. contents, err := os.ReadFile(tmpfile.Name())
  336. if err != nil {
  337. t.Fatal(err)
  338. }
  339. if tt.compress != "" {
  340. decompressor, _ := compressor.GetDecompressor(tt.compress)
  341. decompress, err := decompressor.Decompress(contents)
  342. if err != nil {
  343. t.Errorf("%v", err)
  344. }
  345. if !reflect.DeepEqual(decompress, tt.content) {
  346. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  347. }
  348. } else {
  349. if !reflect.DeepEqual(contents, tt.content) {
  350. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  351. }
  352. }
  353. })
  354. }
  355. }
  356. // Test file collect by fields
  357. func TestFileSinkFields_Collect(t *testing.T) {
  358. tests := []struct {
  359. name string
  360. ft FileType
  361. fname string
  362. format string
  363. delimiter string
  364. fields []string
  365. content []byte
  366. }{
  367. {
  368. name: "test1",
  369. ft: CSV_TYPE,
  370. fname: "test_csv",
  371. format: "delimited",
  372. delimiter: ",",
  373. fields: []string{"temperature", "humidity"},
  374. content: []byte("temperature,humidity\n31.2,40"),
  375. },
  376. {
  377. name: "test2",
  378. ft: CSV_TYPE,
  379. fname: "test_csv",
  380. format: "delimited",
  381. delimiter: ",",
  382. content: []byte("humidity,temperature\n40,31.2"),
  383. },
  384. }
  385. // Create a stream context for testing
  386. contextLogger := conf.Log.WithField("rule", "testFields")
  387. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  388. for _, tt := range tests {
  389. t.Run(tt.name, func(t *testing.T) {
  390. tf, _ := transform.GenTransform("", tt.format, "", tt.delimiter, tt.fields)
  391. vCtx := context.WithValue(ctx, context.TransKey, tf)
  392. // Create a temporary file for testing
  393. tmpfile, err := os.CreateTemp("", tt.fname)
  394. if err != nil {
  395. t.Fatal(err)
  396. }
  397. defer os.Remove(tmpfile.Name())
  398. // Create a file sink with the temporary file path
  399. sink := &fileSink{}
  400. err = sink.Configure(map[string]interface{}{
  401. "path": tmpfile.Name(),
  402. "fileType": tt.ft,
  403. "hasHeader": true,
  404. "format": tt.format,
  405. "rollingNamePattern": "none",
  406. "fields": tt.fields,
  407. })
  408. if err != nil {
  409. t.Fatal(err)
  410. }
  411. err = sink.Open(ctx)
  412. if err != nil {
  413. t.Fatal(err)
  414. }
  415. // Test collecting a map item
  416. m := map[string]interface{}{"humidity": 40, "temperature": 31.2}
  417. if err := sink.Collect(vCtx, m); err != nil {
  418. t.Errorf("unexpected error: %s", err)
  419. }
  420. if err = sink.Close(ctx); err != nil {
  421. t.Errorf("unexpected close error: %s", err)
  422. }
  423. // Read the contents of the temporary file and check if they match the collected items
  424. contents, err := os.ReadFile(tmpfile.Name())
  425. if err != nil {
  426. t.Fatal(err)
  427. }
  428. if !reflect.DeepEqual(contents, tt.content) {
  429. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  430. }
  431. })
  432. }
  433. }
  434. // Test file rolling by time
  435. func TestFileSinkRolling_Collect(t *testing.T) {
  436. // Remove existing files
  437. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  438. if err != nil {
  439. return err
  440. }
  441. if filepath.Ext(path) == ".log" {
  442. fmt.Println("Deleting file:", path)
  443. return os.Remove(path)
  444. }
  445. return nil
  446. })
  447. if err != nil {
  448. t.Fatal(err)
  449. }
  450. conf.IsTesting = true
  451. tests := []struct {
  452. name string
  453. ft FileType
  454. fname string
  455. contents [2][]byte
  456. compress string
  457. }{
  458. {
  459. name: "lines",
  460. ft: LINES_TYPE,
  461. fname: "test_lines.log",
  462. contents: [2][]byte{
  463. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  464. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  465. },
  466. }, {
  467. name: "json",
  468. ft: JSON_TYPE,
  469. fname: "test_json.log",
  470. contents: [2][]byte{
  471. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  472. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  473. },
  474. },
  475. {
  476. name: "lines",
  477. ft: LINES_TYPE,
  478. fname: "test_lines_gzip.log",
  479. contents: [2][]byte{
  480. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  481. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  482. },
  483. compress: GZIP,
  484. }, {
  485. name: "json",
  486. ft: JSON_TYPE,
  487. fname: "test_json_gzip.log",
  488. contents: [2][]byte{
  489. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  490. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  491. },
  492. compress: GZIP,
  493. },
  494. {
  495. name: "lines",
  496. ft: LINES_TYPE,
  497. fname: "test_lines_zstd.log",
  498. contents: [2][]byte{
  499. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  500. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  501. },
  502. compress: ZSTD,
  503. }, {
  504. name: "json",
  505. ft: JSON_TYPE,
  506. fname: "test_json_zstd.log",
  507. contents: [2][]byte{
  508. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  509. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  510. },
  511. compress: ZSTD,
  512. },
  513. }
  514. // Create a stream context for testing
  515. contextLogger := conf.Log.WithField("rule", "testRolling")
  516. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  517. tf, _ := transform.GenTransform("", "json", "", "", []string{})
  518. vCtx := context.WithValue(ctx, context.TransKey, tf)
  519. for _, tt := range tests {
  520. t.Run(tt.name, func(t *testing.T) {
  521. // Create a file sink with the temporary file path
  522. sink := &fileSink{}
  523. err := sink.Configure(map[string]interface{}{
  524. "path": tt.fname,
  525. "fileType": tt.ft,
  526. "rollingInterval": 1000,
  527. "checkInterval": 500,
  528. "rollingCount": 0,
  529. "rollingNamePattern": "suffix",
  530. "compression": tt.compress,
  531. })
  532. if err != nil {
  533. t.Fatal(err)
  534. }
  535. mockclock.ResetClock(10)
  536. err = sink.Open(ctx)
  537. if err != nil {
  538. t.Fatal(err)
  539. }
  540. c := mockclock.GetMockClock()
  541. for i := 0; i < 5; i++ {
  542. c.Add(450 * time.Millisecond)
  543. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  544. if err := sink.Collect(vCtx, m); err != nil {
  545. t.Errorf("unexpected error: %s", err)
  546. }
  547. }
  548. c.After(2000 * time.Millisecond)
  549. if err = sink.Close(ctx); err != nil {
  550. t.Errorf("unexpected close error: %s", err)
  551. }
  552. // Should write to 2 files
  553. for i := 0; i < 2; i++ {
  554. // Read the contents of the temporary file and check if they match the collected items
  555. var fn string
  556. if tt.compress != "" {
  557. fn = fmt.Sprintf("test_%s_%s-%d.log", tt.ft, tt.compress, 460+1350*i)
  558. } else {
  559. fn = fmt.Sprintf("test_%s-%d.log", tt.ft, 460+1350*i)
  560. }
  561. var contents []byte
  562. contents, err := os.ReadFile(fn)
  563. if err != nil {
  564. t.Fatal(err)
  565. }
  566. if tt.compress != "" {
  567. decompressor, _ := compressor.GetDecompressor(tt.compress)
  568. contents, err = decompressor.Decompress(contents)
  569. if err != nil {
  570. t.Errorf("%v", err)
  571. }
  572. }
  573. if !reflect.DeepEqual(contents, tt.contents[i]) {
  574. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  575. }
  576. }
  577. })
  578. }
  579. }
  580. // Test file rolling by count
  581. func TestFileSinkRollingCount_Collect(t *testing.T) {
  582. // Remove existing files
  583. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  584. if err != nil {
  585. return err
  586. }
  587. if filepath.Ext(path) == ".dd" {
  588. fmt.Println("Deleting file:", path)
  589. return os.Remove(path)
  590. }
  591. return nil
  592. })
  593. if err != nil {
  594. t.Fatal(err)
  595. }
  596. conf.IsTesting = true
  597. tests := []struct {
  598. name string
  599. ft FileType
  600. fname string
  601. contents [3][]byte
  602. compress string
  603. }{
  604. {
  605. name: "csv",
  606. ft: CSV_TYPE,
  607. fname: "test_csv_{{.ts}}.dd",
  608. contents: [3][]byte{
  609. []byte("key,ts\nvalue0,460"),
  610. []byte("key,ts\nvalue1,910"),
  611. []byte("key,ts\nvalue2,1360"),
  612. },
  613. },
  614. {
  615. name: "csv",
  616. ft: CSV_TYPE,
  617. fname: "test_csv_gzip_{{.ts}}.dd",
  618. contents: [3][]byte{
  619. []byte("key,ts\nvalue0,460"),
  620. []byte("key,ts\nvalue1,910"),
  621. []byte("key,ts\nvalue2,1360"),
  622. },
  623. compress: GZIP,
  624. },
  625. {
  626. name: "csv",
  627. ft: CSV_TYPE,
  628. fname: "test_csv_zstd_{{.ts}}.dd",
  629. contents: [3][]byte{
  630. []byte("key,ts\nvalue0,460"),
  631. []byte("key,ts\nvalue1,910"),
  632. []byte("key,ts\nvalue2,1360"),
  633. },
  634. compress: ZSTD,
  635. },
  636. }
  637. // Create a stream context for testing
  638. contextLogger := conf.Log.WithField("rule", "testRollingCount")
  639. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  640. tf, _ := transform.GenTransform("", "delimited", "", ",", []string{})
  641. vCtx := context.WithValue(ctx, context.TransKey, tf)
  642. for _, tt := range tests {
  643. t.Run(tt.name, func(t *testing.T) {
  644. // Create a file sink with the temporary file path
  645. sink := &fileSink{}
  646. err := sink.Configure(map[string]interface{}{
  647. "path": tt.fname,
  648. "fileType": tt.ft,
  649. "rollingInterval": 0,
  650. "rollingCount": 1,
  651. "rollingNamePattern": "none",
  652. "hasHeader": true,
  653. "format": "delimited",
  654. "compression": tt.compress,
  655. })
  656. if err != nil {
  657. t.Fatal(err)
  658. }
  659. mockclock.ResetClock(10)
  660. err = sink.Open(ctx)
  661. if err != nil {
  662. t.Fatal(err)
  663. }
  664. c := mockclock.GetMockClock()
  665. for i := 0; i < 3; i++ {
  666. c.Add(450 * time.Millisecond)
  667. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  668. if err := sink.Collect(vCtx, m); err != nil {
  669. t.Errorf("unexpected error: %s", err)
  670. }
  671. }
  672. c.After(2000 * time.Millisecond)
  673. if err = sink.Close(ctx); err != nil {
  674. t.Errorf("unexpected close error: %s", err)
  675. }
  676. // Should write to 2 files
  677. for i := 0; i < 3; i++ {
  678. // Read the contents of the temporary file and check if they match the collected items
  679. var fn string
  680. if tt.compress != "" {
  681. fn = fmt.Sprintf("test_%s_%s_%d.dd", tt.ft, tt.compress, 460+450*i)
  682. } else {
  683. fn = fmt.Sprintf("test_%s_%d.dd", tt.ft, 460+450*i)
  684. }
  685. contents, err := os.ReadFile(fn)
  686. if err != nil {
  687. t.Fatal(err)
  688. }
  689. if tt.compress != "" {
  690. decompressor, _ := compressor.GetDecompressor(tt.compress)
  691. contents, err = decompressor.Decompress(contents)
  692. if err != nil {
  693. t.Errorf("%v", err)
  694. }
  695. }
  696. if !reflect.DeepEqual(contents, tt.contents[i]) {
  697. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  698. }
  699. }
  700. })
  701. }
  702. }
  703. func TestFileSinkReopen(t *testing.T) {
  704. // Remove existing files
  705. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  706. if err != nil {
  707. return err
  708. }
  709. if filepath.Ext(path) == ".log" {
  710. fmt.Println("Deleting file:", path)
  711. return os.Remove(path)
  712. }
  713. return nil
  714. })
  715. if err != nil {
  716. t.Fatal(err)
  717. }
  718. conf.IsTesting = true
  719. tmpfile, err := os.CreateTemp("", "reopen.log")
  720. if err != nil {
  721. t.Fatal(err)
  722. }
  723. defer os.Remove(tmpfile.Name())
  724. // Create a stream context for testing
  725. contextLogger := conf.Log.WithField("rule", "testRollingCount")
  726. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  727. tf, _ := transform.GenTransform("", "json", "", "", []string{})
  728. vCtx := context.WithValue(ctx, context.TransKey, tf)
  729. sink := &fileSink{}
  730. err = sink.Configure(map[string]interface{}{
  731. "path": tmpfile.Name(),
  732. "fileType": LINES_TYPE,
  733. "format": "json",
  734. "rollingNamePattern": "none",
  735. })
  736. if err != nil {
  737. t.Fatal(err)
  738. }
  739. err = sink.Open(vCtx)
  740. if err != nil {
  741. t.Fatal(err)
  742. }
  743. // Test collecting a map item
  744. m := map[string]interface{}{"key": "value1"}
  745. if err := sink.Collect(vCtx, m); err != nil {
  746. t.Errorf("unexpected error: %s", err)
  747. }
  748. sink.Close(vCtx)
  749. exp := []byte(`{"key":"value1"}`)
  750. contents, err := os.ReadFile(tmpfile.Name())
  751. if err != nil {
  752. t.Fatal(err)
  753. }
  754. if !reflect.DeepEqual(contents, exp) {
  755. t.Errorf("\nexpected\t %q \nbut got\t\t %q", string(exp), string(contents))
  756. }
  757. sink = &fileSink{}
  758. err = sink.Configure(map[string]interface{}{
  759. "path": tmpfile.Name(),
  760. "fileType": LINES_TYPE,
  761. "hasHeader": true,
  762. "format": "json",
  763. "rollingNamePattern": "none",
  764. })
  765. if err != nil {
  766. t.Fatal(err)
  767. }
  768. err = sink.Open(vCtx)
  769. if err != nil {
  770. t.Fatal(err)
  771. }
  772. // Test collecting another map item
  773. m = map[string]interface{}{"key": "value2"}
  774. if err := sink.Collect(vCtx, m); err != nil {
  775. t.Errorf("unexpected error: %s", err)
  776. }
  777. if err = sink.Close(vCtx); err != nil {
  778. t.Errorf("unexpected close error: %s", err)
  779. }
  780. exp = []byte(`{"key":"value2"}`)
  781. contents, err = os.ReadFile(tmpfile.Name())
  782. if err != nil {
  783. t.Fatal(err)
  784. }
  785. if !reflect.DeepEqual(contents, exp) {
  786. t.Errorf("\nexpected\t %q \nbut got\t\t %q", string(exp), string(contents))
  787. }
  788. }