Преглед изворни кода

fix(sink): support none exist dir for file sink (#2061)

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
superxan пре 1 година
родитељ
комит
f38a8a36ed
2 измењених фајлова са 17 додато и 2 уклоњено
  1. 9 2
      internal/io/file/file_sink.go
  2. 8 0
      internal/io/file/file_writer.go

+ 9 - 2
internal/io/file/file_sink.go

@@ -245,14 +245,21 @@ func (m *fileSink) GetFws(ctx api.StreamContext, fn string, item interface{}) (*
 		}
 		nfn := fn
 		if m.c.RollingNamePattern != "" {
+			newFile := ""
+			fileDir := filepath.Dir(fn)
+			fileName := filepath.Base(fn)
 			switch m.c.RollingNamePattern {
 			case "prefix":
-				nfn = fmt.Sprintf("%d-%s", conf.GetNowInMilli(), fn)
+				newFile = fmt.Sprintf("%d-%s", conf.GetNowInMilli(), fileName)
 			case "suffix":
 				ext := filepath.Ext(fn)
-				nfn = fmt.Sprintf("%s-%d%s", strings.TrimSuffix(fn, ext), conf.GetNowInMilli(), ext)
+				newFile = fmt.Sprintf("%s-%d%s", strings.TrimSuffix(fileName, ext), conf.GetNowInMilli(), ext)
+			default:
+				newFile = fileName
 			}
+			nfn = filepath.Join(fileDir, newFile)
 		}
+
 		fws, e = createFileWriter(ctx, nfn, m.c.FileType, headers, m.c.Compression)
 		if e != nil {
 			return nil, e

+ 8 - 0
internal/io/file/file_writer.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"path/filepath"
 	"time"
 
 	"github.com/lf-edge/ekuiper/internal/compressor"
@@ -45,6 +46,13 @@ func createFileWriter(ctx api.StreamContext, fn string, ft FileType, headers str
 		f   *os.File
 		err error
 	)
+	Dir := filepath.Dir(fn)
+	if _, err = os.Stat(Dir); os.IsNotExist(err) {
+		if err := os.Mkdir(Dir, 0o777); err != nil {
+			return nil, fmt.Errorf("fail to create file %s: %v", fn, err)
+		}
+	}
+
 	if _, err = os.Stat(fn); os.IsNotExist(err) {
 		if _, err := os.Create(fn); err != nil {
 			return nil, fmt.Errorf("fail to create file %s: %v", fn, err)