file_sink_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "fmt"
  17. "os"
  18. "path/filepath"
  19. "reflect"
  20. "strconv"
  21. "testing"
  22. "time"
  23. "github.com/lf-edge/ekuiper/internal/compressor"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/topo/context"
  26. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  27. "github.com/lf-edge/ekuiper/internal/topo/transform"
  28. "github.com/lf-edge/ekuiper/pkg/message"
  29. )
  30. // Unit test for Configure function
  31. func TestConfigure(t *testing.T) {
  32. props := map[string]interface{}{
  33. "interval": 500,
  34. "path": "test",
  35. }
  36. m := File().(*fileSink)
  37. err := m.Configure(props)
  38. if err != nil {
  39. t.Errorf("Configure() error = %v, wantErr nil", err)
  40. }
  41. if *m.c.Interval != 500 {
  42. t.Errorf("Configure() Interval = %v, want 500", m.c.Interval)
  43. }
  44. if m.c.Path != "test" {
  45. t.Errorf("Configure() Path = %v, want test", m.c.Path)
  46. }
  47. err = m.Configure(map[string]interface{}{"interval": -1, "path": "test"})
  48. if err == nil {
  49. t.Errorf("Configure() error = %v, wantErr not nil", err)
  50. }
  51. err = m.Configure(map[string]interface{}{"interval": 500, "path": ""})
  52. if err == nil {
  53. t.Errorf("Configure() error = %v, wantErr not nil", err)
  54. }
  55. err = m.Configure(map[string]interface{}{"fileType": "csv2"})
  56. if err == nil {
  57. t.Errorf("Configure() error = %v, wantErr not nil", err)
  58. }
  59. err = m.Configure(map[string]interface{}{
  60. "interval": 500,
  61. "path": "test",
  62. "fileType": "csv",
  63. })
  64. if err == nil {
  65. t.Errorf("Configure() error = %v, wantErr not nil", err)
  66. }
  67. err = m.Configure(map[string]interface{}{"interval": 60, "path": "test", "checkInterval": -1})
  68. if err == nil {
  69. t.Errorf("Configure() error = %v, wantErr not nil", err)
  70. }
  71. err = m.Configure(map[string]interface{}{"rollingInterval": -1})
  72. if err == nil {
  73. t.Errorf("Configure() error = %v, wantErr not nil", err)
  74. }
  75. err = m.Configure(map[string]interface{}{"rollingCount": -1})
  76. if err == nil {
  77. t.Errorf("Configure() error = %v, wantErr not nil", err)
  78. }
  79. err = m.Configure(map[string]interface{}{"rollingCount": 0, "rollingInterval": 0})
  80. if err == nil {
  81. t.Errorf("Configure() error = %v, wantErr not nil", err)
  82. }
  83. err = m.Configure(map[string]interface{}{"RollingNamePattern": "test"})
  84. if err == nil {
  85. t.Errorf("Configure() error = %v, wantErr not nil", err)
  86. }
  87. err = m.Configure(map[string]interface{}{"RollingNamePattern": 0})
  88. if err == nil {
  89. t.Errorf("Configure() error = %v, wantErr not nil", err)
  90. }
  91. for k := range compressionTypes {
  92. err = m.Configure(map[string]interface{}{
  93. "interval": 500,
  94. "path": "test",
  95. "compression": k,
  96. "rollingNamePattern": "suffix",
  97. })
  98. if err != nil {
  99. t.Errorf("Configure() error = %v, wantErr nil", err)
  100. }
  101. if m.c.Compression != k {
  102. t.Errorf("Configure() Compression = %v, want %v", m.c.Compression, k)
  103. }
  104. }
  105. err = m.Configure(map[string]interface{}{
  106. "interval": 500,
  107. "path": "test",
  108. "compression": "",
  109. "rollingNamePattern": "suffix",
  110. })
  111. if err != nil {
  112. t.Errorf("Configure() error = %v, wantErr nil", err)
  113. }
  114. if m.c.Compression != "" {
  115. t.Errorf("Configure() Compression = %v, want %v", m.c.Compression, "")
  116. }
  117. err = m.Configure(map[string]interface{}{
  118. "interval": 500,
  119. "path": "test",
  120. "compression": "not_exist_algorithm",
  121. })
  122. if err == nil {
  123. t.Errorf("Configure() error = %v, wantErr not nil", err)
  124. }
  125. }
  126. func TestFileSink_Configure(t *testing.T) {
  127. var (
  128. defaultCheckInterval = (5 * time.Minute).Milliseconds()
  129. int500 = 500
  130. int64_500 = int64(int500)
  131. )
  132. tests := []struct {
  133. name string
  134. c *sinkConf
  135. p map[string]interface{}
  136. }{
  137. {
  138. name: "default configurations",
  139. c: &sinkConf{
  140. CheckInterval: &defaultCheckInterval,
  141. Path: "cache",
  142. FileType: LINES_TYPE,
  143. RollingCount: 1000000,
  144. },
  145. p: map[string]interface{}{},
  146. },
  147. {
  148. name: "previous setting",
  149. c: &sinkConf{
  150. Interval: &int500,
  151. CheckInterval: &int64_500,
  152. Path: "test",
  153. FileType: LINES_TYPE,
  154. RollingCount: 1000000,
  155. },
  156. p: map[string]interface{}{
  157. "interval": 500,
  158. "path": "test",
  159. },
  160. },
  161. {
  162. name: "new props",
  163. c: &sinkConf{
  164. CheckInterval: &int64_500,
  165. Path: "test",
  166. FileType: CSV_TYPE,
  167. Format: message.FormatDelimited,
  168. Delimiter: ",",
  169. RollingCount: 1000000,
  170. RollingNamePattern: "none",
  171. },
  172. p: map[string]interface{}{
  173. "checkInterval": 500,
  174. "path": "test",
  175. "fileType": "csv",
  176. "format": message.FormatDelimited,
  177. "rollingNamePattern": "none",
  178. },
  179. },
  180. { // only set rolling interval
  181. name: "rolling",
  182. c: &sinkConf{
  183. CheckInterval: &defaultCheckInterval,
  184. Path: "cache",
  185. FileType: LINES_TYPE,
  186. RollingInterval: 500,
  187. RollingCount: 0,
  188. },
  189. p: map[string]interface{}{
  190. "rollingInterval": 500,
  191. "rollingCount": 0,
  192. },
  193. },
  194. {
  195. name: "fields",
  196. c: &sinkConf{
  197. CheckInterval: &defaultCheckInterval,
  198. Path: "cache",
  199. FileType: LINES_TYPE,
  200. RollingInterval: 500,
  201. RollingCount: 0,
  202. Fields: []string{"c", "a", "b"},
  203. },
  204. p: map[string]interface{}{
  205. "rollingInterval": 500,
  206. "rollingCount": 0,
  207. "fields": []string{"c", "a", "b"},
  208. },
  209. },
  210. }
  211. for _, tt := range tests {
  212. t.Run(tt.name, func(t *testing.T) {
  213. m := &fileSink{}
  214. if err := m.Configure(tt.p); err != nil {
  215. t.Errorf("fileSink.Configure() error = %v", err)
  216. return
  217. }
  218. if !reflect.DeepEqual(m.c, tt.c) {
  219. t.Errorf("fileSink.Configure() = %v, want %v", m.c, tt.c)
  220. }
  221. })
  222. }
  223. }
  224. // Test single file writing and flush by close
  225. func TestFileSink_Collect(t *testing.T) {
  226. tests := []struct {
  227. name string
  228. ft FileType
  229. fname string
  230. content []byte
  231. compress string
  232. }{
  233. {
  234. name: "lines",
  235. ft: LINES_TYPE,
  236. fname: "test_lines",
  237. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  238. },
  239. {
  240. name: "json",
  241. ft: JSON_TYPE,
  242. fname: "test_json",
  243. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  244. },
  245. {
  246. name: "csv",
  247. ft: CSV_TYPE,
  248. fname: "test_csv",
  249. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  250. },
  251. {
  252. name: "lines",
  253. ft: LINES_TYPE,
  254. fname: "test_lines",
  255. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  256. compress: GZIP,
  257. },
  258. {
  259. name: "json",
  260. ft: JSON_TYPE,
  261. fname: "test_json",
  262. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  263. compress: GZIP,
  264. },
  265. {
  266. name: "csv",
  267. ft: CSV_TYPE,
  268. fname: "test_csv",
  269. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  270. compress: GZIP,
  271. },
  272. {
  273. name: "lines",
  274. ft: LINES_TYPE,
  275. fname: "test_lines",
  276. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  277. compress: ZSTD,
  278. },
  279. {
  280. name: "json",
  281. ft: JSON_TYPE,
  282. fname: "test_json",
  283. content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
  284. compress: ZSTD,
  285. },
  286. {
  287. name: "csv",
  288. ft: CSV_TYPE,
  289. fname: "test_csv",
  290. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
  291. compress: ZSTD,
  292. },
  293. }
  294. // Create a stream context for testing
  295. contextLogger := conf.Log.WithField("rule", "test2")
  296. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  297. tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
  298. vCtx := context.WithValue(ctx, context.TransKey, tf)
  299. for _, tt := range tests {
  300. t.Run(tt.name, func(t *testing.T) {
  301. // Create a temporary file for testing
  302. tmpfile, err := os.CreateTemp("", tt.fname)
  303. if err != nil {
  304. t.Fatal(err)
  305. }
  306. defer os.Remove(tmpfile.Name())
  307. // Create a file sink with the temporary file path
  308. sink := &fileSink{}
  309. f := message.FormatJson
  310. if tt.ft == CSV_TYPE {
  311. f = message.FormatDelimited
  312. }
  313. err = sink.Configure(map[string]interface{}{
  314. "path": tmpfile.Name(),
  315. "fileType": tt.ft,
  316. "hasHeader": true,
  317. "format": f,
  318. "rollingNamePattern": "none",
  319. "compression": tt.compress,
  320. "fields": []string{"key"},
  321. })
  322. if err != nil {
  323. t.Fatal(err)
  324. }
  325. err = sink.Open(ctx)
  326. if err != nil {
  327. t.Fatal(err)
  328. }
  329. // Test collecting a map item
  330. m := map[string]interface{}{"key": "value1"}
  331. if err := sink.Collect(vCtx, m); err != nil {
  332. t.Errorf("unexpected error: %s", err)
  333. }
  334. // Test collecting another map item
  335. m = map[string]interface{}{"key": "value2"}
  336. if err := sink.Collect(ctx, m); err != nil {
  337. t.Errorf("unexpected error: %s", err)
  338. }
  339. if err = sink.Close(ctx); err != nil {
  340. t.Errorf("unexpected close error: %s", err)
  341. }
  342. // Read the contents of the temporary file and check if they match the collected items
  343. contents, err := os.ReadFile(tmpfile.Name())
  344. if err != nil {
  345. t.Fatal(err)
  346. }
  347. if tt.compress != "" {
  348. decompressor, _ := compressor.GetDecompressor(tt.compress)
  349. decompress, err := decompressor.Decompress(contents)
  350. if err != nil {
  351. t.Errorf("%v", err)
  352. }
  353. if !reflect.DeepEqual(decompress, tt.content) {
  354. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  355. }
  356. } else {
  357. if !reflect.DeepEqual(contents, tt.content) {
  358. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  359. }
  360. }
  361. })
  362. }
  363. }
  364. // Test file collect by fields
  365. func TestFileSinkFields_Collect(t *testing.T) {
  366. tests := []struct {
  367. name string
  368. ft FileType
  369. fname string
  370. format string
  371. delimiter string
  372. fields []string
  373. content []byte
  374. }{
  375. {
  376. name: "test1",
  377. ft: CSV_TYPE,
  378. fname: "test_csv",
  379. format: "delimited",
  380. delimiter: ",",
  381. fields: []string{"temperature", "humidity"},
  382. content: []byte("temperature,humidity\n31.2,40"),
  383. },
  384. {
  385. name: "test2",
  386. ft: CSV_TYPE,
  387. fname: "test_csv",
  388. format: "delimited",
  389. delimiter: ",",
  390. content: []byte("humidity,temperature\n40,31.2"),
  391. },
  392. }
  393. // Create a stream context for testing
  394. contextLogger := conf.Log.WithField("rule", "testFields")
  395. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  396. for _, tt := range tests {
  397. t.Run(tt.name, func(t *testing.T) {
  398. tf, _ := transform.GenTransform("", tt.format, "", tt.delimiter, "", tt.fields)
  399. vCtx := context.WithValue(ctx, context.TransKey, tf)
  400. // Create a temporary file for testing
  401. tmpfile, err := os.CreateTemp("", tt.fname)
  402. if err != nil {
  403. t.Fatal(err)
  404. }
  405. defer os.Remove(tmpfile.Name())
  406. // Create a file sink with the temporary file path
  407. sink := &fileSink{}
  408. err = sink.Configure(map[string]interface{}{
  409. "path": tmpfile.Name(),
  410. "fileType": tt.ft,
  411. "hasHeader": true,
  412. "format": tt.format,
  413. "rollingNamePattern": "none",
  414. "fields": tt.fields,
  415. })
  416. if err != nil {
  417. t.Fatal(err)
  418. }
  419. err = sink.Open(ctx)
  420. if err != nil {
  421. t.Fatal(err)
  422. }
  423. // Test collecting a map item
  424. m := map[string]interface{}{"humidity": 40, "temperature": 31.2}
  425. if err := sink.Collect(vCtx, m); err != nil {
  426. t.Errorf("unexpected error: %s", err)
  427. }
  428. if err = sink.Close(ctx); err != nil {
  429. t.Errorf("unexpected close error: %s", err)
  430. }
  431. // Read the contents of the temporary file and check if they match the collected items
  432. contents, err := os.ReadFile(tmpfile.Name())
  433. if err != nil {
  434. t.Fatal(err)
  435. }
  436. if !reflect.DeepEqual(contents, tt.content) {
  437. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  438. }
  439. })
  440. }
  441. }
  442. // Test file rolling by time
  443. func TestFileSinkRolling_Collect(t *testing.T) {
  444. // Remove existing files
  445. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  446. if err != nil {
  447. return err
  448. }
  449. if filepath.Ext(path) == ".log" {
  450. fmt.Println("Deleting file:", path)
  451. return os.Remove(path)
  452. }
  453. return nil
  454. })
  455. if err != nil {
  456. t.Fatal(err)
  457. }
  458. conf.IsTesting = true
  459. tests := []struct {
  460. name string
  461. ft FileType
  462. fname string
  463. contents [2][]byte
  464. compress string
  465. }{
  466. {
  467. name: "lines",
  468. ft: LINES_TYPE,
  469. fname: "test_lines.log",
  470. contents: [2][]byte{
  471. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  472. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  473. },
  474. },
  475. {
  476. name: "json",
  477. ft: JSON_TYPE,
  478. fname: "test_json.log",
  479. contents: [2][]byte{
  480. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  481. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  482. },
  483. },
  484. {
  485. name: "lines",
  486. ft: LINES_TYPE,
  487. fname: "test_lines_gzip.log",
  488. contents: [2][]byte{
  489. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  490. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  491. },
  492. compress: GZIP,
  493. },
  494. {
  495. name: "json",
  496. ft: JSON_TYPE,
  497. fname: "test_json_gzip.log",
  498. contents: [2][]byte{
  499. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  500. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  501. },
  502. compress: GZIP,
  503. },
  504. {
  505. name: "lines",
  506. ft: LINES_TYPE,
  507. fname: "test_lines_zstd.log",
  508. contents: [2][]byte{
  509. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
  510. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
  511. },
  512. compress: ZSTD,
  513. },
  514. {
  515. name: "json",
  516. ft: JSON_TYPE,
  517. fname: "test_json_zstd.log",
  518. contents: [2][]byte{
  519. []byte("[{\"key\":\"value0\",\"ts\":460},{\"key\":\"value1\",\"ts\":910},{\"key\":\"value2\",\"ts\":1360}]"),
  520. []byte("[{\"key\":\"value3\",\"ts\":1810},{\"key\":\"value4\",\"ts\":2260}]"),
  521. },
  522. compress: ZSTD,
  523. },
  524. }
  525. // Create a stream context for testing
  526. contextLogger := conf.Log.WithField("rule", "testRolling")
  527. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  528. tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
  529. vCtx := context.WithValue(ctx, context.TransKey, tf)
  530. for _, tt := range tests {
  531. t.Run(tt.name, func(t *testing.T) {
  532. // Create a file sink with the temporary file path
  533. sink := &fileSink{}
  534. err := sink.Configure(map[string]interface{}{
  535. "path": tt.fname,
  536. "fileType": tt.ft,
  537. "rollingInterval": 1000,
  538. "checkInterval": 500,
  539. "rollingCount": 0,
  540. "rollingNamePattern": "suffix",
  541. "compression": tt.compress,
  542. })
  543. if err != nil {
  544. t.Fatal(err)
  545. }
  546. mockclock.ResetClock(10)
  547. err = sink.Open(ctx)
  548. if err != nil {
  549. t.Fatal(err)
  550. }
  551. c := mockclock.GetMockClock()
  552. for i := 0; i < 5; i++ {
  553. c.Add(450 * time.Millisecond)
  554. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  555. if err := sink.Collect(vCtx, m); err != nil {
  556. t.Errorf("unexpected error: %s", err)
  557. }
  558. }
  559. c.After(2000 * time.Millisecond)
  560. if err = sink.Close(ctx); err != nil {
  561. t.Errorf("unexpected close error: %s", err)
  562. }
  563. // Should write to 2 files
  564. for i := 0; i < 2; i++ {
  565. // Read the contents of the temporary file and check if they match the collected items
  566. var fn string
  567. if tt.compress != "" {
  568. fn = fmt.Sprintf("test_%s_%s-%d.log", tt.ft, tt.compress, 460+1350*i)
  569. } else {
  570. fn = fmt.Sprintf("test_%s-%d.log", tt.ft, 460+1350*i)
  571. }
  572. var contents []byte
  573. contents, err := os.ReadFile(fn)
  574. if err != nil {
  575. t.Fatal(err)
  576. }
  577. if tt.compress != "" {
  578. decompressor, _ := compressor.GetDecompressor(tt.compress)
  579. contents, err = decompressor.Decompress(contents)
  580. if err != nil {
  581. t.Errorf("%v", err)
  582. }
  583. }
  584. if !reflect.DeepEqual(contents, tt.contents[i]) {
  585. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  586. }
  587. }
  588. })
  589. }
  590. }
  591. // Test file rolling by count
  592. func TestFileSinkRollingCount_Collect(t *testing.T) {
  593. // Remove existing files
  594. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  595. if err != nil {
  596. return err
  597. }
  598. if filepath.Ext(path) == ".dd" {
  599. fmt.Println("Deleting file:", path)
  600. return os.Remove(path)
  601. }
  602. return nil
  603. })
  604. if err != nil {
  605. t.Fatal(err)
  606. }
  607. conf.IsTesting = true
  608. tests := []struct {
  609. name string
  610. ft FileType
  611. fname string
  612. contents [3][]byte
  613. compress string
  614. }{
  615. {
  616. name: "csv",
  617. ft: CSV_TYPE,
  618. fname: "test_csv_{{.ts}}.dd",
  619. contents: [3][]byte{
  620. []byte("key,ts\nvalue0,460"),
  621. []byte("key,ts\nvalue1,910"),
  622. []byte("key,ts\nvalue2,1360"),
  623. },
  624. },
  625. {
  626. name: "csv",
  627. ft: CSV_TYPE,
  628. fname: "test_csv_gzip_{{.ts}}.dd",
  629. contents: [3][]byte{
  630. []byte("key,ts\nvalue0,460"),
  631. []byte("key,ts\nvalue1,910"),
  632. []byte("key,ts\nvalue2,1360"),
  633. },
  634. compress: GZIP,
  635. },
  636. {
  637. name: "csv",
  638. ft: CSV_TYPE,
  639. fname: "test_csv_zstd_{{.ts}}.dd",
  640. contents: [3][]byte{
  641. []byte("key,ts\nvalue0,460"),
  642. []byte("key,ts\nvalue1,910"),
  643. []byte("key,ts\nvalue2,1360"),
  644. },
  645. compress: ZSTD,
  646. },
  647. }
  648. // Create a stream context for testing
  649. contextLogger := conf.Log.WithField("rule", "testRollingCount")
  650. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  651. tf, _ := transform.GenTransform("", "delimited", "", ",", "", []string{})
  652. vCtx := context.WithValue(ctx, context.TransKey, tf)
  653. for _, tt := range tests {
  654. t.Run(tt.name, func(t *testing.T) {
  655. // Create a file sink with the temporary file path
  656. sink := &fileSink{}
  657. err := sink.Configure(map[string]interface{}{
  658. "path": tt.fname,
  659. "fileType": tt.ft,
  660. "rollingInterval": 0,
  661. "rollingCount": 1,
  662. "rollingNamePattern": "none",
  663. "hasHeader": true,
  664. "format": "delimited",
  665. "compression": tt.compress,
  666. })
  667. if err != nil {
  668. t.Fatal(err)
  669. }
  670. mockclock.ResetClock(10)
  671. err = sink.Open(ctx)
  672. if err != nil {
  673. t.Fatal(err)
  674. }
  675. c := mockclock.GetMockClock()
  676. for i := 0; i < 3; i++ {
  677. c.Add(450 * time.Millisecond)
  678. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  679. if err := sink.Collect(vCtx, m); err != nil {
  680. t.Errorf("unexpected error: %s", err)
  681. }
  682. }
  683. c.After(2000 * time.Millisecond)
  684. if err = sink.Close(ctx); err != nil {
  685. t.Errorf("unexpected close error: %s", err)
  686. }
  687. // Should write to 2 files
  688. for i := 0; i < 3; i++ {
  689. // Read the contents of the temporary file and check if they match the collected items
  690. var fn string
  691. if tt.compress != "" {
  692. fn = fmt.Sprintf("test_%s_%s_%d.dd", tt.ft, tt.compress, 460+450*i)
  693. } else {
  694. fn = fmt.Sprintf("test_%s_%d.dd", tt.ft, 460+450*i)
  695. }
  696. contents, err := os.ReadFile(fn)
  697. if err != nil {
  698. t.Fatal(err)
  699. }
  700. if tt.compress != "" {
  701. decompressor, _ := compressor.GetDecompressor(tt.compress)
  702. contents, err = decompressor.Decompress(contents)
  703. if err != nil {
  704. t.Errorf("%v", err)
  705. }
  706. }
  707. if !reflect.DeepEqual(contents, tt.contents[i]) {
  708. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  709. }
  710. }
  711. })
  712. }
  713. }
  714. func TestFileSinkReopen(t *testing.T) {
  715. // Remove existing files
  716. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  717. if err != nil {
  718. return err
  719. }
  720. if filepath.Ext(path) == ".log" {
  721. fmt.Println("Deleting file:", path)
  722. return os.Remove(path)
  723. }
  724. return nil
  725. })
  726. if err != nil {
  727. t.Fatal(err)
  728. }
  729. conf.IsTesting = true
  730. tmpfile, err := os.CreateTemp("", "reopen.log")
  731. if err != nil {
  732. t.Fatal(err)
  733. }
  734. defer os.Remove(tmpfile.Name())
  735. // Create a stream context for testing
  736. contextLogger := conf.Log.WithField("rule", "testRollingCount")
  737. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  738. tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
  739. vCtx := context.WithValue(ctx, context.TransKey, tf)
  740. sink := &fileSink{}
  741. err = sink.Configure(map[string]interface{}{
  742. "path": tmpfile.Name(),
  743. "fileType": LINES_TYPE,
  744. "format": "json",
  745. "rollingNamePattern": "none",
  746. })
  747. if err != nil {
  748. t.Fatal(err)
  749. }
  750. err = sink.Open(vCtx)
  751. if err != nil {
  752. t.Fatal(err)
  753. }
  754. // Test collecting a map item
  755. m := map[string]interface{}{"key": "value1"}
  756. if err := sink.Collect(vCtx, m); err != nil {
  757. t.Errorf("unexpected error: %s", err)
  758. }
  759. sink.Close(vCtx)
  760. exp := []byte(`{"key":"value1"}`)
  761. contents, err := os.ReadFile(tmpfile.Name())
  762. if err != nil {
  763. t.Fatal(err)
  764. }
  765. if !reflect.DeepEqual(contents, exp) {
  766. t.Errorf("\nexpected\t %q \nbut got\t\t %q", string(exp), string(contents))
  767. }
  768. sink = &fileSink{}
  769. err = sink.Configure(map[string]interface{}{
  770. "path": tmpfile.Name(),
  771. "fileType": LINES_TYPE,
  772. "hasHeader": true,
  773. "format": "json",
  774. "rollingNamePattern": "none",
  775. })
  776. if err != nil {
  777. t.Fatal(err)
  778. }
  779. err = sink.Open(vCtx)
  780. if err != nil {
  781. t.Fatal(err)
  782. }
  783. // Test collecting another map item
  784. m = map[string]interface{}{"key": "value2"}
  785. if err := sink.Collect(vCtx, m); err != nil {
  786. t.Errorf("unexpected error: %s", err)
  787. }
  788. if err = sink.Close(vCtx); err != nil {
  789. t.Errorf("unexpected close error: %s", err)
  790. }
  791. exp = []byte(`{"key":"value2"}`)
  792. contents, err = os.ReadFile(tmpfile.Name())
  793. if err != nil {
  794. t.Fatal(err)
  795. }
  796. if !reflect.DeepEqual(contents, exp) {
  797. t.Errorf("\nexpected\t %q \nbut got\t\t %q", string(exp), string(contents))
  798. }
  799. }