Преглед изворни кода

fea(config): Full data export import (#1573)

* fix(config): initial commit for rules/streams/config reset

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(config): add reset rest api

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(config): add batch rules/streams rest api

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(config): add export/import configuration interface

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(config): add restart method

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Signed-off-by: Jianxiang Ran <jianxiang.ran@emqx.io>

* fix(config): add service/schema reset import export

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(config): add restart support for plugin/schema

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(config): add rest api to get import status

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(config): fix meta UT error

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(test): schema test now needs store

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* fix(config): rename config export/import to data

Signed-off-by: Jianxiang Ran <jianxiang.ran@emqx.io>

* fix(config): add get import status rpc call

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(config): fix stream/rule export error

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Signed-off-by: Jianxiang Ran <jianxiang.ran@emqx.io>
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Co-authored-by: Jiyong Huang <huangjy@emqx.io>
Co-authored-by: Jianxiang Ran <jianxiang.ran@emqx.io>
superxan пре 2 година
родитељ
комит
119d49f22b

+ 76 - 3
cmd/kuiper/main.go

@@ -769,7 +769,7 @@ func main() {
 		{
 			Name:    "getstatus",
 			Aliases: []string{"getstatus"},
-			Usage:   "getstatus rule $rule_name",
+			Usage:   "getstatus rule $rule_name | import",
 			Subcommands: []cli.Command{
 				{
 					Name:  "rule",
@@ -791,6 +791,21 @@ func main() {
 						return nil
 					},
 				},
+				{
+					Name:  "import",
+					Usage: "getstatus import",
+					//Flags: nflag,
+					Action: func(c *cli.Context) error {
+						var reply string
+						err = client.Call("Server.GetStatusImport", 0, &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 		},
 		{
@@ -966,7 +981,7 @@ func main() {
 		{
 			Name:    "import",
 			Aliases: []string{"import"},
-			Usage:   "import ruleset -f ruleset_file",
+			Usage:   "import ruleset | data -f file",
 			Subcommands: []cli.Command{
 				{
 					Name:  "ruleset",
@@ -994,12 +1009,52 @@ func main() {
 						return nil
 					},
 				},
+				{
+					Name:  "data",
+					Usage: "\"import data -f configuration_file -s stop",
+					Flags: []cli.Flag{
+						cli.StringFlag{
+							Name:     "file, f",
+							Usage:    "the location of the configuration json file",
+							FilePath: "/home/ekuiper_configuration.json",
+						},
+						cli.StringFlag{
+							Name:  "stop, s",
+							Usage: "stop kuiper after the action",
+						},
+					},
+					Action: func(c *cli.Context) error {
+						sfile := c.String("file")
+						if sfile == "" {
+							fmt.Print("Required configuration json file to import")
+							return nil
+						}
+						r := c.String("stop")
+						if r != "true" && r != "false" {
+							fmt.Printf("Expect s flag to be a boolean value.\n")
+							return nil
+						}
+						args := &model.ImportDataDesc{
+							FileName: sfile,
+							Stop:     r == "true",
+						}
+
+						var reply string
+						err = client.Call("Server.ImportConfiguration", args, &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 		},
 		{
 			Name:    "export",
 			Aliases: []string{"export"},
-			Usage:   "export ruleset $ruleset_file",
+			Usage:   "export ruleset | data $ruleset_file",
 			Subcommands: []cli.Command{
 				{
 					Name:  "ruleset",
@@ -1019,6 +1074,24 @@ func main() {
 						return nil
 					},
 				},
+				{
+					Name:  "data",
+					Usage: "\"export data $configuration_file",
+					Action: func(c *cli.Context) error {
+						if len(c.Args()) < 1 {
+							fmt.Printf("Require exported file name.\n")
+							return nil
+						}
+						var reply string
+						err = client.Call("Server.ExportConfiguration", c.Args()[0], &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 		},
 	}

+ 22 - 0
internal/conf/yaml_config_ops.go

@@ -31,6 +31,7 @@ type ConfKeysOperator interface {
 	CopyConfContent() map[string]map[string]interface{}
 	CopyReadOnlyConfContent() map[string]map[string]interface{}
 	CopyUpdatableConfContent() map[string]map[string]interface{}
+	LoadConfContent(cf map[string]map[string]interface{})
 	GetConfKeys() (keys []string)
 	GetReadOnlyConfKeys() (keys []string)
 	GetUpdatableConfKeys() (keys []string)
@@ -38,6 +39,7 @@ type ConfKeysOperator interface {
 	DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
 	AddConfKey(confKey string, reqField map[string]interface{}) error
 	AddConfKeyField(confKey string, reqField map[string]interface{}) error
+	ClearConfKeys()
 }
 
 //ConfigOperator define interface to query/add/update/delete the configs in disk
@@ -108,6 +110,19 @@ func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{} {
 	return cf
 }
 
+func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{}) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	for key, kvs := range cf {
+		aux := make(map[string]interface{})
+		for k, v := range kvs {
+			aux[k] = v
+		}
+		c.dataCfg[key] = aux
+	}
+}
+
 func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{} {
 	cf := make(map[string]map[string]interface{})
 	c.lock.RLock()
@@ -177,6 +192,13 @@ func (c *ConfigKeys) DeleteConfKey(confKey string) {
 	delete(c.dataCfg, confKey)
 }
 
+func (c *ConfigKeys) ClearConfKeys() {
+	keys := c.GetUpdatableConfKeys()
+	for _, key := range keys {
+		c.DeleteConfKey(key)
+	}
+}
+
 func recursionDelMap(cf, fields map[string]interface{}) error {
 	for k, v := range fields {
 		if nil == v {

+ 4 - 0
internal/converter/custom/converter_test.go

@@ -26,6 +26,10 @@ import (
 	"testing"
 )
 
+func init() {
+	testx.InitEnv()
+}
+
 func TestCustomConverter(t *testing.T) {
 	dataDir, err := conf.GetDataLoc()
 	if err != nil {

+ 234 - 6
internal/meta/yamlConfigMeta.go

@@ -18,21 +18,33 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/store"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"strings"
 	"sync"
 )
 
 type configManager struct {
-	lock         sync.RWMutex
-	cfgOperators map[string]conf.ConfigOperator
+	lock                     sync.RWMutex
+	cfgOperators             map[string]conf.ConfigOperator
+	sourceConfigStatusDb     kv.KeyValue
+	sinkConfigStatusDb       kv.KeyValue
+	connectionConfigStatusDb kv.KeyValue
 }
 
 //ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
 // for configs in etc/sources/xxx.yaml, the map key is sources.xxx format, xxx will be mqtt/httppull and so on
 // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
-var ConfigManager = configManager{
-	lock:         sync.RWMutex{},
-	cfgOperators: make(map[string]conf.ConfigOperator),
+var ConfigManager *configManager
+
+func InitYamlConfigManager() {
+	ConfigManager = &configManager{
+		lock:         sync.RWMutex{},
+		cfgOperators: make(map[string]conf.ConfigOperator),
+	}
+	_, ConfigManager.sourceConfigStatusDb = store.GetKV("sourceConfigStatus")
+	_, ConfigManager.sinkConfigStatusDb = store.GetKV("sinkConfigStatus")
+	_, ConfigManager.connectionConfigStatusDb = store.GetKV("connectionConfigStatus")
 }
 
 const SourceCfgOperatorKeyTemplate = "sources.%s"
@@ -129,7 +141,6 @@ func delYamlConf(configOperatorKey string) {
 }
 
 func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
-
 	ConfigManager.lock.RLock()
 	defer ConfigManager.lock.RUnlock()
 
@@ -146,6 +157,30 @@ func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
 	}
 }
 
+func addSourceConfKeys(plgName string, configurations YamlConfigurations) (err error) {
+	ConfigManager.lock.Lock()
+	defer ConfigManager.lock.Unlock()
+
+	configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
+
+	var cfgOps conf.ConfigOperator
+	var found bool
+
+	cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
+	if !found {
+		cfgOps = conf.NewConfigOperatorForSource(plgName)
+		ConfigManager.cfgOperators[configOperatorKey] = cfgOps
+	}
+
+	cfgOps.LoadConfContent(configurations)
+
+	err = cfgOps.SaveCfgToFile()
+	if err != nil {
+		return fmt.Errorf(`%s.%v`, configOperatorKey, err)
+	}
+	return nil
+}
+
 func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
 	ConfigManager.lock.Lock()
 	defer ConfigManager.lock.Unlock()
@@ -210,6 +245,30 @@ func AddSinkConfKey(plgName, confKey, language string, content []byte) error {
 	return nil
 }
 
+func addSinkConfKeys(plgName string, cf YamlConfigurations) error {
+	ConfigManager.lock.Lock()
+	defer ConfigManager.lock.Unlock()
+
+	configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
+
+	var cfgOps conf.ConfigOperator
+	var found bool
+
+	cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
+	if !found {
+		cfgOps = conf.NewConfigOperatorForSink(plgName)
+		ConfigManager.cfgOperators[configOperatorKey] = cfgOps
+	}
+
+	cfgOps.LoadConfContent(cf)
+
+	err := cfgOps.SaveCfgToFile()
+	if err != nil {
+		return fmt.Errorf(`%s.%v`, configOperatorKey, err)
+	}
+	return nil
+}
+
 func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
 	ConfigManager.lock.Lock()
 	defer ConfigManager.lock.Unlock()
@@ -242,6 +301,30 @@ func AddConnectionConfKey(plgName, confKey, language string, content []byte) err
 	return nil
 }
 
+func addConnectionConfKeys(plgName string, cf YamlConfigurations) error {
+	ConfigManager.lock.Lock()
+	defer ConfigManager.lock.Unlock()
+
+	configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
+
+	var cfgOps conf.ConfigOperator
+	var found bool
+
+	cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
+	if !found {
+		cfgOps = conf.NewConfigOperatorForConnection(plgName)
+		ConfigManager.cfgOperators[configOperatorKey] = cfgOps
+	}
+
+	cfgOps.LoadConfContent(cf)
+
+	err := cfgOps.SaveCfgToFile()
+	if err != nil {
+		return fmt.Errorf(`%s.%v`, configOperatorKey, err)
+	}
+	return nil
+}
+
 func GetResources(language string) (b []byte, err error) {
 	ConfigManager.lock.RLock()
 	defer ConfigManager.lock.RUnlock()
@@ -288,3 +371,148 @@ func GetResources(language string) (b []byte, err error) {
 		return b, err
 	}
 }
+
+func ResetConfigs() {
+	ConfigManager.lock.Lock()
+	defer ConfigManager.lock.Unlock()
+
+	for _, ops := range ConfigManager.cfgOperators {
+		ops.ClearConfKeys()
+		_ = ops.SaveCfgToFile()
+	}
+}
+
+type YamlConfigurations map[string]map[string]interface{}
+
+type YamlConfigurationSet struct {
+	Sources     map[string]string `json:"sources"`
+	Sinks       map[string]string `json:"sinks"`
+	Connections map[string]string `json:"connections"`
+}
+
+func GetConfigurations() YamlConfigurationSet {
+	ConfigManager.lock.RLock()
+	defer ConfigManager.lock.RUnlock()
+	result := YamlConfigurationSet{
+		Sources:     map[string]string{},
+		Sinks:       map[string]string{},
+		Connections: map[string]string{},
+	}
+	srcResources := map[string]string{}
+	sinkResources := map[string]string{}
+	connectionResources := map[string]string{}
+
+	for key, ops := range ConfigManager.cfgOperators {
+		if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
+			plugin := strings.TrimPrefix(key, ConnectionCfgOperatorKeyPrefix)
+			cfs := ops.CopyUpdatableConfContent()
+			if len(cfs) > 0 {
+				jsonByte, _ := json.Marshal(cfs)
+				connectionResources[plugin] = string(jsonByte)
+			}
+			continue
+		}
+		if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
+			plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
+			cfs := ops.CopyUpdatableConfContent()
+			if len(cfs) > 0 {
+				jsonByte, _ := json.Marshal(cfs)
+				srcResources[plugin] = string(jsonByte)
+			}
+			continue
+		}
+		if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
+			plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
+			cfs := ops.CopyUpdatableConfContent()
+			if len(cfs) > 0 {
+				jsonByte, _ := json.Marshal(cfs)
+				sinkResources[plugin] = string(jsonByte)
+			}
+			continue
+		}
+	}
+
+	result.Sources = srcResources
+	result.Sinks = sinkResources
+	result.Connections = connectionResources
+
+	return result
+}
+
+func GetConfigurationStatus() YamlConfigurationSet {
+	result := YamlConfigurationSet{
+		Sources:     map[string]string{},
+		Sinks:       map[string]string{},
+		Connections: map[string]string{},
+	}
+
+	all, err := ConfigManager.sourceConfigStatusDb.All()
+	if err == nil {
+		result.Sources = all
+	}
+
+	all, err = ConfigManager.sinkConfigStatusDb.All()
+	if err == nil {
+		result.Sinks = all
+	}
+
+	all, err = ConfigManager.connectionConfigStatusDb.All()
+	if err == nil {
+		result.Connections = all
+	}
+
+	return result
+}
+
+func LoadConfigurations(configSets YamlConfigurationSet) {
+
+	var srcResources = configSets.Sources
+	var sinkResources = configSets.Sinks
+	var connectionResources = configSets.Connections
+
+	_ = ConfigManager.sourceConfigStatusDb.Clean()
+	_ = ConfigManager.sinkConfigStatusDb.Clean()
+	_ = ConfigManager.connectionConfigStatusDb.Clean()
+
+	for key, val := range srcResources {
+		configs := YamlConfigurations{}
+		err := json.Unmarshal([]byte(val), &configs)
+		if err != nil {
+			_ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
+			continue
+		}
+		err = addSourceConfKeys(key, configs)
+		if err != nil {
+			_ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
+			continue
+		}
+	}
+
+	for key, val := range sinkResources {
+		configs := YamlConfigurations{}
+		err := json.Unmarshal([]byte(val), &configs)
+		if err != nil {
+			_ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
+			continue
+		}
+		err = addSinkConfKeys(key, configs)
+		if err != nil {
+			_ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
+			continue
+		}
+	}
+
+	for key, val := range connectionResources {
+		configs := YamlConfigurations{}
+		err := json.Unmarshal([]byte(val), &configs)
+		if err != nil {
+			_ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
+			continue
+		}
+		err = addConnectionConfKeys(key, configs)
+		if err != nil {
+			_ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
+			continue
+		}
+	}
+}

+ 4 - 0
internal/meta/yamlConfigMeta_test.go

@@ -21,6 +21,10 @@ import (
 	"testing"
 )
 
+func init() {
+	InitYamlConfigManager()
+}
+
 func createPaths() {
 	dataDir, err := conf.GetDataLoc()
 	if err != nil {

+ 5 - 0
internal/pkg/model/data.go

@@ -27,3 +27,8 @@ type PluginDesc struct {
 	Type int
 	Stop bool
 }
+
+type ImportDataDesc struct {
+	FileName string
+	Stop     bool
+}

+ 155 - 33
internal/plugin/native/manager.go

@@ -19,6 +19,7 @@ package native
 import (
 	"archive/zip"
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"os"
 	"os/exec"
@@ -59,8 +60,12 @@ type Manager struct {
 	// dirs
 	pluginDir     string
 	pluginConfDir string
-	// the access to db
-	db kv.KeyValue
+	// the access to func symbols db
+	funcSymbolsDb kv.KeyValue
+	// the access to plugin install script db
+	plgInstallDb kv.KeyValue
+	// the access to plugin install status db
+	plgStatusDb kv.KeyValue
 }
 
 // InitManager must only be called once
@@ -73,30 +78,47 @@ func InitManager() (*Manager, error) {
 	if err != nil {
 		return nil, fmt.Errorf("cannot find data folder: %s", err)
 	}
-	err, db := store.GetKV("pluginFuncs")
+	err, func_db := store.GetKV("pluginFuncs")
 	if err != nil {
-		return nil, fmt.Errorf("error when opening db: %v", err)
+		return nil, fmt.Errorf("error when opening funcSymbolsdb: %v", err)
 	}
-	registry := &Manager{symbols: make(map[string]string), db: db, pluginDir: pluginDir, pluginConfDir: dataDir, runtime: make(map[string]*plugin.Plugin)}
+	err, plg_db := store.GetKV("nativePlugin")
+	if err != nil {
+		return nil, fmt.Errorf("error when opening nativePlugin: %v", err)
+	}
+	err, plg_status_db := store.GetKV("nativePluginStatus")
+	if err != nil {
+		return nil, fmt.Errorf("error when opening nativePluginStatus: %v", err)
+	}
+	registry := &Manager{symbols: make(map[string]string), funcSymbolsDb: func_db, plgInstallDb: plg_db, plgStatusDb: plg_status_db, pluginDir: pluginDir, pluginConfDir: dataDir, runtime: make(map[string]*plugin.Plugin)}
 	manager = registry
-	plugins := make([]map[string]string, 3)
-	for i := range plugin2.PluginTypes {
-		names, err := findAll(plugin2.PluginType(i), pluginDir)
-		if err != nil {
-			return nil, fmt.Errorf("fail to find existing plugins: %s", err)
+	if manager.hasInstallFlag() {
+		manager.plugins = make([]map[string]string, 3)
+		for i := range manager.plugins {
+			manager.plugins[i] = make(map[string]string)
 		}
-		plugins[i] = names
-	}
-	registry.plugins = plugins
+		manager.pluginInstallWhenReboot()
+		manager.clearInstallFlag()
+	} else {
+		plugins := make([]map[string]string, 3)
+		for i := range plugins {
+			names, err := findAll(plugin2.PluginType(i), pluginDir)
+			if err != nil {
+				return nil, fmt.Errorf("fail to find existing plugins: %s", err)
+			}
+			plugins[i] = names
+		}
+		registry.plugins = plugins
 
-	for pf := range plugins[plugin2.FUNCTION] {
-		l := make([]string, 0)
-		if ok, err := db.Get(pf, &l); ok {
-			registry.storeSymbols(pf, l)
-		} else if err != nil {
-			return nil, fmt.Errorf("error when querying kv: %s", err)
-		} else {
-			registry.storeSymbols(pf, []string{pf})
+		for pf := range plugins[plugin2.FUNCTION] {
+			l := make([]string, 0)
+			if ok, err := func_db.Get(pf, &l); ok {
+				registry.storeSymbols(pf, l)
+			} else if err != nil {
+				return nil, fmt.Errorf("error when querying kv: %s", err)
+			} else {
+				registry.storeSymbols(pf, []string{pf})
+			}
 		}
 	}
 	return registry, nil
@@ -220,6 +242,17 @@ func (rr *Manager) GetPluginBySymbol(t plugin2.PluginType, symbolName string) (s
 	}
 }
 
+func (rr *Manager) storePluginInstallScript(name string, t plugin2.PluginType, j plugin2.Plugin) {
+	key := plugin2.PluginTypes[t] + "_" + name
+	val := string(j.GetInstallScripts())
+	_ = rr.plgInstallDb.Set(key, val)
+}
+
+func (rr *Manager) removePluginInstallScript(name string, t plugin2.PluginType) {
+	key := plugin2.PluginTypes[t] + "_" + name
+	_ = rr.plgInstallDb.Delete(key)
+}
+
 func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
 	name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
 	//Validation
@@ -252,7 +285,7 @@ func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
 
 	if t == plugin2.FUNCTION {
 		if len(j.GetSymbols()) > 0 {
-			err = rr.db.Set(name, j.GetSymbols())
+			err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
 			if err != nil {
 				return err
 			}
@@ -268,7 +301,7 @@ func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
 	//unzip and copy to destination
 	version, err := rr.install(t, name, zipPath, shellParas)
 	if err == nil && len(j.GetSymbols()) > 0 {
-		err = rr.db.Set(name, j.GetSymbols())
+		err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
 	}
 	if err != nil { //Revert for any errors
 		if len(j.GetSymbols()) > 0 {
@@ -279,6 +312,7 @@ func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
 		return fmt.Errorf("fail to install plugin: %s", err)
 	}
 	rr.store(t, name, version)
+	rr.storePluginInstallScript(name, t, j)
 
 	switch t {
 	case plugin2.SINK:
@@ -309,14 +343,14 @@ func (rr *Manager) RegisterFuncs(name string, functions []string) error {
 		return fmt.Errorf("property 'functions' must not be empty")
 	}
 	old := make([]string, 0)
-	if ok, err := rr.db.Get(name, &old); err != nil {
+	if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
 		return err
 	} else if ok {
 		rr.removeSymbols(old)
 	} else if !ok {
 		rr.removeSymbols([]string{name})
 	}
-	err := rr.db.Set(name, functions)
+	err := rr.funcSymbolsDb.Set(name, functions)
 	if err != nil {
 		return err
 	}
@@ -360,11 +394,11 @@ func (rr *Manager) Delete(t plugin2.PluginType, name string, stop bool) error {
 		funcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.FUNCTION], name+".json")
 		_ = os.Remove(funcJsonPath)
 		old := make([]string, 0)
-		if ok, err := rr.db.Get(name, &old); err != nil {
+		if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
 			return err
 		} else if ok {
 			rr.removeSymbols(old)
-			err := rr.db.Delete(name)
+			err := rr.funcSymbolsDb.Delete(name)
 			if err != nil {
 				return err
 			}
@@ -384,6 +418,7 @@ func (rr *Manager) Delete(t plugin2.PluginType, name string, stop bool) error {
 			results = append(results, fmt.Sprintf("can't find %s", p))
 		}
 	}
+	rr.removePluginInstallScript(name, t)
 
 	if len(results) > 0 {
 		return fmt.Errorf(strings.Join(results, "\n"))
@@ -410,7 +445,7 @@ func (rr *Manager) GetPluginInfo(t plugin2.PluginType, name string) (map[string]
 		}
 		if t == plugin2.FUNCTION {
 			l := make([]string, 0)
-			if ok, _ := rr.db.Get(name, &l); ok {
+			if ok, _ := rr.funcSymbolsDb.Get(name, &l); ok {
 				r["functions"] = l
 			}
 			// ignore the error
@@ -500,13 +535,16 @@ func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []
 		return version, err
 	} else if haveInstallFile {
 		//run install script if there is
+		var shell = make([]string, len(shellParas))
+		copy(shell, shellParas)
 		spath := path.Join(tempPath, "install.sh")
-		shellParas = append(shellParas, spath)
-		if 1 != len(shellParas) {
-			copy(shellParas[1:], shellParas[0:])
-			shellParas[0] = spath
+		shell = append(shell, spath)
+		if 1 != len(shell) {
+			copy(shell[1:], shell[0:])
+			shell[0] = spath
 		}
-		cmd := exec.Command("/bin/sh", shellParas...)
+		conf.Log.Infof("run install script %s", strings.Join(shell, " "))
+		cmd := exec.Command("/bin/sh", shell...)
 		var outb, errb bytes.Buffer
 		cmd.Stdout = &outb
 		cmd.Stderr = &errb
@@ -728,3 +766,87 @@ func lcFirst(str string) string {
 	}
 	return ""
 }
+
+func (rr *Manager) UninstallAllPlugins() {
+	keys, err := rr.plgInstallDb.Keys()
+	if err != nil {
+		return
+	}
+	for _, v := range keys {
+		plgType := plugin2.PluginTypeMap[strings.Split(v, "_")[0]]
+		plgName := strings.Split(v, "_")[1]
+		_ = rr.Delete(plgType, plgName, false)
+	}
+}
+
+func (rr *Manager) GetAllPlugins() map[string]string {
+	allPlgs, err := rr.plgInstallDb.All()
+	if err != nil {
+		return nil
+	}
+	delete(allPlgs, BOOT_INSTALL)
+	return allPlgs
+}
+
+func (rr *Manager) GetAllPluginsStatus() map[string]string {
+	allPlgs, err := rr.plgStatusDb.All()
+	if err != nil {
+		return nil
+	}
+	return allPlgs
+}
+
+const BOOT_INSTALL = "$boot_install"
+
+func (rr *Manager) PluginImport(plugins map[string]string) error {
+	if len(plugins) == 0 {
+		return nil
+	}
+	for k, v := range plugins {
+		err := rr.plgInstallDb.Set(k, v)
+		if err != nil {
+			return err
+		}
+	}
+	//set the flag to install the plugins when eKuiper reboot
+	err := rr.plgInstallDb.Set(BOOT_INSTALL, BOOT_INSTALL)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (rr *Manager) hasInstallFlag() bool {
+	var val = ""
+	found, _ := rr.plgInstallDb.Get(BOOT_INSTALL, &val)
+	return found
+}
+
+func (rr *Manager) clearInstallFlag() {
+	_ = rr.plgInstallDb.Delete(BOOT_INSTALL)
+}
+
+func (rr *Manager) pluginInstallWhenReboot() {
+	allPlgs, err := rr.plgInstallDb.All()
+	if err != nil {
+		return
+	}
+
+	delete(allPlgs, BOOT_INSTALL)
+	_ = rr.plgStatusDb.Clean()
+
+	for k, v := range allPlgs {
+		plgType := plugin2.PluginTypeMap[strings.Split(k, "_")[0]]
+		sd := plugin2.NewPluginByType(plgType)
+		err := json.Unmarshal([]byte(v), &sd)
+		if err != nil {
+			_ = rr.plgStatusDb.Set(k, err.Error())
+			continue
+		}
+		err = rr.Register(plgType, sd)
+		if err != nil {
+			_ = rr.plgStatusDb.Set(k, err.Error())
+			continue
+		}
+	}
+}

+ 2 - 0
internal/plugin/native/manager_test.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/binder"
 	"github.com/lf-edge/ekuiper/internal/binder/function"
+	"github.com/lf-edge/ekuiper/internal/meta"
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"net/http"
@@ -32,6 +33,7 @@ import (
 
 func init() {
 	testx.InitEnv()
+	meta.InitYamlConfigManager()
 	nativeManager, err := InitManager()
 	if err != nil {
 		panic(err)

+ 20 - 1
internal/plugin/plugin.go

@@ -14,6 +14,8 @@
 
 package plugin
 
+import "encoding/json"
+
 type PluginType int
 
 const (
@@ -24,7 +26,15 @@ const (
 	WASM
 )
 
-var PluginTypes = []string{"sources", "sinks", "functions"}
+var PluginTypes = []string{"sources", "sinks", "functions", "portable", "wasm"}
+
+var PluginTypeMap = map[string]PluginType{
+	"sources":   SOURCE,
+	"sinks":     SINK,
+	"functions": FUNCTION,
+	"portable":  PORTABLE,
+	"wasm":      WASM,
+}
 
 type Plugin interface {
 	GetName() string
@@ -32,6 +42,7 @@ type Plugin interface {
 	GetShellParas() []string
 	GetSymbols() []string
 	SetName(n string)
+	GetInstallScripts() []byte
 }
 
 // IOPlugin Unify model. Flat all properties for each kind.
@@ -61,6 +72,14 @@ func (p *IOPlugin) SetName(n string) {
 	p.Name = n
 }
 
+func (p *IOPlugin) GetInstallScripts() []byte {
+	marshal, err := json.Marshal(p)
+	if err != nil {
+		return nil
+	}
+	return marshal
+}
+
 func NewPluginByType(t PluginType) Plugin {
 	switch t {
 	case FUNCTION:

+ 78 - 6
internal/plugin/portable/manager.go

@@ -19,6 +19,8 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/store"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"io"
 	"os"
 	"os/exec"
@@ -41,6 +43,10 @@ type Manager struct {
 	pluginDir     string
 	pluginConfDir string
 	reg           *registry // can be replaced with kv
+	// the access to plugin install script db
+	plgInstallDb kv.KeyValue
+	// the access to plugin install status db
+	plgStatusDb kv.KeyValue
 }
 
 // InitManager must only be called once
@@ -71,6 +77,16 @@ func InitManager() (*Manager, error) {
 	if err != nil {
 		return nil, err
 	}
+	err, plg_db := store.GetKV("portablePlugin")
+	if err != nil {
+		return nil, fmt.Errorf("error when opening portablePlugin: %v", err)
+	}
+	err, plg_status_db := store.GetKV("portablePluginStatus")
+	if err != nil {
+		return nil, fmt.Errorf("error when opening portablePluginStatus: %v", err)
+	}
+	m.plgInstallDb = plg_db
+	m.plgStatusDb = plg_status_db
 	manager = m
 	return m, nil
 }
@@ -164,6 +180,15 @@ func (m *Manager) parsePluginJson(name string) (*PluginInfo, error) {
 	return pi, nil
 }
 
+func (m *Manager) storePluginInstallScript(name string, j plugin.Plugin) {
+	val := string(j.GetInstallScripts())
+	_ = m.plgInstallDb.Set(name, val)
+}
+
+func (m *Manager) removePluginInstallScript(name string) {
+	_ = m.plgInstallDb.Delete(name)
+}
+
 func (m *Manager) Register(p plugin.Plugin) error {
 	name, uri, shellParas := p.GetName(), p.GetFile(), p.GetShellParas()
 	name = strings.Trim(name, " ")
@@ -191,6 +216,7 @@ func (m *Manager) Register(p plugin.Plugin) error {
 	if err != nil { //Revert for any errors
 		return fmt.Errorf("fail to install plugin: %s", err)
 	}
+	m.storePluginInstallScript(name, p)
 	return nil
 }
 
@@ -290,14 +316,16 @@ func (m *Manager) install(name, src string, shellParas []string) (resultErr erro
 
 	if needInstall {
 		//run install script if there is
+		var shell = make([]string, len(shellParas))
+		copy(shell, shellParas)
 		spath := path.Join(pluginTarget, "install.sh")
-		shellParas = append(shellParas, spath)
-		if 1 != len(shellParas) {
-			copy(shellParas[1:], shellParas[0:])
-			shellParas[0] = spath
+		shell = append(shell, spath)
+		if 1 != len(shell) {
+			copy(shell[1:], shell[0:])
+			shell[0] = spath
 		}
-		cmd := exec.Command("/bin/sh", shellParas...)
-		conf.Log.Infof("run install script %s", strings.Join(shellParas, " "))
+		cmd := exec.Command("/bin/sh", shell...)
+		conf.Log.Infof("run install script %s", strings.Join(shell, " "))
 		var outb, errb bytes.Buffer
 		cmd.Stdout = &outb
 		cmd.Stderr = &errb
@@ -362,6 +390,7 @@ func (m *Manager) Delete(name string) error {
 		os.Remove(p)
 	}
 	_ = os.RemoveAll(path.Join(m.pluginDir, name))
+	m.removePluginInstallScript(name)
 	// Kill the process in the end, and return error if it cannot be deleted
 	pm := runtime.GetPluginInsManager()
 	err := pm.Kill(name)
@@ -370,3 +399,46 @@ func (m *Manager) Delete(name string) error {
 	}
 	return nil
 }
+
+func (m *Manager) UninstallAllPlugins() {
+	keys, err := m.plgInstallDb.Keys()
+	if err != nil {
+		return
+	}
+	for _, v := range keys {
+		_ = m.Delete(v)
+	}
+}
+
+func (m *Manager) GetAllPlugins() map[string]string {
+	allPlgs, err := m.plgInstallDb.All()
+	if err != nil {
+		return nil
+	}
+	return allPlgs
+}
+
+func (m *Manager) GetAllPluginsStatus() map[string]string {
+	allPlgs, err := m.plgInstallDb.All()
+	if err != nil {
+		return nil
+	}
+	return allPlgs
+}
+
+func (m *Manager) PluginImport(plugins map[string]string) {
+	_ = m.plgStatusDb.Clean()
+	for k, v := range plugins {
+		sd := plugin.NewPluginByType(plugin.PORTABLE)
+		err := json.Unmarshal([]byte(v), &sd)
+		if err != nil {
+			_ = m.plgStatusDb.Set(k, err.Error())
+			continue
+		}
+		err = m.Register(sd)
+		if err != nil {
+			_ = m.plgStatusDb.Set(k, err.Error())
+			continue
+		}
+	}
+}

+ 4 - 0
internal/plugin/portable/manager_test.go

@@ -17,8 +17,10 @@ package portable
 import (
 	"errors"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/meta"
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
+	"github.com/lf-edge/ekuiper/internal/testx"
 	"net/http"
 	"net/http/httptest"
 	"os"
@@ -31,7 +33,9 @@ import (
 // Test only install API. Install from file is tested in the integration test in test/portable_rule_test
 
 func init() {
+	testx.InitEnv()
 	InitManager()
+	meta.InitYamlConfigManager()
 }
 
 func TestManager_Install(t *testing.T) {

+ 8 - 2
internal/processor/rule.go

@@ -27,7 +27,8 @@ import (
 )
 
 type RuleProcessor struct {
-	db kv.KeyValue
+	db           kv.KeyValue
+	ruleStatusDb kv.KeyValue
 }
 
 func NewRuleProcessor() *RuleProcessor {
@@ -35,8 +36,13 @@ func NewRuleProcessor() *RuleProcessor {
 	if err != nil {
 		panic(fmt.Sprintf("Can not initalize store for the rule processor at path 'rule': %v", err))
 	}
+	err, ruleStatusDb := store.GetKV("ruleStatus")
+	if err != nil {
+		panic(fmt.Sprintf("Can not initalize store for the rule processor at path 'rule': %v", err))
+	}
 	processor := &RuleProcessor{
-		db: db,
+		db:           db,
+		ruleStatusDb: ruleStatusDb,
 	}
 	return processor
 }

+ 87 - 3
internal/processor/ruleset.go

@@ -27,7 +27,7 @@ type RulesetProcessor struct {
 	s *StreamProcessor
 }
 
-type ruleset struct {
+type Ruleset struct {
 	Streams map[string]string `json:"streams"`
 	Tables  map[string]string `json:"tables"`
 	Rules   map[string]string `json:"rules"`
@@ -41,7 +41,7 @@ func NewRulesetProcessor(r *RuleProcessor, s *StreamProcessor) *RulesetProcessor
 }
 
 func (rs *RulesetProcessor) Export() (io.ReadSeeker, []int, error) {
-	var all ruleset
+	var all Ruleset
 	allStreams, err := rs.s.GetAll()
 	if err != nil {
 		return nil, nil, fmt.Errorf("fail to get all streams: %v", err)
@@ -61,8 +61,49 @@ func (rs *RulesetProcessor) Export() (io.ReadSeeker, []int, error) {
 	return bytes.NewReader(jsonBytes), counts, nil
 }
 
+func (rs *RulesetProcessor) ExportRuleSet() *Ruleset {
+	all := &Ruleset{}
+	allStreams, err := rs.s.GetAll()
+	if err != nil {
+		conf.Log.Errorf("fail to get all streams: %v", err)
+		return nil
+	}
+	all.Streams = allStreams["streams"]
+	all.Tables = allStreams["tables"]
+	rules, err := rs.r.GetAllRulesJson()
+	if err != nil {
+		conf.Log.Errorf("fail to get all rules: %v", err)
+		return nil
+	}
+	all.Rules = rules
+	return all
+}
+
+func (rs *RulesetProcessor) ExportRuleSetStatus() *Ruleset {
+	all := &Ruleset{}
+	allStreams, err := rs.s.streamStatusDb.All()
+	if err != nil {
+		conf.Log.Errorf("fail to get all stream status: %v", err)
+		return nil
+	}
+	allTables, err := rs.s.tableStatusDb.All()
+	if err != nil {
+		conf.Log.Errorf("fail to get all table status: %v", err)
+		return nil
+	}
+	all.Streams = allStreams
+	all.Tables = allTables
+	rules, err := rs.r.ruleStatusDb.All()
+	if err != nil {
+		conf.Log.Errorf("fail to get all rule status: %v", err)
+		return nil
+	}
+	all.Rules = rules
+	return all
+}
+
 func (rs *RulesetProcessor) Import(content []byte) ([]string, []int, error) {
-	all := &ruleset{}
+	all := &Ruleset{}
 	err := json.Unmarshal(content, all)
 	if err != nil {
 		return nil, nil, fmt.Errorf("invalid import file: %v", err)
@@ -99,3 +140,46 @@ func (rs *RulesetProcessor) Import(content []byte) ([]string, []int, error) {
 	}
 	return rules, counts, nil
 }
+
+func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) {
+	_ = rs.s.streamStatusDb.Clean()
+	_ = rs.s.tableStatusDb.Clean()
+	_ = rs.r.ruleStatusDb.Clean()
+
+	counts := make([]int, 3)
+	// restore streams
+	for k, v := range all.Streams {
+		_, e := rs.s.ExecStreamSql(v)
+		if e != nil {
+			conf.Log.Errorf("Fail to import stream %s(%s) with error: %v", k, v, e)
+			_ = rs.s.streamStatusDb.Set(k, e.Error())
+			continue
+		} else {
+			counts[0]++
+		}
+	}
+	// restore tables
+	for k, v := range all.Tables {
+		_, e := rs.s.ExecStreamSql(v)
+		if e != nil {
+			conf.Log.Errorf("Fail to import table %s(%s) with error: %v", k, v, e)
+			_ = rs.s.tableStatusDb.Set(k, e.Error())
+			continue
+		} else {
+			counts[1]++
+		}
+	}
+	var rules []string
+	// restore rules
+	for k, v := range all.Rules {
+		_, e := rs.r.ExecCreateWithValidation(k, v)
+		if e != nil {
+			conf.Log.Errorf("Fail to import rule %s(%s) with error: %v", k, v, e)
+			_ = rs.r.ruleStatusDb.Set(k, e.Error())
+			continue
+		} else {
+			rules = append(rules, k)
+			counts[2]++
+		}
+	}
+}

+ 14 - 2
internal/processor/stream.go

@@ -34,7 +34,9 @@ var (
 )
 
 type StreamProcessor struct {
-	db kv.KeyValue
+	db             kv.KeyValue
+	streamStatusDb kv.KeyValue
+	tableStatusDb  kv.KeyValue
 }
 
 func NewStreamProcessor() *StreamProcessor {
@@ -42,8 +44,18 @@ func NewStreamProcessor() *StreamProcessor {
 	if err != nil {
 		panic(fmt.Sprintf("Can not initalize store for the stream processor at path 'stream': %v", err))
 	}
+	err, streamDb := store.GetKV("streamStatus")
+	if err != nil {
+		panic(fmt.Sprintf("Can not initalize store for the stream processor at path 'stream': %v", err))
+	}
+	err, tableDb := store.GetKV("tableStatus")
+	if err != nil {
+		panic(fmt.Sprintf("Can not initalize store for the stream processor at path 'stream': %v", err))
+	}
 	processor := &StreamProcessor{
-		db: db,
+		db:             db,
+		streamStatusDb: streamDb,
+		tableStatusDb:  tableDb,
 	}
 	return processor
 }

+ 123 - 26
internal/schema/registry.go

@@ -15,7 +15,10 @@
 package schema
 
 import (
+	"encoding/json"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/store"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"os"
 	"path/filepath"
 	"strings"
@@ -28,6 +31,8 @@ import (
 
 // Initialize in the server startup
 var registry *Registry
+var schemaDb kv.KeyValue
+var schemaStatusDb kv.KeyValue
 
 type Files struct {
 	SchemaFile string
@@ -54,34 +59,47 @@ func InitRegistry() error {
 	if err != nil {
 		return fmt.Errorf("cannot find etc folder: %s", err)
 	}
-	for _, schemaType := range def.SchemaTypes {
-		schemaDir := filepath.Join(dataDir, "schemas", string(schemaType))
-		var newSchemas map[string]*Files
-		files, err := os.ReadDir(schemaDir)
-		if err != nil {
-			conf.Log.Warnf("cannot read schema directory: %s", err)
-			newSchemas = make(map[string]*Files)
-		} else {
-			newSchemas = make(map[string]*Files, len(files))
-			for _, file := range files {
-				fileName := filepath.Base(file.Name())
-				ext := filepath.Ext(fileName)
-				schemaId := strings.TrimSuffix(fileName, filepath.Ext(fileName))
-				ffs, ok := newSchemas[schemaId]
-				if !ok {
-					ffs = &Files{}
-					newSchemas[schemaId] = ffs
-				}
-				switch ext {
-				case ".so":
-					ffs.SoFile = filepath.Join(schemaDir, file.Name())
-				default:
-					ffs.SchemaFile = filepath.Join(schemaDir, file.Name())
+	err, schemaDb = store.GetKV("schema")
+	if err != nil {
+		return fmt.Errorf("cannot open schema db: %s", err)
+	}
+	err, schemaStatusDb = store.GetKV("schemaStatus")
+	if err != nil {
+		return fmt.Errorf("cannot open schemaStatus db: %s", err)
+	}
+	if hasInstallFlag() {
+		schemaInstallWhenReboot()
+		clearInstallFlag()
+	} else {
+		for _, schemaType := range def.SchemaTypes {
+			schemaDir := filepath.Join(dataDir, "schemas", string(schemaType))
+			var newSchemas map[string]*Files
+			files, err := os.ReadDir(schemaDir)
+			if err != nil {
+				conf.Log.Warnf("cannot read schema directory: %s", err)
+				newSchemas = make(map[string]*Files)
+			} else {
+				newSchemas = make(map[string]*Files, len(files))
+				for _, file := range files {
+					fileName := filepath.Base(file.Name())
+					ext := filepath.Ext(fileName)
+					schemaId := strings.TrimSuffix(fileName, filepath.Ext(fileName))
+					ffs, ok := newSchemas[schemaId]
+					if !ok {
+						ffs = &Files{}
+						newSchemas[schemaId] = ffs
+					}
+					switch ext {
+					case ".so":
+						ffs.SoFile = filepath.Join(schemaDir, file.Name())
+					default:
+						ffs.SchemaFile = filepath.Join(schemaDir, file.Name())
+					}
+					conf.Log.Infof("schema file %s.%s loaded", schemaType, schemaId)
 				}
-				conf.Log.Infof("schema file %s.%s loaded", schemaType, fileName)
 			}
+			registry.schemas[schemaType] = newSchemas
 		}
-		registry.schemas[schemaType] = newSchemas
 	}
 	return nil
 }
@@ -186,7 +204,7 @@ func GetSchemaFile(schemaType def.SchemaType, name string) (*Files, error) {
 	registry.RLock()
 	defer registry.RUnlock()
 	if _, ok := registry.schemas[schemaType]; !ok {
-		return nil, fmt.Errorf("schema type %s not found", schemaType)
+		return nil, fmt.Errorf("schema type %s not found in registry", schemaType)
 	}
 	if _, ok := registry.schemas[schemaType][name]; !ok {
 		return nil, fmt.Errorf("schema type %s, file %s not found", schemaType, name)
@@ -220,3 +238,82 @@ func DeleteSchema(schemaType def.SchemaType, name string) error {
 	delete(registry.schemas[schemaType], name)
 	return nil
 }
+
+const BOOT_INSTALL = "$boot_install"
+
+func GetAllSchema() map[string]string {
+	all, err := schemaDb.All()
+	if err != nil {
+		return nil
+	}
+	delete(all, BOOT_INSTALL)
+	return all
+}
+
+func GetAllSchemaStatus() map[string]string {
+	all, err := schemaStatusDb.All()
+	if err != nil {
+		return nil
+	}
+	return all
+}
+
+func UninstallAllSchema() {
+	schemaMaps, err := schemaDb.All()
+	if err != nil {
+		return
+	}
+	for key, value := range schemaMaps {
+		info := &Info{}
+		_ = json.Unmarshal([]byte(value), info)
+		_ = DeleteSchema(info.Type, key)
+	}
+}
+
+func hasInstallFlag() bool {
+	var val = ""
+	found, _ := schemaDb.Get(BOOT_INSTALL, &val)
+	return found
+}
+
+func clearInstallFlag() {
+	_ = schemaDb.Delete(BOOT_INSTALL)
+}
+
+func ImportSchema(schema map[string]string) error {
+	if len(schema) == 0 {
+		return nil
+	}
+	for k, v := range schema {
+		err := schemaDb.Set(k, v)
+		if err != nil {
+			return err
+		}
+	}
+	//set the flag to install the plugins when eKuiper reboot
+	return schemaDb.Set(BOOT_INSTALL, BOOT_INSTALL)
+}
+
+func schemaInstallWhenReboot() {
+	allPlgs, err := schemaDb.All()
+	if err != nil {
+		return
+	}
+
+	delete(allPlgs, BOOT_INSTALL)
+	_ = schemaStatusDb.Clean()
+
+	for k, v := range allPlgs {
+		info := &Info{}
+		err := json.Unmarshal([]byte(v), info)
+		if err != nil {
+			_ = schemaStatusDb.Set(k, err.Error())
+			continue
+		}
+		err = CreateOrUpdateSchema(info)
+		if err != nil {
+			_ = schemaStatusDb.Set(k, err.Error())
+			continue
+		}
+	}
+}

+ 9 - 0
internal/schema/schema.go

@@ -15,6 +15,7 @@
 package schema
 
 import (
+	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/pkg/def"
 )
@@ -27,6 +28,14 @@ type Info struct {
 	SoPath   string         `json:"soFile"`
 }
 
+func (i *Info) InstallScript() string {
+	marshal, err := json.Marshal(i)
+	if err != nil {
+		return ""
+	}
+	return string(marshal)
+}
+
 func (i *Info) Validate() error {
 	if i.Name == "" {
 		return fmt.Errorf("name is required")

+ 76 - 0
internal/server/core.go

@@ -0,0 +1,76 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build core
+// +build core
+
+package server
+
+func pluginReset() {
+}
+
+func pluginExport() map[string]string {
+	return nil
+}
+
+func pluginStatusExport() map[string]string {
+	return nil
+}
+
+func pluginImport(plugins map[string]string) error {
+	return nil
+}
+
+func portablePluginsReset() {
+}
+
+func portablePluginExport() map[string]string {
+	return nil
+}
+
+func portablePluginStatusExport() map[string]string {
+	return nil
+}
+
+func portablePluginImport(plugins map[string]string) {
+}
+
+func serviceReset() {
+}
+
+func serviceExport() map[string]string {
+	return nil
+}
+
+func serviceStatusExport() map[string]string {
+	return nil
+}
+
+func serviceImport(services map[string]string) {
+}
+
+func schemaReset() {
+}
+
+func schemaExport() map[string]string {
+	return nil
+}
+
+func schemaStatusExport() map[string]string {
+	return nil
+}
+
+func schemaImport(s map[string]string) error {
+	return nil
+}

+ 17 - 0
internal/server/plugin_init.go

@@ -267,3 +267,20 @@ func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (err error, re
 	}
 	return
 }
+
+func pluginReset() {
+	nativeManager.UninstallAllPlugins()
+}
+
+func pluginExport() map[string]string {
+	return nativeManager.GetAllPlugins()
+}
+
+func pluginStatusExport() map[string]string {
+	return nativeManager.GetAllPluginsStatus()
+}
+
+func pluginImport(plugins map[string]string) error {
+	nativeManager.PluginImport(plugins)
+	return nil
+}

+ 16 - 0
internal/server/portable_init.go

@@ -117,3 +117,19 @@ func portableHandler(w http.ResponseWriter, r *http.Request) {
 		w.Write([]byte(fmt.Sprintf("portable plugin %s is updated", sd.GetName())))
 	}
 }
+
+func portablePluginsReset() {
+	portableManager.UninstallAllPlugins()
+}
+
+func portablePluginExport() map[string]string {
+	return portableManager.GetAllPlugins()
+}
+
+func portablePluginStatusExport() map[string]string {
+	return portableManager.GetAllPlugins()
+}
+
+func portablePluginImport(plugins map[string]string) {
+	portableManager.PluginImport(plugins)
+}

+ 214 - 1
internal/server/rest.go

@@ -19,7 +19,9 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/meta"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
+	"github.com/lf-edge/ekuiper/internal/processor"
 	"io"
 	"net/http"
 	"os"
@@ -137,7 +139,9 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 	r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
 	r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
 	r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
-
+	r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet)
+	r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
+	r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
 	// Register extended routes
 	for k, v := range components {
 		logger.Infof("register rest endpoint for component %s", k)
@@ -630,3 +634,212 @@ func exportHandler(w http.ResponseWriter, r *http.Request) {
 	w.Header().Add("Content-Disposition", "Attachment")
 	http.ServeContent(w, r, name, time.Now(), exported)
 }
+
+type Configuration struct {
+	Streams          map[string]string `json:"streams"`
+	Tables           map[string]string `json:"tables"`
+	Rules            map[string]string `json:"rules"`
+	NativePlugins    map[string]string `json:"nativePlugins"`
+	PortablePlugins  map[string]string `json:"portablePlugins"`
+	SourceConfig     map[string]string `json:"sourceConfig"`
+	SinkConfig       map[string]string `json:"sinkConfig"`
+	ConnectionConfig map[string]string `json:"connectionConfig"`
+	Service          map[string]string `json:"Service"`
+	Schema           map[string]string `json:"Schema"`
+}
+
+func configurationExport() ([]byte, error) {
+	conf := &Configuration{
+		Streams:          make(map[string]string),
+		Tables:           make(map[string]string),
+		Rules:            make(map[string]string),
+		NativePlugins:    make(map[string]string),
+		PortablePlugins:  make(map[string]string),
+		SourceConfig:     make(map[string]string),
+		SinkConfig:       make(map[string]string),
+		ConnectionConfig: make(map[string]string),
+		Service:          make(map[string]string),
+		Schema:           make(map[string]string),
+	}
+	ruleSet := rulesetProcessor.ExportRuleSet()
+	if ruleSet != nil {
+		conf.Streams = ruleSet.Streams
+		conf.Tables = ruleSet.Tables
+		conf.Rules = ruleSet.Rules
+	}
+
+	conf.NativePlugins = pluginExport()
+	conf.PortablePlugins = portablePluginExport()
+	conf.Service = serviceExport()
+	conf.Schema = schemaExport()
+
+	yamlCfg := meta.GetConfigurations()
+	conf.SourceConfig = yamlCfg.Sources
+	conf.SinkConfig = yamlCfg.Sinks
+	conf.ConnectionConfig = yamlCfg.Connections
+
+	return json.Marshal(conf)
+}
+
+func configurationExportHandler(w http.ResponseWriter, r *http.Request) {
+	const name = "ekuiper_export.json"
+	jsonBytes, _ := configurationExport()
+	w.Header().Set("Content-Type", "application/octet-stream")
+	w.Header().Add("Content-Disposition", "Attachment")
+	http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
+}
+
+func configurationReset() {
+	_ = resetAllRules()
+	_ = resetAllStreams()
+	pluginReset()
+	portablePluginsReset()
+	serviceReset()
+	schemaReset()
+	meta.ResetConfigs()
+}
+
+func configurationImport(data []byte, reboot bool) error {
+	conf := &Configuration{
+		Streams:          make(map[string]string),
+		Tables:           make(map[string]string),
+		Rules:            make(map[string]string),
+		NativePlugins:    make(map[string]string),
+		PortablePlugins:  make(map[string]string),
+		SourceConfig:     make(map[string]string),
+		SinkConfig:       make(map[string]string),
+		ConnectionConfig: make(map[string]string),
+		Service:          make(map[string]string),
+		Schema:           make(map[string]string),
+	}
+
+	err := json.Unmarshal(data, conf)
+	if err != nil {
+		return fmt.Errorf("configuration unmarshal with error %v", err)
+	}
+
+	if reboot {
+		err = pluginImport(conf.NativePlugins)
+		if err != nil {
+			return fmt.Errorf("pluginImportHandler with error %v", err)
+		}
+		err = schemaImport(conf.Schema)
+		if err != nil {
+			return fmt.Errorf("schemaImport with error %v", err)
+		}
+	}
+
+	portablePluginImport(conf.PortablePlugins)
+
+	serviceImport(conf.Service)
+
+	yamlCfgSet := meta.YamlConfigurationSet{
+		Sources:     conf.SourceConfig,
+		Sinks:       conf.SinkConfig,
+		Connections: conf.ConnectionConfig,
+	}
+
+	meta.LoadConfigurations(yamlCfgSet)
+
+	ruleSet := processor.Ruleset{
+		Streams: conf.Streams,
+		Tables:  conf.Tables,
+		Rules:   conf.Rules,
+	}
+
+	rulesetProcessor.ImportRuleSet(ruleSet)
+
+	return nil
+}
+
+type configurationInfo struct {
+	Content  string `json:"content"`
+	FilePath string `json:"file"`
+}
+
+func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
+	cb := r.URL.Query().Get("stop")
+	stop := cb == "1"
+	rsi := &configurationInfo{}
+	err := json.NewDecoder(r.Body).Decode(rsi)
+	if err != nil {
+		handleError(w, err, "Invalid body: Error decoding json", logger)
+		return
+	}
+	if rsi.Content != "" && rsi.FilePath != "" {
+		handleError(w, nil, "Invalid body: Cannot specify both content and file", logger)
+		return
+	} else if rsi.Content == "" && rsi.FilePath == "" {
+		handleError(w, nil, "Invalid body: must specify content or file", logger)
+		return
+	}
+	content := []byte(rsi.Content)
+	if rsi.FilePath != "" {
+		reader, err := httpx.ReadFile(rsi.FilePath)
+		if err != nil {
+			handleError(w, nil, "Fail to read file", logger)
+			return
+		}
+		defer reader.Close()
+		buf := new(bytes.Buffer)
+		_, err = io.Copy(buf, reader)
+		if err != nil {
+			handleError(w, err, "fail to convert file", logger)
+			return
+		}
+		content = buf.Bytes()
+	}
+	configurationReset()
+	err = configurationImport(content, stop)
+	if err != nil {
+		handleError(w, err, "Import configuration error", logger)
+		return
+	}
+	if stop {
+		go func() {
+			time.Sleep(1 * time.Second)
+			os.Exit(100)
+		}()
+	}
+
+	w.WriteHeader(http.StatusOK)
+}
+
+func configurationStatusExport() Configuration {
+	conf := Configuration{
+		Streams:          make(map[string]string),
+		Tables:           make(map[string]string),
+		Rules:            make(map[string]string),
+		NativePlugins:    make(map[string]string),
+		PortablePlugins:  make(map[string]string),
+		SourceConfig:     make(map[string]string),
+		SinkConfig:       make(map[string]string),
+		ConnectionConfig: make(map[string]string),
+		Service:          make(map[string]string),
+		Schema:           make(map[string]string),
+	}
+	ruleSet := rulesetProcessor.ExportRuleSetStatus()
+	if ruleSet != nil {
+		conf.Streams = ruleSet.Streams
+		conf.Tables = ruleSet.Tables
+		conf.Rules = ruleSet.Rules
+	}
+
+	conf.NativePlugins = pluginStatusExport()
+	conf.PortablePlugins = portablePluginStatusExport()
+	conf.Service = serviceStatusExport()
+	conf.Schema = schemaStatusExport()
+
+	yamlCfgStatus := meta.GetConfigurationStatus()
+	conf.SourceConfig = yamlCfgStatus.Sources
+	conf.SinkConfig = yamlCfgStatus.Sinks
+	conf.ConnectionConfig = yamlCfgStatus.Connections
+
+	return conf
+}
+
+func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	content := configurationStatusExport()
+	jsonResponse(content, w, logger)
+}

+ 52 - 0
internal/server/rpc.go

@@ -297,6 +297,58 @@ func (t *Server) Export(file string, reply *string) error {
 	return nil
 }
 
+func (t *Server) ImportConfiguration(arg *model.ImportDataDesc, reply *string) error {
+	file := arg.FileName
+	f, err := os.Open(file)
+	if err != nil {
+		return fmt.Errorf("fail to read file %s: %v", file, err)
+	}
+	defer f.Close()
+	buf := new(bytes.Buffer)
+	_, err = io.Copy(buf, f)
+	if err != nil {
+		return fmt.Errorf("fail to convert file %s: %v", file, err)
+	}
+	content := buf.Bytes()
+
+	configurationReset()
+	err = configurationImport(content, arg.Stop)
+	if err != nil {
+		return fmt.Errorf("import configuration error: %v", err)
+	}
+	*reply = fmt.Sprintf("import configuration success")
+	return nil
+}
+
+func (t *Server) GetStatusImport(_ int, reply *string) error {
+	jsonRsp := configurationStatusExport()
+	result, err := json.Marshal(jsonRsp)
+	if err != nil {
+		return fmt.Errorf("Show rule error : %s.", err)
+	}
+	dst := &bytes.Buffer{}
+	if err := json.Indent(dst, result, "", "  "); err != nil {
+		return fmt.Errorf("Show rule error : %s.", err)
+	}
+	*reply = dst.String()
+
+	return nil
+}
+
+func (t *Server) ExportConfiguration(file string, reply *string) error {
+	f, err := os.Create(file)
+	if err != nil {
+		return err
+	}
+	jsonBytes, err := configurationExport()
+	_, err = io.Copy(f, bytes.NewReader(jsonBytes))
+	if err != nil {
+		return fmt.Errorf("fail to save to file %s:%v", file, err)
+	}
+	*reply = fmt.Sprintf("export configuration success")
+	return nil
+}
+
 func marshalDesc(m interface{}) (string, error) {
 	s, err := json.Marshal(m)
 	if err != nil {

+ 16 - 0
internal/server/schema_init.go

@@ -127,3 +127,19 @@ func schemaHandler(w http.ResponseWriter, r *http.Request) {
 		w.Write([]byte(fmt.Sprintf("%s schema %s is updated", sch.Type, sch.Name)))
 	}
 }
+
+func schemaReset() {
+	schema.UninstallAllSchema()
+}
+
+func schemaExport() map[string]string {
+	return schema.GetAllSchema()
+}
+
+func schemaStatusExport() map[string]string {
+	return schema.GetAllSchemaStatus()
+}
+
+func schemaImport(s map[string]string) error {
+	return schema.ImportSchema(s)
+}

+ 44 - 0
internal/server/server.go

@@ -22,10 +22,12 @@ import (
 	"github.com/lf-edge/ekuiper/internal/binder/io"
 	"github.com/lf-edge/ekuiper/internal/binder/meta"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	meta2 "github.com/lf-edge/ekuiper/internal/meta"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/factory"
 	"github.com/lf-edge/ekuiper/internal/topo/rule"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"net/http"
 	"os"
 	"os/signal"
@@ -88,6 +90,7 @@ func StartUp(Version, LoadFileType string) {
 		panic(err)
 	}
 
+	meta2.InitYamlConfigManager()
 	ruleProcessor = processor.NewRuleProcessor()
 	streamProcessor = processor.NewStreamProcessor()
 	rulesetProcessor = processor.NewRulesetProcessor(ruleProcessor, streamProcessor)
@@ -201,3 +204,44 @@ func initRuleset() error {
 	}
 	return nil
 }
+
+func resetAllRules() error {
+	rules, err := ruleProcessor.GetAllRules()
+	if err != nil {
+		return err
+	}
+	for _, name := range rules {
+		_ = deleteRule(name)
+		_, err := ruleProcessor.ExecDrop(name)
+		if err != nil {
+			logger.Warnf("delete rule: %s with error %v", name, err)
+			continue
+		}
+	}
+	return nil
+}
+
+func resetAllStreams() error {
+	allStreams, err := streamProcessor.GetAll()
+	if err != nil {
+		return err
+	}
+	Streams := allStreams["streams"]
+	Tables := allStreams["tables"]
+
+	for name, _ := range Streams {
+		_, err2 := streamProcessor.DropStream(name, ast.TypeStream)
+		if err2 != nil {
+			logger.Warnf("streamProcessor DropStream %s error: %v", name, err2)
+			continue
+		}
+	}
+	for name, _ := range Tables {
+		_, err2 := streamProcessor.DropStream(name, ast.TypeTable)
+		if err2 != nil {
+			logger.Warnf("streamProcessor DropTable %s error: %v", name, err2)
+			continue
+		}
+	}
+	return nil
+}

+ 16 - 0
internal/server/service_init.go

@@ -138,3 +138,19 @@ func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
 	}
 	jsonResponse(j, w, logger)
 }
+
+func serviceReset() {
+	serviceManager.UninstallAllServices()
+}
+
+func serviceExport() map[string]string {
+	return serviceManager.GetAllServices()
+}
+
+func serviceStatusExport() map[string]string {
+	return serviceManager.GetAllServicesStatus()
+}
+
+func serviceImport(services map[string]string) {
+	serviceManager.ImportServices(services)
+}

+ 73 - 6
internal/service/manager.go

@@ -16,6 +16,7 @@ package service
 
 import (
 	"archive/zip"
+	"encoding/json"
 	"fmt"
 	"os"
 	"path"
@@ -43,9 +44,11 @@ type Manager struct {
 	serviceBuf   *sync.Map
 	functionBuf  *sync.Map
 
-	etcDir     string
-	serviceKV  kv.KeyValue
-	functionKV kv.KeyValue
+	etcDir                 string
+	serviceInstallKV       kv.KeyValue
+	serviceStatusInstallKV kv.KeyValue
+	serviceKV              kv.KeyValue
+	functionKV             kv.KeyValue
 }
 
 func InitManager() (*Manager, error) {
@@ -68,14 +71,24 @@ func InitManager() (*Manager, error) {
 		if err != nil {
 			return nil, fmt.Errorf("cannot open function db: %s", err)
 		}
+		err, sInstallDb := store.GetKV("serviceInstall")
+		if err != nil {
+			return nil, fmt.Errorf("cannot open service db: %s", err)
+		}
+		err, statusDb := store.GetKV("serviceInstallStatus")
+		if err != nil {
+			return nil, fmt.Errorf("cannot open service db: %s", err)
+		}
 		singleton = &Manager{
 			executorPool: &sync.Map{},
 			serviceBuf:   &sync.Map{},
 			functionBuf:  &sync.Map{},
 
-			etcDir:     etcDir,
-			serviceKV:  sdb,
-			functionKV: fdb,
+			etcDir:                 etcDir,
+			serviceStatusInstallKV: statusDb,
+			serviceInstallKV:       sInstallDb,
+			serviceKV:              sdb,
+			functionKV:             fdb,
 		}
 	}
 	if !singleton.loaded && !kconf.IsTesting { // To boost the testing perf
@@ -301,6 +314,14 @@ type ServiceCreationRequest struct {
 	File string `json:"file"`
 }
 
+func (s *ServiceCreationRequest) InstallScript() string {
+	marshal, err := json.Marshal(s)
+	if err != nil {
+		return ""
+	}
+	return string(marshal)
+}
+
 func (m *Manager) List() ([]string, error) {
 	return m.serviceKV.Keys()
 }
@@ -326,6 +347,8 @@ func (m *Manager) Create(r *ServiceCreationRequest) error {
 	if err != nil {
 		return err
 	}
+	//save the install script
+	m.serviceInstallKV.Set(name, r.InstallScript())
 	// init file to serviceKV
 	return m.initFile(name + ".json")
 }
@@ -341,6 +364,7 @@ func (m *Manager) Delete(name string) error {
 	if err != nil {
 		return err
 	}
+	_ = m.serviceInstallKV.Delete(name)
 	path := path.Join(m.etcDir, name+".json")
 	err = os.Remove(path)
 	if err != nil {
@@ -412,3 +436,46 @@ func (m *Manager) GetFunction(name string) (*functionContainer, error) {
 	}
 	return r, nil
 }
+
+func (m *Manager) GetAllServices() map[string]string {
+	all, err := m.serviceInstallKV.All()
+	if err != nil {
+		return nil
+	}
+	return all
+}
+
+func (m *Manager) GetAllServicesStatus() map[string]string {
+	all, err := m.serviceStatusInstallKV.All()
+	if err != nil {
+		return nil
+	}
+	return all
+}
+
+func (m *Manager) UninstallAllServices() {
+	keys, err := m.serviceInstallKV.Keys()
+	if err != nil {
+		return
+	}
+	for _, v := range keys {
+		_ = m.Delete(v)
+	}
+}
+
+func (m *Manager) ImportServices(services map[string]string) {
+	_ = m.serviceStatusInstallKV.Clean()
+	for k, v := range services {
+		req := &ServiceCreationRequest{}
+		err := json.Unmarshal([]byte(v), req)
+		if err != nil {
+			m.serviceStatusInstallKV.Set(k, err.Error())
+			continue
+		}
+		err = m.Create(req)
+		if err != nil {
+			m.serviceStatusInstallKV.Set(k, err.Error())
+			continue
+		}
+	}
+}

+ 10 - 1
internal/topo/node/sink_node_test.go

@@ -20,6 +20,7 @@ package node
 import (
 	"errors"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/testx"
 	"os"
 	"path/filepath"
 	"reflect"
@@ -34,6 +35,10 @@ import (
 	"github.com/lf-edge/ekuiper/internal/xsql"
 )
 
+func init() {
+	testx.InitEnv()
+}
+
 func TestSinkTemplate_Apply(t *testing.T) {
 	conf.InitConf()
 	transform.RegisterAdditionalFuncs()
@@ -254,7 +259,11 @@ func TestFormat_Apply(t *testing.T) {
 			t.Fatal(err)
 		}
 	}()
-	schema.InitRegistry()
+
+	err = schema.InitRegistry()
+	if err != nil {
+		t.Fatal(err)
+	}
 	transform.RegisterAdditionalFuncs()
 	var tests = []struct {
 		config map[string]interface{}