Ver código fonte

feat(source): file source add support for lines format

Typical usage json lines file.
Also support binary lines file like protobuf serialization lines

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 anos atrás
pai
commit
0335602187

+ 5 - 0
internal/topo/mock/test_source.go

@@ -16,7 +16,10 @@ package mock
 
 import (
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/converter"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"reflect"
 	"sync/atomic"
 	"testing"
@@ -32,6 +35,8 @@ func TestSourceOpen(r api.Source, exp []api.SourceTuple, t *testing.T) {
 		c = 0
 	}
 	ctx, cancel := NewMockContext(fmt.Sprintf("rule%d", c), "op1").WithCancel()
+	cv, _ := converter.GetOrCreateConverter(&ast.Options{FORMAT: "json"})
+	ctx = context.WithValue(ctx.(*context.DefaultContext), context.DecodeKey, cv)
 	count.Store(c.(int) + 1)
 	consumer := make(chan api.SourceTuple)
 	errCh := make(chan error)

+ 22 - 0
internal/topo/source/file_source.go

@@ -322,6 +322,28 @@ func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer ch
 				time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
 			}
 		}
+	case LINES_TYPE:
+		scanner := bufio.NewScanner(file)
+		scanner.Split(bufio.ScanLines)
+		for scanner.Scan() {
+			var tuple api.SourceTuple
+			m, err := ctx.Decode(scanner.Bytes())
+			if err != nil {
+				tuple = &xsql.ErrorSourceTuple{
+					Error: fmt.Errorf("Invalid data format, cannot decode %s with error %s", scanner.Text(), err),
+				}
+			} else {
+				tuple = api.NewDefaultSourceTuple(m, meta)
+			}
+			select {
+			case consumer <- tuple:
+			case <-ctx.Done():
+				return nil
+			}
+			if fs.config.SendInterval > 0 {
+				time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
+			}
+		}
 	default:
 		return fmt.Errorf("invalid file type %s", fs.config.FileType)
 	}

+ 28 - 0
internal/topo/source/file_source_test.go

@@ -76,6 +76,8 @@ func TestJsonFolder(t *testing.T) {
 		return
 	}
 	mock.TestSourceOpen(r, exp, t)
+	// wait for the move to finish
+	time.Sleep(100 * time.Millisecond)
 	files, err := os.ReadDir(moveToFolder)
 	if err != nil {
 		t.Error(err)
@@ -194,3 +196,29 @@ func TestCSVFile(t *testing.T) {
 	}
 	mock.TestSourceOpen(r, exp, t)
 }
+
+func TestJsonLines(t *testing.T) {
+	path, err := os.Getwd()
+	if err != nil {
+		t.Fatal(err)
+	}
+	meta := map[string]interface{}{
+		"file": filepath.Join(path, "test", "test.lines"),
+	}
+	exp := []api.SourceTuple{
+		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(1), "name": "John Doe"}, meta),
+		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(2), "name": "Jane Doe"}, meta),
+		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(3), "name": "John Smith"}, meta),
+	}
+	p := map[string]interface{}{
+		"path":     filepath.Join(path, "test"),
+		"fileType": "lines",
+	}
+	r := &FileSource{}
+	err = r.Configure("test.lines", p)
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+	mock.TestSourceOpen(r, exp, t)
+}

+ 3 - 0
internal/topo/source/test/test.lines

@@ -0,0 +1,3 @@
+{"id": 1,"name": "John Doe"}
+{"id": 2,"name": "Jane Doe"}
+{"id": 3,"name": "John Smith"}