file_sink.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "errors"
  17. "fmt"
  18. "path/filepath"
  19. "sort"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/cast"
  26. "github.com/lf-edge/ekuiper/pkg/message"
  27. )
  28. type sinkConf struct {
  29. RollingInterval int64 `json:"rollingInterval"`
  30. RollingCount int `json:"rollingCount"`
  31. RollingNamePattern string `json:"rollingNamePattern"` // where to add the timestamp to the file name
  32. CheckInterval int64 `json:"checkInterval"`
  33. Path string `json:"path"` // support dynamic property, when rolling, make sure the path is updated
  34. FileType FileType `json:"fileType"`
  35. HasHeader bool `json:"hasHeader"`
  36. Delimiter string `json:"delimiter"`
  37. Format string `json:"format"` // only use for validation; transformation is done in sink_node
  38. Compression string `json:"compression"`
  39. Fields []string `json:"fields"` // only use for extracting header for csv; transformation is done in sink_node
  40. }
  41. type fileSink struct {
  42. c *sinkConf
  43. mux sync.Mutex
  44. fws map[string]*fileWriter
  45. }
  46. func (m *fileSink) Configure(props map[string]interface{}) error {
  47. c := &sinkConf{
  48. RollingCount: 1000000,
  49. Path: "cache",
  50. FileType: LINES_TYPE,
  51. CheckInterval: (5 * time.Minute).Milliseconds(),
  52. }
  53. if err := cast.MapToStruct(props, c); err != nil {
  54. return err
  55. }
  56. if c.RollingInterval < 0 {
  57. return fmt.Errorf("rollingInterval must be positive")
  58. }
  59. if c.RollingCount < 0 {
  60. return fmt.Errorf("rollingCount must be positive")
  61. }
  62. if c.CheckInterval < 0 {
  63. return fmt.Errorf("checkInterval must be positive")
  64. }
  65. if c.RollingInterval == 0 && c.RollingCount == 0 {
  66. return fmt.Errorf("one of rollingInterval and rollingCount must be set")
  67. }
  68. if c.RollingNamePattern != "" && c.RollingNamePattern != "prefix" && c.RollingNamePattern != "suffix" && c.RollingNamePattern != "none" {
  69. return fmt.Errorf("rollingNamePattern must be one of prefix, suffix or none")
  70. }
  71. if c.Path == "" {
  72. return fmt.Errorf("path must be set")
  73. }
  74. if c.FileType != JSON_TYPE && c.FileType != CSV_TYPE && c.FileType != LINES_TYPE {
  75. return fmt.Errorf("fileType must be one of json, csv or lines")
  76. }
  77. if c.FileType == CSV_TYPE {
  78. if c.Format != message.FormatDelimited {
  79. return fmt.Errorf("format must be delimited when fileType is csv")
  80. }
  81. if c.Delimiter == "" {
  82. conf.Log.Warnf("delimiter is not set, use default ','")
  83. c.Delimiter = ","
  84. }
  85. }
  86. if _, ok := compressionTypes[c.Compression]; !ok && c.Compression != "" {
  87. return fmt.Errorf("compression must be one of gzip, zstd")
  88. }
  89. m.c = c
  90. m.fws = make(map[string]*fileWriter)
  91. return nil
  92. }
  93. func (m *fileSink) Open(ctx api.StreamContext) error {
  94. ctx.GetLogger().Debug("Opening file sink")
  95. // Check if the files have opened longer than the rolling interval, if so close it and create a new one
  96. if m.c.CheckInterval > 0 {
  97. t := conf.GetTicker(m.c.CheckInterval)
  98. go func() {
  99. defer t.Stop()
  100. for {
  101. select {
  102. case now := <-t.C:
  103. m.mux.Lock()
  104. for k, v := range m.fws {
  105. if now.Sub(v.Start) > time.Duration(m.c.RollingInterval)*time.Millisecond {
  106. ctx.GetLogger().Debugf("rolling file %s", k)
  107. err := v.Close(ctx)
  108. // TODO how to inform this error to the rule
  109. if err != nil {
  110. ctx.GetLogger().Errorf("file sink fails to close file %s with error %s.", k, err)
  111. }
  112. delete(m.fws, k)
  113. // The file will be created when the next item comes
  114. v.Written = false
  115. }
  116. }
  117. m.mux.Unlock()
  118. case <-ctx.Done():
  119. ctx.GetLogger().Info("file sink done")
  120. return
  121. }
  122. }
  123. }()
  124. }
  125. return nil
  126. }
  127. func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
  128. ctx.GetLogger().Debugf("file sink receive %s", item)
  129. fn, err := ctx.ParseTemplate(m.c.Path, item)
  130. if err != nil {
  131. return err
  132. }
  133. fw, err := m.GetFws(ctx, fn, item)
  134. if err != nil {
  135. return err
  136. }
  137. if v, _, err := ctx.TransformOutput(item); err == nil {
  138. ctx.GetLogger().Debugf("file sink transform data %s", v)
  139. m.mux.Lock()
  140. defer m.mux.Unlock()
  141. if fw.Written {
  142. _, e := fw.Writer.Write(fw.Hook.Line())
  143. if e != nil {
  144. return e
  145. }
  146. } else {
  147. fw.Written = true
  148. }
  149. _, e := fw.Writer.Write(v)
  150. if e != nil {
  151. return e
  152. }
  153. if m.c.RollingCount > 0 {
  154. fw.Count++
  155. if fw.Count >= m.c.RollingCount {
  156. e = fw.Close(ctx)
  157. if e != nil {
  158. return e
  159. }
  160. delete(m.fws, fn)
  161. fw.Count = 0
  162. fw.Written = false
  163. }
  164. }
  165. } else {
  166. return fmt.Errorf("file sink transform data error: %v", err)
  167. }
  168. return nil
  169. }
  170. func (m *fileSink) Close(ctx api.StreamContext) error {
  171. ctx.GetLogger().Infof("Closing file sink")
  172. var errs []error
  173. for k, v := range m.fws {
  174. if e := v.Close(ctx); e != nil {
  175. ctx.GetLogger().Errorf("failed to close file %s: %v", k, e)
  176. errs = append(errs, e)
  177. }
  178. }
  179. return errors.Join(errs...)
  180. }
  181. // GetFws returns the file writer for the given file name, if the file writer does not exist, it will create one
  182. // The item is used to get the csv header if needed
  183. func (m *fileSink) GetFws(ctx api.StreamContext, fn string, item interface{}) (*fileWriter, error) {
  184. m.mux.Lock()
  185. defer m.mux.Unlock()
  186. fws, ok := m.fws[fn]
  187. if !ok {
  188. var e error
  189. // extract header for csv
  190. var headers string
  191. if m.c.FileType == CSV_TYPE && m.c.HasHeader {
  192. var header []string
  193. if len(m.c.Fields) > 0 {
  194. header = m.c.Fields
  195. } else {
  196. switch v := item.(type) {
  197. case map[string]interface{}:
  198. header = make([]string, len(v))
  199. i := 0
  200. for k := range item.(map[string]interface{}) {
  201. header[i] = k
  202. i++
  203. }
  204. case []map[string]interface{}:
  205. if len(v) > 0 {
  206. header = make([]string, len(v[0]))
  207. i := 0
  208. for k := range v[0] {
  209. header[i] = k
  210. i++
  211. }
  212. }
  213. }
  214. sort.Strings(header)
  215. }
  216. headers = strings.Join(header, m.c.Delimiter)
  217. }
  218. nfn := fn
  219. if m.c.RollingNamePattern != "" {
  220. newFile := ""
  221. fileDir := filepath.Dir(fn)
  222. fileName := filepath.Base(fn)
  223. switch m.c.RollingNamePattern {
  224. case "prefix":
  225. newFile = fmt.Sprintf("%d-%s", conf.GetNowInMilli(), fileName)
  226. case "suffix":
  227. ext := filepath.Ext(fn)
  228. newFile = fmt.Sprintf("%s-%d%s", strings.TrimSuffix(fileName, ext), conf.GetNowInMilli(), ext)
  229. default:
  230. newFile = fileName
  231. }
  232. nfn = filepath.Join(fileDir, newFile)
  233. }
  234. fws, e = createFileWriter(ctx, nfn, m.c.FileType, headers, m.c.Compression)
  235. if e != nil {
  236. return nil, e
  237. }
  238. m.fws[fn] = fws
  239. }
  240. return fws, nil
  241. }
  242. func File() api.Sink {
  243. return &fileSink{}
  244. }