|
@@ -15,38 +15,56 @@
|
|
|
package source
|
|
|
|
|
|
import (
|
|
|
+ "bufio"
|
|
|
+ "encoding/csv"
|
|
|
+ "encoding/json"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
|
- "github.com/lf-edge/ekuiper/internal/pkg/filex"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/xsql"
|
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
|
"github.com/lf-edge/ekuiper/pkg/cast"
|
|
|
+ "io"
|
|
|
"os"
|
|
|
- "path"
|
|
|
"path/filepath"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
type FileType string
|
|
|
|
|
|
const (
|
|
|
- JSON_TYPE FileType = "json"
|
|
|
+ JSON_TYPE FileType = "json"
|
|
|
+ CSV_TYPE FileType = "csv"
|
|
|
+ LINES_TYPE FileType = "lines"
|
|
|
)
|
|
|
|
|
|
-var fileTypes = map[FileType]bool{
|
|
|
- JSON_TYPE: true,
|
|
|
+var fileTypes = map[FileType]struct{}{
|
|
|
+ JSON_TYPE: {},
|
|
|
+ CSV_TYPE: {},
|
|
|
+ LINES_TYPE: {},
|
|
|
}
|
|
|
|
|
|
type FileSourceConfig struct {
|
|
|
- FileType FileType `json:"fileType"`
|
|
|
- Path string `json:"path"`
|
|
|
- Interval int `json:"interval"`
|
|
|
- RetainSize int `json:"$retainSize"`
|
|
|
+ FileType FileType `json:"fileType"`
|
|
|
+ Path string `json:"path"`
|
|
|
+ Interval int `json:"interval"`
|
|
|
+ IsTable bool `json:"isTable"`
|
|
|
+ SendInterval int `json:"sendInterval"`
|
|
|
+ ActionAfterRead int `json:"actionAfterRead"`
|
|
|
+ MoveTo string `json:"moveTo"`
|
|
|
+ HasHeader bool `json:"hasHeader"`
|
|
|
+ Columns []string `json:"columns"`
|
|
|
+ IgnoreStartLines int `json:"ignoreStartLines"`
|
|
|
+ IgnoreEndLines int `json:"ignoreEndLines"`
|
|
|
+ Delimiter string `json:"delimiter"`
|
|
|
}
|
|
|
|
|
|
-// The BATCH to load data from file at once
|
|
|
+// FileSource The BATCH to load data from file at once
|
|
|
type FileSource struct {
|
|
|
file string
|
|
|
+ isDir bool
|
|
|
config *FileSourceConfig
|
|
|
}
|
|
|
|
|
@@ -57,7 +75,9 @@ func (fs *FileSource) Close(ctx api.StreamContext) error {
|
|
|
}
|
|
|
|
|
|
func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error {
|
|
|
- cfg := &FileSourceConfig{}
|
|
|
+ cfg := &FileSourceConfig{
|
|
|
+ FileType: JSON_TYPE,
|
|
|
+ }
|
|
|
err := cast.MapToStruct(props, cfg)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("read properties %v fail with error: %v", props, err)
|
|
@@ -71,9 +91,6 @@ func (fs *FileSource) Configure(fileName string, props map[string]interface{}) e
|
|
|
if cfg.Path == "" {
|
|
|
return errors.New("missing property Path")
|
|
|
}
|
|
|
- if fileName == "" {
|
|
|
- return errors.New("file name must be specified")
|
|
|
- }
|
|
|
if !filepath.IsAbs(cfg.Path) {
|
|
|
cfg.Path, err = conf.GetLoc(cfg.Path)
|
|
|
if err != nil {
|
|
@@ -81,15 +98,49 @@ func (fs *FileSource) Configure(fileName string, props map[string]interface{}) e
|
|
|
}
|
|
|
}
|
|
|
if fileName != "/$$TEST_CONNECTION$$" {
|
|
|
- fs.file = path.Join(cfg.Path, fileName)
|
|
|
-
|
|
|
- if fi, err := os.Stat(fs.file); err != nil {
|
|
|
+ fs.file = filepath.Join(cfg.Path, fileName)
|
|
|
+ fi, err := os.Stat(fs.file)
|
|
|
+ if err != nil {
|
|
|
if os.IsNotExist(err) {
|
|
|
return fmt.Errorf("file %s not exist", fs.file)
|
|
|
- } else if !fi.Mode().IsRegular() {
|
|
|
- return fmt.Errorf("file %s is not a regular file", fs.file)
|
|
|
}
|
|
|
}
|
|
|
+ if fi.IsDir() {
|
|
|
+ fs.isDir = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if cfg.IgnoreStartLines < 0 {
|
|
|
+ cfg.IgnoreStartLines = 0
|
|
|
+ }
|
|
|
+ if cfg.IgnoreEndLines < 0 {
|
|
|
+ cfg.IgnoreEndLines = 0
|
|
|
+ }
|
|
|
+ if cfg.ActionAfterRead < 0 || cfg.ActionAfterRead > 2 {
|
|
|
+ return fmt.Errorf("invalid actionAfterRead: %d", cfg.ActionAfterRead)
|
|
|
+ }
|
|
|
+ if cfg.ActionAfterRead == 2 {
|
|
|
+ if cfg.MoveTo == "" {
|
|
|
+ return fmt.Errorf("missing moveTo when actionAfterRead is 2")
|
|
|
+ } else {
|
|
|
+ if !filepath.IsAbs(cfg.MoveTo) {
|
|
|
+ cfg.MoveTo, err = conf.GetLoc(cfg.MoveTo)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("invalid moveTo %s: %v", cfg.MoveTo, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ fileInfo, err := os.Stat(cfg.MoveTo)
|
|
|
+ if err != nil {
|
|
|
+ err := os.MkdirAll(cfg.MoveTo, os.ModePerm)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("fail to create dir for moveTo %s: %v", cfg.MoveTo, err)
|
|
|
+ }
|
|
|
+ } else if !fileInfo.IsDir() {
|
|
|
+ return fmt.Errorf("moveTo %s is not a directory", cfg.MoveTo)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if cfg.Delimiter == "" {
|
|
|
+ cfg.Delimiter = ","
|
|
|
}
|
|
|
fs.config = cfg
|
|
|
return nil
|
|
@@ -98,8 +149,12 @@ func (fs *FileSource) Configure(fileName string, props map[string]interface{}) e
|
|
|
func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
|
|
|
err := fs.Load(ctx, consumer)
|
|
|
if err != nil {
|
|
|
- errCh <- err
|
|
|
- return
|
|
|
+ select {
|
|
|
+ case consumer <- &xsql.ErrorSourceTuple{Error: err}:
|
|
|
+ ctx.GetLogger().Errorf("find error when loading file %s with err %v", fs.file, err)
|
|
|
+ case <-ctx.Done():
|
|
|
+ return
|
|
|
+ }
|
|
|
}
|
|
|
if fs.config.Interval > 0 {
|
|
|
ticker := time.NewTicker(time.Millisecond * time.Duration(fs.config.Interval))
|
|
@@ -122,38 +177,213 @@ func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl
|
|
|
}
|
|
|
|
|
|
func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error {
|
|
|
+ if fs.isDir {
|
|
|
+ ctx.GetLogger().Debugf("Monitor dir %s", fs.file)
|
|
|
+ entries, err := os.ReadDir(fs.file)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ for _, entry := range entries {
|
|
|
+ if entry.IsDir() {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ file := filepath.Join(fs.file, entry.Name())
|
|
|
+ err := fs.parseFile(ctx, file, consumer)
|
|
|
+ if err != nil {
|
|
|
+ ctx.GetLogger().Errorf("parse file %s fail with error: %v", file, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ err := fs.parseFile(ctx, fs.file, consumer)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Send EOF if retain size not set if used in table
|
|
|
+ if fs.config.IsTable {
|
|
|
+ select {
|
|
|
+ case consumer <- api.NewDefaultSourceTuple(nil, nil):
|
|
|
+ // do nothing
|
|
|
+ case <-ctx.Done():
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ctx.GetLogger().Debug("All tuples sent")
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (fs *FileSource) parseFile(ctx api.StreamContext, file string, consumer chan<- api.SourceTuple) (result error) {
|
|
|
+ r, err := fs.prepareFile(ctx, file)
|
|
|
+ if err != nil {
|
|
|
+ ctx.GetLogger().Debugf("prepare file %s error: %v", file, err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ meta := map[string]interface{}{
|
|
|
+ "file": file,
|
|
|
+ }
|
|
|
+ defer func() {
|
|
|
+ ctx.GetLogger().Debugf("Finish loading from file %s", file)
|
|
|
+ if closer, ok := r.(io.Closer); ok {
|
|
|
+ ctx.GetLogger().Debugf("Close reader")
|
|
|
+ closer.Close()
|
|
|
+ }
|
|
|
+ if result == nil {
|
|
|
+ switch fs.config.ActionAfterRead {
|
|
|
+ case 1:
|
|
|
+ if err := os.Remove(file); err != nil {
|
|
|
+ result = err
|
|
|
+ }
|
|
|
+ ctx.GetLogger().Debugf("Remove file %s", file)
|
|
|
+ case 2:
|
|
|
+ targetFile := filepath.Join(fs.config.MoveTo, filepath.Base(file))
|
|
|
+ if err := os.Rename(file, targetFile); err != nil {
|
|
|
+ result = err
|
|
|
+ }
|
|
|
+ ctx.GetLogger().Debugf("Move file %s to %s", file, targetFile)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ if err := fs.publish(ctx, r, consumer, meta); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer chan<- api.SourceTuple, meta map[string]interface{}) error {
|
|
|
+ ctx.GetLogger().Debug("Start to load")
|
|
|
switch fs.config.FileType {
|
|
|
case JSON_TYPE:
|
|
|
- ctx.GetLogger().Debugf("Start to load from file %s", fs.file)
|
|
|
+ r := json.NewDecoder(file)
|
|
|
resultMap := make([]map[string]interface{}, 0)
|
|
|
- err := filex.ReadJsonUnmarshal(fs.file, &resultMap)
|
|
|
+ err := r.Decode(&resultMap)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("loaded %s, check error %s", fs.file, err)
|
|
|
}
|
|
|
ctx.GetLogger().Debug("Sending tuples")
|
|
|
- if fs.config.RetainSize > 0 && fs.config.RetainSize < len(resultMap) {
|
|
|
- resultMap = resultMap[(len(resultMap) - fs.config.RetainSize):]
|
|
|
- ctx.GetLogger().Debug("Sending tuples for retain size %d", fs.config.RetainSize)
|
|
|
- }
|
|
|
for _, m := range resultMap {
|
|
|
select {
|
|
|
- case consumer <- api.NewDefaultSourceTuple(m, nil):
|
|
|
- // do nothing
|
|
|
+ case consumer <- api.NewDefaultSourceTuple(m, meta):
|
|
|
case <-ctx.Done():
|
|
|
return nil
|
|
|
}
|
|
|
+ if fs.config.SendInterval > 0 {
|
|
|
+ time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ case CSV_TYPE:
|
|
|
+ r := csv.NewReader(file)
|
|
|
+ r.Comma = rune(fs.config.Delimiter[0])
|
|
|
+ r.TrimLeadingSpace = true
|
|
|
+ r.FieldsPerRecord = -1
|
|
|
+ cols := fs.config.Columns
|
|
|
+ if fs.config.HasHeader {
|
|
|
+ var err error
|
|
|
+ ctx.GetLogger().Debug("Has header")
|
|
|
+ cols, err = r.Read()
|
|
|
+ if err == io.EOF {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ ctx.GetLogger().Warnf("Read file %s encounter error: %v", fs.file, err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ ctx.GetLogger().Debugf("Got header %v", cols)
|
|
|
}
|
|
|
- // Send EOF if retain size not set
|
|
|
- if fs.config.RetainSize == 0 {
|
|
|
+ for {
|
|
|
+ record, err := r.Read()
|
|
|
+ if err == io.EOF {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ ctx.GetLogger().Warnf("Read file %s encounter error: %v", fs.file, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ ctx.GetLogger().Debugf("Read" + strings.Join(record, ","))
|
|
|
+ var m map[string]interface{}
|
|
|
+ if cols == nil {
|
|
|
+ m = make(map[string]interface{}, len(record))
|
|
|
+ for i, v := range record {
|
|
|
+ m["cols"+strconv.Itoa(i)] = v
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ m = make(map[string]interface{}, len(cols))
|
|
|
+ for i, v := range cols {
|
|
|
+ m[v] = record[i]
|
|
|
+ }
|
|
|
+ }
|
|
|
select {
|
|
|
- case consumer <- api.NewDefaultSourceTuple(nil, nil):
|
|
|
- // do nothing
|
|
|
+ case consumer <- api.NewDefaultSourceTuple(m, meta):
|
|
|
case <-ctx.Done():
|
|
|
return nil
|
|
|
}
|
|
|
+ if fs.config.SendInterval > 0 {
|
|
|
+ time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
|
|
|
+ }
|
|
|
}
|
|
|
- ctx.GetLogger().Debug("All tuples sent")
|
|
|
- return nil
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("invalid file type %s", fs.config.FileType)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// prepareFile prepare file by deleting ignore lines
|
|
|
+func (fs *FileSource) prepareFile(ctx api.StreamContext, file string) (io.Reader, error) {
|
|
|
+ f, err := os.Open(file)
|
|
|
+ if err != nil {
|
|
|
+ ctx.GetLogger().Error(err)
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if fs.config.IgnoreStartLines > 0 || fs.config.IgnoreEndLines > 0 {
|
|
|
+ r, w := io.Pipe()
|
|
|
+ go func() {
|
|
|
+ defer func() {
|
|
|
+ ctx.GetLogger().Debugf("Close pipe files %s", file)
|
|
|
+ w.Close()
|
|
|
+ f.Close()
|
|
|
+ }()
|
|
|
+ scanner := bufio.NewScanner(f)
|
|
|
+ scanner.Split(bufio.ScanLines)
|
|
|
+ ln := 0
|
|
|
+ // This is a queue to store the lines that should be ignored
|
|
|
+ tempLines := make([]string, 0, fs.config.IgnoreEndLines)
|
|
|
+ for scanner.Scan() {
|
|
|
+ if ln >= fs.config.IgnoreStartLines {
|
|
|
+ if fs.config.IgnoreEndLines > 0 { // the last n line are left in the tempLines
|
|
|
+ slot := (ln - fs.config.IgnoreStartLines) % fs.config.IgnoreEndLines
|
|
|
+ if len(tempLines) <= slot { // first round
|
|
|
+ tempLines = append(tempLines, scanner.Text())
|
|
|
+ } else {
|
|
|
+ _, err := w.Write([]byte(tempLines[slot]))
|
|
|
+ if err != nil {
|
|
|
+ ctx.GetLogger().Error(err)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ _, err = w.Write([]byte{'\n'})
|
|
|
+ if err != nil {
|
|
|
+ ctx.GetLogger().Error(err)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ tempLines[slot] = scanner.Text()
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ _, err = w.Write(scanner.Bytes())
|
|
|
+ if err != nil {
|
|
|
+ ctx.GetLogger().Error(err)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ _, err = w.Write([]byte{'\n'})
|
|
|
+ if err != nil {
|
|
|
+ ctx.GetLogger().Error(err)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ln++
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ return r, nil
|
|
|
}
|
|
|
- return fmt.Errorf("invalid file type %s", fs.config.FileType)
|
|
|
+ return f, nil
|
|
|
}
|