image.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/lf-edge/ekuiper/pkg/api"
  8. "image/jpeg"
  9. "image/png"
  10. "io/ioutil"
  11. "os"
  12. "path/filepath"
  13. "strings"
  14. "time"
  15. )
  16. type imageSink struct {
  17. path string
  18. format string
  19. maxAge int
  20. maxCount int
  21. cancel context.CancelFunc
  22. }
  23. func (m *imageSink) Configure(props map[string]interface{}) error {
  24. if i, ok := props["format"]; ok {
  25. if i, ok := i.(string); ok {
  26. if "png" != i && "jpeg" != i {
  27. return fmt.Errorf("%s image type is not currently supported", i)
  28. }
  29. m.format = i
  30. }
  31. } else {
  32. return fmt.Errorf("Field not found format.")
  33. }
  34. if i, ok := props["path"]; ok {
  35. if i, ok := i.(string); ok {
  36. m.path = i
  37. } else {
  38. return fmt.Errorf("%s image type is not supported", i)
  39. }
  40. } else {
  41. return fmt.Errorf("Field not found path.")
  42. }
  43. m.maxAge = 72
  44. if i, ok := props["maxAge"]; ok {
  45. if i, ok := i.(int); ok {
  46. m.maxAge = i
  47. }
  48. }
  49. m.maxCount = 1000
  50. if i, ok := props["maxCount"]; ok {
  51. if i, ok := i.(int); ok {
  52. m.maxCount = i
  53. }
  54. }
  55. return nil
  56. }
  57. func (m *imageSink) Open(ctx api.StreamContext) error {
  58. logger := ctx.GetLogger()
  59. logger.Debug("Opening image sink")
  60. if _, err := os.Stat(m.path); os.IsNotExist(err) {
  61. if err := os.MkdirAll(m.path, os.ModePerm); nil != err {
  62. return fmt.Errorf("fail to open image sink for %v", err)
  63. }
  64. }
  65. t := time.NewTicker(time.Duration(3) * time.Minute)
  66. exeCtx, cancel := ctx.WithCancel()
  67. m.cancel = cancel
  68. go func() {
  69. defer t.Stop()
  70. for {
  71. select {
  72. case <-t.C:
  73. m.delFile(logger)
  74. case <-exeCtx.Done():
  75. logger.Info("image sink done")
  76. return
  77. }
  78. }
  79. }()
  80. return nil
  81. }
  82. func (m *imageSink) delFile(logger api.Logger) error {
  83. files, err := ioutil.ReadDir(m.path)
  84. if nil != err || 0 == len(files) {
  85. return err
  86. }
  87. pos := m.maxCount
  88. delTime := time.Now().Add(time.Duration(0-m.maxAge) * time.Hour)
  89. for i := 0; i < len(files); i++ {
  90. for j := i + 1; j < len(files); j++ {
  91. if files[i].ModTime().Before(files[j].ModTime()) {
  92. files[i], files[j] = files[j], files[i]
  93. }
  94. }
  95. if files[i].ModTime().Before(delTime) && i < pos {
  96. pos = i
  97. break
  98. }
  99. }
  100. for i := pos; i < len(files); i++ {
  101. fname := files[i].Name()
  102. if strings.HasSuffix(fname, m.format) {
  103. fpath := filepath.Join(m.path, fname)
  104. os.Remove(fpath)
  105. }
  106. }
  107. return nil
  108. }
  109. func (m *imageSink) getSuffix() string {
  110. now := time.Now()
  111. year, month, day := now.Date()
  112. hour, minute, second := now.Clock()
  113. nsecond := now.Nanosecond()
  114. return fmt.Sprintf(`%d-%d-%d_%d-%d-%d-%d`, year, month, day, hour, minute, second, nsecond)
  115. }
  116. func (m *imageSink) saveFile(b []byte, fpath string) error {
  117. reader := bytes.NewReader(b)
  118. fp, err := os.Create(fpath)
  119. if nil != err {
  120. return err
  121. }
  122. defer fp.Close()
  123. if "png" == m.format {
  124. if img, err := png.Decode(reader); nil != err {
  125. return err
  126. } else if err = png.Encode(fp, img); nil != err {
  127. return err
  128. }
  129. } else if "jpeg" == m.format {
  130. if img, err := jpeg.Decode(reader); nil != err {
  131. return err
  132. } else if err = jpeg.Encode(fp, img, nil); nil != err {
  133. return err
  134. }
  135. }
  136. return nil
  137. }
  138. func (m *imageSink) saveFiles(msg []map[string][]byte) error {
  139. for _, images := range msg {
  140. for k, v := range images {
  141. suffix := m.getSuffix()
  142. fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.format)
  143. fpath := filepath.Join(m.path, fname)
  144. m.saveFile(v, fpath)
  145. }
  146. }
  147. return nil
  148. }
  149. func (m *imageSink) Collect(ctx api.StreamContext, item interface{}) error {
  150. logger := ctx.GetLogger()
  151. if v, ok := item.([]byte); ok {
  152. var msg []map[string][]byte
  153. if err := json.Unmarshal(v, &msg); nil != err {
  154. return fmt.Errorf("The sink only accepts bytea field, other types are not supported.")
  155. }
  156. return m.saveFiles(msg)
  157. } else {
  158. logger.Debug("image sink receive non byte data")
  159. }
  160. return nil
  161. }
  162. func (m *imageSink) Close(ctx api.StreamContext) error {
  163. if m.cancel != nil {
  164. m.cancel()
  165. }
  166. return m.delFile(ctx.GetLogger())
  167. }
  168. func Image() api.Sink {
  169. return &imageSink{}
  170. }