소스 검색

fix: keep the order for delimited format (#1866)

* keep the order for delimited format

Signed-off-by: Rui-Gan <1171530954@qq.com>

* add ut

Signed-off-by: Rui-Gan <1171530954@qq.com>

---------

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 1 년 전
부모
커밋
55e8bbe2a5
4개의 변경된 파일106개의 추가작업 그리고 2개의 파일을 삭제
  1. 1 1
      internal/io/file/file_sink.go
  2. 96 0
      internal/io/file/file_sink_test.go
  3. 7 0
      internal/topo/node/sink_node_test.go
  4. 2 1
      internal/topo/transform/template.go

+ 1 - 1
internal/io/file/file_sink.go

@@ -238,8 +238,8 @@ func (m *fileSink) GetFws(ctx api.StreamContext, fn string, item interface{}) (*
 						}
 					}
 				}
+				sort.Strings(header)
 			}
-			sort.Strings(header)
 			headers = strings.Join(header, m.c.Delimiter)
 		}
 		nfn := fn

+ 96 - 0
internal/io/file/file_sink_test.go

@@ -198,6 +198,22 @@ func TestFileSink_Configure(t *testing.T) {
 				"rollingCount":    0,
 			},
 		},
+		{
+			name: "fields",
+			c: &sinkConf{
+				CheckInterval:   &defaultCheckInterval,
+				Path:            "cache",
+				FileType:        LINES_TYPE,
+				RollingInterval: 500,
+				RollingCount:    0,
+				Fields:          []string{"c", "a", "b"},
+			},
+			p: map[string]interface{}{
+				"rollingInterval": 500,
+				"rollingCount":    0,
+				"fields":          []string{"c", "a", "b"},
+			},
+		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -356,6 +372,86 @@ func TestFileSink_Collect(t *testing.T) {
 	}
 }
 
+// Test file collect by fields
+func TestFileSinkFields_Collect(t *testing.T) {
+	tests := []struct {
+		name      string
+		ft        FileType
+		fname     string
+		format    string
+		delimiter string
+		fields    []string
+		content   []byte
+	}{
+		{
+			name:      "test1",
+			ft:        CSV_TYPE,
+			fname:     "test_csv",
+			format:    "delimited",
+			delimiter: ",",
+			fields:    []string{"temperature", "humidity"},
+			content:   []byte("temperature,humidity\n31.2,40"),
+		},
+		{
+			name:      "test2",
+			ft:        CSV_TYPE,
+			fname:     "test_csv",
+			format:    "delimited",
+			delimiter: ",",
+			content:   []byte("humidity,temperature\n40,31.2"),
+		},
+	}
+	// Create a stream context for testing
+	contextLogger := conf.Log.WithField("rule", "testFields")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			tf, _ := transform.GenTransform("", tt.format, "", tt.delimiter, tt.fields)
+			vCtx := context.WithValue(ctx, context.TransKey, tf)
+			// Create a temporary file for testing
+			tmpfile, err := os.CreateTemp("", tt.fname)
+			if err != nil {
+				t.Fatal(err)
+			}
+			defer os.Remove(tmpfile.Name())
+			// Create a file sink with the temporary file path
+			sink := &fileSink{}
+			err = sink.Configure(map[string]interface{}{
+				"path":               tmpfile.Name(),
+				"fileType":           tt.ft,
+				"hasHeader":          true,
+				"format":             tt.format,
+				"rollingNamePattern": "none",
+				"fields":             tt.fields,
+			})
+			if err != nil {
+				t.Fatal(err)
+			}
+			err = sink.Open(ctx)
+			if err != nil {
+				t.Fatal(err)
+			}
+			// Test collecting a map item
+			m := map[string]interface{}{"humidity": 40, "temperature": 31.2}
+			if err := sink.Collect(vCtx, m); err != nil {
+				t.Errorf("unexpected error: %s", err)
+			}
+			if err = sink.Close(ctx); err != nil {
+				t.Errorf("unexpected close error: %s", err)
+			}
+			// Read the contents of the temporary file and check if they match the collected items
+			contents, err := os.ReadFile(tmpfile.Name())
+			if err != nil {
+				t.Fatal(err)
+			}
+			if !reflect.DeepEqual(contents, tt.content) {
+				t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
+			}
+		})
+	}
+}
+
 // Test file rolling by time
 func TestFileSinkRolling_Collect(t *testing.T) {
 	// Remove existing files

+ 7 - 0
internal/topo/node/sink_node_test.go

@@ -523,6 +523,13 @@ func TestSinkFields_Apply(t *testing.T) {
 			result:    [][]byte{[]byte(`1,2`)},
 		},
 		{
+			format:    "delimited",
+			delimiter: ",",
+			fields:    []string{"b", "c", "a"},
+			data:      map[string]interface{}{"a": "1", "b": "2", "c": "3"},
+			result:    [][]byte{[]byte(`2,3,1`)},
+		},
+		{
 			format:   "json",
 			schemaId: "",
 			fields:   []string{"ax", "bx"},

+ 2 - 1
internal/topo/transform/template.go

@@ -22,6 +22,7 @@ import (
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/converter"
+	"github.com/lf-edge/ekuiper/internal/converter/delimited"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
@@ -51,6 +52,7 @@ func GenTransform(dt string, format string, schemaId string, delimiter string, f
 		if err != nil {
 			return nil, err
 		}
+		c.(*delimited.Converter).SetColumns(fields)
 	case message.FormatJson:
 		c, err = converter.GetOrCreateConverter(&ast.Options{FORMAT: format})
 		if err != nil {
@@ -120,7 +122,6 @@ func GenTransform(dt string, format string, schemaId string, delimiter string, f
 				}
 				d = m
 			}
-			// TODO: if headers are defined by user, find a way to keep the order
 			outBytes, err := c.Encode(d)
 			return outBytes, transformed || selected, err
 		default: // should not happen