Преглед изворни кода

fix(etc): init uploads folder to data

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran пре 2 година
родитељ
комит
1ea5465bdb

+ 164 - 41
internal/meta/yaml_config_ops.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -12,12 +12,11 @@
 // See the License for the specific language governing permissions and
 // See the License for the specific language governing permissions and
 // limitations under the License.
 // limitations under the License.
 
 
-package meta
+package conf
 
 
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/filex"
 	"github.com/lf-edge/ekuiper/internal/pkg/filex"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"path"
 	"path"
@@ -30,7 +29,11 @@ type ConfKeysOperator interface {
 	GetPluginName() string
 	GetPluginName() string
 	GetConfContentByte() ([]byte, error)
 	GetConfContentByte() ([]byte, error)
 	CopyConfContent() map[string]map[string]interface{}
 	CopyConfContent() map[string]map[string]interface{}
+	CopyReadOnlyConfContent() map[string]map[string]interface{}
+	CopyUpdatableConfContent() map[string]map[string]interface{}
 	GetConfKeys() (keys []string)
 	GetConfKeys() (keys []string)
+	GetReadOnlyConfKeys() (keys []string)
+	GetUpdatableConfKeys() (keys []string)
 	DeleteConfKey(confKey string)
 	DeleteConfKey(confKey string)
 	DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
 	DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
 	AddConfKey(confKey string, reqField map[string]interface{}) error
 	AddConfKey(confKey string, reqField map[string]interface{}) error
@@ -45,12 +48,13 @@ type ConfigOperator interface {
 }
 }
 
 
 // ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml
 // ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml
-// Hold the connection configs for each connection type in cf field
+// Hold the connection configs for each connection type in etcCfg field
 // Provide method to query/add/update/delete the configs
 // Provide method to query/add/update/delete the configs
 type ConfigKeys struct {
 type ConfigKeys struct {
 	lock       sync.RWMutex
 	lock       sync.RWMutex
 	pluginName string                            // source type, can be mqtt/edgex/httppull
 	pluginName string                            // source type, can be mqtt/edgex/httppull
-	cf         map[string]map[string]interface{} // configs defined in yaml
+	etcCfg     map[string]map[string]interface{} // configs defined in etc/sources/yaml
+	dataCfg    map[string]map[string]interface{} // configs defined in etc/sources/
 }
 }
 
 
 func (c *ConfigKeys) GetPluginName() string {
 func (c *ConfigKeys) GetPluginName() string {
@@ -61,7 +65,15 @@ func (c *ConfigKeys) GetConfContentByte() ([]byte, error) {
 	cf := make(map[string]map[string]interface{})
 	cf := make(map[string]map[string]interface{})
 	c.lock.RLock()
 	c.lock.RLock()
 	defer c.lock.RUnlock()
 	defer c.lock.RUnlock()
-	for key, kvs := range c.cf {
+	for key, kvs := range c.etcCfg {
+		aux := make(map[string]interface{})
+		for k, v := range kvs {
+			aux[k] = v
+		}
+		cf[key] = aux
+	}
+
+	for key, kvs := range c.dataCfg {
 		aux := make(map[string]interface{})
 		aux := make(map[string]interface{})
 		for k, v := range kvs {
 		for k, v := range kvs {
 			aux[k] = v
 			aux[k] = v
@@ -77,7 +89,47 @@ func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{} {
 	c.lock.RLock()
 	c.lock.RLock()
 	defer c.lock.RUnlock()
 	defer c.lock.RUnlock()
 
 
-	for key, kvs := range c.cf {
+	for key, kvs := range c.etcCfg {
+		aux := make(map[string]interface{})
+		for k, v := range kvs {
+			aux[k] = v
+		}
+		cf[key] = aux
+	}
+
+	for key, kvs := range c.dataCfg {
+		aux := make(map[string]interface{})
+		for k, v := range kvs {
+			aux[k] = v
+		}
+		cf[key] = aux
+	}
+
+	return cf
+}
+
+func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{} {
+	cf := make(map[string]map[string]interface{})
+	c.lock.RLock()
+	defer c.lock.RUnlock()
+
+	for key, kvs := range c.etcCfg {
+		aux := make(map[string]interface{})
+		for k, v := range kvs {
+			aux[k] = v
+		}
+		cf[key] = aux
+	}
+
+	return cf
+}
+
+func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{} {
+	cf := make(map[string]map[string]interface{})
+	c.lock.RLock()
+	defer c.lock.RUnlock()
+
+	for key, kvs := range c.dataCfg {
 		aux := make(map[string]interface{})
 		aux := make(map[string]interface{})
 		for k, v := range kvs {
 		for k, v := range kvs {
 			aux[k] = v
 			aux[k] = v
@@ -89,10 +141,30 @@ func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{} {
 }
 }
 
 
 func (c *ConfigKeys) GetConfKeys() (keys []string) {
 func (c *ConfigKeys) GetConfKeys() (keys []string) {
+	ro := c.GetReadOnlyConfKeys()
+	keys = append(keys, ro...)
+
+	up := c.GetUpdatableConfKeys()
+	keys = append(keys, up...)
+
+	return keys
+}
+
+func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string) {
+	c.lock.RLock()
+	defer c.lock.RUnlock()
+
+	for k := range c.etcCfg {
+		keys = append(keys, k)
+	}
+	return keys
+}
+
+func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string) {
 	c.lock.RLock()
 	c.lock.RLock()
 	defer c.lock.RUnlock()
 	defer c.lock.RUnlock()
 
 
-	for k := range c.cf {
+	for k := range c.dataCfg {
 		keys = append(keys, k)
 		keys = append(keys, k)
 	}
 	}
 	return keys
 	return keys
@@ -102,7 +174,7 @@ func (c *ConfigKeys) DeleteConfKey(confKey string) {
 	c.lock.Lock()
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	defer c.lock.Unlock()
 
 
-	delete(c.cf, confKey)
+	delete(c.dataCfg, confKey)
 }
 }
 
 
 func recursionDelMap(cf, fields map[string]interface{}) error {
 func recursionDelMap(cf, fields map[string]interface{}) error {
@@ -117,7 +189,6 @@ func recursionDelMap(cf, fields map[string]interface{}) error {
 				delete(cf, k)
 				delete(cf, k)
 				continue
 				continue
 			}
 			}
-
 			var auxCf map[string]interface{}
 			var auxCf map[string]interface{}
 			if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
 			if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
 				return fmt.Errorf(`%s%s.%s`, "type_conversion_fail", k, delKey)
 				return fmt.Errorf(`%s%s.%s`, "type_conversion_fail", k, delKey)
@@ -147,7 +218,7 @@ func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]inte
 	c.lock.Lock()
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	defer c.lock.Unlock()
 
 
-	err := recursionDelMap(c.cf[confKey], reqField)
+	err := recursionDelMap(c.dataCfg[confKey], reqField)
 	if nil != err {
 	if nil != err {
 		return err
 		return err
 	}
 	}
@@ -159,7 +230,11 @@ func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{})
 	c.lock.Lock()
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	defer c.lock.Unlock()
 
 
-	c.cf[confKey] = reqField
+	if _, ok := c.etcCfg[confKey]; ok {
+		return fmt.Errorf("duplicate key %s, already exist in etc folder", confKey)
+	}
+
+	c.dataCfg[confKey] = reqField
 
 
 	return nil
 	return nil
 }
 }
@@ -168,12 +243,12 @@ func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interfa
 	c.lock.Lock()
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	defer c.lock.Unlock()
 
 
-	if nil == c.cf[confKey] {
+	if nil == c.dataCfg[confKey] {
 		return fmt.Errorf(`%s`, "not_found_confkey")
 		return fmt.Errorf(`%s`, "not_found_confkey")
 	}
 	}
 
 
 	for k, v := range reqField {
 	for k, v := range reqField {
-		c.cf[confKey][k] = v
+		c.dataCfg[confKey][k] = v
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -189,18 +264,14 @@ func (c *SourceConfigKeysOps) IsSource() bool {
 
 
 func (c *SourceConfigKeysOps) SaveCfgToFile() error {
 func (c *SourceConfigKeysOps) SaveCfgToFile() error {
 	pluginName := c.pluginName
 	pluginName := c.pluginName
-	confDir, err := conf.GetConfLoc()
+	confDir, err := GetDataLoc()
 	if nil != err {
 	if nil != err {
 		return err
 		return err
 	}
 	}
 
 
 	dir := path.Join(confDir, "sources")
 	dir := path.Join(confDir, "sources")
-	if "mqtt" == pluginName {
-		pluginName = "mqtt_source"
-		dir = confDir
-	}
 	filePath := path.Join(dir, pluginName+".yaml")
 	filePath := path.Join(dir, pluginName+".yaml")
-	cfg := c.CopyConfContent()
+	cfg := c.CopyUpdatableConfContent()
 	err = filex.WriteYamlMarshal(filePath, cfg)
 	err = filex.WriteYamlMarshal(filePath, cfg)
 	if nil != err {
 	if nil != err {
 		return err
 		return err
@@ -219,12 +290,12 @@ func (p *ConnectionConfigKeysOps) IsSource() bool {
 
 
 func (p *ConnectionConfigKeysOps) SaveCfgToFile() error {
 func (p *ConnectionConfigKeysOps) SaveCfgToFile() error {
 	pluginName := p.pluginName
 	pluginName := p.pluginName
-	confDir, err := conf.GetConfLoc()
+	confDir, err := GetDataLoc()
 	if nil != err {
 	if nil != err {
 		return err
 		return err
 	}
 	}
 
 
-	cfg := p.CopyConfContent()
+	cfg := p.CopyUpdatableConfContent()
 
 
 	yamlPath := path.Join(confDir, "connections/connection.yaml")
 	yamlPath := path.Join(confDir, "connections/connection.yaml")
 
 
@@ -239,21 +310,34 @@ func (p *ConnectionConfigKeysOps) SaveCfgToFile() error {
 	return filex.WriteYamlMarshal(yamlPath, yamlData)
 	return filex.WriteYamlMarshal(yamlPath, yamlData)
 }
 }
 
 
-// NewConfigOperatorFromSourceYaml construct function, Load the configs from etc/sources/xx.yaml
-func NewConfigOperatorFromSourceYaml(pluginName string) (ConfigOperator, error) {
-	confDir, err := conf.GetConfLoc()
-	if nil != err {
-		return nil, err
+// NewConfigOperatorForSource construct function
+func NewConfigOperatorForSource(pluginName string) ConfigOperator {
+	c := &SourceConfigKeysOps{
+		&ConfigKeys{
+			lock:       sync.RWMutex{},
+			pluginName: pluginName,
+			etcCfg:     map[string]map[string]interface{}{},
+			dataCfg:    map[string]map[string]interface{}{},
+		},
 	}
 	}
+	return c
+}
 
 
+// NewConfigOperatorFromSourceYaml construct function, Load the configs from etc/sources/xx.yaml
+func NewConfigOperatorFromSourceYaml(pluginName string) (ConfigOperator, error) {
 	c := &SourceConfigKeysOps{
 	c := &SourceConfigKeysOps{
 		&ConfigKeys{
 		&ConfigKeys{
 			lock:       sync.RWMutex{},
 			lock:       sync.RWMutex{},
 			pluginName: pluginName,
 			pluginName: pluginName,
-			cf:         map[string]map[string]interface{}{},
+			etcCfg:     map[string]map[string]interface{}{},
+			dataCfg:    map[string]map[string]interface{}{},
 		},
 		},
 	}
 	}
 
 
+	confDir, err := GetConfLoc()
+	if nil != err {
+		return nil, err
+	}
 	dir := path.Join(confDir, "sources")
 	dir := path.Join(confDir, "sources")
 	fileName := pluginName
 	fileName := pluginName
 	if "mqtt" == pluginName {
 	if "mqtt" == pluginName {
@@ -261,42 +345,60 @@ func NewConfigOperatorFromSourceYaml(pluginName string) (ConfigOperator, error)
 		dir = confDir
 		dir = confDir
 	}
 	}
 	filePath := path.Join(dir, fileName+`.yaml`)
 	filePath := path.Join(dir, fileName+`.yaml`)
-	err = filex.ReadYamlUnmarshal(filePath, &c.cf)
+	_ = filex.ReadYamlUnmarshal(filePath, &c.etcCfg)
+
+	dataDir, err := GetDataLoc()
 	if nil != err {
 	if nil != err {
 		return nil, err
 		return nil, err
 	}
 	}
+	dir = path.Join(dataDir, "sources")
+	fileName = pluginName
+
+	filePath = path.Join(dir, fileName+`.yaml`)
+	_ = filex.ReadYamlUnmarshal(filePath, &c.dataCfg)
 
 
 	return c, nil
 	return c, nil
 }
 }
 
 
+// NewConfigOperatorForConnection construct function
+func NewConfigOperatorForConnection(pluginName string) ConfigOperator {
+	c := &ConnectionConfigKeysOps{
+		&ConfigKeys{
+			lock:       sync.RWMutex{},
+			pluginName: pluginName,
+			etcCfg:     map[string]map[string]interface{}{},
+			dataCfg:    map[string]map[string]interface{}{},
+		},
+	}
+	return c
+}
+
 // NewConfigOperatorFromConnectionYaml construct function, Load the configs from et/connections/connection.yaml
 // NewConfigOperatorFromConnectionYaml construct function, Load the configs from et/connections/connection.yaml
 func NewConfigOperatorFromConnectionYaml(pluginName string) (ConfigOperator, error) {
 func NewConfigOperatorFromConnectionYaml(pluginName string) (ConfigOperator, error) {
+	c := &ConnectionConfigKeysOps{
+		&ConfigKeys{
+			lock:       sync.RWMutex{},
+			pluginName: pluginName,
+			etcCfg:     map[string]map[string]interface{}{},
+			dataCfg:    map[string]map[string]interface{}{},
+		},
+	}
 
 
-	confDir, err := conf.GetConfLoc()
+	confDir, err := GetConfLoc()
 	if nil != err {
 	if nil != err {
 		return nil, err
 		return nil, err
 	}
 	}
-
 	yamlPath := path.Join(confDir, "connections/connection.yaml")
 	yamlPath := path.Join(confDir, "connections/connection.yaml")
 	yamlData := make(map[string]interface{})
 	yamlData := make(map[string]interface{})
 	err = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
 	err = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
 	if nil != err {
 	if nil != err {
 		return nil, err
 		return nil, err
 	}
 	}
-
-	c := &ConnectionConfigKeysOps{
-		&ConfigKeys{
-			lock:       sync.RWMutex{},
-			pluginName: pluginName,
-			cf:         map[string]map[string]interface{}{},
-		},
-	}
-
 	if plgCnfs, ok := yamlData[pluginName]; ok {
 	if plgCnfs, ok := yamlData[pluginName]; ok {
 		if cf, ok1 := plgCnfs.(map[string]interface{}); ok1 {
 		if cf, ok1 := plgCnfs.(map[string]interface{}); ok1 {
 			for confKey, confVal := range cf {
 			for confKey, confVal := range cf {
 				if conf, ok := confVal.(map[string]interface{}); ok {
 				if conf, ok := confVal.(map[string]interface{}); ok {
-					c.cf[confKey] = conf
+					c.etcCfg[confKey] = conf
 				} else {
 				} else {
 					return nil, fmt.Errorf("file content is not right: %s.%v", confKey, confVal)
 					return nil, fmt.Errorf("file content is not right: %s.%v", confKey, confVal)
 				}
 				}
@@ -308,5 +410,26 @@ func NewConfigOperatorFromConnectionYaml(pluginName string) (ConfigOperator, err
 		return nil, fmt.Errorf("not find the target connection type: %s", c.pluginName)
 		return nil, fmt.Errorf("not find the target connection type: %s", c.pluginName)
 	}
 	}
 
 
+	confDir, err = GetDataLoc()
+	if nil != err {
+		return nil, err
+	}
+	yamlPath = path.Join(confDir, "connections/connection.yaml")
+	yamlData = make(map[string]interface{})
+	_ = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
+
+	if plgCnfs, ok := yamlData[pluginName]; ok {
+		if cf, ok1 := plgCnfs.(map[string]interface{}); ok1 {
+			for confKey, confVal := range cf {
+				if conf, ok := confVal.(map[string]interface{}); ok {
+					c.dataCfg[confKey] = conf
+				} else {
+					return nil, fmt.Errorf("file content is not right: %s.%v", confKey, confVal)
+				}
+			}
+		} else {
+			return nil, fmt.Errorf("file content is not right: %v", plgCnfs)
+		}
+	}
 	return c, nil
 	return c, nil
 }
 }

+ 2 - 2
internal/meta/yaml_config_ops_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // See the License for the specific language governing permissions and
 // limitations under the License.
 // limitations under the License.
 
 
-package meta
+package conf
 
 
 import (
 import (
 	"encoding/json"
 	"encoding/json"

+ 9 - 21
internal/meta/yamlConfigMeta.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -23,7 +23,7 @@ import (
 
 
 type configManager struct {
 type configManager struct {
 	lock         sync.RWMutex
 	lock         sync.RWMutex
-	cfgOperators map[string]ConfigOperator
+	cfgOperators map[string]conf.ConfigOperator
 }
 }
 
 
 //ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
 //ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
@@ -31,7 +31,7 @@ type configManager struct {
 // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
 // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
 var ConfigManager = configManager{
 var ConfigManager = configManager{
 	lock:         sync.RWMutex{},
 	lock:         sync.RWMutex{},
-	cfgOperators: make(map[string]ConfigOperator),
+	cfgOperators: make(map[string]conf.ConfigOperator),
 }
 }
 
 
 const SourceCfgOperatorKeyTemplate = "sources.%s"
 const SourceCfgOperatorKeyTemplate = "sources.%s"
@@ -43,7 +43,7 @@ const ConnectionCfgOperatorKeyTemplate = "connections.%s"
 func loadConfigOperatorForSource(pluginName string) {
 func loadConfigOperatorForSource(pluginName string) {
 	yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
 	yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
 
 
-	if cfg, _ := NewConfigOperatorFromSourceYaml(pluginName); cfg != nil {
+	if cfg, _ := conf.NewConfigOperatorFromSourceYaml(pluginName); cfg != nil {
 		ConfigManager.lock.Lock()
 		ConfigManager.lock.Lock()
 		ConfigManager.cfgOperators[yamlKey] = cfg
 		ConfigManager.cfgOperators[yamlKey] = cfg
 		ConfigManager.lock.Unlock()
 		ConfigManager.lock.Unlock()
@@ -57,7 +57,7 @@ func loadConfigOperatorForSource(pluginName string) {
 func loadConfigOperatorForConnection(pluginName string) {
 func loadConfigOperatorForConnection(pluginName string) {
 	yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
 	yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
 
 
-	if cfg, _ := NewConfigOperatorFromConnectionYaml(pluginName); cfg != nil {
+	if cfg, _ := conf.NewConfigOperatorFromConnectionYaml(pluginName); cfg != nil {
 		ConfigManager.lock.Lock()
 		ConfigManager.lock.Lock()
 		ConfigManager.cfgOperators[yamlKey] = cfg
 		ConfigManager.cfgOperators[yamlKey] = cfg
 		ConfigManager.lock.Unlock()
 		ConfigManager.lock.Unlock()
@@ -123,18 +123,12 @@ func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
 		return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
 		return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
 	}
 	}
 
 
-	var cfgOps ConfigOperator
+	var cfgOps conf.ConfigOperator
 	var found bool
 	var found bool
 
 
 	cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
 	cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
 	if !found {
 	if !found {
-		cfgOps = &SourceConfigKeysOps{
-			ConfigKeys: &ConfigKeys{
-				lock:       sync.RWMutex{},
-				pluginName: plgName,
-				cf:         map[string]map[string]interface{}{},
-			},
-		}
+		cfgOps = conf.NewConfigOperatorForSource(plgName)
 		ConfigManager.cfgOperators[configOperatorKey] = cfgOps
 		ConfigManager.cfgOperators[configOperatorKey] = cfgOps
 	}
 	}
 
 
@@ -161,18 +155,12 @@ func AddConnectionConfKey(plgName, confKey, language string, content []byte) err
 		return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
 		return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
 	}
 	}
 
 
-	var cfgOps ConfigOperator
+	var cfgOps conf.ConfigOperator
 	var found bool
 	var found bool
 
 
 	cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
 	cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
 	if !found {
 	if !found {
-		cfgOps = &ConnectionConfigKeysOps{
-			ConfigKeys: &ConfigKeys{
-				lock:       sync.RWMutex{},
-				pluginName: plgName,
-				cf:         map[string]map[string]interface{}{},
-			},
-		}
+		cfgOps = conf.NewConfigOperatorForConnection(plgName)
 		ConfigManager.cfgOperators[configOperatorKey] = cfgOps
 		ConfigManager.cfgOperators[configOperatorKey] = cfgOps
 	}
 	}
 
 

+ 32 - 22
internal/meta/yamlConfigMeta_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -15,37 +15,47 @@
 package meta
 package meta
 
 
 import (
 import (
-	"fmt"
-	"sync"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"os"
+	"path/filepath"
 	"testing"
 	"testing"
 )
 )
 
 
-type MockSourcesConfigOps struct {
-	*ConfigKeys
-}
+func createPaths() {
+	dataDir, err := conf.GetDataLoc()
+	if err != nil {
+		panic(err)
+	}
+	dirs := []string{"sources", "sinks", "functions", "services", "services/schemas", "connections"}
 
 
-func (m MockSourcesConfigOps) IsSource() bool {
-	return true
-}
+	for _, v := range dirs {
+		// Create dir if not exist
+		realDir := filepath.Join(dataDir, v)
+		if _, err := os.Stat(realDir); os.IsNotExist(err) {
+			if err := os.MkdirAll(realDir, os.ModePerm); err != nil {
+				panic(err)
+			}
+		}
+	}
+
+	files := []string{"connections/connection.yaml"}
+	for _, v := range files {
+		// Create dir if not exist
+		realFile := filepath.Join(dataDir, v)
+		if _, err := os.Stat(realFile); os.IsNotExist(err) {
+			if _, err := os.Create(realFile); err != nil {
+				panic(err)
+			}
+		}
+	}
 
 
-func (m MockSourcesConfigOps) SaveCfgToFile() error {
-	return nil
 }
 }
 
 
 func TestYamlConfigMeta_Ops(t *testing.T) {
 func TestYamlConfigMeta_Ops(t *testing.T) {
-	plgName := "mocksource"
+	createPaths()
 
 
-	yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
+	plgName := "mocksource"
 	addData := `{"url":"127.0.0.1","method":"post","headers":{"Accept":"json"}}`
 	addData := `{"url":"127.0.0.1","method":"post","headers":{"Accept":"json"}}`
-
-	ConfigManager.cfgOperators[yamlKey] = MockSourcesConfigOps{
-		&ConfigKeys{
-			sync.RWMutex{},
-			plgName,
-			make(map[string]map[string]interface{}),
-		},
-	}
-
 	// init new ConfigOperator, success
 	// init new ConfigOperator, success
 	err := AddSourceConfKey(plgName, "new", "en_US", []byte(addData))
 	err := AddSourceConfKey(plgName, "new", "en_US", []byte(addData))
 	if err != nil {
 	if err != nil {

+ 7 - 0
internal/server/rest.go

@@ -17,6 +17,7 @@ package server
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"io"
 	"io"
 	"net/http"
 	"net/http"
 	"os"
 	"os"
@@ -96,6 +97,12 @@ func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
 }
 }
 
 
 func createRestServer(ip string, port int, needToken bool) *http.Server {
 func createRestServer(ip string, port int, needToken bool) *http.Server {
+	dataDir, err := conf.GetDataLoc()
+	if err != nil {
+		panic(err)
+	}
+	uploadDir = filepath.Join(dataDir, "uploads")
+
 	r := mux.NewRouter()
 	r := mux.NewRouter()
 	r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
 	r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)

+ 13 - 1
internal/server/server.go

@@ -46,7 +46,7 @@ func createPaths() {
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
-	dirs := []string{"sources", "sinks", "functions", "services", "services/schemas"}
+	dirs := []string{"uploads", "sources", "sinks", "functions", "services", "services/schemas", "connections"}
 
 
 	for _, v := range dirs {
 	for _, v := range dirs {
 		// Create dir if not exist
 		// Create dir if not exist
@@ -57,6 +57,18 @@ func createPaths() {
 			}
 			}
 		}
 		}
 	}
 	}
+
+	files := []string{"connections/connection.yaml"}
+	for _, v := range files {
+		// Create dir if not exist
+		realFile := filepath.Join(dataDir, v)
+		if _, err := os.Stat(realFile); os.IsNotExist(err) {
+			if _, err := os.Create(realFile); err != nil {
+				panic(err)
+			}
+		}
+	}
+
 }
 }
 
 
 func StartUp(Version, LoadFileType string) {
 func StartUp(Version, LoadFileType string) {

+ 9 - 14
internal/topo/node/conf/source.go

@@ -22,29 +22,24 @@ import (
 
 
 func GetSourceConf(sourceType string, options *ast.Options) map[string]interface{} {
 func GetSourceConf(sourceType string, options *ast.Options) map[string]interface{} {
 	confkey := options.CONF_KEY
 	confkey := options.CONF_KEY
-	confPath := "sources/" + sourceType + ".yaml"
-	if sourceType == "mqtt" {
-		confPath = "mqtt_source.yaml"
+
+	yamlOps, err := conf.NewConfigOperatorFromSourceYaml(sourceType)
+	if err != nil {
+		conf.Log.Warnf("fail to parse yaml for source %s. Return error %v", sourceType, err)
 	}
 	}
 	props := make(map[string]interface{})
 	props := make(map[string]interface{})
-	cfg := make(map[string]interface{})
-	err := conf.LoadConfigByName(confPath, &cfg)
-	if err != nil {
+	cfg := yamlOps.CopyConfContent()
+	if len(cfg) == 0 {
 		conf.Log.Warnf("fail to parse yaml for source %s. Return an empty configuration", sourceType)
 		conf.Log.Warnf("fail to parse yaml for source %s. Return an empty configuration", sourceType)
 	} else {
 	} else {
 		def, ok := cfg["default"]
 		def, ok := cfg["default"]
 		if !ok {
 		if !ok {
 			conf.Log.Warnf("default conf %s is not found", confkey)
 			conf.Log.Warnf("default conf %s is not found", confkey)
 		} else {
 		} else {
-			if def1, ok1 := def.(map[string]interface{}); ok1 {
-				props = def1
-			}
+			props = def
 			if c, ok := cfg[strings.ToLower(confkey)]; ok {
 			if c, ok := cfg[strings.ToLower(confkey)]; ok {
-				if c1, ok := c.(map[string]interface{}); ok {
-					c2 := c1
-					for k, v := range c2 {
-						props[k] = v
-					}
+				for k, v := range c {
+					props[k] = v
 				}
 				}
 			}
 			}
 		}
 		}