yamlConfigMeta.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package meta
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "strings"
  20. "sync"
  21. )
  22. type configManager struct {
  23. lock sync.RWMutex
  24. cfgOperators map[string]conf.ConfigOperator
  25. }
  26. //ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
  27. // for configs in etc/sources/xxx.yaml, the map key is sources.xxx format, xxx will be mqtt/httppull and so on
  28. // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
  29. var ConfigManager = configManager{
  30. lock: sync.RWMutex{},
  31. cfgOperators: make(map[string]conf.ConfigOperator),
  32. }
  33. const SourceCfgOperatorKeyTemplate = "sources.%s"
  34. const SourceCfgOperatorKeyPrefix = "sources."
  35. const SinkCfgOperatorKeyTemplate = "sinks.%s"
  36. const SinkCfgOperatorKeyPrefix = "sinks."
  37. const ConnectionCfgOperatorKeyTemplate = "connections.%s"
  38. const ConnectionCfgOperatorKeyPrefix = "connections."
  39. // loadConfigOperatorForSource
  40. // Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml /data/sources/xxx.yaml
  41. // If plugin xxx not exist, no error response
  42. func loadConfigOperatorForSource(pluginName string) {
  43. yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
  44. if cfg, _ := conf.NewConfigOperatorFromSourceYaml(pluginName); cfg != nil {
  45. ConfigManager.lock.Lock()
  46. ConfigManager.cfgOperators[yamlKey] = cfg
  47. ConfigManager.lock.Unlock()
  48. conf.Log.Infof("Loading yaml file for source: %s", pluginName)
  49. }
  50. }
  51. // loadConfigOperatorForSink
  52. // Try to load ConfigOperator for plugin xxx from /data/sinks/xxx.yaml
  53. // If plugin xxx not exist, no error response
  54. func loadConfigOperatorForSink(pluginName string) {
  55. yamlKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, pluginName)
  56. if cfg, _ := conf.NewConfigOperatorFromSinkYaml(pluginName); cfg != nil {
  57. ConfigManager.lock.Lock()
  58. ConfigManager.cfgOperators[yamlKey] = cfg
  59. ConfigManager.lock.Unlock()
  60. conf.Log.Infof("Loading yaml file for sink: %s", pluginName)
  61. }
  62. }
  63. // loadConfigOperatorForConnection
  64. // Try to load ConfigOperator for plugin from /etc/connections/connection.yaml /data/connections/connection.yaml
  65. // If plugin not exist in /etc/connections/connection.yaml, no error response
  66. func loadConfigOperatorForConnection(pluginName string) {
  67. yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
  68. if cfg, _ := conf.NewConfigOperatorFromConnectionYaml(pluginName); cfg != nil {
  69. ConfigManager.lock.Lock()
  70. ConfigManager.cfgOperators[yamlKey] = cfg
  71. ConfigManager.lock.Unlock()
  72. conf.Log.Infof("Loading yaml file for connection: %s", pluginName)
  73. }
  74. }
  75. func delConfKey(configOperatorKey, confKey, language string) error {
  76. ConfigManager.lock.Lock()
  77. defer ConfigManager.lock.Unlock()
  78. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  79. if !ok {
  80. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  81. }
  82. cfgOps.DeleteConfKey(confKey)
  83. err := cfgOps.SaveCfgToFile()
  84. if err != nil {
  85. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  86. }
  87. return nil
  88. }
  89. func DelSourceConfKey(plgName, confKey, language string) error {
  90. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  91. return delConfKey(configOperatorKey, confKey, language)
  92. }
  93. func DelSinkConfKey(plgName, confKey, language string) error {
  94. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  95. return delConfKey(configOperatorKey, confKey, language)
  96. }
  97. func DelConnectionConfKey(plgName, confKey, language string) error {
  98. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  99. return delConfKey(configOperatorKey, confKey, language)
  100. }
  101. func delYamlConf(configOperatorKey string) {
  102. ConfigManager.lock.Lock()
  103. defer ConfigManager.lock.Unlock()
  104. _, ok := ConfigManager.cfgOperators[configOperatorKey]
  105. if ok {
  106. delete(ConfigManager.cfgOperators, configOperatorKey)
  107. }
  108. }
  109. func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
  110. ConfigManager.lock.RLock()
  111. defer ConfigManager.lock.RUnlock()
  112. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  113. if !ok {
  114. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  115. }
  116. cf := cfgOps.CopyConfContent()
  117. if b, err = json.Marshal(cf); nil != err {
  118. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  119. } else {
  120. return b, err
  121. }
  122. }
  123. func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
  124. ConfigManager.lock.Lock()
  125. defer ConfigManager.lock.Unlock()
  126. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  127. reqField := make(map[string]interface{})
  128. err := json.Unmarshal(content, &reqField)
  129. if nil != err {
  130. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  131. }
  132. var cfgOps conf.ConfigOperator
  133. var found bool
  134. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  135. if !found {
  136. cfgOps = conf.NewConfigOperatorForSource(plgName)
  137. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  138. }
  139. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  140. return err
  141. }
  142. err = cfgOps.SaveCfgToFile()
  143. if err != nil {
  144. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  145. }
  146. return nil
  147. }
  148. func AddSinkConfKey(plgName, confKey, language string, content []byte) error {
  149. ConfigManager.lock.Lock()
  150. defer ConfigManager.lock.Unlock()
  151. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  152. reqField := make(map[string]interface{})
  153. err := json.Unmarshal(content, &reqField)
  154. if nil != err {
  155. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "type_conversion_fail"), plgName, err)
  156. }
  157. var cfgOps conf.ConfigOperator
  158. var found bool
  159. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  160. if !found {
  161. cfgOps = conf.NewConfigOperatorForSink(plgName)
  162. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  163. }
  164. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  165. return err
  166. }
  167. err = cfgOps.SaveCfgToFile()
  168. if err != nil {
  169. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "write_data_fail"), configOperatorKey, err)
  170. }
  171. return nil
  172. }
  173. func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
  174. ConfigManager.lock.Lock()
  175. defer ConfigManager.lock.Unlock()
  176. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  177. reqField := make(map[string]interface{})
  178. err := json.Unmarshal(content, &reqField)
  179. if nil != err {
  180. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  181. }
  182. var cfgOps conf.ConfigOperator
  183. var found bool
  184. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  185. if !found {
  186. cfgOps = conf.NewConfigOperatorForConnection(plgName)
  187. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  188. }
  189. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  190. return err
  191. }
  192. err = cfgOps.SaveCfgToFile()
  193. if err != nil {
  194. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  195. }
  196. return nil
  197. }
  198. func GetResources(language string) (b []byte, err error) {
  199. ConfigManager.lock.RLock()
  200. defer ConfigManager.lock.RUnlock()
  201. var srcResources []map[string]string
  202. var sinkResources []map[string]string
  203. for key, ops := range ConfigManager.cfgOperators {
  204. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  205. continue
  206. }
  207. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  208. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  209. resourceIds := ops.GetUpdatableConfKeys()
  210. if len(resourceIds) > 0 {
  211. item := map[string]string{}
  212. for _, v := range resourceIds {
  213. item[v] = plugin
  214. }
  215. srcResources = append(srcResources, item)
  216. }
  217. continue
  218. }
  219. if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
  220. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  221. resourceIds := ops.GetUpdatableConfKeys()
  222. if len(resourceIds) > 0 {
  223. item := map[string]string{}
  224. for _, v := range resourceIds {
  225. item[v] = plugin
  226. }
  227. sinkResources = append(sinkResources, item)
  228. }
  229. continue
  230. }
  231. }
  232. result := map[string]interface{}{}
  233. result["sources"] = srcResources
  234. result["sinks"] = sinkResources
  235. if b, err = json.Marshal(result); nil != err {
  236. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), result)
  237. } else {
  238. return b, err
  239. }
  240. }