file_sink.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "context"
  18. "fmt"
  19. "os"
  20. "sync"
  21. "time"
  22. "github.com/lf-edge/ekuiper/sdk/go/api"
  23. )
  24. type fileSink struct {
  25. interval int
  26. path string
  27. results [][]byte
  28. file *os.File
  29. mux sync.Mutex
  30. cancel context.CancelFunc
  31. }
  32. func (m *fileSink) Configure(props map[string]interface{}) error {
  33. m.interval = 1000
  34. m.path = "cache"
  35. if i, ok := props["interval"]; ok {
  36. if i, ok := i.(float64); ok {
  37. m.interval = int(i)
  38. }
  39. }
  40. if i, ok := props["path"]; ok {
  41. if i, ok := i.(string); ok {
  42. m.path = i
  43. }
  44. }
  45. return nil
  46. }
  47. func (m *fileSink) Open(ctx api.StreamContext) error {
  48. logger := ctx.GetLogger()
  49. logger.Debug("Opening file sink")
  50. m.results = make([][]byte, 0)
  51. var f *os.File
  52. var err error
  53. if _, err := os.Stat(m.path); os.IsNotExist(err) {
  54. _, err = os.Create(m.path)
  55. }
  56. f, err = os.OpenFile(m.path, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
  57. if err != nil {
  58. return fmt.Errorf("fail to open file sink for %v", err)
  59. }
  60. m.file = f
  61. t := time.NewTicker(time.Duration(m.interval) * time.Millisecond)
  62. exeCtx, cancel := ctx.WithCancel()
  63. m.cancel = cancel
  64. go func() {
  65. defer t.Stop()
  66. for {
  67. select {
  68. case <-t.C:
  69. m.save(logger)
  70. case <-exeCtx.Done():
  71. logger.Info("file sink done")
  72. return
  73. }
  74. }
  75. }()
  76. return nil
  77. }
  78. func (m *fileSink) save(logger api.Logger) {
  79. if len(m.results) == 0 {
  80. return
  81. }
  82. logger.Debugf("file sink is saving to file %s", m.path)
  83. var strings []string
  84. m.mux.Lock()
  85. for _, b := range m.results {
  86. strings = append(strings, string(b)+"\n")
  87. }
  88. m.results = make([][]byte, 0)
  89. m.mux.Unlock()
  90. w := bufio.NewWriter(m.file)
  91. for _, s := range strings {
  92. _, err := m.file.WriteString(s)
  93. if err != nil {
  94. logger.Errorf("file sink fails to write out result '%s' with error %s.", s, err)
  95. }
  96. }
  97. w.Flush()
  98. logger.Debugf("file sink has saved to file %s", m.path)
  99. }
  100. func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
  101. logger := ctx.GetLogger()
  102. if v, ok := item.([]byte); ok {
  103. logger.Debugf("file sink receive %s", item)
  104. m.mux.Lock()
  105. m.results = append(m.results, v)
  106. m.mux.Unlock()
  107. } else {
  108. logger.Debug("file sink receive non byte data")
  109. }
  110. return nil
  111. }
  112. func (m *fileSink) Close(ctx api.StreamContext) error {
  113. if m.cancel != nil {
  114. m.cancel()
  115. }
  116. if m.file != nil {
  117. m.save(ctx.GetLogger())
  118. return m.file.Close()
  119. }
  120. return nil
  121. }