image.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/json"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "image/jpeg"
  22. "image/png"
  23. "io/ioutil"
  24. "os"
  25. "path/filepath"
  26. "strings"
  27. "time"
  28. )
  29. type imageSink struct {
  30. path string
  31. format string
  32. maxAge int
  33. maxCount int
  34. cancel context.CancelFunc
  35. }
  36. func (m *imageSink) Configure(props map[string]interface{}) error {
  37. if i, ok := props["format"]; ok {
  38. if i, ok := i.(string); ok {
  39. if "png" != i && "jpeg" != i {
  40. return fmt.Errorf("%s image type is not currently supported", i)
  41. }
  42. m.format = i
  43. }
  44. } else {
  45. return fmt.Errorf("Field not found format.")
  46. }
  47. if i, ok := props["path"]; ok {
  48. if i, ok := i.(string); ok {
  49. m.path = i
  50. } else {
  51. return fmt.Errorf("%s image type is not supported", i)
  52. }
  53. } else {
  54. return fmt.Errorf("Field not found path.")
  55. }
  56. m.maxAge = 72
  57. if i, ok := props["maxAge"]; ok {
  58. if i, ok := i.(int); ok {
  59. m.maxAge = i
  60. }
  61. }
  62. m.maxCount = 1000
  63. if i, ok := props["maxCount"]; ok {
  64. if i, ok := i.(int); ok {
  65. m.maxCount = i
  66. }
  67. }
  68. return nil
  69. }
  70. func (m *imageSink) Open(ctx api.StreamContext) error {
  71. logger := ctx.GetLogger()
  72. logger.Debug("Opening image sink")
  73. if _, err := os.Stat(m.path); os.IsNotExist(err) {
  74. if err := os.MkdirAll(m.path, os.ModePerm); nil != err {
  75. return fmt.Errorf("fail to open image sink for %v", err)
  76. }
  77. }
  78. t := time.NewTicker(time.Duration(3) * time.Minute)
  79. exeCtx, cancel := ctx.WithCancel()
  80. m.cancel = cancel
  81. go func() {
  82. defer t.Stop()
  83. for {
  84. select {
  85. case <-t.C:
  86. m.delFile(logger)
  87. case <-exeCtx.Done():
  88. logger.Info("image sink done")
  89. return
  90. }
  91. }
  92. }()
  93. return nil
  94. }
  95. func (m *imageSink) delFile(logger api.Logger) error {
  96. files, err := ioutil.ReadDir(m.path)
  97. if nil != err || 0 == len(files) {
  98. return err
  99. }
  100. pos := m.maxCount
  101. delTime := time.Now().Add(time.Duration(0-m.maxAge) * time.Hour)
  102. for i := 0; i < len(files); i++ {
  103. for j := i + 1; j < len(files); j++ {
  104. if files[i].ModTime().Before(files[j].ModTime()) {
  105. files[i], files[j] = files[j], files[i]
  106. }
  107. }
  108. if files[i].ModTime().Before(delTime) && i < pos {
  109. pos = i
  110. break
  111. }
  112. }
  113. for i := pos; i < len(files); i++ {
  114. fname := files[i].Name()
  115. if strings.HasSuffix(fname, m.format) {
  116. fpath := filepath.Join(m.path, fname)
  117. os.Remove(fpath)
  118. }
  119. }
  120. return nil
  121. }
  122. func (m *imageSink) getSuffix() string {
  123. now := time.Now()
  124. year, month, day := now.Date()
  125. hour, minute, second := now.Clock()
  126. nsecond := now.Nanosecond()
  127. return fmt.Sprintf(`%d-%d-%d_%d-%d-%d-%d`, year, month, day, hour, minute, second, nsecond)
  128. }
  129. func (m *imageSink) saveFile(b []byte, fpath string) error {
  130. reader := bytes.NewReader(b)
  131. fp, err := os.Create(fpath)
  132. if nil != err {
  133. return err
  134. }
  135. defer fp.Close()
  136. if "png" == m.format {
  137. if img, err := png.Decode(reader); nil != err {
  138. return err
  139. } else if err = png.Encode(fp, img); nil != err {
  140. return err
  141. }
  142. } else if "jpeg" == m.format {
  143. if img, err := jpeg.Decode(reader); nil != err {
  144. return err
  145. } else if err = jpeg.Encode(fp, img, nil); nil != err {
  146. return err
  147. }
  148. }
  149. return nil
  150. }
  151. func (m *imageSink) saveFiles(msg []map[string][]byte) error {
  152. for _, images := range msg {
  153. for k, v := range images {
  154. suffix := m.getSuffix()
  155. fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.format)
  156. fpath := filepath.Join(m.path, fname)
  157. m.saveFile(v, fpath)
  158. }
  159. }
  160. return nil
  161. }
  162. func (m *imageSink) Collect(ctx api.StreamContext, item interface{}) error {
  163. logger := ctx.GetLogger()
  164. if v, ok := item.([]byte); ok {
  165. var msg []map[string][]byte
  166. if err := json.Unmarshal(v, &msg); nil != err {
  167. return fmt.Errorf("The sink only accepts bytea field, other types are not supported.")
  168. }
  169. return m.saveFiles(msg)
  170. } else {
  171. logger.Debug("image sink receive non byte data")
  172. }
  173. return nil
  174. }
  175. func (m *imageSink) Close(ctx api.StreamContext) error {
  176. if m.cancel != nil {
  177. m.cancel()
  178. }
  179. return m.delFile(ctx.GetLogger())
  180. }
  181. func Image() api.Sink {
  182. return &imageSink{}
  183. }