yamlConfigMeta.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package meta
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/pkg/kv"
  21. "strings"
  22. "sync"
  23. )
  24. type configManager struct {
  25. lock sync.RWMutex
  26. cfgOperators map[string]conf.ConfigOperator
  27. sourceConfigStatusDb kv.KeyValue
  28. sinkConfigStatusDb kv.KeyValue
  29. connectionConfigStatusDb kv.KeyValue
  30. }
  31. //ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
  32. // for configs in etc/sources/xxx.yaml, the map key is sources.xxx format, xxx will be mqtt/httppull and so on
  33. // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
  34. var ConfigManager *configManager
  35. func InitYamlConfigManager() {
  36. ConfigManager = &configManager{
  37. lock: sync.RWMutex{},
  38. cfgOperators: make(map[string]conf.ConfigOperator),
  39. }
  40. _, ConfigManager.sourceConfigStatusDb = store.GetKV("sourceConfigStatus")
  41. _, ConfigManager.sinkConfigStatusDb = store.GetKV("sinkConfigStatus")
  42. _, ConfigManager.connectionConfigStatusDb = store.GetKV("connectionConfigStatus")
  43. }
  44. const SourceCfgOperatorKeyTemplate = "sources.%s"
  45. const SourceCfgOperatorKeyPrefix = "sources."
  46. const SinkCfgOperatorKeyTemplate = "sinks.%s"
  47. const SinkCfgOperatorKeyPrefix = "sinks."
  48. const ConnectionCfgOperatorKeyTemplate = "connections.%s"
  49. const ConnectionCfgOperatorKeyPrefix = "connections."
  50. // loadConfigOperatorForSource
  51. // Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml /data/sources/xxx.yaml
  52. // If plugin xxx not exist, no error response
  53. func loadConfigOperatorForSource(pluginName string) {
  54. yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
  55. if cfg, _ := conf.NewConfigOperatorFromSourceYaml(pluginName); cfg != nil {
  56. ConfigManager.lock.Lock()
  57. ConfigManager.cfgOperators[yamlKey] = cfg
  58. ConfigManager.lock.Unlock()
  59. conf.Log.Infof("Loading yaml file for source: %s", pluginName)
  60. }
  61. }
  62. // loadConfigOperatorForSink
  63. // Try to load ConfigOperator for plugin xxx from /data/sinks/xxx.yaml
  64. // If plugin xxx not exist, no error response
  65. func loadConfigOperatorForSink(pluginName string) {
  66. yamlKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, pluginName)
  67. if cfg, _ := conf.NewConfigOperatorFromSinkYaml(pluginName); cfg != nil {
  68. ConfigManager.lock.Lock()
  69. ConfigManager.cfgOperators[yamlKey] = cfg
  70. ConfigManager.lock.Unlock()
  71. conf.Log.Infof("Loading yaml file for sink: %s", pluginName)
  72. }
  73. }
  74. // loadConfigOperatorForConnection
  75. // Try to load ConfigOperator for plugin from /etc/connections/connection.yaml /data/connections/connection.yaml
  76. // If plugin not exist in /etc/connections/connection.yaml, no error response
  77. func loadConfigOperatorForConnection(pluginName string) {
  78. yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
  79. if cfg, _ := conf.NewConfigOperatorFromConnectionYaml(pluginName); cfg != nil {
  80. ConfigManager.lock.Lock()
  81. ConfigManager.cfgOperators[yamlKey] = cfg
  82. ConfigManager.lock.Unlock()
  83. conf.Log.Infof("Loading yaml file for connection: %s", pluginName)
  84. }
  85. }
  86. func delConfKey(configOperatorKey, confKey, language string) error {
  87. ConfigManager.lock.Lock()
  88. defer ConfigManager.lock.Unlock()
  89. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  90. if !ok {
  91. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  92. }
  93. cfgOps.DeleteConfKey(confKey)
  94. err := cfgOps.SaveCfgToFile()
  95. if err != nil {
  96. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  97. }
  98. return nil
  99. }
  100. func DelSourceConfKey(plgName, confKey, language string) error {
  101. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  102. return delConfKey(configOperatorKey, confKey, language)
  103. }
  104. func DelSinkConfKey(plgName, confKey, language string) error {
  105. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  106. return delConfKey(configOperatorKey, confKey, language)
  107. }
  108. func DelConnectionConfKey(plgName, confKey, language string) error {
  109. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  110. return delConfKey(configOperatorKey, confKey, language)
  111. }
  112. func delYamlConf(configOperatorKey string) {
  113. ConfigManager.lock.Lock()
  114. defer ConfigManager.lock.Unlock()
  115. _, ok := ConfigManager.cfgOperators[configOperatorKey]
  116. if ok {
  117. delete(ConfigManager.cfgOperators, configOperatorKey)
  118. }
  119. }
  120. func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
  121. ConfigManager.lock.RLock()
  122. defer ConfigManager.lock.RUnlock()
  123. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  124. if !ok {
  125. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  126. }
  127. cf := cfgOps.CopyConfContent()
  128. if b, err = json.Marshal(cf); nil != err {
  129. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  130. } else {
  131. return b, err
  132. }
  133. }
  134. func addSourceConfKeys(plgName string, configurations YamlConfigurations) (err error) {
  135. ConfigManager.lock.Lock()
  136. defer ConfigManager.lock.Unlock()
  137. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  138. var cfgOps conf.ConfigOperator
  139. var found bool
  140. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  141. if !found {
  142. cfgOps = conf.NewConfigOperatorForSource(plgName)
  143. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  144. }
  145. cfgOps.LoadConfContent(configurations)
  146. err = cfgOps.SaveCfgToFile()
  147. if err != nil {
  148. return fmt.Errorf(`%s.%v`, configOperatorKey, err)
  149. }
  150. return nil
  151. }
  152. func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
  153. ConfigManager.lock.Lock()
  154. defer ConfigManager.lock.Unlock()
  155. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  156. reqField := make(map[string]interface{})
  157. err := json.Unmarshal(content, &reqField)
  158. if nil != err {
  159. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  160. }
  161. var cfgOps conf.ConfigOperator
  162. var found bool
  163. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  164. if !found {
  165. cfgOps = conf.NewConfigOperatorForSource(plgName)
  166. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  167. }
  168. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  169. return err
  170. }
  171. err = cfgOps.SaveCfgToFile()
  172. if err != nil {
  173. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  174. }
  175. return nil
  176. }
  177. func AddSinkConfKey(plgName, confKey, language string, content []byte) error {
  178. ConfigManager.lock.Lock()
  179. defer ConfigManager.lock.Unlock()
  180. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  181. reqField := make(map[string]interface{})
  182. err := json.Unmarshal(content, &reqField)
  183. if nil != err {
  184. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "type_conversion_fail"), plgName, err)
  185. }
  186. var cfgOps conf.ConfigOperator
  187. var found bool
  188. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  189. if !found {
  190. cfgOps = conf.NewConfigOperatorForSink(plgName)
  191. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  192. }
  193. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  194. return err
  195. }
  196. err = cfgOps.SaveCfgToFile()
  197. if err != nil {
  198. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "write_data_fail"), configOperatorKey, err)
  199. }
  200. return nil
  201. }
  202. func addSinkConfKeys(plgName string, cf YamlConfigurations) error {
  203. ConfigManager.lock.Lock()
  204. defer ConfigManager.lock.Unlock()
  205. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  206. var cfgOps conf.ConfigOperator
  207. var found bool
  208. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  209. if !found {
  210. cfgOps = conf.NewConfigOperatorForSink(plgName)
  211. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  212. }
  213. cfgOps.LoadConfContent(cf)
  214. err := cfgOps.SaveCfgToFile()
  215. if err != nil {
  216. return fmt.Errorf(`%s.%v`, configOperatorKey, err)
  217. }
  218. return nil
  219. }
  220. func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
  221. ConfigManager.lock.Lock()
  222. defer ConfigManager.lock.Unlock()
  223. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  224. reqField := make(map[string]interface{})
  225. err := json.Unmarshal(content, &reqField)
  226. if nil != err {
  227. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  228. }
  229. var cfgOps conf.ConfigOperator
  230. var found bool
  231. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  232. if !found {
  233. cfgOps = conf.NewConfigOperatorForConnection(plgName)
  234. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  235. }
  236. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  237. return err
  238. }
  239. err = cfgOps.SaveCfgToFile()
  240. if err != nil {
  241. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  242. }
  243. return nil
  244. }
  245. func addConnectionConfKeys(plgName string, cf YamlConfigurations) error {
  246. ConfigManager.lock.Lock()
  247. defer ConfigManager.lock.Unlock()
  248. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  249. var cfgOps conf.ConfigOperator
  250. var found bool
  251. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  252. if !found {
  253. cfgOps = conf.NewConfigOperatorForConnection(plgName)
  254. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  255. }
  256. cfgOps.LoadConfContent(cf)
  257. err := cfgOps.SaveCfgToFile()
  258. if err != nil {
  259. return fmt.Errorf(`%s.%v`, configOperatorKey, err)
  260. }
  261. return nil
  262. }
  263. func GetResources(language string) (b []byte, err error) {
  264. ConfigManager.lock.RLock()
  265. defer ConfigManager.lock.RUnlock()
  266. var srcResources []map[string]string
  267. var sinkResources []map[string]string
  268. for key, ops := range ConfigManager.cfgOperators {
  269. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  270. continue
  271. }
  272. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  273. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  274. resourceIds := ops.GetUpdatableConfKeys()
  275. if len(resourceIds) > 0 {
  276. item := map[string]string{}
  277. for _, v := range resourceIds {
  278. item[v] = plugin
  279. }
  280. srcResources = append(srcResources, item)
  281. }
  282. continue
  283. }
  284. if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
  285. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  286. resourceIds := ops.GetUpdatableConfKeys()
  287. if len(resourceIds) > 0 {
  288. item := map[string]string{}
  289. for _, v := range resourceIds {
  290. item[v] = plugin
  291. }
  292. sinkResources = append(sinkResources, item)
  293. }
  294. continue
  295. }
  296. }
  297. result := map[string]interface{}{}
  298. result["sources"] = srcResources
  299. result["sinks"] = sinkResources
  300. if b, err = json.Marshal(result); nil != err {
  301. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), result)
  302. } else {
  303. return b, err
  304. }
  305. }
  306. func ResetConfigs() {
  307. ConfigManager.lock.Lock()
  308. defer ConfigManager.lock.Unlock()
  309. for _, ops := range ConfigManager.cfgOperators {
  310. ops.ClearConfKeys()
  311. _ = ops.SaveCfgToFile()
  312. }
  313. }
  314. type YamlConfigurations map[string]map[string]interface{}
  315. type YamlConfigurationSet struct {
  316. Sources map[string]string `json:"sources"`
  317. Sinks map[string]string `json:"sinks"`
  318. Connections map[string]string `json:"connections"`
  319. }
  320. func GetConfigurations() YamlConfigurationSet {
  321. ConfigManager.lock.RLock()
  322. defer ConfigManager.lock.RUnlock()
  323. result := YamlConfigurationSet{
  324. Sources: map[string]string{},
  325. Sinks: map[string]string{},
  326. Connections: map[string]string{},
  327. }
  328. srcResources := map[string]string{}
  329. sinkResources := map[string]string{}
  330. connectionResources := map[string]string{}
  331. for key, ops := range ConfigManager.cfgOperators {
  332. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  333. plugin := strings.TrimPrefix(key, ConnectionCfgOperatorKeyPrefix)
  334. cfs := ops.CopyUpdatableConfContent()
  335. if len(cfs) > 0 {
  336. jsonByte, _ := json.Marshal(cfs)
  337. connectionResources[plugin] = string(jsonByte)
  338. }
  339. continue
  340. }
  341. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  342. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  343. cfs := ops.CopyUpdatableConfContent()
  344. if len(cfs) > 0 {
  345. jsonByte, _ := json.Marshal(cfs)
  346. srcResources[plugin] = string(jsonByte)
  347. }
  348. continue
  349. }
  350. if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
  351. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  352. cfs := ops.CopyUpdatableConfContent()
  353. if len(cfs) > 0 {
  354. jsonByte, _ := json.Marshal(cfs)
  355. sinkResources[plugin] = string(jsonByte)
  356. }
  357. continue
  358. }
  359. }
  360. result.Sources = srcResources
  361. result.Sinks = sinkResources
  362. result.Connections = connectionResources
  363. return result
  364. }
  365. func GetConfigurationStatus() YamlConfigurationSet {
  366. result := YamlConfigurationSet{
  367. Sources: map[string]string{},
  368. Sinks: map[string]string{},
  369. Connections: map[string]string{},
  370. }
  371. all, err := ConfigManager.sourceConfigStatusDb.All()
  372. if err == nil {
  373. result.Sources = all
  374. }
  375. all, err = ConfigManager.sinkConfigStatusDb.All()
  376. if err == nil {
  377. result.Sinks = all
  378. }
  379. all, err = ConfigManager.connectionConfigStatusDb.All()
  380. if err == nil {
  381. result.Connections = all
  382. }
  383. return result
  384. }
  385. func LoadConfigurations(configSets YamlConfigurationSet) {
  386. var srcResources = configSets.Sources
  387. var sinkResources = configSets.Sinks
  388. var connectionResources = configSets.Connections
  389. _ = ConfigManager.sourceConfigStatusDb.Clean()
  390. _ = ConfigManager.sinkConfigStatusDb.Clean()
  391. _ = ConfigManager.connectionConfigStatusDb.Clean()
  392. for key, val := range srcResources {
  393. configs := YamlConfigurations{}
  394. err := json.Unmarshal([]byte(val), &configs)
  395. if err != nil {
  396. _ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
  397. continue
  398. }
  399. err = addSourceConfKeys(key, configs)
  400. if err != nil {
  401. _ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
  402. continue
  403. }
  404. }
  405. for key, val := range sinkResources {
  406. configs := YamlConfigurations{}
  407. err := json.Unmarshal([]byte(val), &configs)
  408. if err != nil {
  409. _ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
  410. continue
  411. }
  412. err = addSinkConfKeys(key, configs)
  413. if err != nil {
  414. _ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
  415. continue
  416. }
  417. }
  418. for key, val := range connectionResources {
  419. configs := YamlConfigurations{}
  420. err := json.Unmarshal([]byte(val), &configs)
  421. if err != nil {
  422. _ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
  423. continue
  424. }
  425. err = addConnectionConfKeys(key, configs)
  426. if err != nil {
  427. _ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
  428. continue
  429. }
  430. }
  431. }