123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- // Copyright 2021-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package file
- import (
- "bufio"
- "encoding/csv"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/lf-edge/ekuiper/internal/compressor"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/xsql"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/cast"
- )
- type FileSourceConfig struct {
- FileType FileType `json:"fileType"`
- Path string `json:"path"`
- Interval int `json:"interval"`
- IsTable bool `json:"isTable"`
- Parallel bool `json:"parallel"`
- SendInterval int `json:"sendInterval"`
- ActionAfterRead int `json:"actionAfterRead"`
- MoveTo string `json:"moveTo"`
- HasHeader bool `json:"hasHeader"`
- Columns []string `json:"columns"`
- IgnoreStartLines int `json:"ignoreStartLines"`
- IgnoreEndLines int `json:"ignoreEndLines"`
- Delimiter string `json:"delimiter"`
- Decompression string `json:"decompression"`
- }
- // FileSource The BATCH to load data from file at once
- type FileSource struct {
- file string
- isDir bool
- config *FileSourceConfig
- }
- func (fs *FileSource) Close(ctx api.StreamContext) error {
- ctx.GetLogger().Infof("Close file source")
- // do nothing
- return nil
- }
- func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error {
- cfg := &FileSourceConfig{
- FileType: JSON_TYPE,
- }
- err := cast.MapToStruct(props, cfg)
- if err != nil {
- return fmt.Errorf("read properties %v fail with error: %v", props, err)
- }
- if cfg.FileType == "" {
- return errors.New("missing or invalid property fileType, must be 'json'")
- }
- if _, ok := fileTypes[cfg.FileType]; !ok {
- return fmt.Errorf("invalid property fileType: %s", cfg.FileType)
- }
- if cfg.Path == "" {
- return errors.New("missing property Path")
- }
- if !filepath.IsAbs(cfg.Path) {
- cfg.Path, err = conf.GetLoc(cfg.Path)
- if err != nil {
- return fmt.Errorf("invalid path %s", cfg.Path)
- }
- }
- if fileName != "/$$TEST_CONNECTION$$" {
- fs.file = filepath.Join(cfg.Path, fileName)
- fi, err := os.Stat(fs.file)
- if err != nil {
- if os.IsNotExist(err) {
- return fmt.Errorf("file %s not exist", fs.file)
- }
- }
- if fi.IsDir() {
- fs.isDir = true
- }
- }
- if cfg.IgnoreStartLines < 0 {
- cfg.IgnoreStartLines = 0
- }
- if cfg.IgnoreEndLines < 0 {
- cfg.IgnoreEndLines = 0
- }
- if cfg.ActionAfterRead < 0 || cfg.ActionAfterRead > 2 {
- return fmt.Errorf("invalid actionAfterRead: %d", cfg.ActionAfterRead)
- }
- if cfg.ActionAfterRead == 2 {
- if cfg.MoveTo == "" {
- return fmt.Errorf("missing moveTo when actionAfterRead is 2")
- } else {
- if !filepath.IsAbs(cfg.MoveTo) {
- cfg.MoveTo, err = conf.GetLoc(cfg.MoveTo)
- if err != nil {
- return fmt.Errorf("invalid moveTo %s: %v", cfg.MoveTo, err)
- }
- }
- fileInfo, err := os.Stat(cfg.MoveTo)
- if err != nil {
- err := os.MkdirAll(cfg.MoveTo, os.ModePerm)
- if err != nil {
- return fmt.Errorf("fail to create dir for moveTo %s: %v", cfg.MoveTo, err)
- }
- } else if !fileInfo.IsDir() {
- return fmt.Errorf("moveTo %s is not a directory", cfg.MoveTo)
- }
- }
- }
- if cfg.Delimiter == "" {
- cfg.Delimiter = ","
- }
- if _, ok := compressionTypes[cfg.Decompression]; !ok && cfg.Decompression != "" {
- return fmt.Errorf("decompression must be one of gzip, zstd")
- }
- fs.config = cfg
- return nil
- }
- func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
- err := fs.Load(ctx, consumer)
- if err != nil {
- select {
- case consumer <- &xsql.ErrorSourceTuple{Error: err}:
- ctx.GetLogger().Errorf("find error when loading file %s with err %v", fs.file, err)
- case <-ctx.Done():
- return
- }
- }
- if fs.config.Interval > 0 {
- ticker := time.NewTicker(time.Millisecond * time.Duration(fs.config.Interval))
- logger := ctx.GetLogger()
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- logger.Debugf("Load file source again at %v", conf.GetNowInMilli())
- err := fs.Load(ctx, consumer)
- if err != nil {
- errCh <- err
- return
- }
- case <-ctx.Done():
- return
- }
- }
- }
- }
- func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error {
- rcvTime := conf.GetNow()
- if fs.isDir {
- ctx.GetLogger().Debugf("Monitor dir %s", fs.file)
- entries, err := os.ReadDir(fs.file)
- if err != nil {
- return err
- }
- if fs.config.Parallel {
- var wg sync.WaitGroup
- for _, entry := range entries {
- if entry.IsDir() {
- continue
- }
- wg.Add(1)
- go func(file string) {
- defer wg.Done()
- err := fs.parseFile(ctx, file, consumer)
- if err != nil {
- ctx.GetLogger().Errorf("Failed to parse file %s: %v", file, err)
- }
- }(filepath.Join(fs.file, entry.Name()))
- }
- wg.Wait()
- } else {
- for _, entry := range entries {
- if entry.IsDir() {
- continue
- }
- file := filepath.Join(fs.file, entry.Name())
- err := fs.parseFile(ctx, file, consumer)
- if err != nil {
- ctx.GetLogger().Errorf("parse file %s fail with error: %v", file, err)
- continue
- }
- }
- }
- } else {
- err := fs.parseFile(ctx, fs.file, consumer)
- if err != nil {
- return err
- }
- }
- // Send EOF if retain size not set if used in table
- if fs.config.IsTable {
- select {
- case consumer <- api.NewDefaultSourceTupleWithTime(nil, nil, rcvTime):
- // do nothing
- case <-ctx.Done():
- return nil
- }
- }
- ctx.GetLogger().Debug("All tuples sent")
- return nil
- }
- func (fs *FileSource) parseFile(ctx api.StreamContext, file string, consumer chan<- api.SourceTuple) (result error) {
- r, err := fs.prepareFile(ctx, file)
- if err != nil {
- ctx.GetLogger().Debugf("prepare file %s error: %v", file, err)
- return err
- }
- meta := map[string]interface{}{
- "file": file,
- }
- defer func() {
- ctx.GetLogger().Debugf("Finish loading from file %s", file)
- if closer, ok := r.(io.Closer); ok {
- ctx.GetLogger().Debugf("Close reader")
- closer.Close()
- }
- if result == nil {
- switch fs.config.ActionAfterRead {
- case 1:
- if err := os.Remove(file); err != nil {
- result = err
- }
- ctx.GetLogger().Debugf("Remove file %s", file)
- case 2:
- targetFile := filepath.Join(fs.config.MoveTo, filepath.Base(file))
- if err := os.Rename(file, targetFile); err != nil {
- result = err
- }
- ctx.GetLogger().Debugf("Move file %s to %s", file, targetFile)
- }
- }
- }()
- return fs.publish(ctx, r, consumer, meta)
- }
- func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer chan<- api.SourceTuple, meta map[string]interface{}) error {
- ctx.GetLogger().Debug("Start to load")
- rcvTime := conf.GetNow()
- switch fs.config.FileType {
- case JSON_TYPE:
- r := json.NewDecoder(file)
- resultMap := make([]map[string]interface{}, 0)
- err := r.Decode(&resultMap)
- if err != nil {
- return fmt.Errorf("loaded %s, check error %s", fs.file, err)
- }
- ctx.GetLogger().Debug("Sending tuples")
- for _, m := range resultMap {
- select {
- case consumer <- api.NewDefaultSourceTupleWithTime(m, meta, rcvTime):
- case <-ctx.Done():
- return nil
- }
- if fs.config.SendInterval > 0 {
- time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
- }
- rcvTime = conf.GetNow()
- }
- return nil
- case CSV_TYPE:
- r := csv.NewReader(file)
- r.Comma = rune(fs.config.Delimiter[0])
- r.TrimLeadingSpace = true
- r.FieldsPerRecord = -1
- cols := fs.config.Columns
- if fs.config.HasHeader {
- var err error
- ctx.GetLogger().Debug("Has header")
- cols, err = r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- ctx.GetLogger().Warnf("Read file %s encounter error: %v", fs.file, err)
- return err
- }
- ctx.GetLogger().Debugf("Got header %v", cols)
- }
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- ctx.GetLogger().Warnf("Read file %s encounter error: %v", fs.file, err)
- continue
- }
- ctx.GetLogger().Debugf("Read" + strings.Join(record, ","))
- var m map[string]interface{}
- if cols == nil {
- m = make(map[string]interface{}, len(record))
- for i, v := range record {
- m["cols"+strconv.Itoa(i)] = v
- }
- } else {
- m = make(map[string]interface{}, len(cols))
- for i, v := range cols {
- m[v] = record[i]
- }
- }
- select {
- case consumer <- api.NewDefaultSourceTupleWithTime(m, meta, rcvTime):
- case <-ctx.Done():
- return nil
- }
- if fs.config.SendInterval > 0 {
- time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
- }
- rcvTime = conf.GetNow()
- }
- case LINES_TYPE:
- scanner := bufio.NewScanner(file)
- scanner.Split(bufio.ScanLines)
- for scanner.Scan() {
- var tuples []api.SourceTuple
- m, err := ctx.DecodeIntoList(scanner.Bytes())
- if err != nil {
- tuples = []api.SourceTuple{&xsql.ErrorSourceTuple{
- Error: fmt.Errorf("Invalid data format, cannot decode %s with error %s", scanner.Text(), err),
- }}
- } else {
- for _, t := range m {
- tuples = append(tuples, api.NewDefaultSourceTupleWithTime(t, meta, rcvTime))
- }
- }
- for _, tuple := range tuples {
- select {
- case consumer <- tuple:
- case <-ctx.Done():
- return nil
- }
- }
- if fs.config.SendInterval > 0 {
- time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
- }
- rcvTime = conf.GetNow()
- }
- default:
- return fmt.Errorf("invalid file type %s", fs.config.FileType)
- }
- return nil
- }
- // prepareFile prepare file by deleting ignore lines
- func (fs *FileSource) prepareFile(ctx api.StreamContext, file string) (io.Reader, error) {
- f, err := os.Open(file)
- if err != nil {
- ctx.GetLogger().Error(err)
- return nil, err
- }
- var reader io.ReadCloser
- if fs.config.Decompression != "" {
- reader, err = compressor.GetDecompressReader(fs.config.Decompression, f)
- if err != nil {
- return nil, err
- }
- } else {
- reader = f
- }
- if fs.config.IgnoreStartLines > 0 || fs.config.IgnoreEndLines > 0 {
- r, w := io.Pipe()
- go func() {
- defer func() {
- ctx.GetLogger().Debugf("Close pipe files %s", file)
- w.Close()
- reader.Close()
- }()
- scanner := bufio.NewScanner(reader)
- scanner.Split(bufio.ScanLines)
- ln := 0
- // This is a queue to store the lines that should be ignored
- tempLines := make([][]byte, 0, fs.config.IgnoreEndLines)
- for scanner.Scan() {
- if ln >= fs.config.IgnoreStartLines {
- if fs.config.IgnoreEndLines > 0 { // the last n line are left in the tempLines
- slot := (ln - fs.config.IgnoreStartLines) % fs.config.IgnoreEndLines
- if len(tempLines) <= slot { // first round
- tempLines = append(tempLines, scanner.Bytes())
- } else {
- _, err := w.Write(tempLines[slot])
- if err != nil {
- ctx.GetLogger().Error(err)
- break
- }
- _, err = w.Write([]byte{'\n'})
- if err != nil {
- ctx.GetLogger().Error(err)
- break
- }
- tempLines[slot] = scanner.Bytes()
- }
- } else {
- _, err = w.Write(scanner.Bytes())
- if err != nil {
- ctx.GetLogger().Error(err)
- break
- }
- _, err = w.Write([]byte{'\n'})
- if err != nil {
- ctx.GetLogger().Error(err)
- break
- }
- }
- }
- ln++
- }
- }()
- return r, nil
- }
- return reader, nil
- }
|