123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- // Copyright 2021 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package meta
- import (
- "encoding/json"
- "fmt"
- "github.com/lf-edge/ekuiper/internal/conf"
- "sync"
- )
- type configManager struct {
- lock sync.RWMutex
- cfgOperators map[string]ConfigOperator
- }
- //ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
- // for configs in etc/sources/xxx.yaml, the map key is sources.xxx format, xxx will be mqtt/httppull and so on
- // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
- var ConfigManager = configManager{
- lock: sync.RWMutex{},
- cfgOperators: make(map[string]ConfigOperator),
- }
- const SourceCfgOperatorKeyTemplate = "sources.%s"
- const ConnectionCfgOperatorKeyTemplate = "connections.%s"
- // loadConfigOperatorForSource
- // Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml
- // If plugin xxx not exist in /etc/sources/xxx.yaml, no error response
- func loadConfigOperatorForSource(pluginName string) {
- yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
- if cfg, _ := NewConfigOperatorFromSourceYaml(pluginName); cfg != nil {
- ConfigManager.lock.Lock()
- ConfigManager.cfgOperators[yamlKey] = cfg
- ConfigManager.lock.Unlock()
- conf.Log.Infof("Loading yaml file for source: %s", pluginName)
- }
- }
- // loadConfigOperatorForConnection
- // Try to load ConfigOperator for plugin from /etc/connections/connection.yaml
- // If plugin not exist in /etc/connections/connection.yaml, no error response
- func loadConfigOperatorForConnection(pluginName string) {
- yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
- if cfg, _ := NewConfigOperatorFromConnectionYaml(pluginName); cfg != nil {
- ConfigManager.lock.Lock()
- ConfigManager.cfgOperators[yamlKey] = cfg
- ConfigManager.lock.Unlock()
- conf.Log.Infof("Loading yaml file for connection: %s", pluginName)
- }
- }
- func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
- ConfigManager.lock.RLock()
- defer ConfigManager.lock.RUnlock()
- cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
- if !ok {
- return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
- }
- cf := cfgOps.CopyConfContent()
- if b, err = json.Marshal(cf); nil != err {
- return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
- } else {
- return b, err
- }
- }
- func DelSourceConfKey(plgName, confKey, language string) error {
- configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
- return delConfKey(configOperatorKey, confKey, language)
- }
- func DelConnectionConfKey(plgName, confKey, language string) error {
- configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
- return delConfKey(configOperatorKey, confKey, language)
- }
- func delConfKey(configOperatorKey, confKey, language string) error {
- ConfigManager.lock.Lock()
- defer ConfigManager.lock.Unlock()
- cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
- if !ok {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
- }
- cfgOps.DeleteConfKey(confKey)
- err := cfgOps.SaveCfgToFile()
- if err != nil {
- return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
- }
- return nil
- }
- func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
- ConfigManager.lock.Lock()
- defer ConfigManager.lock.Unlock()
- configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
- reqField := make(map[string]interface{})
- err := json.Unmarshal(content, &reqField)
- if nil != err {
- return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
- }
- var cfgOps ConfigOperator
- var found bool
- cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
- if !found {
- cfgOps = &SourceConfigKeysOps{
- ConfigKeys: &ConfigKeys{
- lock: sync.RWMutex{},
- pluginName: plgName,
- cf: map[string]map[string]interface{}{},
- },
- }
- ConfigManager.cfgOperators[configOperatorKey] = cfgOps
- }
- if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
- return err
- }
- err = cfgOps.SaveCfgToFile()
- if err != nil {
- return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
- }
- return nil
- }
- func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
- ConfigManager.lock.Lock()
- defer ConfigManager.lock.Unlock()
- configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
- reqField := make(map[string]interface{})
- err := json.Unmarshal(content, &reqField)
- if nil != err {
- return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
- }
- var cfgOps ConfigOperator
- var found bool
- cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
- if !found {
- cfgOps = &ConnectionConfigKeysOps{
- ConfigKeys: &ConfigKeys{
- lock: sync.RWMutex{},
- pluginName: plgName,
- cf: map[string]map[string]interface{}{},
- },
- }
- ConfigManager.cfgOperators[configOperatorKey] = cfgOps
- }
- if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
- return err
- }
- err = cfgOps.SaveCfgToFile()
- if err != nil {
- return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
- }
- return nil
- }
|