image.go 4.8 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "image/jpeg"
  20. "image/png"
  21. "os"
  22. "path/filepath"
  23. "strings"
  24. "time"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. )
  27. type imageSink struct {
  28. path string
  29. format string
  30. maxAge int
  31. maxCount int
  32. cancel context.CancelFunc
  33. }
  34. func (m *imageSink) Configure(props map[string]interface{}) error {
  35. if i, ok := props["imageFormat"]; ok {
  36. if i, ok := i.(string); ok {
  37. if "png" != i && "jpeg" != i {
  38. return fmt.Errorf("%s image type is not currently supported", i)
  39. }
  40. m.format = i
  41. }
  42. } else {
  43. return fmt.Errorf("Field not found format.")
  44. }
  45. if i, ok := props["path"]; ok {
  46. if i, ok := i.(string); ok {
  47. m.path = i
  48. } else {
  49. return fmt.Errorf("%s image type is not supported", i)
  50. }
  51. } else {
  52. return fmt.Errorf("Field not found path.")
  53. }
  54. m.maxAge = 72
  55. if i, ok := props["maxAge"]; ok {
  56. if i, ok := i.(int); ok {
  57. m.maxAge = i
  58. }
  59. }
  60. m.maxCount = 1000
  61. if i, ok := props["maxCount"]; ok {
  62. if i, ok := i.(int); ok {
  63. m.maxCount = i
  64. }
  65. }
  66. return nil
  67. }
  68. func (m *imageSink) Open(ctx api.StreamContext) error {
  69. logger := ctx.GetLogger()
  70. logger.Debug("Opening image sink")
  71. if _, err := os.Stat(m.path); os.IsNotExist(err) {
  72. if err := os.MkdirAll(m.path, os.ModePerm); nil != err {
  73. return fmt.Errorf("fail to open image sink for %v", err)
  74. }
  75. }
  76. t := time.NewTicker(time.Duration(3) * time.Minute)
  77. exeCtx, cancel := ctx.WithCancel()
  78. m.cancel = cancel
  79. go func() {
  80. defer t.Stop()
  81. for {
  82. select {
  83. case <-t.C:
  84. m.delFile(logger)
  85. case <-exeCtx.Done():
  86. logger.Info("image sink done")
  87. return
  88. }
  89. }
  90. }()
  91. return nil
  92. }
  93. func (m *imageSink) delFile(logger api.Logger) error {
  94. dirEntries, err := os.ReadDir(m.path)
  95. if nil != err || 0 == len(dirEntries) {
  96. return err
  97. }
  98. files := make([]os.FileInfo, 0, len(dirEntries))
  99. for _, entry := range dirEntries {
  100. info, err := entry.Info()
  101. if err != nil {
  102. continue
  103. }
  104. files = append(files, info)
  105. }
  106. pos := m.maxCount
  107. delTime := time.Now().Add(time.Duration(0-m.maxAge) * time.Hour)
  108. for i := 0; i < len(files); i++ {
  109. for j := i + 1; j < len(files); j++ {
  110. if files[i].ModTime().Before(files[j].ModTime()) {
  111. files[i], files[j] = files[j], files[i]
  112. }
  113. }
  114. if files[i].ModTime().Before(delTime) && i < pos {
  115. pos = i
  116. break
  117. }
  118. }
  119. for i := pos; i < len(files); i++ {
  120. fname := files[i].Name()
  121. if strings.HasSuffix(fname, m.format) {
  122. fpath := filepath.Join(m.path, fname)
  123. os.Remove(fpath)
  124. }
  125. }
  126. return nil
  127. }
  128. func (m *imageSink) getSuffix() string {
  129. now := time.Now()
  130. year, month, day := now.Date()
  131. hour, minute, second := now.Clock()
  132. nsecond := now.Nanosecond()
  133. return fmt.Sprintf(`%d-%d-%d_%d-%d-%d-%d`, year, month, day, hour, minute, second, nsecond)
  134. }
  135. func (m *imageSink) saveFile(b []byte, fpath string) error {
  136. reader := bytes.NewReader(b)
  137. fp, err := os.Create(fpath)
  138. if nil != err {
  139. return err
  140. }
  141. defer fp.Close()
  142. if "png" == m.format {
  143. if img, err := png.Decode(reader); nil != err {
  144. return err
  145. } else if err = png.Encode(fp, img); nil != err {
  146. return err
  147. }
  148. } else if "jpeg" == m.format {
  149. if img, err := jpeg.Decode(reader); nil != err {
  150. return err
  151. } else if err = jpeg.Encode(fp, img, nil); nil != err {
  152. return err
  153. }
  154. }
  155. return nil
  156. }
  157. func (m *imageSink) saveFiles(images map[string]interface{}) error {
  158. for k, v := range images {
  159. image, ok := v.([]byte)
  160. if !ok {
  161. return fmt.Errorf("found none bytes data %v for path %s", image, k)
  162. }
  163. suffix := m.getSuffix()
  164. fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.format)
  165. fpath := filepath.Join(m.path, fname)
  166. m.saveFile(image, fpath)
  167. }
  168. return nil
  169. }
  170. func (m *imageSink) Collect(ctx api.StreamContext, item interface{}) error {
  171. logger := ctx.GetLogger()
  172. switch v := item.(type) {
  173. case []map[string]interface{}:
  174. var outer error
  175. for _, vm := range v {
  176. err := m.saveFiles(vm)
  177. if err != nil {
  178. outer = err
  179. logger.Error(err)
  180. }
  181. }
  182. return outer
  183. case map[string]interface{}:
  184. return m.saveFiles(v)
  185. default:
  186. fmt.Errorf("image sink receive invalid data %v", item)
  187. }
  188. return nil
  189. }
  190. func (m *imageSink) Close(ctx api.StreamContext) error {
  191. if m.cancel != nil {
  192. m.cancel()
  193. }
  194. return m.delFile(ctx.GetLogger())
  195. }
  196. func Image() api.Sink {
  197. return &imageSink{}
  198. }