Просмотр исходного кода

fix(upload): support uploads export/import (#2104)

Signed-off-by: Jianxiang Ran <jianxiang.ran@emqx.io>
Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
superxan 1 год назад
Родитель
Сommit
a7217aad65

+ 2 - 0
docs/en_US/api/cli/data.md

@@ -35,6 +35,8 @@ The file format for importing and exporting Data is JSON, which can contain : `s
   "Service":{
   },
   "Schema":{
+  },
+  "uploads":{
   }
 }
 ```

+ 7 - 2
docs/en_US/api/restapi/data.md

@@ -35,6 +35,8 @@ The file format for importing and exporting data is JSON, which can contain : `s
     "Service":{
     },
     "Schema":{
+    },
+    "uploads":{
     }
 }
 ```
@@ -112,7 +114,8 @@ Content-Type: application/json
   "sinkConfig":{},
   "connectionConfig":{},
   "Service":{},
-  "Schema":{}
+  "Schema":{},
+  "uploads":{}
 }
 
 ```
@@ -135,7 +138,9 @@ Content-Type: application/json
   "sinkConfig":{},
   "connectionConfig":{},
   "Service":{},
-  "Schema":{}}
+  "Schema":{},
+  "uploads":{}
+}
 ```
 
 ## Data Export

+ 2 - 0
docs/zh_CN/api/cli/data.md

@@ -35,6 +35,8 @@ eKuiper 命令行工具允许您导入导出当前数据。
   "Service":{
   },
   "Schema":{
+  },
+  "uploads":{
   }
 }
 ```

+ 8 - 2
docs/zh_CN/api/restapi/data.md

@@ -35,6 +35,8 @@ eKuiper REST api 允许您导入导出数据。
     "Service":{
     },
     "Schema":{
+    },
+    "uploads":{
     }
 }
 ```
@@ -111,7 +113,9 @@ Content-Type: application/json
   "sinkConfig":{},
   "connectionConfig":{},
   "Service":{},
-  "Schema":{}}
+  "Schema":{},
+  "uploads":{}
+}
 ```
 
 示例2:导入插件失败
@@ -132,7 +136,9 @@ Content-Type: application/json
   "sinkConfig":{},
   "connectionConfig":{},
   "Service":{},
-  "Schema":{}}
+  "Schema":{},
+  "uploads":{}
+}
 ```
 
 ## 导出数据

+ 85 - 3
internal/server/rest.go

@@ -36,6 +36,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/meta"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
+	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/server/middleware"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -43,6 +44,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/infra"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 )
 
 const (
@@ -50,7 +52,11 @@ const (
 	ContentTypeJSON = "application/json"
 )
 
-var uploadDir string
+var (
+	uploadDir       string
+	uploadsDb       kv.KeyValue
+	uploadsStatusDb kv.KeyValue
+)
 
 type statementDescriptor struct {
 	Sql string `json:"sql,omitempty"`
@@ -123,6 +129,14 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 		panic(err)
 	}
 	uploadDir = filepath.Join(dataDir, "uploads")
+	uploadsDb, err = store.GetKV("uploads")
+	if err != nil {
+		panic(err)
+	}
+	uploadsStatusDb, err = store.GetKV("uploadsStatusDb")
+	if err != nil {
+		panic(err)
+	}
 
 	r := mux.NewRouter()
 	r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
@@ -175,6 +189,14 @@ type fileContent struct {
 	FilePath string `json:"file"`
 }
 
+func (f *fileContent) InstallScript() string {
+	marshal, err := json.Marshal(f)
+	if err != nil {
+		return ""
+	}
+	return string(marshal)
+}
+
 func (f *fileContent) Validate() error {
 	if f.Content == "" && f.FilePath == "" {
 		return fmt.Errorf("invalid body: content or FilePath is required")
@@ -188,10 +210,11 @@ func (f *fileContent) Validate() error {
 func upload(file *fileContent) error {
 	err := getFile(file)
 	if err != nil {
+		_ = uploadsStatusDb.Set(file.Name, err.Error())
 		return err
 	}
 
-	return nil
+	return uploadsDb.Set(file.Name, file.InstallScript())
 }
 
 func getFile(file *fileContent) error {
@@ -303,6 +326,8 @@ func fileDeleteHandler(w http.ResponseWriter, r *http.Request) {
 		handleError(w, e, "Error deleting the file", logger)
 		return
 	}
+	_ = uploadsDb.Delete(name)
+	_ = uploadsStatusDb.Delete(name)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte("ok"))
 }
@@ -689,6 +714,7 @@ type Configuration struct {
 	ConnectionConfig map[string]string `json:"connectionConfig"`
 	Service          map[string]string `json:"Service"`
 	Schema           map[string]string `json:"Schema"`
+	Uploads          map[string]string `json:"uploads"`
 }
 
 func configurationExport() ([]byte, error) {
@@ -703,6 +729,7 @@ func configurationExport() ([]byte, error) {
 		ConnectionConfig: make(map[string]string),
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
+		Uploads:          make(map[string]string),
 	}
 	ruleSet := rulesetProcessor.ExportRuleSet()
 	if ruleSet != nil {
@@ -715,6 +742,7 @@ func configurationExport() ([]byte, error) {
 	conf.PortablePlugins = portablePluginExport()
 	conf.Service = serviceExport()
 	conf.Schema = schemaExport()
+	conf.Uploads = uploadsExport()
 
 	yamlCfg := meta.GetConfigurations()
 	conf.SourceConfig = yamlCfg.Sources
@@ -749,6 +777,7 @@ func configurationReset() {
 	serviceReset()
 	schemaReset()
 	meta.ResetConfigs()
+	uploadsReset()
 }
 
 type ImportConfigurationStatus struct {
@@ -768,6 +797,7 @@ func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
 		ConnectionConfig: make(map[string]string),
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
+		Uploads:          make(map[string]string),
 	}
 
 	importStatus := ImportConfigurationStatus{}
@@ -783,6 +813,7 @@ func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
 		ConnectionConfig: make(map[string]string),
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
+		Uploads:          make(map[string]string),
 	}
 
 	ResponseNil := Configuration{
@@ -796,6 +827,7 @@ func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
 		ConnectionConfig: make(map[string]string),
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
+		Uploads:          make(map[string]string),
 	}
 
 	err := json.Unmarshal(data, conf)
@@ -803,6 +835,7 @@ func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
 		importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
 		return importStatus
 	}
+	configResponse.Uploads = uploadsImport(conf.Uploads)
 
 	if reboot {
 		err = pluginImport(conf.NativePlugins)
@@ -818,7 +851,6 @@ func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
 	}
 
 	configResponse.PortablePlugins = portablePluginImport(conf.PortablePlugins)
-
 	configResponse.Service = serviceImport(conf.Service)
 
 	yamlCfgSet := meta.YamlConfigurationSet{
@@ -882,6 +914,7 @@ func configurationPartialImport(data []byte) ImportConfigurationStatus {
 		ConnectionConfig: make(map[string]string),
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
+		Uploads:          make(map[string]string),
 	}
 
 	importStatus := ImportConfigurationStatus{}
@@ -897,6 +930,7 @@ func configurationPartialImport(data []byte) ImportConfigurationStatus {
 		ConnectionConfig: make(map[string]string),
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
+		Uploads:          make(map[string]string),
 	}
 
 	ResponseNil := Configuration{
@@ -910,6 +944,7 @@ func configurationPartialImport(data []byte) ImportConfigurationStatus {
 		ConnectionConfig: make(map[string]string),
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
+		Uploads:          make(map[string]string),
 	}
 
 	err := json.Unmarshal(data, conf)
@@ -926,6 +961,7 @@ func configurationPartialImport(data []byte) ImportConfigurationStatus {
 
 	confRsp := meta.LoadConfigurationsPartial(yamlCfgSet)
 
+	configResponse.Uploads = uploadsImport(conf.Uploads)
 	configResponse.NativePlugins = pluginPartialImport(conf.NativePlugins)
 	configResponse.Schema = schemaPartialImport(conf.Schema)
 	configResponse.PortablePlugins = portablePluginPartialImport(conf.PortablePlugins)
@@ -1035,6 +1071,7 @@ func configurationStatusExport() Configuration {
 		ConnectionConfig: make(map[string]string),
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
+		Uploads:          make(map[string]string),
 	}
 	ruleSet := rulesetProcessor.ExportRuleSetStatus()
 	if ruleSet != nil {
@@ -1047,6 +1084,7 @@ func configurationStatusExport() Configuration {
 	conf.PortablePlugins = portablePluginStatusExport()
 	conf.Service = serviceStatusExport()
 	conf.Schema = schemaStatusExport()
+	conf.Uploads = uploadsStatusExport()
 
 	yamlCfgStatus := meta.GetConfigurationStatus()
 	conf.SourceConfig = yamlCfgStatus.Sources
@@ -1113,3 +1151,47 @@ func importRuleSetPartial(all processor.Ruleset) processor.Ruleset {
 
 	return ruleSetRsp
 }
+
+func uploadsReset() {
+	_ = uploadsDb.Clean()
+	_ = uploadsStatusDb.Clean()
+}
+
+func uploadsExport() map[string]string {
+	conf, _ := uploadsDb.All()
+	return conf
+}
+
+func uploadsStatusExport() map[string]string {
+	status, _ := uploadsDb.All()
+	return status
+}
+
+func uploadsImport(s map[string]string) map[string]string {
+	errMap := map[string]string{}
+	_ = uploadsStatusDb.Clean()
+	for k, v := range s {
+		fc := &fileContent{}
+		err := json.Unmarshal([]byte(v), fc)
+		if err != nil {
+			errMsg := fmt.Sprintf("invalid body: Error decoding file json: %s", err.Error())
+			errMap[k] = errMsg
+			_ = uploadsStatusDb.Set(k, errMsg)
+			continue
+		}
+
+		err = fc.Validate()
+		if err != nil {
+			errMap[k] = err.Error()
+			_ = uploadsStatusDb.Set(k, err.Error())
+			continue
+		}
+
+		err = upload(fc)
+		if err != nil {
+			errMap[k] = err.Error()
+			continue
+		}
+	}
+	return errMap
+}

+ 3 - 0
internal/server/rest_test.go

@@ -31,6 +31,7 @@ import (
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/model"
+	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/topo/rule"
@@ -42,6 +43,8 @@ func init() {
 	ruleProcessor = processor.NewRuleProcessor()
 	rulesetProcessor = processor.NewRulesetProcessor(ruleProcessor, streamProcessor)
 	registry = &RuleRegistry{internal: make(map[string]*rule.RuleState)}
+	uploadsDb, _ = store.GetKV("uploads")
+	uploadsStatusDb, _ = store.GetKV("uploadsStatusDb")
 }
 
 type RestTestSuite struct {

+ 2 - 2
internal/server/rpc_test.go

@@ -161,12 +161,12 @@ func (suite *ServerTestSuite) TestConfigurarion() {
 	var reply string
 	err := suite.s.ImportConfiguration(&importArg, &reply)
 	assert.Nil(suite.T(), err)
-	assert.Equal(suite.T(), "{\n  \"ErrorMsg\": \"\",\n  \"ConfigResponse\": {\n    \"streams\": {},\n    \"tables\": {},\n    \"rules\": {},\n    \"nativePlugins\": {},\n    \"portablePlugins\": {},\n    \"sourceConfig\": {},\n    \"sinkConfig\": {},\n    \"connectionConfig\": {},\n    \"Service\": {},\n    \"Schema\": {}\n  }\n}", reply)
+	assert.Equal(suite.T(), "{\n  \"ErrorMsg\": \"\",\n  \"ConfigResponse\": {\n    \"streams\": {},\n    \"tables\": {},\n    \"rules\": {},\n    \"nativePlugins\": {},\n    \"portablePlugins\": {},\n    \"sourceConfig\": {},\n    \"sinkConfig\": {},\n    \"connectionConfig\": {},\n    \"Service\": {},\n    \"Schema\": {},\n    \"uploads\": {}\n  }\n}", reply)
 
 	reply = ""
 	err = suite.s.GetStatusImport(1, &reply)
 	assert.Nil(suite.T(), err)
-	assert.Equal(suite.T(), "{\n  \"streams\": {},\n  \"tables\": {},\n  \"rules\": {},\n  \"nativePlugins\": {},\n  \"portablePlugins\": {},\n  \"sourceConfig\": {},\n  \"sinkConfig\": {},\n  \"connectionConfig\": {},\n  \"Service\": {},\n  \"Schema\": {}\n}", reply)
+	assert.Equal(suite.T(), "{\n  \"streams\": {},\n  \"tables\": {},\n  \"rules\": {},\n  \"nativePlugins\": {},\n  \"portablePlugins\": {},\n  \"sourceConfig\": {},\n  \"sinkConfig\": {},\n  \"connectionConfig\": {},\n  \"Service\": {},\n  \"Schema\": {},\n  \"uploads\": {}\n}", reply)
 
 	reply = ""
 	exportArg := model.ExportDataDesc{

+ 2 - 0
internal/server/rule_migration.go

@@ -320,6 +320,7 @@ func (p *RuleMigrationProcessor) ConfigurationPartialExport(rules []string) ([]b
 		ConnectionConfig: make(map[string]string),
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
+		Uploads:          make(map[string]string),
 	}
 	config.Rules = p.exportRules(rules)
 
@@ -419,6 +420,7 @@ func (p *RuleMigrationProcessor) exportSelected(de *dependencies, config *Config
 		schName, schInfo := getSchemaInstallScript(v)
 		config.Schema[schName] = schInfo
 	}
+	config.Uploads = uploadsExport()
 }
 
 func parsePick(props map[string]interface{}) (*ast.SelectStatement, error) {

Разница между файлами не показана из-за своего большого размера
+ 12 - 4
test/management_test/data_import_export.jmx