image.go 4.6 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "image/jpeg"
  21. "image/png"
  22. "io/ioutil"
  23. "os"
  24. "path/filepath"
  25. "strings"
  26. "time"
  27. )
  28. type imageSink struct {
  29. path string
  30. format string
  31. maxAge int
  32. maxCount int
  33. cancel context.CancelFunc
  34. }
  35. func (m *imageSink) Configure(props map[string]interface{}) error {
  36. if i, ok := props["format"]; ok {
  37. if i, ok := i.(string); ok {
  38. if "png" != i && "jpeg" != i {
  39. return fmt.Errorf("%s image type is not currently supported", i)
  40. }
  41. m.format = i
  42. }
  43. } else {
  44. return fmt.Errorf("Field not found format.")
  45. }
  46. if i, ok := props["path"]; ok {
  47. if i, ok := i.(string); ok {
  48. m.path = i
  49. } else {
  50. return fmt.Errorf("%s image type is not supported", i)
  51. }
  52. } else {
  53. return fmt.Errorf("Field not found path.")
  54. }
  55. m.maxAge = 72
  56. if i, ok := props["maxAge"]; ok {
  57. if i, ok := i.(int); ok {
  58. m.maxAge = i
  59. }
  60. }
  61. m.maxCount = 1000
  62. if i, ok := props["maxCount"]; ok {
  63. if i, ok := i.(int); ok {
  64. m.maxCount = i
  65. }
  66. }
  67. return nil
  68. }
  69. func (m *imageSink) Open(ctx api.StreamContext) error {
  70. logger := ctx.GetLogger()
  71. logger.Debug("Opening image sink")
  72. if _, err := os.Stat(m.path); os.IsNotExist(err) {
  73. if err := os.MkdirAll(m.path, os.ModePerm); nil != err {
  74. return fmt.Errorf("fail to open image sink for %v", err)
  75. }
  76. }
  77. t := time.NewTicker(time.Duration(3) * time.Minute)
  78. exeCtx, cancel := ctx.WithCancel()
  79. m.cancel = cancel
  80. go func() {
  81. defer t.Stop()
  82. for {
  83. select {
  84. case <-t.C:
  85. m.delFile(logger)
  86. case <-exeCtx.Done():
  87. logger.Info("image sink done")
  88. return
  89. }
  90. }
  91. }()
  92. return nil
  93. }
  94. func (m *imageSink) delFile(logger api.Logger) error {
  95. files, err := ioutil.ReadDir(m.path)
  96. if nil != err || 0 == len(files) {
  97. return err
  98. }
  99. pos := m.maxCount
  100. delTime := time.Now().Add(time.Duration(0-m.maxAge) * time.Hour)
  101. for i := 0; i < len(files); i++ {
  102. for j := i + 1; j < len(files); j++ {
  103. if files[i].ModTime().Before(files[j].ModTime()) {
  104. files[i], files[j] = files[j], files[i]
  105. }
  106. }
  107. if files[i].ModTime().Before(delTime) && i < pos {
  108. pos = i
  109. break
  110. }
  111. }
  112. for i := pos; i < len(files); i++ {
  113. fname := files[i].Name()
  114. if strings.HasSuffix(fname, m.format) {
  115. fpath := filepath.Join(m.path, fname)
  116. os.Remove(fpath)
  117. }
  118. }
  119. return nil
  120. }
  121. func (m *imageSink) getSuffix() string {
  122. now := time.Now()
  123. year, month, day := now.Date()
  124. hour, minute, second := now.Clock()
  125. nsecond := now.Nanosecond()
  126. return fmt.Sprintf(`%d-%d-%d_%d-%d-%d-%d`, year, month, day, hour, minute, second, nsecond)
  127. }
  128. func (m *imageSink) saveFile(b []byte, fpath string) error {
  129. reader := bytes.NewReader(b)
  130. fp, err := os.Create(fpath)
  131. if nil != err {
  132. return err
  133. }
  134. defer fp.Close()
  135. if "png" == m.format {
  136. if img, err := png.Decode(reader); nil != err {
  137. return err
  138. } else if err = png.Encode(fp, img); nil != err {
  139. return err
  140. }
  141. } else if "jpeg" == m.format {
  142. if img, err := jpeg.Decode(reader); nil != err {
  143. return err
  144. } else if err = jpeg.Encode(fp, img, nil); nil != err {
  145. return err
  146. }
  147. }
  148. return nil
  149. }
  150. func (m *imageSink) saveFiles(images map[string]interface{}) error {
  151. for k, v := range images {
  152. image, ok := v.([]byte)
  153. if !ok {
  154. return fmt.Errorf("found none bytes data %v for path %s", image, k)
  155. }
  156. suffix := m.getSuffix()
  157. fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.format)
  158. fpath := filepath.Join(m.path, fname)
  159. m.saveFile(image, fpath)
  160. }
  161. return nil
  162. }
  163. func (m *imageSink) Collect(ctx api.StreamContext, item interface{}) error {
  164. logger := ctx.GetLogger()
  165. switch v := item.(type) {
  166. case []map[string]interface{}:
  167. var outer error
  168. for _, vm := range v {
  169. err := m.saveFiles(vm)
  170. if err != nil {
  171. outer = err
  172. logger.Error(err)
  173. }
  174. }
  175. return outer
  176. case map[string]interface{}:
  177. return m.saveFiles(v)
  178. default:
  179. fmt.Errorf("image sink receive invalid data %v", item)
  180. }
  181. return nil
  182. }
  183. func (m *imageSink) Close(ctx api.StreamContext) error {
  184. if m.cancel != nil {
  185. m.cancel()
  186. }
  187. return m.delFile(ctx.GetLogger())
  188. }
  189. func Image() api.Sink {
  190. return &imageSink{}
  191. }