Browse Source

feat: file soure support enable parallel (#1948)

* feat: file soure support enable parallel

Signed-off-by: L-607 <i@1l.fit>

* test(file): test parallel file source

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* doc(file): add parallel property

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

---------

Signed-off-by: L-607 <i@1l.fit>
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Co-authored-by: Jiyong Huang <huangjy@emqx.io>
L 1 year ago
parent
commit
dfa9b5e16b

+ 2 - 0
docs/en_US/guide/sources/builtin/file.md

@@ -31,6 +31,8 @@ default:
   interval: 0
   # The sending interval between each event in millisecond
   sendInterval: 0
+  # Read the files in a directory in parallel or not
+  parallel: false
   # After read
   # 0: keep the file
   # 1: delete the file

+ 2 - 0
docs/zh_CN/guide/sources/builtin/file.md

@@ -29,6 +29,8 @@ default:
   interval: 0
   # 读取后,两条数据发送的间隔时间
   sendInterval: 0
+  # 是否并行读取目录中的文件
+  parallel: false
   # 文件读取后的操作
   # 0: 文件保持不变
   # 1: 删除文件

+ 2 - 0
etc/sources/file.yaml

@@ -8,6 +8,8 @@ default:
   interval: 0
   # The sending interval between each event in millisecond
   sendInterval: 0
+  # Read the files in a directory in parallel or not
+  parallel: false
   # After read
   # 0: keep the file
   # 1: delete the file

+ 28 - 8
internal/io/file/file_source.go

@@ -25,6 +25,7 @@ import (
 	"path/filepath"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/lf-edge/ekuiper/internal/compressor"
@@ -39,6 +40,7 @@ type FileSourceConfig struct {
 	Path             string   `json:"path"`
 	Interval         int      `json:"interval"`
 	IsTable          bool     `json:"isTable"`
+	Parallel         bool     `json:"parallel"`
 	SendInterval     int      `json:"sendInterval"`
 	ActionAfterRead  int      `json:"actionAfterRead"`
 	MoveTo           string   `json:"moveTo"`
@@ -178,15 +180,33 @@ func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTupl
 		if err != nil {
 			return err
 		}
-		for _, entry := range entries {
-			if entry.IsDir() {
-				continue
+		if fs.config.Parallel {
+			var wg sync.WaitGroup
+			for _, entry := range entries {
+				if entry.IsDir() {
+					continue
+				}
+				wg.Add(1)
+				go func(file string) {
+					defer wg.Done()
+					err := fs.parseFile(ctx, file, consumer)
+					if err != nil {
+						ctx.GetLogger().Errorf("Failed to parse file %s: %v", file, err)
+					}
+				}(filepath.Join(fs.file, entry.Name()))
 			}
-			file := filepath.Join(fs.file, entry.Name())
-			err := fs.parseFile(ctx, file, consumer)
-			if err != nil {
-				ctx.GetLogger().Errorf("parse file %s fail with error: %v", file, err)
-				continue
+			wg.Wait()
+		} else {
+			for _, entry := range entries {
+				if entry.IsDir() {
+					continue
+				}
+				file := filepath.Join(fs.file, entry.Name())
+				err := fs.parseFile(ctx, file, consumer)
+				if err != nil {
+					ctx.GetLogger().Errorf("parse file %s fail with error: %v", file, err)
+					continue
+				}
 			}
 		}
 	} else {

+ 46 - 0
internal/io/file/file_source_test.go

@@ -23,6 +23,7 @@ import (
 	"time"
 
 	"github.com/benbjohnson/clock"
+	"github.com/stretchr/testify/assert"
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
@@ -96,6 +97,51 @@ func TestJsonFolder(t *testing.T) {
 	}
 }
 
+func TestJsonFolderParallel(t *testing.T) {
+	path, err := os.Getwd()
+	if err != nil {
+		t.Fatal(err)
+	}
+	mc := conf.Clock.(*clock.Mock)
+	exp := []api.SourceTuple{
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(1), "name": "John Doe", "height": 1.82}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f1.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(2), "name": "Jane Doe", "height": 1.65}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f1.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(3), "name": "Will Doe", "height": 1.76}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f2.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(4), "name": "Dude Doe", "height": 1.92}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f3.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(5), "name": "Jane Doe", "height": 1.72}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f3.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(6), "name": "John Smith", "height": 2.22}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f3.json")}, mc.Now()),
+	}
+	p := map[string]interface{}{
+		"path":     filepath.Join(path, "test"),
+		"parallel": true,
+	}
+	r := &FileSource{}
+	err = r.Configure("json", p)
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+	result, err := mock.RunMockSource(r, len(exp))
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+	checkIds := make([]bool, len(exp))
+	// The result is not ordered, so we need to check the ids
+	for i, m := range result {
+		id, ok := m.Message()["id"]
+		if !ok {
+			t.Errorf("missing id in message %d: %v", i, r)
+		}
+		idInt := int(id.(float64)) - 1
+		if checkIds[idInt] == true {
+			t.Errorf("id %d already exists", idInt)
+		}
+		checkIds[idInt] = true
+		assert.Equal(t, exp[idInt], m)
+	}
+}
+
 func TestCSVFolder(t *testing.T) {
 	// Move test files to temp folder
 	path, err := os.Getwd()

+ 16 - 13
internal/io/mock/test_source.go

@@ -31,6 +31,18 @@ import (
 var count atomic.Value
 
 func TestSourceOpen(r api.Source, exp []api.SourceTuple, t *testing.T) {
+	result, err := RunMockSource(r, len(exp))
+	if err != nil {
+		t.Error(err)
+	}
+	for i, v := range result {
+		if !reflect.DeepEqual(exp[i].Message(), v.Message()) || !reflect.DeepEqual(exp[i].Meta(), v.Meta()) {
+			t.Errorf("result mismatch:\n  exp=%s\n  got=%s\n\n", exp[i], v)
+		}
+	}
+}
+
+func RunMockSource(r api.Source, limit int) ([]api.SourceTuple, error) {
 	c := count.Load()
 	if c == nil {
 		count.Store(1)
@@ -44,15 +56,13 @@ func TestSourceOpen(r api.Source, exp []api.SourceTuple, t *testing.T) {
 	errCh := make(chan error)
 	go r.Open(ctx, consumer, errCh)
 	ticker := time.After(10 * time.Second)
-	limit := len(exp)
 	var result []api.SourceTuple
 outerloop:
 	for {
 		select {
 		case err := <-errCh:
-			t.Errorf("received error: %v", err)
 			cancel()
-			return
+			return nil, err
 		case tuple := <-consumer:
 			result = append(result, tuple)
 			limit--
@@ -60,21 +70,14 @@ outerloop:
 				break outerloop
 			}
 		case <-ticker:
-			t.Errorf("stop after timeout")
-			t.Errorf("expect %v, but got %v", exp, result)
 			cancel()
-			return
+			return nil, fmt.Errorf("timeout")
 		}
 	}
 	err := r.Close(ctx)
 	if err != nil {
-		t.Errorf(err.Error())
-		return
+		return nil, err
 	}
 	cancel()
-	for i, v := range result {
-		if !reflect.DeepEqual(exp[i].Message(), v.Message()) || !reflect.DeepEqual(exp[i].Meta(), v.Meta()) {
-			t.Errorf("result mismatch:\n  exp=%s\n  got=%s\n\n", exp[i], v)
-		}
-	}
+	return result, nil
 }