file_sink.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "github.com/lf-edge/ekuiper/pkg/message"
  22. "path/filepath"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "time"
  27. )
  28. type sinkConf struct {
  29. Interval *int `json:"interval"` // deprecated, will remove in the next release
  30. RollingInterval int64 `json:"rollingInterval"`
  31. RollingCount int `json:"rollingCount"`
  32. RollingNamePattern string `json:"rollingNamePattern"` // where to add the timestamp to the file name
  33. CheckInterval *int64 `json:"checkInterval"` // Once interval removed, this will be NOT nullable
  34. Path string `json:"path"` // support dynamic property, when rolling, make sure the path is updated
  35. FileType FileType `json:"fileType"`
  36. HasHeader bool `json:"hasHeader"`
  37. Delimiter string `json:"delimiter"`
  38. Format string `json:"format"` // only use for validation; transformation is done in sink_node
  39. }
  40. type fileSink struct {
  41. c *sinkConf
  42. mux sync.Mutex
  43. fws map[string]*fileWriter
  44. }
  45. func (m *fileSink) Configure(props map[string]interface{}) error {
  46. c := &sinkConf{
  47. RollingCount: 1000000,
  48. Path: "cache",
  49. FileType: LINES_TYPE,
  50. }
  51. if err := cast.MapToStruct(props, c); err != nil {
  52. return err
  53. }
  54. if c.Interval != nil {
  55. if *c.Interval < 0 {
  56. return fmt.Errorf("interval must be positive")
  57. } else if c.CheckInterval == nil {
  58. conf.Log.Warnf("interval is deprecated, use checkInterval instead. automatically set checkInterval to %d", c.Interval)
  59. t := int64(*c.Interval)
  60. c.CheckInterval = &t
  61. } else {
  62. conf.Log.Warnf("interval is deprecated and ignored, use checkInterval instead.")
  63. }
  64. } else if c.CheckInterval == nil { // set checkInterval default value if both interval and checkInerval are not set
  65. t := (5 * time.Minute).Milliseconds()
  66. c.CheckInterval = &t
  67. }
  68. if c.RollingInterval < 0 {
  69. return fmt.Errorf("rollingInterval must be positive")
  70. }
  71. if c.RollingCount < 0 {
  72. return fmt.Errorf("rollingCount must be positive")
  73. }
  74. if *c.CheckInterval < 0 {
  75. return fmt.Errorf("checkInterval must be positive")
  76. }
  77. if c.RollingInterval == 0 && c.RollingCount == 0 {
  78. return fmt.Errorf("one of rollingInterval and rollingCount must be set")
  79. }
  80. if c.RollingNamePattern != "" && c.RollingNamePattern != "prefix" && c.RollingNamePattern != "suffix" && c.RollingNamePattern != "none" {
  81. return fmt.Errorf("rollingNamePattern must be one of prefix, suffix or none")
  82. }
  83. if c.Path == "" {
  84. return fmt.Errorf("path must be set")
  85. }
  86. if c.FileType != JSON_TYPE && c.FileType != CSV_TYPE && c.FileType != LINES_TYPE {
  87. return fmt.Errorf("fileType must be one of json, csv or lines")
  88. }
  89. if c.FileType == CSV_TYPE {
  90. if c.Format != message.FormatDelimited {
  91. return fmt.Errorf("format must be delimited when fileType is csv")
  92. }
  93. if c.Delimiter == "" {
  94. conf.Log.Warnf("delimiter is not set, use default ','")
  95. c.Delimiter = ","
  96. }
  97. }
  98. m.c = c
  99. m.fws = make(map[string]*fileWriter)
  100. return nil
  101. }
  102. func (m *fileSink) Open(ctx api.StreamContext) error {
  103. ctx.GetLogger().Debug("Opening file sink")
  104. // Check if the files have opened longer than the rolling interval, if so close it and create a new one
  105. if *m.c.CheckInterval > 0 {
  106. t := conf.GetTicker(int(*m.c.CheckInterval))
  107. go func() {
  108. defer t.Stop()
  109. for {
  110. select {
  111. case now := <-t.C:
  112. m.mux.Lock()
  113. for k, v := range m.fws {
  114. if now.Sub(v.Start) > time.Duration(m.c.RollingInterval)*time.Millisecond {
  115. ctx.GetLogger().Debugf("rolling file %s", k)
  116. err := v.Close(ctx)
  117. // TODO how to inform this error to the rule
  118. if err != nil {
  119. ctx.GetLogger().Errorf("file sink fails to close file %s with error %s.", k, err)
  120. }
  121. delete(m.fws, k)
  122. // The file will be created when the next item comes
  123. }
  124. }
  125. m.mux.Unlock()
  126. case <-ctx.Done():
  127. ctx.GetLogger().Info("file sink done")
  128. return
  129. }
  130. }
  131. }()
  132. }
  133. return nil
  134. }
  135. func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
  136. ctx.GetLogger().Debugf("file sink receive %s", item)
  137. fn, err := ctx.ParseTemplate(m.c.Path, item)
  138. if err != nil {
  139. return err
  140. }
  141. fw, err := m.GetFws(ctx, fn, item)
  142. if err != nil {
  143. return err
  144. }
  145. if v, _, err := ctx.TransformOutput(item); err == nil {
  146. ctx.GetLogger().Debugf("file sink transform data %s", v)
  147. m.mux.Lock()
  148. defer m.mux.Unlock()
  149. _, e := fw.Writer.Write(v)
  150. if e != nil {
  151. return e
  152. }
  153. _, e = fw.Writer.Write(fw.Hook.Line())
  154. if e != nil {
  155. return e
  156. }
  157. if m.c.RollingCount > 0 {
  158. fw.Count++
  159. if fw.Count >= m.c.RollingCount {
  160. e = fw.Close(ctx)
  161. if e != nil {
  162. return e
  163. }
  164. delete(m.fws, fn)
  165. fw.Count = 0
  166. }
  167. }
  168. } else {
  169. return fmt.Errorf("file sink transform data error: %v", err)
  170. }
  171. return nil
  172. }
  173. func (m *fileSink) Close(ctx api.StreamContext) error {
  174. ctx.GetLogger().Infof("Closing file sink")
  175. var errs []error
  176. for k, v := range m.fws {
  177. if e := v.Close(ctx); e != nil {
  178. ctx.GetLogger().Errorf("failed to close file %s: %v", k, e)
  179. errs = append(errs, e)
  180. }
  181. }
  182. return errors.Join(errs...)
  183. }
  184. // GetFws returns the file writer for the given file name, if the file writer does not exist, it will create one
  185. // The item is used to get the csv header if needed
  186. func (m *fileSink) GetFws(ctx api.StreamContext, fn string, item interface{}) (*fileWriter, error) {
  187. m.mux.Lock()
  188. defer m.mux.Unlock()
  189. fws, ok := m.fws[fn]
  190. if !ok {
  191. var e error
  192. // extract header for csv
  193. var headers string
  194. if m.c.FileType == CSV_TYPE && m.c.HasHeader {
  195. var header []string
  196. switch v := item.(type) {
  197. case map[string]interface{}:
  198. header = make([]string, len(v))
  199. i := 0
  200. for k := range item.(map[string]interface{}) {
  201. header[i] = k
  202. i++
  203. }
  204. case []map[string]interface{}:
  205. if len(v) > 0 {
  206. header = make([]string, len(v[0]))
  207. i := 0
  208. for k := range v[0] {
  209. header[i] = k
  210. i++
  211. }
  212. }
  213. }
  214. sort.Strings(header)
  215. headers = strings.Join(header, m.c.Delimiter)
  216. }
  217. nfn := fn
  218. if m.c.RollingNamePattern != "" {
  219. switch m.c.RollingNamePattern {
  220. case "prefix":
  221. nfn = fmt.Sprintf("%d-%s", conf.GetNowInMilli(), fn)
  222. case "suffix":
  223. ext := filepath.Ext(fn)
  224. nfn = fmt.Sprintf("%s-%d%s", strings.TrimSuffix(fn, ext), conf.GetNowInMilli(), ext)
  225. }
  226. }
  227. fws, e = createFileWriter(ctx, nfn, m.c.FileType, headers)
  228. if e != nil {
  229. return nil, e
  230. }
  231. m.fws[fn] = fws
  232. }
  233. return fws, nil
  234. }
  235. func File() api.Sink {
  236. return &fileSink{}
  237. }