|
@@ -43,7 +43,6 @@ type ConfKeysOperator interface {
|
|
|
//ConfigOperator define interface to query/add/update/delete the configs in disk
|
|
|
type ConfigOperator interface {
|
|
|
ConfKeysOperator
|
|
|
- IsSource() bool
|
|
|
SaveCfgToFile() error
|
|
|
}
|
|
|
|
|
@@ -255,10 +254,6 @@ type SourceConfigKeysOps struct {
|
|
|
*ConfigKeys
|
|
|
}
|
|
|
|
|
|
-func (c *SourceConfigKeysOps) IsSource() bool {
|
|
|
- return true
|
|
|
-}
|
|
|
-
|
|
|
func (c *SourceConfigKeysOps) SaveCfgToFile() error {
|
|
|
pluginName := c.pluginName
|
|
|
confDir, err := GetDataLoc()
|
|
@@ -281,10 +276,6 @@ type SinkConfigKeysOps struct {
|
|
|
*ConfigKeys
|
|
|
}
|
|
|
|
|
|
-func (c *SinkConfigKeysOps) IsSource() bool {
|
|
|
- return false
|
|
|
-}
|
|
|
-
|
|
|
func (c *SinkConfigKeysOps) SaveCfgToFile() error {
|
|
|
pluginName := c.pluginName
|
|
|
confDir, err := GetDataLoc()
|
|
@@ -307,10 +298,6 @@ type ConnectionConfigKeysOps struct {
|
|
|
*ConfigKeys
|
|
|
}
|
|
|
|
|
|
-func (p *ConnectionConfigKeysOps) IsSource() bool {
|
|
|
- return false
|
|
|
-}
|
|
|
-
|
|
|
func (p *ConnectionConfigKeysOps) SaveCfgToFile() error {
|
|
|
pluginName := p.pluginName
|
|
|
confDir, err := GetDataLoc()
|
|
@@ -379,7 +366,7 @@ func NewConfigOperatorFromSourceYaml(pluginName string) (ConfigOperator, error)
|
|
|
fileName = pluginName
|
|
|
|
|
|
filePath = path.Join(dir, fileName+`.yaml`)
|
|
|
- _ = filex.ReadYamlUnmarshal(filePath, &c.dataCfg)
|
|
|
+ _ = LoadConfigFromPath(filePath, &c.dataCfg)
|
|
|
|
|
|
return c, nil
|
|
|
}
|
|
@@ -415,7 +402,7 @@ func NewConfigOperatorFromSinkYaml(pluginName string) (ConfigOperator, error) {
|
|
|
dir := path.Join(dataDir, "sinks")
|
|
|
|
|
|
filePath := path.Join(dir, pluginName+`.yaml`)
|
|
|
- _ = filex.ReadYamlUnmarshal(filePath, &c.dataCfg)
|
|
|
+ _ = LoadConfigFromPath(filePath, &c.dataCfg)
|
|
|
|
|
|
return c, nil
|
|
|
}
|
|
@@ -450,7 +437,7 @@ func NewConfigOperatorFromConnectionYaml(pluginName string) (ConfigOperator, err
|
|
|
}
|
|
|
yamlPath := path.Join(confDir, "connections/connection.yaml")
|
|
|
yamlData := make(map[string]interface{})
|
|
|
- err = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
|
|
|
+ err = LoadConfigFromPath(yamlPath, &yamlData)
|
|
|
if nil != err {
|
|
|
return nil, err
|
|
|
}
|
|
@@ -476,7 +463,7 @@ func NewConfigOperatorFromConnectionYaml(pluginName string) (ConfigOperator, err
|
|
|
}
|
|
|
yamlPath = path.Join(confDir, "connections/connection.yaml")
|
|
|
yamlData = make(map[string]interface{})
|
|
|
- _ = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
|
|
|
+ _ = LoadConfigFromPath(yamlPath, &yamlData)
|
|
|
|
|
|
if plgCnfs, ok := yamlData[pluginName]; ok {
|
|
|
if cf, ok1 := plgCnfs.(map[string]interface{}); ok1 {
|