file_sink.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "errors"
  17. "fmt"
  18. "path/filepath"
  19. "sort"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/cast"
  26. "github.com/lf-edge/ekuiper/pkg/message"
  27. )
  28. type sinkConf struct {
  29. RollingInterval int64 `json:"rollingInterval"`
  30. RollingCount int `json:"rollingCount"`
  31. RollingNamePattern string `json:"rollingNamePattern"` // where to add the timestamp to the file name
  32. CheckInterval *int64 `json:"checkInterval"` // Once interval removed, this will be NOT nullable
  33. Path string `json:"path"` // support dynamic property, when rolling, make sure the path is updated
  34. FileType FileType `json:"fileType"`
  35. HasHeader bool `json:"hasHeader"`
  36. Delimiter string `json:"delimiter"`
  37. Format string `json:"format"` // only use for validation; transformation is done in sink_node
  38. Compression string `json:"compression"`
  39. Fields []string `json:"fields"` // only use for extracting header for csv; transformation is done in sink_node
  40. }
  41. type fileSink struct {
  42. c *sinkConf
  43. mux sync.Mutex
  44. fws map[string]*fileWriter
  45. }
  46. func (m *fileSink) Configure(props map[string]interface{}) error {
  47. c := &sinkConf{
  48. RollingCount: 1000000,
  49. Path: "cache",
  50. FileType: LINES_TYPE,
  51. }
  52. if err := cast.MapToStruct(props, c); err != nil {
  53. return err
  54. }
  55. if c.CheckInterval == nil { // set checkInterval default value if both interval and checkInerval are not set
  56. t := (5 * time.Minute).Milliseconds()
  57. c.CheckInterval = &t
  58. }
  59. if c.RollingInterval < 0 {
  60. return fmt.Errorf("rollingInterval must be positive")
  61. }
  62. if c.RollingCount < 0 {
  63. return fmt.Errorf("rollingCount must be positive")
  64. }
  65. if *c.CheckInterval < 0 {
  66. return fmt.Errorf("checkInterval must be positive")
  67. }
  68. if c.RollingInterval == 0 && c.RollingCount == 0 {
  69. return fmt.Errorf("one of rollingInterval and rollingCount must be set")
  70. }
  71. if c.RollingNamePattern != "" && c.RollingNamePattern != "prefix" && c.RollingNamePattern != "suffix" && c.RollingNamePattern != "none" {
  72. return fmt.Errorf("rollingNamePattern must be one of prefix, suffix or none")
  73. }
  74. if c.Path == "" {
  75. return fmt.Errorf("path must be set")
  76. }
  77. if c.FileType != JSON_TYPE && c.FileType != CSV_TYPE && c.FileType != LINES_TYPE {
  78. return fmt.Errorf("fileType must be one of json, csv or lines")
  79. }
  80. if c.FileType == CSV_TYPE {
  81. if c.Format != message.FormatDelimited {
  82. return fmt.Errorf("format must be delimited when fileType is csv")
  83. }
  84. if c.Delimiter == "" {
  85. conf.Log.Warnf("delimiter is not set, use default ','")
  86. c.Delimiter = ","
  87. }
  88. }
  89. if _, ok := compressionTypes[c.Compression]; !ok && c.Compression != "" {
  90. return fmt.Errorf("compression must be one of gzip, zstd")
  91. }
  92. m.c = c
  93. m.fws = make(map[string]*fileWriter)
  94. return nil
  95. }
  96. func (m *fileSink) Open(ctx api.StreamContext) error {
  97. ctx.GetLogger().Debug("Opening file sink")
  98. // Check if the files have opened longer than the rolling interval, if so close it and create a new one
  99. if *m.c.CheckInterval > 0 {
  100. t := conf.GetTicker(*m.c.CheckInterval)
  101. go func() {
  102. defer t.Stop()
  103. for {
  104. select {
  105. case now := <-t.C:
  106. m.mux.Lock()
  107. for k, v := range m.fws {
  108. if now.Sub(v.Start) > time.Duration(m.c.RollingInterval)*time.Millisecond {
  109. ctx.GetLogger().Debugf("rolling file %s", k)
  110. err := v.Close(ctx)
  111. // TODO how to inform this error to the rule
  112. if err != nil {
  113. ctx.GetLogger().Errorf("file sink fails to close file %s with error %s.", k, err)
  114. }
  115. delete(m.fws, k)
  116. // The file will be created when the next item comes
  117. v.Written = false
  118. }
  119. }
  120. m.mux.Unlock()
  121. case <-ctx.Done():
  122. ctx.GetLogger().Info("file sink done")
  123. return
  124. }
  125. }
  126. }()
  127. }
  128. return nil
  129. }
  130. func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
  131. ctx.GetLogger().Debugf("file sink receive %s", item)
  132. fn, err := ctx.ParseTemplate(m.c.Path, item)
  133. if err != nil {
  134. return err
  135. }
  136. fw, err := m.GetFws(ctx, fn, item)
  137. if err != nil {
  138. return err
  139. }
  140. if v, _, err := ctx.TransformOutput(item); err == nil {
  141. ctx.GetLogger().Debugf("file sink transform data %s", v)
  142. m.mux.Lock()
  143. defer m.mux.Unlock()
  144. if fw.Written {
  145. _, e := fw.Writer.Write(fw.Hook.Line())
  146. if e != nil {
  147. return e
  148. }
  149. } else {
  150. fw.Written = true
  151. }
  152. _, e := fw.Writer.Write(v)
  153. if e != nil {
  154. return e
  155. }
  156. if m.c.RollingCount > 0 {
  157. fw.Count++
  158. if fw.Count >= m.c.RollingCount {
  159. e = fw.Close(ctx)
  160. if e != nil {
  161. return e
  162. }
  163. delete(m.fws, fn)
  164. fw.Count = 0
  165. fw.Written = false
  166. }
  167. }
  168. } else {
  169. return fmt.Errorf("file sink transform data error: %v", err)
  170. }
  171. return nil
  172. }
  173. func (m *fileSink) Close(ctx api.StreamContext) error {
  174. ctx.GetLogger().Infof("Closing file sink")
  175. var errs []error
  176. for k, v := range m.fws {
  177. if e := v.Close(ctx); e != nil {
  178. ctx.GetLogger().Errorf("failed to close file %s: %v", k, e)
  179. errs = append(errs, e)
  180. }
  181. }
  182. return errors.Join(errs...)
  183. }
  184. // GetFws returns the file writer for the given file name, if the file writer does not exist, it will create one
  185. // The item is used to get the csv header if needed
  186. func (m *fileSink) GetFws(ctx api.StreamContext, fn string, item interface{}) (*fileWriter, error) {
  187. m.mux.Lock()
  188. defer m.mux.Unlock()
  189. fws, ok := m.fws[fn]
  190. if !ok {
  191. var e error
  192. // extract header for csv
  193. var headers string
  194. if m.c.FileType == CSV_TYPE && m.c.HasHeader {
  195. var header []string
  196. if len(m.c.Fields) > 0 {
  197. header = m.c.Fields
  198. } else {
  199. switch v := item.(type) {
  200. case map[string]interface{}:
  201. header = make([]string, len(v))
  202. i := 0
  203. for k := range item.(map[string]interface{}) {
  204. header[i] = k
  205. i++
  206. }
  207. case []map[string]interface{}:
  208. if len(v) > 0 {
  209. header = make([]string, len(v[0]))
  210. i := 0
  211. for k := range v[0] {
  212. header[i] = k
  213. i++
  214. }
  215. }
  216. }
  217. sort.Strings(header)
  218. }
  219. headers = strings.Join(header, m.c.Delimiter)
  220. }
  221. nfn := fn
  222. if m.c.RollingNamePattern != "" {
  223. newFile := ""
  224. fileDir := filepath.Dir(fn)
  225. fileName := filepath.Base(fn)
  226. switch m.c.RollingNamePattern {
  227. case "prefix":
  228. newFile = fmt.Sprintf("%d-%s", conf.GetNowInMilli(), fileName)
  229. case "suffix":
  230. ext := filepath.Ext(fn)
  231. newFile = fmt.Sprintf("%s-%d%s", strings.TrimSuffix(fileName, ext), conf.GetNowInMilli(), ext)
  232. default:
  233. newFile = fileName
  234. }
  235. nfn = filepath.Join(fileDir, newFile)
  236. }
  237. fws, e = createFileWriter(ctx, nfn, m.c.FileType, headers, m.c.Compression)
  238. if e != nil {
  239. return nil, e
  240. }
  241. m.fws[fn] = fws
  242. }
  243. return fws, nil
  244. }
  245. func File() api.Sink {
  246. return &fileSink{}
  247. }