Bläddra i källkod

fix(config): support rest api for export/import

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 2 år sedan
förälder
incheckning
fb6ff3f68c

+ 14 - 1
internal/meta/yamlConfigMeta.go

@@ -528,7 +528,13 @@ func GetConfigurationStatus() YamlConfigurationSet {
 	return result
 }
 
-func LoadConfigurations(configSets YamlConfigurationSet) {
+func LoadConfigurations(configSets YamlConfigurationSet) YamlConfigurationSet {
+
+	configResponse := YamlConfigurationSet{
+		Sources:     map[string]string{},
+		Sinks:       map[string]string{},
+		Connections: map[string]string{},
+	}
 
 	var srcResources = configSets.Sources
 	var sinkResources = configSets.Sinks
@@ -543,11 +549,13 @@ func LoadConfigurations(configSets YamlConfigurationSet) {
 		err := json.Unmarshal([]byte(val), &configs)
 		if err != nil {
 			_ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
+			configResponse.Sources[key] = err.Error()
 			continue
 		}
 		err = addSourceConfKeys(key, configs)
 		if err != nil {
 			_ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
+			configResponse.Sources[key] = err.Error()
 			continue
 		}
 	}
@@ -557,11 +565,13 @@ func LoadConfigurations(configSets YamlConfigurationSet) {
 		err := json.Unmarshal([]byte(val), &configs)
 		if err != nil {
 			_ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
+			configResponse.Sinks[key] = err.Error()
 			continue
 		}
 		err = addSinkConfKeys(key, configs)
 		if err != nil {
 			_ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
+			configResponse.Sinks[key] = err.Error()
 			continue
 		}
 	}
@@ -571,14 +581,17 @@ func LoadConfigurations(configSets YamlConfigurationSet) {
 		err := json.Unmarshal([]byte(val), &configs)
 		if err != nil {
 			_ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
+			configResponse.Connections[key] = err.Error()
 			continue
 		}
 		err = addConnectionConfKeys(key, configs)
 		if err != nil {
 			_ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
+			configResponse.Connections[key] = err.Error()
 			continue
 		}
 	}
+	return configResponse
 }
 
 func LoadConfigurationsPartial(configSets YamlConfigurationSet) YamlConfigurationSet {

+ 4 - 1
internal/plugin/portable/manager.go

@@ -440,15 +440,18 @@ func (m *Manager) pluginRegisterForImport(k, v string) error {
 	return nil
 }
 
-func (m *Manager) PluginImport(plugins map[string]string) {
+func (m *Manager) PluginImport(plugins map[string]string) map[string]string {
+	errMap := map[string]string{}
 	_ = m.plgStatusDb.Clean()
 	for k, v := range plugins {
 		err := m.pluginRegisterForImport(k, v)
 		if err != nil {
 			_ = m.plgStatusDb.Set(k, err.Error())
+			errMap[k] = err.Error()
 			continue
 		}
 	}
+	return errMap
 }
 
 func (m *Manager) PluginPartialImport(plugins map[string]string) map[string]string {

+ 11 - 1
internal/processor/ruleset.go

@@ -141,7 +141,13 @@ func (rs *RulesetProcessor) Import(content []byte) ([]string, []int, error) {
 	return rules, counts, nil
 }
 
-func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) {
+func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) Ruleset {
+	ruleSetRsp := Ruleset{
+		Rules:   map[string]string{},
+		Streams: map[string]string{},
+		Tables:  map[string]string{},
+	}
+
 	_ = rs.s.streamStatusDb.Clean()
 	_ = rs.s.tableStatusDb.Clean()
 	_ = rs.r.ruleStatusDb.Clean()
@@ -153,6 +159,7 @@ func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) {
 		if e != nil {
 			conf.Log.Errorf("Fail to import stream %s(%s) with error: %v", k, v, e)
 			_ = rs.s.streamStatusDb.Set(k, e.Error())
+			ruleSetRsp.Streams[k] = e.Error()
 			continue
 		} else {
 			counts[0]++
@@ -164,6 +171,7 @@ func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) {
 		if e != nil {
 			conf.Log.Errorf("Fail to import table %s(%s) with error: %v", k, v, e)
 			_ = rs.s.tableStatusDb.Set(k, e.Error())
+			ruleSetRsp.Tables[k] = e.Error()
 			continue
 		} else {
 			counts[1]++
@@ -176,10 +184,12 @@ func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) {
 		if e != nil {
 			conf.Log.Errorf("Fail to import rule %s(%s) with error: %v", k, v, e)
 			_ = rs.r.ruleStatusDb.Set(k, e.Error())
+			ruleSetRsp.Rules[k] = e.Error()
 			continue
 		} else {
 			rules = append(rules, k)
 			counts[2]++
 		}
 	}
+	return ruleSetRsp
 }

+ 4 - 0
internal/server/core.go

@@ -90,3 +90,7 @@ func portablePluginPartialImport(plugins map[string]string) map[string]string {
 func servicePartialImport(services map[string]string) map[string]string {
 	return serviceManager.ImportPartialServices(services)
 }
+
+func getSchemaInstallScript(s string) (string, string) {
+	return "", ""
+}

+ 2 - 2
internal/server/portable_init.go

@@ -130,8 +130,8 @@ func portablePluginStatusExport() map[string]string {
 	return portableManager.GetAllPlugins()
 }
 
-func portablePluginImport(plugins map[string]string) {
-	portableManager.PluginImport(plugins)
+func portablePluginImport(plugins map[string]string) map[string]string {
+	return portableManager.PluginImport(plugins)
 }
 
 func portablePluginPartialImport(plugins map[string]string) map[string]string {

+ 91 - 69
internal/server/rest.go

@@ -140,9 +140,8 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 	r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
 	r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
 	r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
-	r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet)
-	r.HandleFunc("/data/partial/export", configurationPartialExportHandler).Methods(http.MethodPost)
-	r.HandleFunc("/data/partial/import", configurationPartialImportHandler).Methods(http.MethodPost)
+	r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
+	//r.HandleFunc("/data/partial/import", configurationPartialImportHandler).Methods(http.MethodPost)
 	r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
 	r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
 	// Register extended routes
@@ -685,20 +684,17 @@ func configurationExport() ([]byte, error) {
 }
 
 func configurationExportHandler(w http.ResponseWriter, r *http.Request) {
+	var jsonBytes []byte
 	const name = "ekuiper_export.json"
-	jsonBytes, _ := configurationExport()
-	w.Header().Set("Content-Type", "application/octet-stream")
-	w.Header().Add("Content-Disposition", "Attachment")
-	http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
-}
 
-func configurationPartialExportHandler(w http.ResponseWriter, r *http.Request) {
-	const name = "ekuiper_export.json"
-
-	var rules []string
-	_ = json.NewDecoder(r.Body).Decode(&rules)
-
-	jsonBytes, _ := ruleMigrationProcessor.ConfigurationPartialExport(rules)
+	switch r.Method {
+	case http.MethodGet:
+		jsonBytes, _ = configurationExport()
+	case http.MethodPost:
+		var rules []string
+		_ = json.NewDecoder(r.Body).Decode(&rules)
+		jsonBytes, _ = ruleMigrationProcessor.ConfigurationPartialExport(rules)
+	}
 	w.Header().Set("Content-Type", "application/octet-stream")
 	w.Header().Add("Content-Disposition", "Attachment")
 	http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
@@ -714,7 +710,7 @@ func configurationReset() {
 	meta.ResetConfigs()
 }
 
-func configurationImport(data []byte, reboot bool) error {
+func configurationImport(data []byte, reboot bool) Configuration {
 	conf := &Configuration{
 		Streams:          make(map[string]string),
 		Tables:           make(map[string]string),
@@ -728,25 +724,39 @@ func configurationImport(data []byte, reboot bool) error {
 		Schema:           make(map[string]string),
 	}
 
+	configResponse := Configuration{
+		Streams:          make(map[string]string),
+		Tables:           make(map[string]string),
+		Rules:            make(map[string]string),
+		NativePlugins:    make(map[string]string),
+		PortablePlugins:  make(map[string]string),
+		SourceConfig:     make(map[string]string),
+		SinkConfig:       make(map[string]string),
+		ConnectionConfig: make(map[string]string),
+		Service:          make(map[string]string),
+		Schema:           make(map[string]string),
+	}
+
 	err := json.Unmarshal(data, conf)
 	if err != nil {
-		return fmt.Errorf("configuration unmarshal with error %v", err)
+		configResponse.Rules["configuration"] = fmt.Errorf("configuration unmarshal with error %v", err).Error()
+		return configResponse
 	}
 
 	if reboot {
 		err = pluginImport(conf.NativePlugins)
 		if err != nil {
-			return fmt.Errorf("pluginImportHandler with error %v", err)
+			return configResponse
 		}
 		err = schemaImport(conf.Schema)
 		if err != nil {
-			return fmt.Errorf("schemaImport with error %v", err)
+			return configResponse
 		}
 	}
 
-	portablePluginImport(conf.PortablePlugins)
+	configResponse.PortablePlugins = portablePluginImport(conf.PortablePlugins)
 
-	serviceImport(conf.Service)
+	configResponse.Service = serviceImport(conf.Service)
 
 	yamlCfgSet := meta.YamlConfigurationSet{
 		Sources:     conf.SourceConfig,
@@ -754,7 +764,10 @@ func configurationImport(data []byte, reboot bool) error {
 		Connections: conf.ConnectionConfig,
 	}
 
-	meta.LoadConfigurations(yamlCfgSet)
+	confRsp := meta.LoadConfigurations(yamlCfgSet)
+	configResponse.SourceConfig = confRsp.Sources
+	configResponse.SinkConfig = confRsp.Sinks
+	configResponse.ConnectionConfig = confRsp.Connections
 
 	ruleSet := processor.Ruleset{
 		Streams: conf.Streams,
@@ -762,7 +775,11 @@ func configurationImport(data []byte, reboot bool) error {
 		Rules:   conf.Rules,
 	}
 
-	rulesetProcessor.ImportRuleSet(ruleSet)
+	result := rulesetProcessor.ImportRuleSet(ruleSet)
+	configResponse.Streams = result.Streams
+	configResponse.Tables = result.Tables
+	configResponse.Rules = result.Rules
+
 	if !reboot {
 		infra.SafeRun(func() error {
 			for name := range ruleSet.Rules {
@@ -780,7 +797,7 @@ func configurationImport(data []byte, reboot bool) error {
 		})
 	}
 
-	return nil
+	return configResponse
 }
 
 func configurationPartialImport(data []byte) Configuration {
@@ -854,6 +871,8 @@ type configurationInfo struct {
 func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
 	cb := r.URL.Query().Get("stop")
 	stop := cb == "1"
+	par := r.URL.Query().Get("partial")
+	partial := par == "1"
 	rsi := &configurationInfo{}
 	err := json.NewDecoder(r.Body).Decode(rsi)
 	if err != nil {
@@ -883,56 +902,59 @@ func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
 		}
 		content = buf.Bytes()
 	}
-	configurationReset()
-	err = configurationImport(content, stop)
-	if err != nil {
-		handleError(w, err, "Import configuration error", logger)
-		return
-	}
-	if stop {
-		go func() {
-			time.Sleep(1 * time.Second)
-			os.Exit(100)
-		}()
+	if !partial {
+		configurationReset()
+		result := configurationImport(content, stop)
+		jsonResponse(result, w, logger)
+
+		if stop {
+			go func() {
+				time.Sleep(1 * time.Second)
+				os.Exit(100)
+			}()
+		}
+	} else {
+		result := configurationPartialImport(content)
+		jsonResponse(result, w, logger)
 	}
 
 	w.WriteHeader(http.StatusOK)
 }
 
-func configurationPartialImportHandler(w http.ResponseWriter, r *http.Request) {
-	rsi := &configurationInfo{}
-	err := json.NewDecoder(r.Body).Decode(rsi)
-	if err != nil {
-		handleError(w, err, "Invalid body: Error decoding json", logger)
-		return
-	}
-	if rsi.Content != "" && rsi.FilePath != "" {
-		handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
-		return
-	} else if rsi.Content == "" && rsi.FilePath == "" {
-		handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
-		return
-	}
-	content := []byte(rsi.Content)
-	if rsi.FilePath != "" {
-		reader, err := httpx.ReadFile(rsi.FilePath)
-		if err != nil {
-			handleError(w, err, "Fail to read file", logger)
-			return
-		}
-		defer reader.Close()
-		buf := new(bytes.Buffer)
-		_, err = io.Copy(buf, reader)
-		if err != nil {
-			handleError(w, err, "fail to convert file", logger)
-			return
-		}
-		content = buf.Bytes()
-	}
-	result := configurationPartialImport(content)
-
-	jsonResponse(result, w, logger)
-}
+//func configurationPartialImportHandler(w http.ResponseWriter, r *http.Request) {
+//	rsi := &configurationInfo{}
+//	err := json.NewDecoder(r.Body).Decode(rsi)
+//	if err != nil {
+//		handleError(w, err, "Invalid body: Error decoding json", logger)
+//		return
+//	}
+//	if rsi.Content != "" && rsi.FilePath != "" {
+//		handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
+//		return
+//	} else if rsi.Content == "" && rsi.FilePath == "" {
+//		handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
+//		return
+//	}
+//	content := []byte(rsi.Content)
+//	if rsi.FilePath != "" {
+//		reader, err := httpx.ReadFile(rsi.FilePath)
+//		if err != nil {
+//			handleError(w, err, "Fail to read file", logger)
+//			return
+//		}
+//		defer reader.Close()
+//		buf := new(bytes.Buffer)
+//		_, err = io.Copy(buf, reader)
+//		if err != nil {
+//			handleError(w, err, "fail to convert file", logger)
+//			return
+//		}
+//		content = buf.Bytes()
+//	}
+//	result := configurationPartialImport(content)
+//
+//	jsonResponse(result, w, logger)
+//}
 
 func configurationStatusExport() Configuration {
 	conf := Configuration{

+ 4 - 4
internal/server/rpc.go

@@ -312,10 +312,10 @@ func (t *Server) ImportConfiguration(arg *model.ImportDataDesc, reply *string) e
 	content := buf.Bytes()
 
 	configurationReset()
-	err = configurationImport(content, arg.Stop)
-	if err != nil {
-		return fmt.Errorf("import configuration error: %v", err)
-	}
+	_ = configurationImport(content, arg.Stop)
+	//if err != nil {
+	//	return fmt.Errorf("import configuration error: %v", err)
+	//}
 	*reply = fmt.Sprintf("import configuration success")
 	return nil
 }

+ 68 - 243
internal/processor/rule_migration.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package processor
+package server
 
 import (
 	"encoding/json"
@@ -23,7 +23,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/meta"
 	store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/plugin"
-	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/topo/graph"
 	"github.com/lf-edge/ekuiper/internal/topo/node/conf"
 	"github.com/lf-edge/ekuiper/internal/xsql"
@@ -34,38 +34,38 @@ import (
 )
 
 type RuleMigrationProcessor struct {
-	r *RuleProcessor
-	s *StreamProcessor
+	r *processor.RuleProcessor
+	s *processor.StreamProcessor
 }
 
-func NewRuleMigrationProcessor(r *RuleProcessor, s *StreamProcessor) *RuleMigrationProcessor {
+func NewRuleMigrationProcessor(r *processor.RuleProcessor, s *processor.StreamProcessor) *RuleMigrationProcessor {
 	return &RuleMigrationProcessor{
 		r: r,
 		s: s,
 	}
 }
 
-func NewDependencies() *Dependencies {
-	return &Dependencies{
-		SourceConfigKeys: map[string][]string{},
-		SinkConfigKeys:   map[string][]string{},
+func newDependencies() *dependencies {
+	return &dependencies{
+		sourceConfigKeys: map[string][]string{},
+		sinkConfigKeys:   map[string][]string{},
 	}
 }
 
-// Dependencies copy all connections related configs by hardcode
-type Dependencies struct {
-	Rules            []string
-	Streams          []string
-	Tables           []string
-	Sources          []string
-	Sinks            []string
-	SourceConfigKeys map[string][]string
-	SinkConfigKeys   map[string][]string
-	Functions        []string
-	Schemas          []string
+// dependencies copy all connections related configs by hardcode
+type dependencies struct {
+	rules            []string
+	streams          []string
+	tables           []string
+	sources          []string
+	sinks            []string
+	sourceConfigKeys map[string][]string
+	sinkConfigKeys   map[string][]string
+	functions        []string
+	schemas          []string
 }
 
-func ruleTraverse(rule *api.Rule, de *Dependencies) {
+func ruleTraverse(rule *api.Rule, de *dependencies) {
 	sql := rule.Sql
 	if sql != "" {
 		stmt, err := xsql.GetStatementFromSql(sql)
@@ -85,44 +85,44 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 			}
 			if streamStmt.StreamType == ast.TypeStream {
 				//get streams
-				de.Streams = append(de.Streams, string(streamStmt.Name))
+				de.streams = append(de.streams, string(streamStmt.Name))
 			} else if streamStmt.StreamType == ast.TypeTable {
 				//get tables
-				de.Tables = append(de.Tables, string(streamStmt.Name))
+				de.tables = append(de.tables, string(streamStmt.Name))
 			}
 
 			//get source type
-			de.Sources = append(de.Sources, streamStmt.Options.TYPE)
+			de.sources = append(de.sources, streamStmt.Options.TYPE)
 			//get config key
-			_, ok := de.SourceConfigKeys[streamStmt.Options.TYPE]
+			_, ok := de.sourceConfigKeys[streamStmt.Options.TYPE]
 			if ok {
-				de.SourceConfigKeys[streamStmt.Options.TYPE] = append(de.SourceConfigKeys[streamStmt.Options.TYPE], streamStmt.Options.CONF_KEY)
+				de.sourceConfigKeys[streamStmt.Options.TYPE] = append(de.sourceConfigKeys[streamStmt.Options.TYPE], streamStmt.Options.CONF_KEY)
 			} else {
 				var confKeys []string
 				confKeys = append(confKeys, streamStmt.Options.CONF_KEY)
-				de.SourceConfigKeys[streamStmt.Options.TYPE] = confKeys
+				de.sourceConfigKeys[streamStmt.Options.TYPE] = confKeys
 			}
 
 			//get schema id
 			if streamStmt.Options.SCHEMAID != "" {
 				r := strings.Split(streamStmt.Options.SCHEMAID, ".")
-				de.Schemas = append(de.Schemas, streamStmt.Options.FORMAT+"_"+r[0])
+				de.schemas = append(de.schemas, streamStmt.Options.FORMAT+"_"+r[0])
 			}
 		}
 		//actions
 		for _, m := range rule.Actions {
 			for name, action := range m {
 				props, _ := action.(map[string]interface{})
-				de.Sinks = append(de.Sinks, name)
+				de.sinks = append(de.sinks, name)
 				resourceId, ok := props[conf.ResourceID].(string)
 				if ok {
-					_, ok := de.SinkConfigKeys[name]
+					_, ok := de.sinkConfigKeys[name]
 					if ok {
-						de.SinkConfigKeys[name] = append(de.SinkConfigKeys[name], resourceId)
+						de.sinkConfigKeys[name] = append(de.sinkConfigKeys[name], resourceId)
 					} else {
 						var confKeys []string
 						confKeys = append(confKeys, resourceId)
-						de.SinkConfigKeys[name] = confKeys
+						de.sinkConfigKeys[name] = confKeys
 					}
 				}
 
@@ -131,7 +131,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 					schemaId, ok := props["schemaId"].(string)
 					if ok {
 						r := strings.Split(schemaId, ".")
-						de.Schemas = append(de.Schemas, format+"_"+r[0])
+						de.schemas = append(de.schemas, format+"_"+r[0])
 					}
 				}
 			}
@@ -140,13 +140,13 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 		ast.WalkFunc(stmt, func(n ast.Node) bool {
 			switch f := n.(type) {
 			case *ast.Call:
-				de.Functions = append(de.Functions, f.Name)
+				de.functions = append(de.functions, f.Name)
 			}
 			return true
 		})
 
 		//Rules
-		de.Rules = append(de.Rules, rule.Id)
+		de.rules = append(de.rules, rule.Id)
 	}
 
 	ruleGraph := rule.Graph
@@ -161,34 +161,34 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 				}
 				sourceOption.TYPE = gn.NodeType
 
-				de.Sources = append(de.Sources, sourceOption.TYPE)
+				de.sources = append(de.sources, sourceOption.TYPE)
 				//get config key
-				_, ok := de.SourceConfigKeys[sourceOption.TYPE]
+				_, ok := de.sourceConfigKeys[sourceOption.TYPE]
 				if ok {
-					de.SourceConfigKeys[sourceOption.TYPE] = append(de.SourceConfigKeys[sourceOption.TYPE], sourceOption.CONF_KEY)
+					de.sourceConfigKeys[sourceOption.TYPE] = append(de.sourceConfigKeys[sourceOption.TYPE], sourceOption.CONF_KEY)
 				} else {
 					var confKeys []string
 					confKeys = append(confKeys, sourceOption.CONF_KEY)
-					de.SourceConfigKeys[sourceOption.TYPE] = confKeys
+					de.sourceConfigKeys[sourceOption.TYPE] = confKeys
 				}
 				//get schema id
 				if sourceOption.SCHEMAID != "" {
 					r := strings.Split(sourceOption.SCHEMAID, ".")
-					de.Schemas = append(de.Schemas, sourceOption.FORMAT+"_"+r[0])
+					de.schemas = append(de.schemas, sourceOption.FORMAT+"_"+r[0])
 				}
 			case "sink":
 				sinkType := gn.NodeType
 				props := gn.Props
-				de.Sinks = append(de.Sinks, sinkType)
+				de.sinks = append(de.sinks, sinkType)
 				resourceId, ok := props[conf.ResourceID].(string)
 				if ok {
-					_, ok := de.SinkConfigKeys[sinkType]
+					_, ok := de.sinkConfigKeys[sinkType]
 					if ok {
-						de.SinkConfigKeys[sinkType] = append(de.SinkConfigKeys[sinkType], resourceId)
+						de.sinkConfigKeys[sinkType] = append(de.sinkConfigKeys[sinkType], resourceId)
 					} else {
 						var confKeys []string
 						confKeys = append(confKeys, resourceId)
-						de.SinkConfigKeys[sinkType] = confKeys
+						de.sinkConfigKeys[sinkType] = confKeys
 					}
 				}
 
@@ -197,7 +197,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 					schemaId, ok := props["schemaId"].(string)
 					if ok {
 						r := strings.Split(schemaId, ".")
-						de.Schemas = append(de.Schemas, format+"_"+r[0])
+						de.schemas = append(de.schemas, format+"_"+r[0])
 					}
 				}
 			case "operator":
@@ -211,7 +211,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 					ast.WalkFunc(fop, func(n ast.Node) bool {
 						switch f := n.(type) {
 						case *ast.Call:
-							de.Functions = append(de.Functions, f.Name)
+							de.functions = append(de.functions, f.Name)
 						}
 						return true
 					})
@@ -223,7 +223,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 					ast.WalkFunc(fop, func(n ast.Node) bool {
 						switch f := n.(type) {
 						case *ast.Call:
-							de.Functions = append(de.Functions, f.Name)
+							de.functions = append(de.functions, f.Name)
 						}
 						return true
 					})
@@ -235,7 +235,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 					ast.WalkFunc(fop, func(n ast.Node) bool {
 						switch f := n.(type) {
 						case *ast.Call:
-							de.Functions = append(de.Functions, f.Name)
+							de.functions = append(de.functions, f.Name)
 						}
 						return true
 					})
@@ -247,7 +247,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 					ast.WalkFunc(pop, func(n ast.Node) bool {
 						switch f := n.(type) {
 						case *ast.Call:
-							de.Functions = append(de.Functions, f.Name)
+							de.functions = append(de.functions, f.Name)
 						}
 						return true
 					})
@@ -259,7 +259,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 					ast.WalkFunc(jop, func(n ast.Node) bool {
 						switch f := n.(type) {
 						case *ast.Call:
-							de.Functions = append(de.Functions, f.Name)
+							de.functions = append(de.functions, f.Name)
 						}
 						return true
 					})
@@ -271,7 +271,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 					ast.WalkFunc(gop, func(n ast.Node) bool {
 						switch f := n.(type) {
 						case *ast.Call:
-							de.Functions = append(de.Functions, f.Name)
+							de.functions = append(de.functions, f.Name)
 						}
 						return true
 					})
@@ -283,7 +283,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 					ast.WalkFunc(oop, func(n ast.Node) bool {
 						switch f := n.(type) {
 						case *ast.Call:
-							de.Functions = append(de.Functions, f.Name)
+							de.functions = append(de.functions, f.Name)
 						}
 						return true
 					})
@@ -296,7 +296,7 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 						ast.WalkFunc(op, func(n ast.Node) bool {
 							switch f := n.(type) {
 							case *ast.Call:
-								de.Functions = append(de.Functions, f.Name)
+								de.functions = append(de.functions, f.Name)
 							}
 							return true
 						})
@@ -309,21 +309,8 @@ func ruleTraverse(rule *api.Rule, de *Dependencies) {
 	}
 }
 
-type Configuration struct {
-	Streams          map[string]string `json:"streams"`
-	Tables           map[string]string `json:"tables"`
-	Rules            map[string]string `json:"rules"`
-	NativePlugins    map[string]string `json:"nativePlugins"`
-	PortablePlugins  map[string]string `json:"portablePlugins"`
-	SourceConfig     map[string]string `json:"sourceConfig"`
-	SinkConfig       map[string]string `json:"sinkConfig"`
-	ConnectionConfig map[string]string `json:"connectionConfig"`
-	Service          map[string]string `json:"Service"`
-	Schema           map[string]string `json:"Schema"`
-}
-
 func (p *RuleMigrationProcessor) ConfigurationPartialExport(rules []string) ([]byte, error) {
-	conf := &Configuration{
+	config := &Configuration{
 		Streams:          make(map[string]string),
 		Tables:           make(map[string]string),
 		Rules:            make(map[string]string),
@@ -335,9 +322,9 @@ func (p *RuleMigrationProcessor) ConfigurationPartialExport(rules []string) ([]b
 		Service:          make(map[string]string),
 		Schema:           make(map[string]string),
 	}
-	conf.Rules = p.exportRules(rules)
+	config.Rules = p.exportRules(rules)
 
-	de := NewDependencies()
+	de := newDependencies()
 	for _, v := range rules {
 		rule, _ := p.r.GetRuleById(v)
 		if rule != nil {
@@ -345,9 +332,9 @@ func (p *RuleMigrationProcessor) ConfigurationPartialExport(rules []string) ([]b
 		}
 	}
 
-	p.exportSelected(de, conf)
+	p.exportSelected(de, config)
 
-	return json.Marshal(conf)
+	return json.Marshal(config)
 }
 
 func (p *RuleMigrationProcessor) exportRules(rules []string) map[string]string {
@@ -380,12 +367,12 @@ func (p *RuleMigrationProcessor) exportTables(tables []string) map[string]string
 	return tableSet
 }
 
-func (p *RuleMigrationProcessor) exportSelected(de *Dependencies, config *Configuration) {
+func (p *RuleMigrationProcessor) exportSelected(de *dependencies, config *Configuration) {
 	//get the stream and table
-	config.Streams = p.exportStreams(de.Streams)
-	config.Tables = p.exportTables(de.Tables)
+	config.Streams = p.exportStreams(de.streams)
+	config.Tables = p.exportTables(de.tables)
 	//get the sources
-	for _, v := range de.Sources {
+	for _, v := range de.sources {
 		t, srcName, srcInfo := io.GetSourcePlugin(v)
 		if t == plugin.NATIVE_EXTENSION {
 			config.NativePlugins[srcName] = srcInfo
@@ -395,7 +382,7 @@ func (p *RuleMigrationProcessor) exportSelected(de *Dependencies, config *Config
 		}
 	}
 	// get sinks
-	for _, v := range de.Sinks {
+	for _, v := range de.sinks {
 		t, sinkName, sinkInfo := io.GetSinkPlugin(v)
 		if t == plugin.NATIVE_EXTENSION {
 			config.NativePlugins[sinkName] = sinkInfo
@@ -406,7 +393,7 @@ func (p *RuleMigrationProcessor) exportSelected(de *Dependencies, config *Config
 	}
 
 	// get functions
-	for _, v := range de.Functions {
+	for _, v := range de.functions {
 		t, svcName, svcInfo := function.GetFunctionPlugin(v)
 		if t == plugin.NATIVE_EXTENSION {
 			config.NativePlugins[svcName] = svcInfo
@@ -421,16 +408,16 @@ func (p *RuleMigrationProcessor) exportSelected(de *Dependencies, config *Config
 
 	// get sourceCfg/sinkCfg
 	configKeys := meta.YamlConfigurationKeys{}
-	configKeys.Sources = de.SourceConfigKeys
-	configKeys.Sinks = de.SinkConfigKeys
+	configKeys.Sources = de.sourceConfigKeys
+	configKeys.Sinks = de.sinkConfigKeys
 	configSet := meta.GetConfigurationsFor(configKeys)
 	config.SourceConfig = configSet.Sources
 	config.SinkConfig = configSet.Sinks
 	config.ConnectionConfig = configSet.Connections
 
 	//get schema
-	for _, v := range de.Schemas {
-		schName, schInfo := schema.GetSchemaInstallScript(v)
+	for _, v := range de.schemas {
+		schName, schInfo := getSchemaInstallScript(v)
 		config.Schema[schName] = schInfo
 	}
 }
@@ -581,165 +568,3 @@ func parseJoin(props map[string]interface{}) (*ast.SelectStatement, error) {
 	}
 
 }
-
-// PlanByGraph returns a topo.Topo object by a graph
-func PlanByGraph(rule *api.Rule) {
-	var de *Dependencies = nil
-	ruleGraph := rule.Graph
-
-	for _, gn := range ruleGraph.Nodes {
-		switch gn.Type {
-		case "source":
-			sourceOption := &ast.Options{}
-			err := cast.MapToStruct(gn.Props, sourceOption)
-			if err != nil {
-				break
-			}
-			sourceOption.TYPE = gn.NodeType
-
-			de.Sources = append(de.Sources, sourceOption.TYPE)
-			//get config key
-			_, ok := de.SourceConfigKeys[sourceOption.TYPE]
-			if ok {
-				de.SourceConfigKeys[sourceOption.TYPE] = append(de.SourceConfigKeys[sourceOption.TYPE], sourceOption.CONF_KEY)
-			} else {
-				var confKeys []string
-				confKeys = append(confKeys, sourceOption.CONF_KEY)
-				de.SourceConfigKeys[sourceOption.TYPE] = confKeys
-			}
-			//get schema id
-			if sourceOption.SCHEMAID != "" {
-				r := strings.Split(sourceOption.SCHEMAID, ".")
-				de.Schemas = append(de.Schemas, sourceOption.FORMAT+"_"+r[0])
-			}
-		case "sink":
-			sinkType := gn.NodeType
-			props := gn.Props
-			de.Sinks = append(de.Sinks, sinkType)
-			resourceId, ok := props[conf.ResourceID].(string)
-			if ok {
-				_, ok := de.SinkConfigKeys[sinkType]
-				if ok {
-					de.SinkConfigKeys[sinkType] = append(de.SinkConfigKeys[sinkType], resourceId)
-				} else {
-					var confKeys []string
-					confKeys = append(confKeys, resourceId)
-					de.SinkConfigKeys[sinkType] = confKeys
-				}
-			}
-
-			format, ok := props["format"].(string)
-			if ok && format != "json" {
-				schemaId, ok := props["schemaId"].(string)
-				if ok {
-					r := strings.Split(schemaId, ".")
-					de.Schemas = append(de.Schemas, format+"_"+r[0])
-				}
-			}
-		case "operator":
-			nt := strings.ToLower(gn.NodeType)
-			switch nt {
-			case "function":
-				fop, err := parseFunc(gn.Props)
-				if err != nil {
-					break
-				}
-				ast.WalkFunc(fop, func(n ast.Node) bool {
-					switch f := n.(type) {
-					case *ast.Call:
-						de.Functions = append(de.Functions, f.Name)
-					}
-					return true
-				})
-			case "aggfunc":
-				fop, err := parseFunc(gn.Props)
-				if err != nil {
-					break
-				}
-				ast.WalkFunc(fop, func(n ast.Node) bool {
-					switch f := n.(type) {
-					case *ast.Call:
-						de.Functions = append(de.Functions, f.Name)
-					}
-					return true
-				})
-			case "filter":
-				fop, err := parseFilter(gn.Props)
-				if err != nil {
-					break
-				}
-				ast.WalkFunc(fop, func(n ast.Node) bool {
-					switch f := n.(type) {
-					case *ast.Call:
-						de.Functions = append(de.Functions, f.Name)
-					}
-					return true
-				})
-			case "pick":
-				pop, err := parsePick(gn.Props)
-				if err != nil {
-					break
-				}
-				ast.WalkFunc(pop, func(n ast.Node) bool {
-					switch f := n.(type) {
-					case *ast.Call:
-						de.Functions = append(de.Functions, f.Name)
-					}
-					return true
-				})
-			case "join":
-				jop, err := parseJoin(gn.Props)
-				if err != nil {
-					break
-				}
-				ast.WalkFunc(jop, func(n ast.Node) bool {
-					switch f := n.(type) {
-					case *ast.Call:
-						de.Functions = append(de.Functions, f.Name)
-					}
-					return true
-				})
-			case "groupby":
-				gop, err := parseGroupBy(gn.Props)
-				if err != nil {
-					break
-				}
-				ast.WalkFunc(gop, func(n ast.Node) bool {
-					switch f := n.(type) {
-					case *ast.Call:
-						de.Functions = append(de.Functions, f.Name)
-					}
-					return true
-				})
-			case "orderby":
-				oop, err := parseOrderBy(gn.Props)
-				if err != nil {
-					break
-				}
-				ast.WalkFunc(oop, func(n ast.Node) bool {
-					switch f := n.(type) {
-					case *ast.Call:
-						de.Functions = append(de.Functions, f.Name)
-					}
-					return true
-				})
-			case "switch":
-				opArray, err := parseSwitch(gn.Props)
-				if err != nil {
-					break
-				}
-				for _, op := range opArray {
-					ast.WalkFunc(op, func(n ast.Node) bool {
-						switch f := n.(type) {
-						case *ast.Call:
-							de.Functions = append(de.Functions, f.Name)
-						}
-						return true
-					})
-				}
-			}
-		default:
-			break
-		}
-	}
-}

+ 4 - 0
internal/server/schema_init.go

@@ -147,3 +147,7 @@ func schemaImport(s map[string]string) error {
 func schemaPartialImport(s map[string]string) map[string]string {
 	return schema.SchemaPartialImport(s)
 }
+
+func getSchemaInstallScript(s string) (string, string) {
+	return schema.GetSchemaInstallScript(s)
+}

+ 2 - 2
internal/server/server.go

@@ -44,7 +44,7 @@ var (
 	ruleProcessor          *processor.RuleProcessor
 	streamProcessor        *processor.StreamProcessor
 	rulesetProcessor       *processor.RulesetProcessor
-	ruleMigrationProcessor *processor.RuleMigrationProcessor
+	ruleMigrationProcessor *RuleMigrationProcessor
 )
 
 // Create path if mount an empty dir. For edgeX, all the folders must be created priorly
@@ -95,7 +95,7 @@ func StartUp(Version, LoadFileType string) {
 	ruleProcessor = processor.NewRuleProcessor()
 	streamProcessor = processor.NewStreamProcessor()
 	rulesetProcessor = processor.NewRulesetProcessor(ruleProcessor, streamProcessor)
-	ruleMigrationProcessor = processor.NewRuleMigrationProcessor(ruleProcessor, streamProcessor)
+	ruleMigrationProcessor = NewRuleMigrationProcessor(ruleProcessor, streamProcessor)
 
 	// register all extensions
 	for k, v := range components {

+ 2 - 2
internal/server/service_init.go

@@ -151,8 +151,8 @@ func serviceStatusExport() map[string]string {
 	return serviceManager.GetAllServicesStatus()
 }
 
-func serviceImport(services map[string]string) {
-	serviceManager.ImportServices(services)
+func serviceImport(services map[string]string) map[string]string {
+	return serviceManager.ImportServices(services)
 }
 
 func servicePartialImport(services map[string]string) map[string]string {

+ 4 - 1
internal/service/manager.go

@@ -489,14 +489,17 @@ func (m *Manager) servicesRegisterForImport(_, v string) error {
 	return nil
 }
 
-func (m *Manager) ImportServices(services map[string]string) {
+func (m *Manager) ImportServices(services map[string]string) map[string]string {
+	errMap := map[string]string{}
 	_ = m.serviceStatusInstallKV.Clean()
 	for k, v := range services {
 		err := m.servicesRegisterForImport(k, v)
 		if err != nil {
 			_ = m.serviceStatusInstallKV.Set(k, err.Error())
+			errMap[k] = err.Error()
 		}
 	}
+	return errMap
 }
 
 func (m *Manager) ImportPartialServices(services map[string]string) map[string]string {