file_source.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package extensions
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "os"
  8. "path"
  9. "path/filepath"
  10. )
  11. type FileType string
  12. const (
  13. JSON_TYPE FileType = "json"
  14. )
  15. var fileTypes = map[FileType]bool{
  16. JSON_TYPE: true,
  17. }
  18. type FileSourceConfig struct {
  19. FileType FileType `json:"fileType"`
  20. Path string `json:"Path"`
  21. }
  22. // The BATCH to load data from file at once
  23. type FileSource struct {
  24. file string
  25. config *FileSourceConfig
  26. }
  27. func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error {
  28. cfg := &FileSourceConfig{}
  29. err := common.MapToStruct(props, cfg)
  30. if err != nil {
  31. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  32. }
  33. if cfg.FileType == "" {
  34. return errors.New("missing or invalid property fileType, must be 'json'")
  35. }
  36. if _, ok := fileTypes[cfg.FileType]; !ok {
  37. return fmt.Errorf("invalid property fileType: %s", cfg.FileType)
  38. }
  39. if cfg.Path == "" {
  40. return errors.New("missing property Path")
  41. }
  42. if fileName == "" {
  43. return errors.New("source must be specified")
  44. }
  45. if !filepath.IsAbs(cfg.Path) {
  46. cfg.Path, err = common.GetLoc("/" + cfg.Path)
  47. if err != nil {
  48. return fmt.Errorf("invalid path %s", cfg.Path)
  49. }
  50. }
  51. fs.file = path.Join(cfg.Path, fileName)
  52. if fi, err := os.Stat(fs.file); err != nil {
  53. if os.IsNotExist(err) {
  54. return fmt.Errorf("file %s not exist", fs.file)
  55. } else if !fi.Mode().IsRegular() {
  56. return fmt.Errorf("file %s is not a regular file", fs.file)
  57. }
  58. }
  59. fs.config = cfg
  60. return nil
  61. }
  62. func (fs *FileSource) Load(ctx api.StreamContext) ([]api.SourceTuple, error) {
  63. switch fs.config.FileType {
  64. case JSON_TYPE:
  65. ctx.GetLogger().Debugf("Start to load from file %s", fs.file)
  66. resultMap := make([]map[string]interface{}, 0)
  67. err := common.ReadJsonUnmarshal(fs.file, &resultMap)
  68. result := make([]api.SourceTuple, len(resultMap))
  69. for i, m := range resultMap {
  70. result[i] = api.NewDefaultSourceTuple(m, nil)
  71. }
  72. ctx.GetLogger().Debugf("loaded %s, check error %s", fs.file, err)
  73. return result, err
  74. }
  75. return nil, fmt.Errorf("invalid file type %s", fs.config.FileType)
  76. }