file_sink.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "errors"
  17. "fmt"
  18. "path/filepath"
  19. "sort"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/cast"
  26. "github.com/lf-edge/ekuiper/pkg/message"
  27. )
  28. type sinkConf struct {
  29. Interval *int `json:"interval"` // deprecated, will remove in the next release
  30. RollingInterval int64 `json:"rollingInterval"`
  31. RollingCount int `json:"rollingCount"`
  32. RollingNamePattern string `json:"rollingNamePattern"` // where to add the timestamp to the file name
  33. CheckInterval *int64 `json:"checkInterval"` // Once interval removed, this will be NOT nullable
  34. Path string `json:"path"` // support dynamic property, when rolling, make sure the path is updated
  35. FileType FileType `json:"fileType"`
  36. HasHeader bool `json:"hasHeader"`
  37. Delimiter string `json:"delimiter"`
  38. Format string `json:"format"` // only use for validation; transformation is done in sink_node
  39. Compression string `json:"compression"`
  40. Fields []string `json:"fields"` // only use for extracting header for csv; transformation is done in sink_node
  41. }
  42. type fileSink struct {
  43. c *sinkConf
  44. mux sync.Mutex
  45. fws map[string]*fileWriter
  46. }
  47. func (m *fileSink) Configure(props map[string]interface{}) error {
  48. c := &sinkConf{
  49. RollingCount: 1000000,
  50. Path: "cache",
  51. FileType: LINES_TYPE,
  52. }
  53. if err := cast.MapToStruct(props, c); err != nil {
  54. return err
  55. }
  56. if c.Interval != nil {
  57. if *c.Interval < 0 {
  58. return fmt.Errorf("interval must be positive")
  59. } else if c.CheckInterval == nil {
  60. conf.Log.Warnf("interval is deprecated, use checkInterval instead. automatically set checkInterval to %d", c.Interval)
  61. t := int64(*c.Interval)
  62. c.CheckInterval = &t
  63. } else {
  64. conf.Log.Warnf("interval is deprecated and ignored, use checkInterval instead.")
  65. }
  66. } else if c.CheckInterval == nil { // set checkInterval default value if both interval and checkInerval are not set
  67. t := (5 * time.Minute).Milliseconds()
  68. c.CheckInterval = &t
  69. }
  70. if c.RollingInterval < 0 {
  71. return fmt.Errorf("rollingInterval must be positive")
  72. }
  73. if c.RollingCount < 0 {
  74. return fmt.Errorf("rollingCount must be positive")
  75. }
  76. if *c.CheckInterval < 0 {
  77. return fmt.Errorf("checkInterval must be positive")
  78. }
  79. if c.RollingInterval == 0 && c.RollingCount == 0 {
  80. return fmt.Errorf("one of rollingInterval and rollingCount must be set")
  81. }
  82. if c.RollingNamePattern != "" && c.RollingNamePattern != "prefix" && c.RollingNamePattern != "suffix" && c.RollingNamePattern != "none" {
  83. return fmt.Errorf("rollingNamePattern must be one of prefix, suffix or none")
  84. }
  85. if c.Path == "" {
  86. return fmt.Errorf("path must be set")
  87. }
  88. if c.FileType != JSON_TYPE && c.FileType != CSV_TYPE && c.FileType != LINES_TYPE {
  89. return fmt.Errorf("fileType must be one of json, csv or lines")
  90. }
  91. if c.FileType == CSV_TYPE {
  92. if c.Format != message.FormatDelimited {
  93. return fmt.Errorf("format must be delimited when fileType is csv")
  94. }
  95. if c.Delimiter == "" {
  96. conf.Log.Warnf("delimiter is not set, use default ','")
  97. c.Delimiter = ","
  98. }
  99. }
  100. if _, ok := compressionTypes[c.Compression]; !ok && c.Compression != "" {
  101. return fmt.Errorf("compression must be one of gzip, zstd")
  102. }
  103. m.c = c
  104. m.fws = make(map[string]*fileWriter)
  105. return nil
  106. }
  107. func (m *fileSink) Open(ctx api.StreamContext) error {
  108. ctx.GetLogger().Debug("Opening file sink")
  109. // Check if the files have opened longer than the rolling interval, if so close it and create a new one
  110. if *m.c.CheckInterval > 0 {
  111. t := conf.GetTicker(*m.c.CheckInterval)
  112. go func() {
  113. defer t.Stop()
  114. for {
  115. select {
  116. case now := <-t.C:
  117. m.mux.Lock()
  118. for k, v := range m.fws {
  119. if now.Sub(v.Start) > time.Duration(m.c.RollingInterval)*time.Millisecond {
  120. ctx.GetLogger().Debugf("rolling file %s", k)
  121. err := v.Close(ctx)
  122. // TODO how to inform this error to the rule
  123. if err != nil {
  124. ctx.GetLogger().Errorf("file sink fails to close file %s with error %s.", k, err)
  125. }
  126. delete(m.fws, k)
  127. // The file will be created when the next item comes
  128. v.Written = false
  129. }
  130. }
  131. m.mux.Unlock()
  132. case <-ctx.Done():
  133. ctx.GetLogger().Info("file sink done")
  134. return
  135. }
  136. }
  137. }()
  138. }
  139. return nil
  140. }
  141. func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
  142. ctx.GetLogger().Debugf("file sink receive %s", item)
  143. fn, err := ctx.ParseTemplate(m.c.Path, item)
  144. if err != nil {
  145. return err
  146. }
  147. fw, err := m.GetFws(ctx, fn, item)
  148. if err != nil {
  149. return err
  150. }
  151. if v, _, err := ctx.TransformOutput(item); err == nil {
  152. ctx.GetLogger().Debugf("file sink transform data %s", v)
  153. m.mux.Lock()
  154. defer m.mux.Unlock()
  155. if fw.Written {
  156. _, e := fw.Writer.Write(fw.Hook.Line())
  157. if e != nil {
  158. return e
  159. }
  160. } else {
  161. fw.Written = true
  162. }
  163. _, e := fw.Writer.Write(v)
  164. if e != nil {
  165. return e
  166. }
  167. if m.c.RollingCount > 0 {
  168. fw.Count++
  169. if fw.Count >= m.c.RollingCount {
  170. e = fw.Close(ctx)
  171. if e != nil {
  172. return e
  173. }
  174. delete(m.fws, fn)
  175. fw.Count = 0
  176. fw.Written = false
  177. }
  178. }
  179. } else {
  180. return fmt.Errorf("file sink transform data error: %v", err)
  181. }
  182. return nil
  183. }
  184. func (m *fileSink) Close(ctx api.StreamContext) error {
  185. ctx.GetLogger().Infof("Closing file sink")
  186. var errs []error
  187. for k, v := range m.fws {
  188. if e := v.Close(ctx); e != nil {
  189. ctx.GetLogger().Errorf("failed to close file %s: %v", k, e)
  190. errs = append(errs, e)
  191. }
  192. }
  193. return errors.Join(errs...)
  194. }
  195. // GetFws returns the file writer for the given file name, if the file writer does not exist, it will create one
  196. // The item is used to get the csv header if needed
  197. func (m *fileSink) GetFws(ctx api.StreamContext, fn string, item interface{}) (*fileWriter, error) {
  198. m.mux.Lock()
  199. defer m.mux.Unlock()
  200. fws, ok := m.fws[fn]
  201. if !ok {
  202. var e error
  203. // extract header for csv
  204. var headers string
  205. if m.c.FileType == CSV_TYPE && m.c.HasHeader {
  206. var header []string
  207. if len(m.c.Fields) > 0 {
  208. header = m.c.Fields
  209. } else {
  210. switch v := item.(type) {
  211. case map[string]interface{}:
  212. header = make([]string, len(v))
  213. i := 0
  214. for k := range item.(map[string]interface{}) {
  215. header[i] = k
  216. i++
  217. }
  218. case []map[string]interface{}:
  219. if len(v) > 0 {
  220. header = make([]string, len(v[0]))
  221. i := 0
  222. for k := range v[0] {
  223. header[i] = k
  224. i++
  225. }
  226. }
  227. }
  228. sort.Strings(header)
  229. }
  230. headers = strings.Join(header, m.c.Delimiter)
  231. }
  232. nfn := fn
  233. if m.c.RollingNamePattern != "" {
  234. switch m.c.RollingNamePattern {
  235. case "prefix":
  236. nfn = fmt.Sprintf("%d-%s", conf.GetNowInMilli(), fn)
  237. case "suffix":
  238. ext := filepath.Ext(fn)
  239. nfn = fmt.Sprintf("%s-%d%s", strings.TrimSuffix(fn, ext), conf.GetNowInMilli(), ext)
  240. }
  241. }
  242. fws, e = createFileWriter(ctx, nfn, m.c.FileType, headers, m.c.Compression)
  243. if e != nil {
  244. return nil, e
  245. }
  246. m.fws[fn] = fws
  247. }
  248. return fws, nil
  249. }
  250. func File() api.Sink {
  251. return &fileSink{}
  252. }