Procházet zdrojové kódy

fix(fea): support sink config by resource id

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran před 2 roky
rodič
revize
312429aafa

+ 5 - 0
etc/multilingual/en_US.ini

@@ -1,6 +1,11 @@
 [sink]
 [sink]
 not_found_plugin=This plugin was not found:
 not_found_plugin=This plugin was not found:
+not_found_confkey=This confkey was not found:
+confkey_already_exist=This confkey already exists:
+json_marshal_fail=Failed to format json data:
 type_conversion_fail=Type conversion failed:
 type_conversion_fail=Type conversion failed:
+not_found_file=Can't find this file:
+write_data_fail=Failed to write data to file:
 [source]
 [source]
 not_found_plugin=This plugin was not found:
 not_found_plugin=This plugin was not found:
 not_found_confkey=This confkey was not found:
 not_found_confkey=This confkey was not found:

+ 5 - 0
etc/multilingual/zh_CN.ini

@@ -1,6 +1,11 @@
 [sink]
 [sink]
 not_found_plugin=没有找到这个插件:
 not_found_plugin=没有找到这个插件:
+not_found_confkey=没有找到这个配置项:
+confkey_already_exist=这个配置项已经存在:
+json_marshal_fail=格式化 json 错误:
 type_conversion_fail=类型转换错误:
 type_conversion_fail=类型转换错误:
+not_found_file=找不到这个文件:
+write_data_fail=数据写入文件失败:
 [source]
 [source]
 not_found_plugin=没有找到这个插件:
 not_found_plugin=没有找到这个插件:
 not_found_confkey=没有找到这个配置项:
 not_found_confkey=没有找到这个配置项:

+ 79 - 3
internal/conf/yaml_config_ops.go

@@ -230,12 +230,11 @@ func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{})
 	c.lock.Lock()
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	defer c.lock.Unlock()
 
 
+	c.dataCfg[confKey] = reqField
 	if _, ok := c.etcCfg[confKey]; ok {
 	if _, ok := c.etcCfg[confKey]; ok {
-		return fmt.Errorf("duplicate key %s, already exist in etc folder", confKey)
+		delete(c.etcCfg, confKey)
 	}
 	}
 
 
-	c.dataCfg[confKey] = reqField
-
 	return nil
 	return nil
 }
 }
 
 
@@ -279,6 +278,32 @@ func (c *SourceConfigKeysOps) SaveCfgToFile() error {
 	return nil
 	return nil
 }
 }
 
 
+// SinkConfigKeysOps implement ConfOperator interface, load the configs from data/sinks/xx.yaml
+type SinkConfigKeysOps struct {
+	*ConfigKeys
+}
+
+func (c *SinkConfigKeysOps) IsSource() bool {
+	return false
+}
+
+func (c *SinkConfigKeysOps) SaveCfgToFile() error {
+	pluginName := c.pluginName
+	confDir, err := GetDataLoc()
+	if nil != err {
+		return err
+	}
+
+	dir := path.Join(confDir, "sinks")
+	filePath := path.Join(dir, pluginName+".yaml")
+	cfg := c.CopyUpdatableConfContent()
+	err = filex.WriteYamlMarshal(filePath, cfg)
+	if nil != err {
+		return err
+	}
+	return nil
+}
+
 // ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml
 // ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml
 type ConnectionConfigKeysOps struct {
 type ConnectionConfigKeysOps struct {
 	*ConfigKeys
 	*ConfigKeys
@@ -358,6 +383,49 @@ func NewConfigOperatorFromSourceYaml(pluginName string) (ConfigOperator, error)
 	filePath = path.Join(dir, fileName+`.yaml`)
 	filePath = path.Join(dir, fileName+`.yaml`)
 	_ = filex.ReadYamlUnmarshal(filePath, &c.dataCfg)
 	_ = filex.ReadYamlUnmarshal(filePath, &c.dataCfg)
 
 
+	//delete the etc config keys that exist in data
+	for k, _ := range c.dataCfg {
+		if _, found := c.etcCfg[k]; found {
+			delete(c.etcCfg, k)
+		}
+	}
+
+	return c, nil
+}
+
+// NewConfigOperatorForSink construct function
+func NewConfigOperatorForSink(pluginName string) ConfigOperator {
+	c := &SinkConfigKeysOps{
+		&ConfigKeys{
+			lock:       sync.RWMutex{},
+			pluginName: pluginName,
+			etcCfg:     map[string]map[string]interface{}{},
+			dataCfg:    map[string]map[string]interface{}{},
+		},
+	}
+	return c
+}
+
+// NewConfigOperatorFromSinkYaml construct function, Load the configs from etc/sources/xx.yaml
+func NewConfigOperatorFromSinkYaml(pluginName string) (ConfigOperator, error) {
+	c := &SinkConfigKeysOps{
+		&ConfigKeys{
+			lock:       sync.RWMutex{},
+			pluginName: pluginName,
+			etcCfg:     map[string]map[string]interface{}{},
+			dataCfg:    map[string]map[string]interface{}{},
+		},
+	}
+
+	dataDir, err := GetDataLoc()
+	if nil != err {
+		return nil, err
+	}
+	dir := path.Join(dataDir, "sinks")
+
+	filePath := path.Join(dir, pluginName+`.yaml`)
+	_ = filex.ReadYamlUnmarshal(filePath, &c.dataCfg)
+
 	return c, nil
 	return c, nil
 }
 }
 
 
@@ -432,5 +500,13 @@ func NewConfigOperatorFromConnectionYaml(pluginName string) (ConfigOperator, err
 			return nil, fmt.Errorf("file content is not right: %v", plgCnfs)
 			return nil, fmt.Errorf("file content is not right: %v", plgCnfs)
 		}
 		}
 	}
 	}
+
+	//delete the etc config keys that exist in data
+	for k, _ := range c.dataCfg {
+		if _, found := c.etcCfg[k]; found {
+			delete(c.etcCfg, k)
+		}
+	}
+
 	return c, nil
 	return c, nil
 }
 }

+ 1 - 0
internal/meta/sinkMeta.go

@@ -254,6 +254,7 @@ func ReadSinkMetaFile(filePath string, installed bool) error {
 	if nil != err {
 	if nil != err {
 		return err
 		return err
 	}
 	}
+	loadConfigOperatorForSink(strings.TrimSuffix(finame, `.json`))
 	conf.Log.Infof("Loading metadata file for sink: %s", finame)
 	conf.Log.Infof("Loading metadata file for sink: %s", finame)
 	return nil
 	return nil
 }
 }

+ 117 - 23
internal/meta/yamlConfigMeta.go

@@ -18,6 +18,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"strings"
 	"sync"
 	"sync"
 )
 )
 
 
@@ -35,11 +36,12 @@ var ConfigManager = configManager{
 }
 }
 
 
 const SourceCfgOperatorKeyTemplate = "sources.%s"
 const SourceCfgOperatorKeyTemplate = "sources.%s"
+const SinkCfgOperatorKeyTemplate = "sinks.%s"
 const ConnectionCfgOperatorKeyTemplate = "connections.%s"
 const ConnectionCfgOperatorKeyTemplate = "connections.%s"
 
 
 // loadConfigOperatorForSource
 // loadConfigOperatorForSource
-// Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml
-// If plugin xxx not exist in /etc/sources/xxx.yaml, no error response
+// Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml  /data/sources/xxx.yaml
+// If plugin xxx not exist, no error response
 func loadConfigOperatorForSource(pluginName string) {
 func loadConfigOperatorForSource(pluginName string) {
 	yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
 	yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
 
 
@@ -51,8 +53,22 @@ func loadConfigOperatorForSource(pluginName string) {
 	}
 	}
 }
 }
 
 
+// loadConfigOperatorForSink
+// Try to load ConfigOperator for plugin xxx from /data/sinks/xxx.yaml
+// If plugin xxx not exist, no error response
+func loadConfigOperatorForSink(pluginName string) {
+	yamlKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, pluginName)
+
+	if cfg, _ := conf.NewConfigOperatorFromSinkYaml(pluginName); cfg != nil {
+		ConfigManager.lock.Lock()
+		ConfigManager.cfgOperators[yamlKey] = cfg
+		ConfigManager.lock.Unlock()
+		conf.Log.Infof("Loading yaml file for sink: %s", pluginName)
+	}
+}
+
 // loadConfigOperatorForConnection
 // loadConfigOperatorForConnection
-// Try to load ConfigOperator for plugin from /etc/connections/connection.yaml
+// Try to load ConfigOperator for plugin from /etc/connections/connection.yaml /data/connections/connection.yaml
 // If plugin not exist in /etc/connections/connection.yaml, no error response
 // If plugin not exist in /etc/connections/connection.yaml, no error response
 func loadConfigOperatorForConnection(pluginName string) {
 func loadConfigOperatorForConnection(pluginName string) {
 	yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
 	yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
@@ -65,22 +81,22 @@ func loadConfigOperatorForConnection(pluginName string) {
 	}
 	}
 }
 }
 
 
-func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
-
-	ConfigManager.lock.RLock()
-	defer ConfigManager.lock.RUnlock()
+func delConfKey(configOperatorKey, confKey, language string) error {
+	ConfigManager.lock.Lock()
+	defer ConfigManager.lock.Unlock()
 
 
 	cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
 	cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
 	if !ok {
 	if !ok {
-		return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
+		return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
 	}
 	}
 
 
-	cf := cfgOps.CopyConfContent()
-	if b, err = json.Marshal(cf); nil != err {
-		return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
-	} else {
-		return b, err
+	cfgOps.DeleteConfKey(confKey)
+
+	err := cfgOps.SaveCfgToFile()
+	if err != nil {
+		return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
 	}
 	}
+	return nil
 }
 }
 
 
 func DelSourceConfKey(plgName, confKey, language string) error {
 func DelSourceConfKey(plgName, confKey, language string) error {
@@ -88,27 +104,32 @@ func DelSourceConfKey(plgName, confKey, language string) error {
 	return delConfKey(configOperatorKey, confKey, language)
 	return delConfKey(configOperatorKey, confKey, language)
 }
 }
 
 
+func DelSinkConfKey(plgName, confKey, language string) error {
+	configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
+	return delConfKey(configOperatorKey, confKey, language)
+}
+
 func DelConnectionConfKey(plgName, confKey, language string) error {
 func DelConnectionConfKey(plgName, confKey, language string) error {
 	configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
 	configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
 	return delConfKey(configOperatorKey, confKey, language)
 	return delConfKey(configOperatorKey, confKey, language)
 }
 }
 
 
-func delConfKey(configOperatorKey, confKey, language string) error {
-	ConfigManager.lock.Lock()
-	defer ConfigManager.lock.Unlock()
+func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
+
+	ConfigManager.lock.RLock()
+	defer ConfigManager.lock.RUnlock()
 
 
 	cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
 	cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
 	if !ok {
 	if !ok {
-		return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
+		return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
 	}
 	}
 
 
-	cfgOps.DeleteConfKey(confKey)
-
-	err := cfgOps.SaveCfgToFile()
-	if err != nil {
-		return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
+	cf := cfgOps.CopyConfContent()
+	if b, err = json.Marshal(cf); nil != err {
+		return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
+	} else {
+		return b, err
 	}
 	}
-	return nil
 }
 }
 
 
 func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
 func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
@@ -143,6 +164,38 @@ func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
 	return nil
 	return nil
 }
 }
 
 
+func AddSinkConfKey(plgName, confKey, language string, content []byte) error {
+	ConfigManager.lock.Lock()
+	defer ConfigManager.lock.Unlock()
+
+	configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
+
+	reqField := make(map[string]interface{})
+	err := json.Unmarshal(content, &reqField)
+	if nil != err {
+		return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "type_conversion_fail"), plgName, err)
+	}
+
+	var cfgOps conf.ConfigOperator
+	var found bool
+
+	cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
+	if !found {
+		cfgOps = conf.NewConfigOperatorForSink(plgName)
+		ConfigManager.cfgOperators[configOperatorKey] = cfgOps
+	}
+
+	if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
+		return err
+	}
+
+	err = cfgOps.SaveCfgToFile()
+	if err != nil {
+		return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "write_data_fail"), configOperatorKey, err)
+	}
+	return nil
+}
+
 func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
 func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
 	ConfigManager.lock.Lock()
 	ConfigManager.lock.Lock()
 	defer ConfigManager.lock.Unlock()
 	defer ConfigManager.lock.Unlock()
@@ -174,3 +227,44 @@ func AddConnectionConfKey(plgName, confKey, language string, content []byte) err
 	}
 	}
 	return nil
 	return nil
 }
 }
+
+func GetResources(language string) (b []byte, err error) {
+	ConfigManager.lock.RLock()
+	defer ConfigManager.lock.RUnlock()
+	var srcResources []map[string]string
+	var sinkResources []map[string]string
+
+	for key, ops := range ConfigManager.cfgOperators {
+		if strings.HasSuffix(key, ConnectionCfgOperatorKeyTemplate) {
+			continue
+		}
+		if strings.HasSuffix(key, SourceCfgOperatorKeyTemplate) {
+			plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyTemplate)
+			resourceIds := ops.GetUpdatableConfKeys()
+			item := map[string]string{}
+			for _, v := range resourceIds {
+				item[plugin] = v
+			}
+			srcResources = append(srcResources, item)
+		}
+		if strings.HasSuffix(key, SinkCfgOperatorKeyTemplate) {
+			plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyTemplate)
+			resourceIds := ops.GetUpdatableConfKeys()
+			item := map[string]string{}
+			for _, v := range resourceIds {
+				item[plugin] = v
+			}
+			sinkResources = append(sinkResources, item)
+		}
+	}
+
+	result := map[string]interface{}{}
+	result["sources"] = srcResources
+	result["sinks"] = sinkResources
+
+	if b, err = json.Marshal(result); nil != err {
+		return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), result)
+	} else {
+		return b, err
+	}
+}

+ 58 - 1
internal/server/meta_init.go

@@ -49,12 +49,15 @@ func (m metaComp) rest(r *mux.Router) {
 	r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
 	r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
+	r.HandleFunc("/metadata/sinks/yaml/{name}", sinkConfHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sinks/{name}/confKeys/{confKey}", sinkConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
 
 
 	r.HandleFunc("/metadata/connections", connectionsMetaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/connections", connectionsMetaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/connections/{name}", connectionMetaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/connections/{name}", connectionMetaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/connections/yaml/{name}", connectionConfHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/connections/yaml/{name}", connectionConfHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}", connectionConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
 	r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}", connectionConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
 
 
+	r.HandleFunc("/metadata/resources", resourcesHandler).Methods(http.MethodGet)
 	for _, endpoint := range metaEndpoints {
 	for _, endpoint := range metaEndpoints {
 		endpoint(r)
 		endpoint(r)
 	}
 	}
@@ -181,7 +184,23 @@ func connectionConfHandler(w http.ResponseWriter, r *http.Request) {
 		handleError(w, err, "", logger)
 		handleError(w, err, "", logger)
 		return
 		return
 	} else {
 	} else {
-		w.Write(ret)
+		_, _ = w.Write(ret)
+	}
+}
+
+// Get sink yaml
+func sinkConfHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	language := getLanguage(r)
+	configOperatorKey := fmt.Sprintf(meta.SinkCfgOperatorKeyTemplate, pluginName)
+	ret, err := meta.GetYamlConf(configOperatorKey, language)
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	} else {
+		_, _ = w.Write(ret)
 	}
 	}
 }
 }
 
 
@@ -212,6 +231,31 @@ func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
 }
 }
 
 
 // Add  del confkey
 // Add  del confkey
+func sinkConfKeyHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	var err error
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	confKey := vars["confKey"]
+	language := getLanguage(r)
+	switch r.Method {
+	case http.MethodDelete:
+		err = meta.DelSinkConfKey(pluginName, confKey, language)
+	case http.MethodPut:
+		v, err1 := io.ReadAll(r.Body)
+		if err1 != nil {
+			handleError(w, err, "Invalid body", logger)
+			return
+		}
+		err = meta.AddSinkConfKey(pluginName, confKey, language, v)
+	}
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	}
+}
+
+// Add  del confkey
 func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
 func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
 
 
 	defer r.Body.Close()
 	defer r.Body.Close()
@@ -237,6 +281,19 @@ func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
 	}
 	}
 }
 }
 
 
+// get updatable resources
+func resourcesHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	language := getLanguage(r)
+	ret, err := meta.GetResources(language)
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	} else {
+		_, _ = w.Write(ret)
+	}
+}
+
 func getLanguage(r *http.Request) string {
 func getLanguage(r *http.Request) string {
 	language := r.Header.Get("Content-Language")
 	language := r.Header.Get("Content-Language")
 	if 0 == len(language) {
 	if 0 == len(language) {

+ 55 - 0
internal/topo/node/conf/sink.go

@@ -0,0 +1,55 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package conf
+
+import (
+	"github.com/lf-edge/ekuiper/internal/conf"
+)
+
+const ResourceID = "resource_id"
+
+func GetSinkConf(sinkType string, action map[string]interface{}) map[string]interface{} {
+	resourceId, ok := action[ResourceID].(string)
+	if !ok {
+		return action
+	}
+	delete(action, ResourceID)
+
+	yamlOps, err := conf.NewConfigOperatorFromSinkYaml(sinkType)
+	if err != nil {
+		conf.Log.Warnf("fail to parse yaml for sink %s. Return error %v", sinkType, err)
+		return action
+	}
+	props := make(map[string]interface{})
+	cfg := yamlOps.CopyConfContent()
+	if len(cfg) == 0 {
+		conf.Log.Warnf("fail to parse yaml for sink %s. Return an empty configuration", sinkType)
+		return action
+	} else {
+		def, ok := cfg[resourceId]
+		if !ok {
+			conf.Log.Warnf("resource id %s is not found", resourceId)
+			return action
+		} else {
+			props = def
+			for k, v := range action {
+				props[k] = v
+			}
+		}
+	}
+
+	conf.Log.Debugf("get conf for %s with resource id %s: %v", sinkType, resourceId, printable(props))
+	return props
+}

+ 3 - 1
internal/topo/node/sink_node.go

@@ -20,6 +20,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/node/cache"
 	"github.com/lf-edge/ekuiper/internal/topo/node/cache"
+	nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
@@ -388,7 +389,8 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	)
 	)
 	s, err = io.Sink(name)
 	s, err = io.Sink(name)
 	if s != nil {
 	if s != nil {
-		err = s.Configure(action)
+		newAction := nodeConf.GetSinkConf(name, action)
+		err = s.Configure(newAction)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}