Pārlūkot izejas kodu

feat(sink): file sink supports various file types

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 gadi atpakaļ
vecāks
revīzija
ab664b1510

+ 29 - 0
internal/io/file/constant.go

@@ -0,0 +1,29 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+type FileType string
+
+const (
+	JSON_TYPE  FileType = "json"
+	CSV_TYPE   FileType = "csv"
+	LINES_TYPE FileType = "lines"
+)
+
+var fileTypes = map[FileType]struct{}{
+	JSON_TYPE:  {},
+	CSV_TYPE:   {},
+	LINES_TYPE: {},
+}

+ 84 - 10
internal/io/file/file_sink.go

@@ -17,31 +17,42 @@ package file
 import (
 import (
 	"bufio"
 	"bufio"
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/message"
 	"io"
 	"io"
 	"os"
 	"os"
+	"strings"
 	"sync"
 	"sync"
 	"time"
 	"time"
 )
 )
 
 
 type sinkConf struct {
 type sinkConf struct {
-	Interval int    `json:"interval"`
-	Path     string `json:"path"`
+	Interval  int      `json:"interval"`
+	Path      string   `json:"path"`
+	FileType  FileType `json:"fileType"`
+	HasHeader bool     `json:"hasHeader"`
+	Delimiter string   `json:"delimiter"`
+	Format    string   `json:"format"` // only use for validation; transformation is done in sink_node
 }
 }
 
 
 type fileSink struct {
 type fileSink struct {
 	c *sinkConf
 	c *sinkConf
-
-	mux    sync.Mutex
-	file   *os.File
-	writer io.Writer
+	// If firstLine is true, it means the file is newly created and the first line is not written yet.
+	// Do not write line feed for the first line.
+	firstLine bool
+	mux       sync.Mutex
+	file      *os.File
+	writer    io.Writer
+	hook      writerHooks
 }
 }
 
 
 func (m *fileSink) Configure(props map[string]interface{}) error {
 func (m *fileSink) Configure(props map[string]interface{}) error {
 	c := &sinkConf{
 	c := &sinkConf{
 		Interval: 1000,
 		Interval: 1000,
 		Path:     "cache",
 		Path:     "cache",
+		FileType: LINES_TYPE,
 	}
 	}
 	if err := cast.MapToStruct(props, c); err != nil {
 	if err := cast.MapToStruct(props, c); err != nil {
 		return err
 		return err
@@ -52,6 +63,18 @@ func (m *fileSink) Configure(props map[string]interface{}) error {
 	if c.Path == "" {
 	if c.Path == "" {
 		return fmt.Errorf("path must be set")
 		return fmt.Errorf("path must be set")
 	}
 	}
+	if c.FileType != JSON_TYPE && c.FileType != CSV_TYPE && c.FileType != LINES_TYPE {
+		return fmt.Errorf("fileType must be one of json, csv or lines")
+	}
+	if c.FileType == CSV_TYPE {
+		if c.Format != message.FormatDelimited {
+			return fmt.Errorf("format must be delimited when fileType is csv")
+		}
+		if c.Delimiter == "" {
+			conf.Log.Warnf("delimiter is not set, use default ','")
+			c.Delimiter = ","
+		}
+	}
 	m.c = c
 	m.c = c
 	return nil
 	return nil
 }
 }
@@ -71,6 +94,15 @@ func (m *fileSink) Open(ctx api.StreamContext) error {
 		return fmt.Errorf("fail to open file sink for %v", err)
 		return fmt.Errorf("fail to open file sink for %v", err)
 	}
 	}
 	m.file = f
 	m.file = f
+	m.firstLine = true
+	switch m.c.FileType {
+	case JSON_TYPE:
+		m.hook = &jsonWriterHooks{}
+	case CSV_TYPE:
+		m.hook = &csvWriterHooks{}
+	case LINES_TYPE:
+		m.hook = &linesWriterHooks{}
+	}
 	if m.c.Interval > 0 {
 	if m.c.Interval > 0 {
 		m.writer = bufio.NewWriter(f)
 		m.writer = bufio.NewWriter(f)
 		t := time.NewTicker(time.Duration(m.c.Interval) * time.Millisecond)
 		t := time.NewTicker(time.Duration(m.c.Interval) * time.Millisecond)
@@ -94,19 +126,57 @@ func (m *fileSink) Open(ctx api.StreamContext) error {
 	} else {
 	} else {
 		m.writer = f
 		m.writer = f
 	}
 	}
-
 	return nil
 	return nil
 }
 }
 
 
 func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	logger.Debugf("file sink receive %s", item)
 	logger.Debugf("file sink receive %s", item)
+	// extract header for csv
+	if m.c.FileType == CSV_TYPE && m.c.HasHeader && m.hook.Header() == nil {
+		var header []string
+		switch v := item.(type) {
+		case map[string]interface{}:
+			header = make([]string, len(v))
+			for k := range item.(map[string]interface{}) {
+				header = append(header, k)
+			}
+		case []map[string]interface{}:
+			if len(v) > 0 {
+				header = make([]string, len(v[0]))
+				for k := range v[0] {
+					header = append(header, k)
+				}
+			}
+		}
+		m.hook.(*csvWriterHooks).SetHeader(strings.Join(header, m.c.Delimiter))
+	}
 	if v, _, err := ctx.TransformOutput(item); err == nil {
 	if v, _, err := ctx.TransformOutput(item); err == nil {
 		logger.Debugf("file sink transform data %s", v)
 		logger.Debugf("file sink transform data %s", v)
 		m.mux.Lock()
 		m.mux.Lock()
-		m.writer.Write(v)
-		m.writer.Write([]byte("\n"))
-		m.mux.Unlock()
+		defer m.mux.Unlock()
+		if !m.firstLine {
+			_, e := m.writer.Write(m.hook.Line())
+			if e != nil {
+				return err
+			}
+		} else {
+			n, err := m.writer.Write(m.hook.Header())
+			if err != nil {
+				return err
+			}
+			if n > 0 {
+				_, e := m.writer.Write(m.hook.Line())
+				if e != nil {
+					return err
+				}
+			}
+			m.firstLine = false
+		}
+		_, e := m.writer.Write(v)
+		if e != nil {
+			return err
+		}
 	} else {
 	} else {
 		return fmt.Errorf("file sink transform data error: %v", err)
 		return fmt.Errorf("file sink transform data error: %v", err)
 	}
 	}
@@ -117,6 +187,10 @@ func (m *fileSink) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Infof("Closing file sink")
 	ctx.GetLogger().Infof("Closing file sink")
 	if m.file != nil {
 	if m.file != nil {
 		ctx.GetLogger().Infof("File sync before close")
 		ctx.GetLogger().Infof("File sync before close")
+		_, e := m.writer.Write(m.hook.Footer())
+		if e != nil {
+			ctx.GetLogger().Errorf("file sink fails to write footer with error %s.", e)
+		}
 		if m.c.Interval > 0 {
 		if m.c.Interval > 0 {
 			ctx.GetLogger().Infof("flush at close")
 			ctx.GetLogger().Infof("flush at close")
 			m.writer.(*bufio.Writer).Flush()
 			m.writer.(*bufio.Writer).Flush()

+ 94 - 65
internal/io/file/file_sink_test.go

@@ -18,7 +18,9 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
+	"github.com/lf-edge/ekuiper/pkg/message"
 	"os"
 	"os"
+	"reflect"
 	"testing"
 	"testing"
 )
 )
 
 
@@ -47,77 +49,101 @@ func TestConfigure(t *testing.T) {
 	if err == nil {
 	if err == nil {
 		t.Errorf("Configure() error = %v, wantErr not nil", err)
 		t.Errorf("Configure() error = %v, wantErr not nil", err)
 	}
 	}
+	err = m.Configure(map[string]interface{}{"fileType": "csv2"})
+	if err == nil {
+		t.Errorf("Configure() error = %v, wantErr not nil", err)
+	}
+	err = m.Configure(map[string]interface{}{"interval": 500,
+		"path":     "test",
+		"fileType": "csv"})
+	if err == nil {
+		t.Errorf("Configure() error = %v, wantErr not nil", err)
+	}
 }
 }
 
 
 func TestFileSink_Configure(t *testing.T) {
 func TestFileSink_Configure(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
-		name    string
-		c       *sinkConf
-		p       map[string]interface{}
-		wantErr bool
+		name string
+		c    *sinkConf
+		p    map[string]interface{}
 	}{
 	}{
 		{
 		{
-			name: "valid configuration",
+			name: "default configurations",
 			c: &sinkConf{
 			c: &sinkConf{
 				Interval: 1000,
 				Interval: 1000,
 				Path:     "cache",
 				Path:     "cache",
+				FileType: LINES_TYPE,
 			},
 			},
-			p: map[string]interface{}{
-				"interval": 500,
-				"path":     "test",
-			},
-			wantErr: false,
+			p: map[string]interface{}{},
 		},
 		},
 		{
 		{
-			name: "invalid interval",
-
+			name: "previous setting",
 			c: &sinkConf{
 			c: &sinkConf{
-				Interval: 1000,
-				Path:     "cache",
+				Interval: 500,
+				Path:     "test",
+				FileType: LINES_TYPE,
 			},
 			},
 
 
 			p: map[string]interface{}{
 			p: map[string]interface{}{
-				"interval": -500,
+				"interval": 500,
 				"path":     "test",
 				"path":     "test",
 			},
 			},
-			wantErr: true,
 		},
 		},
 		{
 		{
-			name: "empty path",
-
+			name: "new props",
 			c: &sinkConf{
 			c: &sinkConf{
-				Interval: 1000,
-				Path:     "cache",
+				Interval:  500,
+				Path:      "test",
+				FileType:  CSV_TYPE,
+				Format:    message.FormatDelimited,
+				Delimiter: ",",
 			},
 			},
-
 			p: map[string]interface{}{
 			p: map[string]interface{}{
 				"interval": 500,
 				"interval": 500,
-				"path":     "",
+				"path":     "test",
+				"fileType": "csv",
+				"format":   message.FormatDelimited,
 			},
 			},
-
-			wantErr: true,
 		},
 		},
 	}
 	}
 	for _, tt := range tests {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &fileSink{}
 			m := &fileSink{}
-			if err := m.Configure(tt.p); (err != nil) != tt.wantErr {
-				t.Errorf("fileSink.Configure() error = %v, wantErr %v", err, tt.wantErr)
+			if err := m.Configure(tt.p); err != nil {
+				t.Errorf("fileSink.Configure() error = %v", err)
+				return
+			}
+			if !reflect.DeepEqual(m.c, tt.c) {
+				t.Errorf("fileSink.Configure() = %v, want %v", m.c, tt.c)
 			}
 			}
 		})
 		})
 	}
 	}
 }
 }
 
 
 func TestFileSink_Collect(t *testing.T) {
 func TestFileSink_Collect(t *testing.T) {
-	// Create a temporary file for testing
-	tmpfile, err := os.CreateTemp("", "test")
-	if err != nil {
-		t.Fatal(err)
+	tests := []struct {
+		name    string
+		ft      FileType
+		fname   string
+		content []byte
+	}{
+		{
+			name:    "lines",
+			ft:      LINES_TYPE,
+			fname:   "test_lines",
+			content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
+		}, {
+			name:    "json",
+			ft:      JSON_TYPE,
+			fname:   "test_json",
+			content: []byte(`[{"key":"value1"}{"key":"value2"}]`),
+		}, {
+			name:    "csv",
+			ft:      CSV_TYPE,
+			fname:   "test_csv",
+			content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
+		},
 	}
 	}
-	defer os.Remove(tmpfile.Name())
-
-	// Create a file sink with the temporary file path
-	sink := &fileSink{c: &sinkConf{Path: tmpfile.Name()}}
 
 
 	// Create a stream context for testing
 	// Create a stream context for testing
 	contextLogger := conf.Log.WithField("rule", "test2")
 	contextLogger := conf.Log.WithField("rule", "test2")
@@ -125,38 +151,41 @@ func TestFileSink_Collect(t *testing.T) {
 
 
 	tf, _ := transform.GenTransform("", "json", "", "")
 	tf, _ := transform.GenTransform("", "json", "", "")
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
-	sink.Open(ctx)
-
-	// Test collecting a string item
-	str := "test string"
-	if err := sink.Collect(vCtx, str); err != nil {
-		t.Errorf("unexpected error: %s", err)
-	}
-
-	// Test collecting a map item
-	m := map[string]interface{}{"key": "value"}
-	if err := sink.Collect(ctx, m); err != nil {
-		t.Errorf("unexpected error: %s", err)
-	}
-
-	// Test collecting an invalid item
-	invalid := make(chan int)
-	if err := sink.Collect(ctx, invalid); err == nil {
-		t.Error("expected error but got nil")
-	}
 
 
-	// Close the file sink
-	if err := sink.Close(ctx); err != nil {
-		t.Errorf("unexpected error: %s", err)
-	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Create a temporary file for testing
+			tmpfile, err := os.CreateTemp("", tt.fname)
+			if err != nil {
+				t.Fatal(err)
+			}
+			defer os.Remove(tmpfile.Name())
+			// Create a file sink with the temporary file path
+			sink := &fileSink{c: &sinkConf{Path: tmpfile.Name(), FileType: tt.ft, HasHeader: true}}
+			sink.Open(ctx)
+
+			// Test collecting a map item
+			m := map[string]interface{}{"key": "value1"}
+			if err := sink.Collect(vCtx, m); err != nil {
+				t.Errorf("unexpected error: %s", err)
+			}
 
 
-	// Read the contents of the temporary file and check if they match the collected items
-	contents, err := os.ReadFile(tmpfile.Name())
-	if err != nil {
-		t.Fatal(err)
-	}
-	expected := "\"test string\"\n{\"key\":\"value\"}\n"
-	if string(contents) != expected {
-		t.Errorf("expected %q but got %q", expected, string(contents))
+			// Test collecting another map item
+			m = map[string]interface{}{"key": "value2"}
+			if err := sink.Collect(ctx, m); err != nil {
+				t.Errorf("unexpected error: %s", err)
+			}
+			if err = sink.Close(ctx); err != nil {
+				t.Errorf("unexpected close error: %s", err)
+			}
+			// Read the contents of the temporary file and check if they match the collected items
+			contents, err := os.ReadFile(tmpfile.Name())
+			if err != nil {
+				t.Fatal(err)
+			}
+			if !reflect.DeepEqual(contents, tt.content) {
+				t.Errorf("expected %q but got %q", tt.content, string(contents))
+			}
+		})
 	}
 	}
 }
 }

+ 0 - 14
internal/io/file/file_source.go

@@ -32,20 +32,6 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-type FileType string
-
-const (
-	JSON_TYPE  FileType = "json"
-	CSV_TYPE   FileType = "csv"
-	LINES_TYPE FileType = "lines"
-)
-
-var fileTypes = map[FileType]struct{}{
-	JSON_TYPE:  {},
-	CSV_TYPE:   {},
-	LINES_TYPE: {},
-}
-
 type FileSourceConfig struct {
 type FileSourceConfig struct {
 	FileType         FileType `json:"fileType"`
 	FileType         FileType `json:"fileType"`
 	Path             string   `json:"path"`
 	Path             string   `json:"path"`

+ 69 - 0
internal/io/file/writer_hooks.go

@@ -0,0 +1,69 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+type writerHooks interface {
+	Header() []byte
+	Line() []byte
+	Footer() []byte
+}
+
+type jsonWriterHooks struct{}
+
+func (j *jsonWriterHooks) Header() []byte {
+	return []byte("[")
+}
+
+func (j *jsonWriterHooks) Line() []byte {
+	return nil
+}
+
+func (j *jsonWriterHooks) Footer() []byte {
+	return []byte("]")
+}
+
+type linesWriterHooks struct{}
+
+func (l *linesWriterHooks) Header() []byte {
+	return nil
+}
+
+func (l *linesWriterHooks) Line() []byte {
+	return []byte("\n")
+}
+
+func (l *linesWriterHooks) Footer() []byte {
+	return nil
+}
+
+type csvWriterHooks struct {
+	header []byte
+}
+
+func (c *csvWriterHooks) Header() []byte {
+	return c.header
+}
+
+func (c *csvWriterHooks) Line() []byte {
+	return []byte("\n")
+}
+
+func (c *csvWriterHooks) Footer() []byte {
+	return nil
+}
+
+func (c *csvWriterHooks) SetHeader(header string) {
+	c.header = []byte(header)
+}