yamlConfigMeta.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package meta
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "strings"
  20. "sync"
  21. )
  22. type configManager struct {
  23. lock sync.RWMutex
  24. cfgOperators map[string]conf.ConfigOperator
  25. }
  26. //ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
  27. // for configs in etc/sources/xxx.yaml, the map key is sources.xxx format, xxx will be mqtt/httppull and so on
  28. // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
  29. var ConfigManager = configManager{
  30. lock: sync.RWMutex{},
  31. cfgOperators: make(map[string]conf.ConfigOperator),
  32. }
  33. const SourceCfgOperatorKeyTemplate = "sources.%s"
  34. const SourceCfgOperatorKeyPrefix = "sources."
  35. const SinkCfgOperatorKeyTemplate = "sinks.%s"
  36. const SinkCfgOperatorKeyPrefix = "sinks."
  37. const ConnectionCfgOperatorKeyTemplate = "connections.%s"
  38. const ConnectionCfgOperatorKeyPrefix = "connections."
  39. // loadConfigOperatorForSource
  40. // Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml /data/sources/xxx.yaml
  41. // If plugin xxx not exist, no error response
  42. func loadConfigOperatorForSource(pluginName string) {
  43. yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
  44. if cfg, _ := conf.NewConfigOperatorFromSourceYaml(pluginName); cfg != nil {
  45. ConfigManager.lock.Lock()
  46. ConfigManager.cfgOperators[yamlKey] = cfg
  47. ConfigManager.lock.Unlock()
  48. conf.Log.Infof("Loading yaml file for source: %s", pluginName)
  49. }
  50. }
  51. // loadConfigOperatorForSink
  52. // Try to load ConfigOperator for plugin xxx from /data/sinks/xxx.yaml
  53. // If plugin xxx not exist, no error response
  54. func loadConfigOperatorForSink(pluginName string) {
  55. yamlKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, pluginName)
  56. if cfg, _ := conf.NewConfigOperatorFromSinkYaml(pluginName); cfg != nil {
  57. ConfigManager.lock.Lock()
  58. ConfigManager.cfgOperators[yamlKey] = cfg
  59. ConfigManager.lock.Unlock()
  60. conf.Log.Infof("Loading yaml file for sink: %s", pluginName)
  61. }
  62. }
  63. // loadConfigOperatorForConnection
  64. // Try to load ConfigOperator for plugin from /etc/connections/connection.yaml /data/connections/connection.yaml
  65. // If plugin not exist in /etc/connections/connection.yaml, no error response
  66. func loadConfigOperatorForConnection(pluginName string) {
  67. yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
  68. if cfg, _ := conf.NewConfigOperatorFromConnectionYaml(pluginName); cfg != nil {
  69. ConfigManager.lock.Lock()
  70. ConfigManager.cfgOperators[yamlKey] = cfg
  71. ConfigManager.lock.Unlock()
  72. conf.Log.Infof("Loading yaml file for connection: %s", pluginName)
  73. }
  74. }
  75. func delConfKey(configOperatorKey, confKey, language string) error {
  76. ConfigManager.lock.Lock()
  77. defer ConfigManager.lock.Unlock()
  78. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  79. if !ok {
  80. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  81. }
  82. cfgOps.DeleteConfKey(confKey)
  83. err := cfgOps.SaveCfgToFile()
  84. if err != nil {
  85. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  86. }
  87. return nil
  88. }
  89. func DelSourceConfKey(plgName, confKey, language string) error {
  90. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  91. return delConfKey(configOperatorKey, confKey, language)
  92. }
  93. func DelSinkConfKey(plgName, confKey, language string) error {
  94. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  95. return delConfKey(configOperatorKey, confKey, language)
  96. }
  97. func DelConnectionConfKey(plgName, confKey, language string) error {
  98. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  99. return delConfKey(configOperatorKey, confKey, language)
  100. }
  101. func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
  102. ConfigManager.lock.RLock()
  103. defer ConfigManager.lock.RUnlock()
  104. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  105. if !ok {
  106. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  107. }
  108. cf := cfgOps.CopyConfContent()
  109. if b, err = json.Marshal(cf); nil != err {
  110. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  111. } else {
  112. return b, err
  113. }
  114. }
  115. func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
  116. ConfigManager.lock.Lock()
  117. defer ConfigManager.lock.Unlock()
  118. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  119. reqField := make(map[string]interface{})
  120. err := json.Unmarshal(content, &reqField)
  121. if nil != err {
  122. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  123. }
  124. var cfgOps conf.ConfigOperator
  125. var found bool
  126. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  127. if !found {
  128. cfgOps = conf.NewConfigOperatorForSource(plgName)
  129. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  130. }
  131. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  132. return err
  133. }
  134. err = cfgOps.SaveCfgToFile()
  135. if err != nil {
  136. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  137. }
  138. return nil
  139. }
  140. func AddSinkConfKey(plgName, confKey, language string, content []byte) error {
  141. ConfigManager.lock.Lock()
  142. defer ConfigManager.lock.Unlock()
  143. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  144. reqField := make(map[string]interface{})
  145. err := json.Unmarshal(content, &reqField)
  146. if nil != err {
  147. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "type_conversion_fail"), plgName, err)
  148. }
  149. var cfgOps conf.ConfigOperator
  150. var found bool
  151. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  152. if !found {
  153. cfgOps = conf.NewConfigOperatorForSink(plgName)
  154. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  155. }
  156. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  157. return err
  158. }
  159. err = cfgOps.SaveCfgToFile()
  160. if err != nil {
  161. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "write_data_fail"), configOperatorKey, err)
  162. }
  163. return nil
  164. }
  165. func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
  166. ConfigManager.lock.Lock()
  167. defer ConfigManager.lock.Unlock()
  168. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  169. reqField := make(map[string]interface{})
  170. err := json.Unmarshal(content, &reqField)
  171. if nil != err {
  172. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  173. }
  174. var cfgOps conf.ConfigOperator
  175. var found bool
  176. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  177. if !found {
  178. cfgOps = conf.NewConfigOperatorForConnection(plgName)
  179. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  180. }
  181. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  182. return err
  183. }
  184. err = cfgOps.SaveCfgToFile()
  185. if err != nil {
  186. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  187. }
  188. return nil
  189. }
  190. func GetResources(language string) (b []byte, err error) {
  191. ConfigManager.lock.RLock()
  192. defer ConfigManager.lock.RUnlock()
  193. var srcResources []map[string]string
  194. var sinkResources []map[string]string
  195. for key, ops := range ConfigManager.cfgOperators {
  196. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  197. continue
  198. }
  199. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  200. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  201. resourceIds := ops.GetUpdatableConfKeys()
  202. if len(resourceIds) > 0 {
  203. item := map[string]string{}
  204. for _, v := range resourceIds {
  205. item[v] = plugin
  206. }
  207. srcResources = append(srcResources, item)
  208. }
  209. }
  210. if strings.HasSuffix(key, SinkCfgOperatorKeyPrefix) {
  211. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  212. resourceIds := ops.GetUpdatableConfKeys()
  213. item := map[string]string{}
  214. for _, v := range resourceIds {
  215. item[plugin] = v
  216. }
  217. sinkResources = append(sinkResources, item)
  218. }
  219. }
  220. result := map[string]interface{}{}
  221. result["sources"] = srcResources
  222. result["sinks"] = sinkResources
  223. if b, err = json.Marshal(result); nil != err {
  224. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), result)
  225. } else {
  226. return b, err
  227. }
  228. }