file_sink.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "bufio"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "github.com/lf-edge/ekuiper/pkg/message"
  22. "io"
  23. "os"
  24. "strings"
  25. "sync"
  26. "time"
  27. )
  28. type sinkConf struct {
  29. Interval int `json:"interval"`
  30. Path string `json:"path"`
  31. FileType FileType `json:"fileType"`
  32. HasHeader bool `json:"hasHeader"`
  33. Delimiter string `json:"delimiter"`
  34. Format string `json:"format"` // only use for validation; transformation is done in sink_node
  35. }
  36. type fileSink struct {
  37. c *sinkConf
  38. // If firstLine is true, it means the file is newly created and the first line is not written yet.
  39. // Do not write line feed for the first line.
  40. firstLine bool
  41. mux sync.Mutex
  42. file *os.File
  43. writer io.Writer
  44. hook writerHooks
  45. }
  46. func (m *fileSink) Configure(props map[string]interface{}) error {
  47. c := &sinkConf{
  48. Interval: 1000,
  49. Path: "cache",
  50. FileType: LINES_TYPE,
  51. }
  52. if err := cast.MapToStruct(props, c); err != nil {
  53. return err
  54. }
  55. if c.Interval <= 0 {
  56. return fmt.Errorf("interval must be positive")
  57. }
  58. if c.Path == "" {
  59. return fmt.Errorf("path must be set")
  60. }
  61. if c.FileType != JSON_TYPE && c.FileType != CSV_TYPE && c.FileType != LINES_TYPE {
  62. return fmt.Errorf("fileType must be one of json, csv or lines")
  63. }
  64. if c.FileType == CSV_TYPE {
  65. if c.Format != message.FormatDelimited {
  66. return fmt.Errorf("format must be delimited when fileType is csv")
  67. }
  68. if c.Delimiter == "" {
  69. conf.Log.Warnf("delimiter is not set, use default ','")
  70. c.Delimiter = ","
  71. }
  72. }
  73. m.c = c
  74. return nil
  75. }
  76. func (m *fileSink) Open(ctx api.StreamContext) error {
  77. logger := ctx.GetLogger()
  78. logger.Debug("Opening file sink")
  79. var (
  80. f *os.File
  81. err error
  82. )
  83. if _, err = os.Stat(m.c.Path); os.IsNotExist(err) {
  84. _, err = os.Create(m.c.Path)
  85. }
  86. f, err = os.OpenFile(m.c.Path, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
  87. if err != nil {
  88. return fmt.Errorf("fail to open file sink for %v", err)
  89. }
  90. m.file = f
  91. m.firstLine = true
  92. switch m.c.FileType {
  93. case JSON_TYPE:
  94. m.hook = &jsonWriterHooks{}
  95. case CSV_TYPE:
  96. m.hook = &csvWriterHooks{}
  97. case LINES_TYPE:
  98. m.hook = &linesWriterHooks{}
  99. }
  100. if m.c.Interval > 0 {
  101. m.writer = bufio.NewWriter(f)
  102. t := time.NewTicker(time.Duration(m.c.Interval) * time.Millisecond)
  103. go func() {
  104. defer t.Stop()
  105. for {
  106. select {
  107. case <-t.C:
  108. m.mux.Lock()
  109. err := m.writer.(*bufio.Writer).Flush()
  110. if err != nil {
  111. logger.Errorf("file sink fails to flush with error %s.", err)
  112. }
  113. m.mux.Unlock()
  114. case <-ctx.Done():
  115. logger.Info("file sink done")
  116. return
  117. }
  118. }
  119. }()
  120. } else {
  121. m.writer = f
  122. }
  123. return nil
  124. }
  125. func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
  126. logger := ctx.GetLogger()
  127. logger.Debugf("file sink receive %s", item)
  128. // extract header for csv
  129. if m.c.FileType == CSV_TYPE && m.c.HasHeader && m.hook.Header() == nil {
  130. var header []string
  131. switch v := item.(type) {
  132. case map[string]interface{}:
  133. header = make([]string, len(v))
  134. for k := range item.(map[string]interface{}) {
  135. header = append(header, k)
  136. }
  137. case []map[string]interface{}:
  138. if len(v) > 0 {
  139. header = make([]string, len(v[0]))
  140. for k := range v[0] {
  141. header = append(header, k)
  142. }
  143. }
  144. }
  145. m.hook.(*csvWriterHooks).SetHeader(strings.Join(header, m.c.Delimiter))
  146. }
  147. if v, _, err := ctx.TransformOutput(item); err == nil {
  148. logger.Debugf("file sink transform data %s", v)
  149. m.mux.Lock()
  150. defer m.mux.Unlock()
  151. if !m.firstLine {
  152. _, e := m.writer.Write(m.hook.Line())
  153. if e != nil {
  154. return err
  155. }
  156. } else {
  157. n, err := m.writer.Write(m.hook.Header())
  158. if err != nil {
  159. return err
  160. }
  161. if n > 0 {
  162. _, e := m.writer.Write(m.hook.Line())
  163. if e != nil {
  164. return err
  165. }
  166. }
  167. m.firstLine = false
  168. }
  169. _, e := m.writer.Write(v)
  170. if e != nil {
  171. return err
  172. }
  173. } else {
  174. return fmt.Errorf("file sink transform data error: %v", err)
  175. }
  176. return nil
  177. }
  178. func (m *fileSink) Close(ctx api.StreamContext) error {
  179. ctx.GetLogger().Infof("Closing file sink")
  180. if m.file != nil {
  181. ctx.GetLogger().Infof("File sync before close")
  182. _, e := m.writer.Write(m.hook.Footer())
  183. if e != nil {
  184. ctx.GetLogger().Errorf("file sink fails to write footer with error %s.", e)
  185. }
  186. if m.c.Interval > 0 {
  187. ctx.GetLogger().Infof("flush at close")
  188. m.writer.(*bufio.Writer).Flush()
  189. }
  190. m.file.Sync()
  191. return m.file.Close()
  192. }
  193. return nil
  194. }
  195. func File() api.Sink {
  196. return &fileSink{}
  197. }