Jelajahi Sumber

feat(sink): file sink rolling support

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 tahun lalu
induk
melakukan
62aea9ba4f

+ 147 - 104
internal/io/file/file_sink.go

@@ -15,50 +15,77 @@
 package file
 
 import (
-	"bufio"
+	"errors"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/message"
-	"io"
-	"os"
+	"path/filepath"
+	"sort"
 	"strings"
 	"sync"
 	"time"
 )
 
 type sinkConf struct {
-	Interval  int      `json:"interval"`
-	Path      string   `json:"path"`
-	FileType  FileType `json:"fileType"`
-	HasHeader bool     `json:"hasHeader"`
-	Delimiter string   `json:"delimiter"`
-	Format    string   `json:"format"` // only use for validation; transformation is done in sink_node
+	Interval           *int     `json:"interval"` // deprecated, will remove in the next release
+	RollingInterval    int64    `json:"rollingInterval"`
+	RollingCount       int      `json:"rollingCount"`
+	RollingNamePattern string   `json:"rollingNamePattern"` // where to add the timestamp to the file name
+	CheckInterval      *int64   `json:"checkInterval"`      // Once interval removed, this will be NOT nullable
+	Path               string   `json:"path"`               // support dynamic property, when rolling, make sure the path is updated
+	FileType           FileType `json:"fileType"`
+	HasHeader          bool     `json:"hasHeader"`
+	Delimiter          string   `json:"delimiter"`
+	Format             string   `json:"format"` // only use for validation; transformation is done in sink_node
 }
 
 type fileSink struct {
 	c *sinkConf
-	// If firstLine is true, it means the file is newly created and the first line is not written yet.
-	// Do not write line feed for the first line.
-	firstLine bool
-	mux       sync.Mutex
-	file      *os.File
-	writer    io.Writer
-	hook      writerHooks
+
+	mux sync.Mutex
+	fws map[string]*fileWriter
 }
 
 func (m *fileSink) Configure(props map[string]interface{}) error {
 	c := &sinkConf{
-		Interval: 1000,
-		Path:     "cache",
-		FileType: LINES_TYPE,
+		RollingCount: 1000000,
+		Path:         "cache",
+		FileType:     LINES_TYPE,
 	}
 	if err := cast.MapToStruct(props, c); err != nil {
 		return err
 	}
-	if c.Interval <= 0 {
-		return fmt.Errorf("interval must be positive")
+	if c.Interval != nil {
+		if *c.Interval < 0 {
+			return fmt.Errorf("interval must be positive")
+		} else if c.CheckInterval == nil {
+			conf.Log.Warnf("interval is deprecated, use checkInterval instead. automatically set checkInterval to %d", c.Interval)
+			t := int64(*c.Interval)
+			c.CheckInterval = &t
+		} else {
+			conf.Log.Warnf("interval is deprecated and ignored, use checkInterval instead.")
+		}
+	} else if c.CheckInterval == nil { // set checkInterval default value if both interval and checkInerval are not set
+		t := (5 * time.Minute).Milliseconds()
+		c.CheckInterval = &t
+	}
+	if c.RollingInterval < 0 {
+		return fmt.Errorf("rollingInterval must be positive")
+	}
+	if c.RollingCount < 0 {
+		return fmt.Errorf("rollingCount must be positive")
+	}
+
+	if *c.CheckInterval < 0 {
+		return fmt.Errorf("checkInterval must be positive")
+	}
+	if c.RollingInterval == 0 && c.RollingCount == 0 {
+		return fmt.Errorf("one of rollingInterval and rollingCount must be set")
+	}
+	if c.RollingNamePattern != "" && c.RollingNamePattern != "prefix" && c.RollingNamePattern != "suffix" && c.RollingNamePattern != "none" {
+		return fmt.Errorf("rollingNamePattern must be one of prefix, suffix or none")
 	}
 	if c.Path == "" {
 		return fmt.Errorf("path must be set")
@@ -76,106 +103,76 @@ func (m *fileSink) Configure(props map[string]interface{}) error {
 		}
 	}
 	m.c = c
+	m.fws = make(map[string]*fileWriter)
 	return nil
 }
 
 func (m *fileSink) Open(ctx api.StreamContext) error {
-	logger := ctx.GetLogger()
-	logger.Debug("Opening file sink")
-	var (
-		f   *os.File
-		err error
-	)
-	if _, err = os.Stat(m.c.Path); os.IsNotExist(err) {
-		_, err = os.Create(m.c.Path)
-	}
-	f, err = os.OpenFile(m.c.Path, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
-	if err != nil {
-		return fmt.Errorf("fail to open file sink for %v", err)
-	}
-	m.file = f
-	m.firstLine = true
-	switch m.c.FileType {
-	case JSON_TYPE:
-		m.hook = &jsonWriterHooks{}
-	case CSV_TYPE:
-		m.hook = &csvWriterHooks{}
-	case LINES_TYPE:
-		m.hook = &linesWriterHooks{}
-	}
-	if m.c.Interval > 0 {
-		m.writer = bufio.NewWriter(f)
-		t := time.NewTicker(time.Duration(m.c.Interval) * time.Millisecond)
+	ctx.GetLogger().Debug("Opening file sink")
+	// Check if the files have opened longer than the rolling interval, if so close it and create a new one
+	if *m.c.CheckInterval > 0 {
+		t := conf.GetTicker(int(*m.c.CheckInterval))
 		go func() {
 			defer t.Stop()
 			for {
 				select {
-				case <-t.C:
+				case now := <-t.C:
 					m.mux.Lock()
-					err := m.writer.(*bufio.Writer).Flush()
-					if err != nil {
-						logger.Errorf("file sink fails to flush with error %s.", err)
+					for k, v := range m.fws {
+						if now.Sub(v.Start) > time.Duration(m.c.RollingInterval)*time.Millisecond {
+							ctx.GetLogger().Debugf("rolling file %s", k)
+							err := v.Close(ctx)
+							// TODO how to inform this error to the rule
+							if err != nil {
+								ctx.GetLogger().Errorf("file sink fails to close file %s with error %s.", k, err)
+							}
+							delete(m.fws, k)
+							// The file will be created when the next item comes
+						}
 					}
 					m.mux.Unlock()
 				case <-ctx.Done():
-					logger.Info("file sink done")
+					ctx.GetLogger().Info("file sink done")
 					return
 				}
 			}
 		}()
-	} else {
-		m.writer = f
 	}
 	return nil
 }
 
 func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
-	logger := ctx.GetLogger()
-	logger.Debugf("file sink receive %s", item)
-	// extract header for csv
-	if m.c.FileType == CSV_TYPE && m.c.HasHeader && m.hook.Header() == nil {
-		var header []string
-		switch v := item.(type) {
-		case map[string]interface{}:
-			header = make([]string, len(v))
-			for k := range item.(map[string]interface{}) {
-				header = append(header, k)
-			}
-		case []map[string]interface{}:
-			if len(v) > 0 {
-				header = make([]string, len(v[0]))
-				for k := range v[0] {
-					header = append(header, k)
-				}
-			}
-		}
-		m.hook.(*csvWriterHooks).SetHeader(strings.Join(header, m.c.Delimiter))
+	ctx.GetLogger().Debugf("file sink receive %s", item)
+	fn, err := ctx.ParseTemplate(m.c.Path, item)
+	if err != nil {
+		return err
+	}
+	fw, err := m.GetFws(ctx, fn, item)
+	if err != nil {
+		return err
 	}
 	if v, _, err := ctx.TransformOutput(item); err == nil {
-		logger.Debugf("file sink transform data %s", v)
+		ctx.GetLogger().Debugf("file sink transform data %s", v)
 		m.mux.Lock()
 		defer m.mux.Unlock()
-		if !m.firstLine {
-			_, e := m.writer.Write(m.hook.Line())
-			if e != nil {
-				return err
-			}
-		} else {
-			n, err := m.writer.Write(m.hook.Header())
-			if err != nil {
-				return err
-			}
-			if n > 0 {
-				_, e := m.writer.Write(m.hook.Line())
+		_, e := fw.Writer.Write(v)
+		if e != nil {
+			return e
+		}
+		_, e = fw.Writer.Write(fw.Hook.Line())
+		if e != nil {
+			return e
+		}
+		if m.c.RollingCount > 0 {
+			fw.Count++
+			if fw.Count >= m.c.RollingCount {
+				e = fw.Close(ctx)
 				if e != nil {
-					return err
+					return e
 				}
+				delete(m.fws, fn)
+				fw.Count = 0
 			}
-			m.firstLine = false
-		}
-		_, e := m.writer.Write(v)
-		if e != nil {
-			return err
 		}
 	} else {
 		return fmt.Errorf("file sink transform data error: %v", err)
@@ -185,20 +182,66 @@ func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
 
 func (m *fileSink) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Infof("Closing file sink")
-	if m.file != nil {
-		ctx.GetLogger().Infof("File sync before close")
-		_, e := m.writer.Write(m.hook.Footer())
-		if e != nil {
-			ctx.GetLogger().Errorf("file sink fails to write footer with error %s.", e)
+	var errs []error
+	for k, v := range m.fws {
+		if e := v.Close(ctx); e != nil {
+			ctx.GetLogger().Errorf("failed to close file %s: %v", k, e)
+			errs = append(errs, e)
 		}
-		if m.c.Interval > 0 {
-			ctx.GetLogger().Infof("flush at close")
-			m.writer.(*bufio.Writer).Flush()
+	}
+	return errors.Join(errs...)
+}
+
+// GetFws returns the file writer for the given file name, if the file writer does not exist, it will create one
+// The item is used to get the csv header if needed
+func (m *fileSink) GetFws(ctx api.StreamContext, fn string, item interface{}) (*fileWriter, error) {
+	m.mux.Lock()
+	defer m.mux.Unlock()
+	fws, ok := m.fws[fn]
+	if !ok {
+		var e error
+		// extract header for csv
+		var headers string
+		if m.c.FileType == CSV_TYPE && m.c.HasHeader {
+			var header []string
+			switch v := item.(type) {
+			case map[string]interface{}:
+				header = make([]string, len(v))
+				i := 0
+				for k := range item.(map[string]interface{}) {
+					header[i] = k
+					i++
+				}
+			case []map[string]interface{}:
+				if len(v) > 0 {
+					header = make([]string, len(v[0]))
+					i := 0
+					for k := range v[0] {
+						header[i] = k
+						i++
+					}
+				}
+			}
+			sort.Strings(header)
+			headers = strings.Join(header, m.c.Delimiter)
 		}
-		m.file.Sync()
-		return m.file.Close()
+		nfn := fn
+		if m.c.RollingNamePattern != "" {
+			switch m.c.RollingNamePattern {
+			case "prefix":
+				nfn = fmt.Sprintf("%d-%s", conf.GetNowInMilli(), fn)
+			case "suffix":
+				ext := filepath.Ext(fn)
+				nfn = fmt.Sprintf("%s-%d%s", strings.TrimSuffix(fn, ext), conf.GetNowInMilli(), ext)
+			}
+		}
+		fws, e = createFileWriter(ctx, nfn, m.c.FileType, headers)
+		if e != nil {
+			return nil, e
+		}
+		m.fws[fn] = fws
 	}
-	return nil
+	return fws, nil
 }
 
 func File() api.Sink {

+ 284 - 22
internal/io/file/file_sink_test.go

@@ -15,13 +15,18 @@
 package file
 
 import (
+	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"os"
+	"path/filepath"
 	"reflect"
+	"strconv"
 	"testing"
+	"time"
 )
 
 // Unit test for Configure function
@@ -30,12 +35,12 @@ func TestConfigure(t *testing.T) {
 		"interval": 500,
 		"path":     "test",
 	}
-	m := &fileSink{}
+	m := File().(*fileSink)
 	err := m.Configure(props)
 	if err != nil {
 		t.Errorf("Configure() error = %v, wantErr nil", err)
 	}
-	if m.c.Interval != 500 {
+	if *m.c.Interval != 500 {
 		t.Errorf("Configure() Interval = %v, want 500", m.c.Interval)
 	}
 	if m.c.Path != "test" {
@@ -59,9 +64,39 @@ func TestConfigure(t *testing.T) {
 	if err == nil {
 		t.Errorf("Configure() error = %v, wantErr not nil", err)
 	}
+	err = m.Configure(map[string]interface{}{"interval": 60, "path": "test", "checkInterval": -1})
+	if err == nil {
+		t.Errorf("Configure() error = %v, wantErr not nil", err)
+	}
+	err = m.Configure(map[string]interface{}{"rollingInterval": -1})
+	if err == nil {
+		t.Errorf("Configure() error = %v, wantErr not nil", err)
+	}
+	err = m.Configure(map[string]interface{}{"rollingCount": -1})
+	if err == nil {
+		t.Errorf("Configure() error = %v, wantErr not nil", err)
+	}
+	err = m.Configure(map[string]interface{}{"rollingCount": 0, "rollingInterval": 0})
+	if err == nil {
+		t.Errorf("Configure() error = %v, wantErr not nil", err)
+	}
+	err = m.Configure(map[string]interface{}{"RollingNamePattern": "test"})
+	if err == nil {
+		t.Errorf("Configure() error = %v, wantErr not nil", err)
+	}
+	err = m.Configure(map[string]interface{}{"RollingNamePattern": 0})
+	if err == nil {
+		t.Errorf("Configure() error = %v, wantErr not nil", err)
+	}
 }
 
 func TestFileSink_Configure(t *testing.T) {
+	var (
+		defaultCheckInterval = (5 * time.Minute).Milliseconds()
+		int500               = 500
+		int64_500            = int64(int500)
+	)
+
 	tests := []struct {
 		name string
 		c    *sinkConf
@@ -70,18 +105,21 @@ func TestFileSink_Configure(t *testing.T) {
 		{
 			name: "default configurations",
 			c: &sinkConf{
-				Interval: 1000,
-				Path:     "cache",
-				FileType: LINES_TYPE,
+				CheckInterval: &defaultCheckInterval,
+				Path:          "cache",
+				FileType:      LINES_TYPE,
+				RollingCount:  1000000,
 			},
 			p: map[string]interface{}{},
 		},
 		{
 			name: "previous setting",
 			c: &sinkConf{
-				Interval: 500,
-				Path:     "test",
-				FileType: LINES_TYPE,
+				Interval:      &int500,
+				CheckInterval: &int64_500,
+				Path:          "test",
+				FileType:      LINES_TYPE,
+				RollingCount:  1000000,
 			},
 
 			p: map[string]interface{}{
@@ -92,17 +130,34 @@ func TestFileSink_Configure(t *testing.T) {
 		{
 			name: "new props",
 			c: &sinkConf{
-				Interval:  500,
-				Path:      "test",
-				FileType:  CSV_TYPE,
-				Format:    message.FormatDelimited,
-				Delimiter: ",",
+				CheckInterval:      &int64_500,
+				Path:               "test",
+				FileType:           CSV_TYPE,
+				Format:             message.FormatDelimited,
+				Delimiter:          ",",
+				RollingCount:       1000000,
+				RollingNamePattern: "none",
 			},
 			p: map[string]interface{}{
-				"interval": 500,
-				"path":     "test",
-				"fileType": "csv",
-				"format":   message.FormatDelimited,
+				"checkInterval":      500,
+				"path":               "test",
+				"fileType":           "csv",
+				"format":             message.FormatDelimited,
+				"rollingNamePattern": "none",
+			},
+		},
+		{ // only set rolling interval
+			name: "rolling",
+			c: &sinkConf{
+				CheckInterval:   &defaultCheckInterval,
+				Path:            "cache",
+				FileType:        LINES_TYPE,
+				RollingInterval: 500,
+				RollingCount:    0,
+			},
+			p: map[string]interface{}{
+				"rollingInterval": 500,
+				"rollingCount":    0,
 			},
 		},
 	}
@@ -120,6 +175,7 @@ func TestFileSink_Configure(t *testing.T) {
 	}
 }
 
+// Test single file writing and flush by close
 func TestFileSink_Collect(t *testing.T) {
 	tests := []struct {
 		name    string
@@ -131,7 +187,7 @@ func TestFileSink_Collect(t *testing.T) {
 			name:    "lines",
 			ft:      LINES_TYPE,
 			fname:   "test_lines",
-			content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
+			content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n"),
 		}, {
 			name:    "json",
 			ft:      JSON_TYPE,
@@ -141,7 +197,7 @@ func TestFileSink_Collect(t *testing.T) {
 			name:    "csv",
 			ft:      CSV_TYPE,
 			fname:   "test_csv",
-			content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
+			content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n"),
 		},
 	}
 
@@ -161,8 +217,25 @@ func TestFileSink_Collect(t *testing.T) {
 			}
 			defer os.Remove(tmpfile.Name())
 			// Create a file sink with the temporary file path
-			sink := &fileSink{c: &sinkConf{Path: tmpfile.Name(), FileType: tt.ft, HasHeader: true}}
-			sink.Open(ctx)
+			sink := &fileSink{}
+			f := message.FormatJson
+			if tt.ft == CSV_TYPE {
+				f = message.FormatDelimited
+			}
+			err = sink.Configure(map[string]interface{}{
+				"path":               tmpfile.Name(),
+				"fileType":           tt.ft,
+				"hasHeader":          true,
+				"format":             f,
+				"rollingNamePattern": "none",
+			})
+			if err != nil {
+				t.Fatal(err)
+			}
+			err = sink.Open(ctx)
+			if err != nil {
+				t.Fatal(err)
+			}
 
 			// Test collecting a map item
 			m := map[string]interface{}{"key": "value1"}
@@ -184,7 +257,196 @@ func TestFileSink_Collect(t *testing.T) {
 				t.Fatal(err)
 			}
 			if !reflect.DeepEqual(contents, tt.content) {
-				t.Errorf("expected %q but got %q", tt.content, string(contents))
+				t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
+			}
+		})
+	}
+}
+
+// Test file rolling by time
+func TestFileSinkRolling_Collect(t *testing.T) {
+	// Remove existing files
+	err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
+		if err != nil {
+			return err
+		}
+		if filepath.Ext(path) == ".log" {
+			fmt.Println("Deleting file:", path)
+			return os.Remove(path)
+		}
+		return nil
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	conf.IsTesting = true
+	tests := []struct {
+		name     string
+		ft       FileType
+		fname    string
+		contents [2][]byte
+	}{
+		{
+			name:  "lines",
+			ft:    LINES_TYPE,
+			fname: "test_lines.log",
+			contents: [2][]byte{
+				[]byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}\n"),
+				[]byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}\n"),
+			},
+		}, {
+			name:  "json",
+			ft:    JSON_TYPE,
+			fname: "test_json.log",
+			contents: [2][]byte{
+				[]byte("[{\"key\":\"value0\",\"ts\":460}{\"key\":\"value1\",\"ts\":910}{\"key\":\"value2\",\"ts\":1360}]"),
+				[]byte("[{\"key\":\"value3\",\"ts\":1810}{\"key\":\"value4\",\"ts\":2260}]"),
+			},
+		},
+	}
+
+	// Create a stream context for testing
+	contextLogger := conf.Log.WithField("rule", "testRolling")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+
+	tf, _ := transform.GenTransform("", "json", "", "")
+	vCtx := context.WithValue(ctx, context.TransKey, tf)
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Create a file sink with the temporary file path
+			sink := &fileSink{}
+			err := sink.Configure(map[string]interface{}{
+				"path":               tt.fname,
+				"fileType":           tt.ft,
+				"rollingInterval":    1000,
+				"checkInterval":      500,
+				"rollingCount":       0,
+				"rollingNamePattern": "suffix",
+			})
+			if err != nil {
+				t.Fatal(err)
+			}
+			mockclock.ResetClock(10)
+			err = sink.Open(ctx)
+			if err != nil {
+				t.Fatal(err)
+			}
+			c := mockclock.GetMockClock()
+
+			for i := 0; i < 5; i++ {
+				c.Add(450 * time.Millisecond)
+				m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
+				if err := sink.Collect(vCtx, m); err != nil {
+					t.Errorf("unexpected error: %s", err)
+				}
+			}
+			c.After(2000 * time.Millisecond)
+			if err = sink.Close(ctx); err != nil {
+				t.Errorf("unexpected close error: %s", err)
+			}
+			// Should write to 2 files
+			for i := 0; i < 2; i++ {
+				// Read the contents of the temporary file and check if they match the collected items
+				fn := fmt.Sprintf("test_%s-%d.log", tt.ft, 460+1350*i)
+				contents, err := os.ReadFile(fn)
+				if err != nil {
+					t.Fatal(err)
+				}
+				if !reflect.DeepEqual(contents, tt.contents[i]) {
+					t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
+				}
+			}
+		})
+	}
+}
+
+// Test file rolling by count
+func TestFileSinkRollingCount_Collect(t *testing.T) {
+	// Remove existing files
+	err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
+		if err != nil {
+			return err
+		}
+		if filepath.Ext(path) == ".dd" {
+			fmt.Println("Deleting file:", path)
+			return os.Remove(path)
+		}
+		return nil
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	conf.IsTesting = true
+	tests := []struct {
+		name     string
+		ft       FileType
+		fname    string
+		contents [3][]byte
+	}{
+		{
+			name:  "csv",
+			ft:    CSV_TYPE,
+			fname: "test_csv_{{.ts}}.dd",
+			contents: [3][]byte{
+				[]byte("key,ts\nvalue0,460\n"),
+				[]byte("key,ts\nvalue1,910\n"),
+				[]byte("key,ts\nvalue2,1360\n"),
+			},
+		},
+	}
+	// Create a stream context for testing
+	contextLogger := conf.Log.WithField("rule", "testRollingCount")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+
+	tf, _ := transform.GenTransform("", "delimited", "", ",")
+	vCtx := context.WithValue(ctx, context.TransKey, tf)
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Create a file sink with the temporary file path
+			sink := &fileSink{}
+			err := sink.Configure(map[string]interface{}{
+				"path":               tt.fname,
+				"fileType":           tt.ft,
+				"rollingInterval":    0,
+				"rollingCount":       1,
+				"rollingNamePattern": "none",
+				"hasHeader":          true,
+				"format":             "delimited",
+			})
+			if err != nil {
+				t.Fatal(err)
+			}
+			mockclock.ResetClock(10)
+			err = sink.Open(ctx)
+			if err != nil {
+				t.Fatal(err)
+			}
+			c := mockclock.GetMockClock()
+
+			for i := 0; i < 3; i++ {
+				c.Add(450 * time.Millisecond)
+				m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
+				if err := sink.Collect(vCtx, m); err != nil {
+					t.Errorf("unexpected error: %s", err)
+				}
+			}
+			c.After(2000 * time.Millisecond)
+			if err = sink.Close(ctx); err != nil {
+				t.Errorf("unexpected close error: %s", err)
+			}
+			// Should write to 2 files
+			for i := 0; i < 3; i++ {
+				// Read the contents of the temporary file and check if they match the collected items
+				fn := fmt.Sprintf("test_%s_%d.dd", tt.ft, 460+450*i)
+				contents, err := os.ReadFile(fn)
+				if err != nil {
+					t.Fatal(err)
+				}
+				if !reflect.DeepEqual(contents, tt.contents[i]) {
+					t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
+				}
 			}
 		})
 	}

+ 96 - 0
internal/io/file/file_writer.go

@@ -0,0 +1,96 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+	"bufio"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"io"
+	"os"
+	"time"
+)
+
+type fileWriter struct {
+	File   *os.File
+	Writer io.Writer
+	Hook   writerHooks
+	Start  time.Time
+	Count  int
+}
+
+func createFileWriter(ctx api.StreamContext, fn string, ft FileType, headers string) (_ *fileWriter, ge error) {
+	ctx.GetLogger().Infof("Create new file writer for %s", fn)
+	fws := &fileWriter{Start: conf.GetNow()}
+	var (
+		f   *os.File
+		err error
+	)
+	if _, err = os.Stat(fn); os.IsNotExist(err) {
+		_, err = os.Create(fn)
+	}
+	f, err = os.OpenFile(fn, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
+	if err != nil {
+		return nil, fmt.Errorf("fail to open file sink for %s: %v", fn, err)
+	}
+	defer func() {
+		if ge != nil {
+			_ = f.Close()
+		}
+	}()
+	fws.File = f
+	switch ft {
+	case JSON_TYPE:
+		fws.Hook = jsonHooks
+	case CSV_TYPE:
+		fws.Hook = &csvWriterHooks{header: []byte(headers)}
+	case LINES_TYPE:
+		fws.Hook = linesHooks
+	}
+	fws.Writer = bufio.NewWriter(f)
+	n, err := fws.Writer.Write(fws.Hook.Header())
+	if err != nil {
+		return nil, err
+	}
+	if n > 0 {
+		_, e := fws.Writer.Write(fws.Hook.Line())
+		if e != nil {
+			return nil, err
+		}
+	}
+	return fws, nil
+}
+
+func (fw *fileWriter) Close(ctx api.StreamContext) error {
+	if fw.File != nil {
+		ctx.GetLogger().Debugf("File sync before close")
+		_, e := fw.Writer.Write(fw.Hook.Footer())
+		if e != nil {
+			ctx.GetLogger().Errorf("file sink fails to write footer with error %s.", e)
+		}
+		err := fw.Writer.(*bufio.Writer).Flush()
+		if err != nil {
+			ctx.GetLogger().Errorf("file sink fails to flush with error %s.", err)
+		}
+		err = fw.File.Sync()
+		if err != nil {
+			ctx.GetLogger().Errorf("file sink fails to sync with error %s.", err)
+		}
+		ctx.GetLogger().Infof("Close file %s", fw.File.Name())
+		return fw.File.Close()
+	}
+	return nil
+}

+ 4 - 0
internal/io/file/writer_hooks.go

@@ -34,6 +34,8 @@ func (j *jsonWriterHooks) Footer() []byte {
 	return []byte("]")
 }
 
+var jsonHooks = &jsonWriterHooks{}
+
 type linesWriterHooks struct{}
 
 func (l *linesWriterHooks) Header() []byte {
@@ -48,6 +50,8 @@ func (l *linesWriterHooks) Footer() []byte {
 	return nil
 }
 
+var linesHooks = &linesWriterHooks{}
+
 type csvWriterHooks struct {
 	header []byte
 }

+ 1 - 4
internal/topo/topotest/plugin_rule_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,9 +12,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:build !windows
-// +build !windows
-
 package topotest
 
 import (