123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- // Copyright 2021-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package file
- import (
- "bufio"
- "fmt"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/cast"
- "github.com/lf-edge/ekuiper/pkg/message"
- "io"
- "os"
- "strings"
- "sync"
- "time"
- )
- type sinkConf struct {
- Interval int `json:"interval"`
- Path string `json:"path"`
- FileType FileType `json:"fileType"`
- HasHeader bool `json:"hasHeader"`
- Delimiter string `json:"delimiter"`
- Format string `json:"format"` // only use for validation; transformation is done in sink_node
- }
- type fileSink struct {
- c *sinkConf
- // If firstLine is true, it means the file is newly created and the first line is not written yet.
- // Do not write line feed for the first line.
- firstLine bool
- mux sync.Mutex
- file *os.File
- writer io.Writer
- hook writerHooks
- }
- func (m *fileSink) Configure(props map[string]interface{}) error {
- c := &sinkConf{
- Interval: 1000,
- Path: "cache",
- FileType: LINES_TYPE,
- }
- if err := cast.MapToStruct(props, c); err != nil {
- return err
- }
- if c.Interval <= 0 {
- return fmt.Errorf("interval must be positive")
- }
- if c.Path == "" {
- return fmt.Errorf("path must be set")
- }
- if c.FileType != JSON_TYPE && c.FileType != CSV_TYPE && c.FileType != LINES_TYPE {
- return fmt.Errorf("fileType must be one of json, csv or lines")
- }
- if c.FileType == CSV_TYPE {
- if c.Format != message.FormatDelimited {
- return fmt.Errorf("format must be delimited when fileType is csv")
- }
- if c.Delimiter == "" {
- conf.Log.Warnf("delimiter is not set, use default ','")
- c.Delimiter = ","
- }
- }
- m.c = c
- return nil
- }
- func (m *fileSink) Open(ctx api.StreamContext) error {
- logger := ctx.GetLogger()
- logger.Debug("Opening file sink")
- var (
- f *os.File
- err error
- )
- if _, err = os.Stat(m.c.Path); os.IsNotExist(err) {
- _, err = os.Create(m.c.Path)
- }
- f, err = os.OpenFile(m.c.Path, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
- if err != nil {
- return fmt.Errorf("fail to open file sink for %v", err)
- }
- m.file = f
- m.firstLine = true
- switch m.c.FileType {
- case JSON_TYPE:
- m.hook = &jsonWriterHooks{}
- case CSV_TYPE:
- m.hook = &csvWriterHooks{}
- case LINES_TYPE:
- m.hook = &linesWriterHooks{}
- }
- if m.c.Interval > 0 {
- m.writer = bufio.NewWriter(f)
- t := time.NewTicker(time.Duration(m.c.Interval) * time.Millisecond)
- go func() {
- defer t.Stop()
- for {
- select {
- case <-t.C:
- m.mux.Lock()
- err := m.writer.(*bufio.Writer).Flush()
- if err != nil {
- logger.Errorf("file sink fails to flush with error %s.", err)
- }
- m.mux.Unlock()
- case <-ctx.Done():
- logger.Info("file sink done")
- return
- }
- }
- }()
- } else {
- m.writer = f
- }
- return nil
- }
- func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
- logger := ctx.GetLogger()
- logger.Debugf("file sink receive %s", item)
- // extract header for csv
- if m.c.FileType == CSV_TYPE && m.c.HasHeader && m.hook.Header() == nil {
- var header []string
- switch v := item.(type) {
- case map[string]interface{}:
- header = make([]string, len(v))
- for k := range item.(map[string]interface{}) {
- header = append(header, k)
- }
- case []map[string]interface{}:
- if len(v) > 0 {
- header = make([]string, len(v[0]))
- for k := range v[0] {
- header = append(header, k)
- }
- }
- }
- m.hook.(*csvWriterHooks).SetHeader(strings.Join(header, m.c.Delimiter))
- }
- if v, _, err := ctx.TransformOutput(item); err == nil {
- logger.Debugf("file sink transform data %s", v)
- m.mux.Lock()
- defer m.mux.Unlock()
- if !m.firstLine {
- _, e := m.writer.Write(m.hook.Line())
- if e != nil {
- return err
- }
- } else {
- n, err := m.writer.Write(m.hook.Header())
- if err != nil {
- return err
- }
- if n > 0 {
- _, e := m.writer.Write(m.hook.Line())
- if e != nil {
- return err
- }
- }
- m.firstLine = false
- }
- _, e := m.writer.Write(v)
- if e != nil {
- return err
- }
- } else {
- return fmt.Errorf("file sink transform data error: %v", err)
- }
- return nil
- }
- func (m *fileSink) Close(ctx api.StreamContext) error {
- ctx.GetLogger().Infof("Closing file sink")
- if m.file != nil {
- ctx.GetLogger().Infof("File sync before close")
- _, e := m.writer.Write(m.hook.Footer())
- if e != nil {
- ctx.GetLogger().Errorf("file sink fails to write footer with error %s.", e)
- }
- if m.c.Interval > 0 {
- ctx.GetLogger().Infof("flush at close")
- m.writer.(*bufio.Writer).Flush()
- }
- m.file.Sync()
- return m.file.Close()
- }
- return nil
- }
- func File() api.Sink {
- return &fileSink{}
- }
|