yamlConfigMeta.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package meta
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "sync"
  20. )
  21. type configManager struct {
  22. lock sync.RWMutex
  23. cfgOperators map[string]ConfigOperator
  24. }
  25. //ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
  26. // for configs in etc/sources/xxx.yaml, the map key is sources.xxx format, xxx will be mqtt/httppull and so on
  27. // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
  28. var ConfigManager = configManager{
  29. lock: sync.RWMutex{},
  30. cfgOperators: make(map[string]ConfigOperator),
  31. }
  32. const SourceCfgOperatorKeyTemplate = "sources.%s"
  33. const ConnectionCfgOperatorKeyTemplate = "connections.%s"
  34. // loadConfigOperatorForSource
  35. // Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml
  36. // If plugin xxx not exist in /etc/sources/xxx.yaml, no error response
  37. func loadConfigOperatorForSource(pluginName string) {
  38. yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
  39. if cfg, _ := NewConfigOperatorFromSourceYaml(pluginName); cfg != nil {
  40. ConfigManager.lock.Lock()
  41. ConfigManager.cfgOperators[yamlKey] = cfg
  42. ConfigManager.lock.Unlock()
  43. conf.Log.Infof("Loading yaml file for source: %s", pluginName)
  44. }
  45. }
  46. // loadConfigOperatorForConnection
  47. // Try to load ConfigOperator for plugin from /etc/connections/connection.yaml
  48. // If plugin not exist in /etc/connections/connection.yaml, no error response
  49. func loadConfigOperatorForConnection(pluginName string) {
  50. yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
  51. if cfg, _ := NewConfigOperatorFromConnectionYaml(pluginName); cfg != nil {
  52. ConfigManager.lock.Lock()
  53. ConfigManager.cfgOperators[yamlKey] = cfg
  54. ConfigManager.lock.Unlock()
  55. conf.Log.Infof("Loading yaml file for connection: %s", pluginName)
  56. }
  57. }
  58. func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
  59. ConfigManager.lock.RLock()
  60. defer ConfigManager.lock.RUnlock()
  61. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  62. if !ok {
  63. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  64. }
  65. cf := cfgOps.CopyConfContent()
  66. if b, err = json.Marshal(cf); nil != err {
  67. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  68. } else {
  69. return b, err
  70. }
  71. }
  72. func DelSourceConfKey(plgName, confKey, language string) error {
  73. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  74. return delConfKey(configOperatorKey, confKey, language)
  75. }
  76. func DelConnectionConfKey(plgName, confKey, language string) error {
  77. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  78. return delConfKey(configOperatorKey, confKey, language)
  79. }
  80. func delConfKey(configOperatorKey, confKey, language string) error {
  81. ConfigManager.lock.Lock()
  82. defer ConfigManager.lock.Unlock()
  83. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  84. if !ok {
  85. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  86. }
  87. cfgOps.DeleteConfKey(confKey)
  88. err := cfgOps.SaveCfgToFile()
  89. if err != nil {
  90. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  91. }
  92. return nil
  93. }
  94. func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
  95. ConfigManager.lock.Lock()
  96. defer ConfigManager.lock.Unlock()
  97. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  98. reqField := make(map[string]interface{})
  99. err := json.Unmarshal(content, &reqField)
  100. if nil != err {
  101. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  102. }
  103. var cfgOps ConfigOperator
  104. var found bool
  105. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  106. if !found {
  107. cfgOps = &SourceConfigKeysOps{
  108. ConfigKeys: &ConfigKeys{
  109. lock: sync.RWMutex{},
  110. pluginName: plgName,
  111. cf: map[string]map[string]interface{}{},
  112. },
  113. }
  114. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  115. }
  116. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  117. return err
  118. }
  119. err = cfgOps.SaveCfgToFile()
  120. if err != nil {
  121. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  122. }
  123. return nil
  124. }
  125. func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
  126. ConfigManager.lock.Lock()
  127. defer ConfigManager.lock.Unlock()
  128. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  129. reqField := make(map[string]interface{})
  130. err := json.Unmarshal(content, &reqField)
  131. if nil != err {
  132. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  133. }
  134. var cfgOps ConfigOperator
  135. var found bool
  136. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  137. if !found {
  138. cfgOps = &ConnectionConfigKeysOps{
  139. ConfigKeys: &ConfigKeys{
  140. lock: sync.RWMutex{},
  141. pluginName: plgName,
  142. cf: map[string]map[string]interface{}{},
  143. },
  144. }
  145. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  146. }
  147. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  148. return err
  149. }
  150. err = cfgOps.SaveCfgToFile()
  151. if err != nil {
  152. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  153. }
  154. return nil
  155. }