file_source.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package source
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. "os"
  23. "path"
  24. "path/filepath"
  25. "time"
  26. )
  27. type FileType string
  28. const (
  29. JSON_TYPE FileType = "json"
  30. )
  31. var fileTypes = map[FileType]bool{
  32. JSON_TYPE: true,
  33. }
  34. type FileSourceConfig struct {
  35. FileType FileType `json:"fileType"`
  36. Path string `json:"path"`
  37. Interval int `json:"interval"`
  38. RetainSize int `json:"$retainSize"`
  39. }
  40. // The BATCH to load data from file at once
  41. type FileSource struct {
  42. file string
  43. config *FileSourceConfig
  44. }
  45. func (fs *FileSource) Close(ctx api.StreamContext) error {
  46. ctx.GetLogger().Infof("Close file source")
  47. // do nothing
  48. return nil
  49. }
  50. func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error {
  51. cfg := &FileSourceConfig{}
  52. err := cast.MapToStruct(props, cfg)
  53. if err != nil {
  54. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  55. }
  56. if cfg.FileType == "" {
  57. return errors.New("missing or invalid property fileType, must be 'json'")
  58. }
  59. if _, ok := fileTypes[cfg.FileType]; !ok {
  60. return fmt.Errorf("invalid property fileType: %s", cfg.FileType)
  61. }
  62. if cfg.Path == "" {
  63. return errors.New("missing property Path")
  64. }
  65. if fileName == "" {
  66. return errors.New("file name must be specified")
  67. }
  68. if !filepath.IsAbs(cfg.Path) {
  69. cfg.Path, err = conf.GetLoc(cfg.Path)
  70. if err != nil {
  71. return fmt.Errorf("invalid path %s", cfg.Path)
  72. }
  73. }
  74. if fileName != "/$$TEST_CONNECTION$$" {
  75. fs.file = path.Join(cfg.Path, fileName)
  76. if fi, err := os.Stat(fs.file); err != nil {
  77. if os.IsNotExist(err) {
  78. return fmt.Errorf("file %s not exist", fs.file)
  79. } else if !fi.Mode().IsRegular() {
  80. return fmt.Errorf("file %s is not a regular file", fs.file)
  81. }
  82. }
  83. }
  84. fs.config = cfg
  85. return nil
  86. }
  87. func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  88. err := fs.Load(ctx, consumer)
  89. if err != nil {
  90. errCh <- err
  91. return
  92. }
  93. if fs.config.Interval > 0 {
  94. ticker := time.NewTicker(time.Millisecond * time.Duration(fs.config.Interval))
  95. logger := ctx.GetLogger()
  96. defer ticker.Stop()
  97. for {
  98. select {
  99. case <-ticker.C:
  100. logger.Debugf("Load file source again at %v", conf.GetNowInMilli())
  101. err := fs.Load(ctx, consumer)
  102. if err != nil {
  103. errCh <- err
  104. return
  105. }
  106. case <-ctx.Done():
  107. return
  108. }
  109. }
  110. }
  111. }
  112. func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error {
  113. switch fs.config.FileType {
  114. case JSON_TYPE:
  115. ctx.GetLogger().Debugf("Start to load from file %s", fs.file)
  116. resultMap := make([]map[string]interface{}, 0)
  117. err := filex.ReadJsonUnmarshal(fs.file, &resultMap)
  118. if err != nil {
  119. return fmt.Errorf("loaded %s, check error %s", fs.file, err)
  120. }
  121. ctx.GetLogger().Debug("Sending tuples")
  122. if fs.config.RetainSize > 0 && fs.config.RetainSize < len(resultMap) {
  123. resultMap = resultMap[(len(resultMap) - fs.config.RetainSize):]
  124. ctx.GetLogger().Debug("Sending tuples for retain size %d", fs.config.RetainSize)
  125. }
  126. for _, m := range resultMap {
  127. select {
  128. case consumer <- api.NewDefaultSourceTuple(m, nil):
  129. // do nothing
  130. case <-ctx.Done():
  131. return nil
  132. }
  133. }
  134. // Send EOF if retain size not set
  135. if fs.config.RetainSize == 0 {
  136. select {
  137. case consumer <- api.NewDefaultSourceTuple(nil, nil):
  138. // do nothing
  139. case <-ctx.Done():
  140. return nil
  141. }
  142. }
  143. ctx.GetLogger().Debug("All tuples sent")
  144. return nil
  145. }
  146. return fmt.Errorf("invalid file type %s", fs.config.FileType)
  147. }