file_source.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package source
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. "os"
  23. "path"
  24. "path/filepath"
  25. "time"
  26. )
  27. type FileType string
  28. const (
  29. JSON_TYPE FileType = "json"
  30. )
  31. var fileTypes = map[FileType]bool{
  32. JSON_TYPE: true,
  33. }
  34. type FileSourceConfig struct {
  35. FileType FileType `json:"fileType"`
  36. Path string `json:"path"`
  37. Interval int `json:"interval"`
  38. RetainSize int `json:"$retainSize"`
  39. }
  40. // The BATCH to load data from file at once
  41. type FileSource struct {
  42. file string
  43. config *FileSourceConfig
  44. }
  45. func (fs *FileSource) Close(ctx api.StreamContext) error {
  46. ctx.GetLogger().Infof("Close file source")
  47. // do nothing
  48. return nil
  49. }
  50. func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error {
  51. cfg := &FileSourceConfig{}
  52. err := cast.MapToStruct(props, cfg)
  53. if err != nil {
  54. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  55. }
  56. if cfg.FileType == "" {
  57. return errors.New("missing or invalid property fileType, must be 'json'")
  58. }
  59. if _, ok := fileTypes[cfg.FileType]; !ok {
  60. return fmt.Errorf("invalid property fileType: %s", cfg.FileType)
  61. }
  62. if cfg.Path == "" {
  63. return errors.New("missing property Path")
  64. }
  65. if fileName == "" {
  66. return errors.New("file name must be specified")
  67. }
  68. if !filepath.IsAbs(cfg.Path) {
  69. cfg.Path, err = conf.GetLoc(cfg.Path)
  70. if err != nil {
  71. return fmt.Errorf("invalid path %s", cfg.Path)
  72. }
  73. }
  74. fs.file = path.Join(cfg.Path, fileName)
  75. if fi, err := os.Stat(fs.file); err != nil {
  76. if os.IsNotExist(err) {
  77. return fmt.Errorf("file %s not exist", fs.file)
  78. } else if !fi.Mode().IsRegular() {
  79. return fmt.Errorf("file %s is not a regular file", fs.file)
  80. }
  81. }
  82. fs.config = cfg
  83. return nil
  84. }
  85. func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  86. err := fs.Load(ctx, consumer)
  87. if err != nil {
  88. errCh <- err
  89. return
  90. }
  91. if fs.config.Interval > 0 {
  92. ticker := time.NewTicker(time.Millisecond * time.Duration(fs.config.Interval))
  93. logger := ctx.GetLogger()
  94. defer ticker.Stop()
  95. for {
  96. select {
  97. case <-ticker.C:
  98. logger.Debugf("Load file source again at %v", conf.GetNowInMilli())
  99. err := fs.Load(ctx, consumer)
  100. if err != nil {
  101. errCh <- err
  102. return
  103. }
  104. case <-ctx.Done():
  105. return
  106. }
  107. }
  108. }
  109. }
  110. func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error {
  111. switch fs.config.FileType {
  112. case JSON_TYPE:
  113. ctx.GetLogger().Debugf("Start to load from file %s", fs.file)
  114. resultMap := make([]map[string]interface{}, 0)
  115. err := filex.ReadJsonUnmarshal(fs.file, &resultMap)
  116. if err != nil {
  117. return fmt.Errorf("loaded %s, check error %s", fs.file, err)
  118. }
  119. ctx.GetLogger().Debug("Sending tuples")
  120. if fs.config.RetainSize > 0 && fs.config.RetainSize < len(resultMap) {
  121. resultMap = resultMap[(len(resultMap) - fs.config.RetainSize):]
  122. ctx.GetLogger().Debug("Sending tuples for retain size %d", fs.config.RetainSize)
  123. }
  124. for _, m := range resultMap {
  125. select {
  126. case consumer <- api.NewDefaultSourceTuple(m, nil):
  127. // do nothing
  128. case <-ctx.Done():
  129. return nil
  130. }
  131. }
  132. // Send EOF if retain size not set
  133. if fs.config.RetainSize == 0 {
  134. select {
  135. case consumer <- api.NewDefaultSourceTuple(nil, nil):
  136. // do nothing
  137. case <-ctx.Done():
  138. return nil
  139. }
  140. }
  141. ctx.GetLogger().Debug("All tuples sent")
  142. return nil
  143. }
  144. return fmt.Errorf("invalid file type %s", fs.config.FileType)
  145. }