file.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package main
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "os"
  8. "sync"
  9. "time"
  10. )
  11. type fileSink struct {
  12. interval int
  13. path string
  14. results [][]byte
  15. file *os.File
  16. mux sync.Mutex
  17. cancel context.CancelFunc
  18. }
  19. func (m *fileSink) Configure(props map[string]interface{}) error {
  20. m.interval = 1000
  21. m.path = "cache"
  22. if i, ok := props["interval"]; ok {
  23. if i, ok := i.(float64); ok {
  24. m.interval = int(i)
  25. }
  26. }
  27. if i, ok := props["path"]; ok {
  28. if i, ok := i.(string); ok {
  29. m.path = i
  30. }
  31. }
  32. return nil
  33. }
  34. func (m *fileSink) Open(ctx api.StreamContext) error {
  35. logger := ctx.GetLogger()
  36. logger.Debug("Opening file sink")
  37. m.results = make([][]byte, 0)
  38. var f *os.File
  39. var err error
  40. if _, err := os.Stat(m.path); os.IsNotExist(err) {
  41. _, err = os.Create(m.path)
  42. }
  43. f, err = os.OpenFile(m.path, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
  44. if err != nil {
  45. return fmt.Errorf("fail to open file sink for %v", err)
  46. }
  47. m.file = f
  48. t := time.NewTicker(time.Duration(m.interval) * time.Millisecond)
  49. exeCtx, cancel := ctx.WithCancel()
  50. m.cancel = cancel
  51. go func() {
  52. defer t.Stop()
  53. for {
  54. select {
  55. case <-t.C:
  56. m.save(logger)
  57. case <-exeCtx.Done():
  58. logger.Info("file sink done")
  59. return
  60. }
  61. }
  62. }()
  63. return nil
  64. }
  65. func (m *fileSink) save(logger api.Logger) {
  66. if len(m.results) == 0 {
  67. return
  68. }
  69. logger.Debugf("file sink is saving to file %s", m.path)
  70. var strings []string
  71. m.mux.Lock()
  72. for _, b := range m.results {
  73. strings = append(strings, string(b)+"\n")
  74. }
  75. m.results = make([][]byte, 0)
  76. m.mux.Unlock()
  77. w := bufio.NewWriter(m.file)
  78. for _, s := range strings {
  79. _, err := m.file.WriteString(s)
  80. if err != nil {
  81. logger.Errorf("file sink fails to write out result '%s' with error %s.", s, err)
  82. }
  83. }
  84. w.Flush()
  85. logger.Debugf("file sink has saved to file %s", m.path)
  86. }
  87. func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
  88. logger := ctx.GetLogger()
  89. if v, ok := item.([]byte); ok {
  90. logger.Debugf("file sink receive %s", item)
  91. m.mux.Lock()
  92. m.results = append(m.results, v)
  93. m.mux.Unlock()
  94. } else {
  95. logger.Debug("file sink receive non byte data")
  96. }
  97. return nil
  98. }
  99. func (m *fileSink) Close(ctx api.StreamContext) error {
  100. if m.cancel != nil {
  101. m.cancel()
  102. }
  103. if m.file != nil {
  104. m.save(ctx.GetLogger())
  105. return m.file.Close()
  106. }
  107. return nil
  108. }
  109. func File() api.Sink {
  110. return &fileSink{}
  111. }