yamlConfigMeta.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package meta
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "strings"
  19. "sync"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/pkg/store"
  22. "github.com/lf-edge/ekuiper/pkg/kv"
  23. )
  24. type configManager struct {
  25. lock sync.RWMutex
  26. cfgOperators map[string]conf.ConfigOperator
  27. sourceConfigStatusDb kv.KeyValue
  28. sinkConfigStatusDb kv.KeyValue
  29. connectionConfigStatusDb kv.KeyValue
  30. }
  31. // ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
  32. // for configs in etc/sources/xxx.yaml, the map key is sources.xxx format, xxx will be mqtt/httppull and so on
  33. // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
  34. var ConfigManager *configManager
  35. func InitYamlConfigManager() {
  36. ConfigManager = &configManager{
  37. lock: sync.RWMutex{},
  38. cfgOperators: make(map[string]conf.ConfigOperator),
  39. }
  40. ConfigManager.sourceConfigStatusDb, _ = store.GetKV("sourceConfigStatus")
  41. ConfigManager.sinkConfigStatusDb, _ = store.GetKV("sinkConfigStatus")
  42. ConfigManager.connectionConfigStatusDb, _ = store.GetKV("connectionConfigStatus")
  43. }
  44. const (
  45. SourceCfgOperatorKeyTemplate = "sources.%s"
  46. SourceCfgOperatorKeyPrefix = "sources."
  47. SinkCfgOperatorKeyTemplate = "sinks.%s"
  48. SinkCfgOperatorKeyPrefix = "sinks."
  49. ConnectionCfgOperatorKeyTemplate = "connections.%s"
  50. ConnectionCfgOperatorKeyPrefix = "connections."
  51. )
  52. // loadConfigOperatorForSource
  53. // Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml /data/sources/xxx.yaml
  54. // If plugin xxx not exist, no error response
  55. func loadConfigOperatorForSource(pluginName string) {
  56. yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
  57. if cfg, _ := conf.NewConfigOperatorFromSourceYaml(pluginName); cfg != nil {
  58. ConfigManager.lock.Lock()
  59. ConfigManager.cfgOperators[yamlKey] = cfg
  60. ConfigManager.lock.Unlock()
  61. conf.Log.Infof("Loading yaml file for source: %s", pluginName)
  62. }
  63. }
  64. // loadConfigOperatorForSink
  65. // Try to load ConfigOperator for plugin xxx from /data/sinks/xxx.yaml
  66. // If plugin xxx not exist, no error response
  67. func loadConfigOperatorForSink(pluginName string) {
  68. yamlKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, pluginName)
  69. if cfg, _ := conf.NewConfigOperatorFromSinkYaml(pluginName); cfg != nil {
  70. ConfigManager.lock.Lock()
  71. ConfigManager.cfgOperators[yamlKey] = cfg
  72. ConfigManager.lock.Unlock()
  73. conf.Log.Infof("Loading yaml file for sink: %s", pluginName)
  74. }
  75. }
  76. // loadConfigOperatorForConnection
  77. // Try to load ConfigOperator for plugin from /etc/connections/connection.yaml /data/connections/connection.yaml
  78. // If plugin not exist in /etc/connections/connection.yaml, no error response
  79. func loadConfigOperatorForConnection(pluginName string) {
  80. yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
  81. if cfg, _ := conf.NewConfigOperatorFromConnectionYaml(pluginName); cfg != nil {
  82. ConfigManager.lock.Lock()
  83. ConfigManager.cfgOperators[yamlKey] = cfg
  84. ConfigManager.lock.Unlock()
  85. conf.Log.Infof("Loading yaml file for connection: %s", pluginName)
  86. }
  87. }
  88. func delConfKey(configOperatorKey, confKey, language string) error {
  89. ConfigManager.lock.Lock()
  90. defer ConfigManager.lock.Unlock()
  91. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  92. if !ok {
  93. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  94. }
  95. cfgOps.DeleteConfKey(confKey)
  96. err := cfgOps.SaveCfgToFile()
  97. if err != nil {
  98. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  99. }
  100. return nil
  101. }
  102. func DelSourceConfKey(plgName, confKey, language string) error {
  103. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  104. return delConfKey(configOperatorKey, confKey, language)
  105. }
  106. func DelSinkConfKey(plgName, confKey, language string) error {
  107. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  108. return delConfKey(configOperatorKey, confKey, language)
  109. }
  110. func DelConnectionConfKey(plgName, confKey, language string) error {
  111. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  112. return delConfKey(configOperatorKey, confKey, language)
  113. }
  114. func delYamlConf(configOperatorKey string) {
  115. ConfigManager.lock.Lock()
  116. defer ConfigManager.lock.Unlock()
  117. _, ok := ConfigManager.cfgOperators[configOperatorKey]
  118. if ok {
  119. delete(ConfigManager.cfgOperators, configOperatorKey)
  120. }
  121. }
  122. func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
  123. ConfigManager.lock.RLock()
  124. defer ConfigManager.lock.RUnlock()
  125. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  126. if !ok {
  127. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  128. }
  129. cf := cfgOps.CopyConfContent()
  130. if b, err = json.Marshal(cf); nil != err {
  131. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  132. } else {
  133. return b, err
  134. }
  135. }
  136. func addSourceConfKeys(plgName string, configurations YamlConfigurations) (err error) {
  137. ConfigManager.lock.Lock()
  138. defer ConfigManager.lock.Unlock()
  139. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  140. var cfgOps conf.ConfigOperator
  141. var found bool
  142. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  143. if !found {
  144. cfgOps = conf.NewConfigOperatorForSource(plgName)
  145. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  146. }
  147. cfgOps.LoadConfContent(configurations)
  148. err = cfgOps.SaveCfgToFile()
  149. if err != nil {
  150. return fmt.Errorf(`%s.%v`, configOperatorKey, err)
  151. }
  152. return nil
  153. }
  154. func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
  155. ConfigManager.lock.Lock()
  156. defer ConfigManager.lock.Unlock()
  157. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  158. reqField := make(map[string]interface{})
  159. err := json.Unmarshal(content, &reqField)
  160. if nil != err {
  161. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  162. }
  163. var cfgOps conf.ConfigOperator
  164. var found bool
  165. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  166. if !found {
  167. cfgOps = conf.NewConfigOperatorForSource(plgName)
  168. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  169. }
  170. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  171. return err
  172. }
  173. err = cfgOps.SaveCfgToFile()
  174. if err != nil {
  175. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  176. }
  177. return nil
  178. }
  179. func AddSinkConfKey(plgName, confKey, language string, content []byte) error {
  180. ConfigManager.lock.Lock()
  181. defer ConfigManager.lock.Unlock()
  182. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  183. reqField := make(map[string]interface{})
  184. err := json.Unmarshal(content, &reqField)
  185. if nil != err {
  186. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "type_conversion_fail"), plgName, err)
  187. }
  188. var cfgOps conf.ConfigOperator
  189. var found bool
  190. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  191. if !found {
  192. cfgOps = conf.NewConfigOperatorForSink(plgName)
  193. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  194. }
  195. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  196. return err
  197. }
  198. err = cfgOps.SaveCfgToFile()
  199. if err != nil {
  200. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "write_data_fail"), configOperatorKey, err)
  201. }
  202. return nil
  203. }
  204. func addSinkConfKeys(plgName string, cf YamlConfigurations) error {
  205. ConfigManager.lock.Lock()
  206. defer ConfigManager.lock.Unlock()
  207. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  208. var cfgOps conf.ConfigOperator
  209. var found bool
  210. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  211. if !found {
  212. cfgOps = conf.NewConfigOperatorForSink(plgName)
  213. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  214. }
  215. cfgOps.LoadConfContent(cf)
  216. err := cfgOps.SaveCfgToFile()
  217. if err != nil {
  218. return fmt.Errorf(`%s.%v`, configOperatorKey, err)
  219. }
  220. return nil
  221. }
  222. func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
  223. ConfigManager.lock.Lock()
  224. defer ConfigManager.lock.Unlock()
  225. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  226. reqField := make(map[string]interface{})
  227. err := json.Unmarshal(content, &reqField)
  228. if nil != err {
  229. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  230. }
  231. var cfgOps conf.ConfigOperator
  232. var found bool
  233. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  234. if !found {
  235. cfgOps = conf.NewConfigOperatorForConnection(plgName)
  236. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  237. }
  238. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  239. return err
  240. }
  241. err = cfgOps.SaveCfgToFile()
  242. if err != nil {
  243. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  244. }
  245. return nil
  246. }
  247. func addConnectionConfKeys(plgName string, cf YamlConfigurations) error {
  248. ConfigManager.lock.Lock()
  249. defer ConfigManager.lock.Unlock()
  250. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  251. var cfgOps conf.ConfigOperator
  252. var found bool
  253. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  254. if !found {
  255. cfgOps = conf.NewConfigOperatorForConnection(plgName)
  256. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  257. }
  258. cfgOps.LoadConfContent(cf)
  259. err := cfgOps.SaveCfgToFile()
  260. if err != nil {
  261. return fmt.Errorf(`%s.%v`, configOperatorKey, err)
  262. }
  263. return nil
  264. }
  265. func GetResources(language string) (b []byte, err error) {
  266. ConfigManager.lock.RLock()
  267. defer ConfigManager.lock.RUnlock()
  268. var srcResources []map[string]string
  269. var sinkResources []map[string]string
  270. for key, ops := range ConfigManager.cfgOperators {
  271. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  272. continue
  273. }
  274. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  275. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  276. resourceIds := ops.GetUpdatableConfKeys()
  277. if len(resourceIds) > 0 {
  278. item := map[string]string{}
  279. for _, v := range resourceIds {
  280. item[v] = plugin
  281. }
  282. srcResources = append(srcResources, item)
  283. }
  284. continue
  285. }
  286. if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
  287. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  288. resourceIds := ops.GetUpdatableConfKeys()
  289. if len(resourceIds) > 0 {
  290. item := map[string]string{}
  291. for _, v := range resourceIds {
  292. item[v] = plugin
  293. }
  294. sinkResources = append(sinkResources, item)
  295. }
  296. continue
  297. }
  298. }
  299. result := map[string]interface{}{}
  300. result["sources"] = srcResources
  301. result["sinks"] = sinkResources
  302. if b, err = json.Marshal(result); nil != err {
  303. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), result)
  304. } else {
  305. return b, err
  306. }
  307. }
  308. func ResetConfigs() {
  309. ConfigManager.lock.Lock()
  310. defer ConfigManager.lock.Unlock()
  311. for _, ops := range ConfigManager.cfgOperators {
  312. ops.ClearConfKeys()
  313. _ = ops.SaveCfgToFile()
  314. }
  315. }
  316. type YamlConfigurations map[string]map[string]interface{}
  317. type YamlConfigurationSet struct {
  318. Sources map[string]string `json:"sources"`
  319. Sinks map[string]string `json:"sinks"`
  320. Connections map[string]string `json:"connections"`
  321. }
  322. func GetConfigurations() YamlConfigurationSet {
  323. ConfigManager.lock.RLock()
  324. defer ConfigManager.lock.RUnlock()
  325. result := YamlConfigurationSet{
  326. Sources: map[string]string{},
  327. Sinks: map[string]string{},
  328. Connections: map[string]string{},
  329. }
  330. srcResources := map[string]string{}
  331. sinkResources := map[string]string{}
  332. connectionResources := map[string]string{}
  333. for key, ops := range ConfigManager.cfgOperators {
  334. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  335. plugin := strings.TrimPrefix(key, ConnectionCfgOperatorKeyPrefix)
  336. cfs := ops.CopyUpdatableConfContent()
  337. if len(cfs) > 0 {
  338. jsonByte, _ := json.Marshal(cfs)
  339. connectionResources[plugin] = string(jsonByte)
  340. }
  341. continue
  342. }
  343. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  344. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  345. cfs := ops.CopyUpdatableConfContent()
  346. if len(cfs) > 0 {
  347. jsonByte, _ := json.Marshal(cfs)
  348. srcResources[plugin] = string(jsonByte)
  349. }
  350. continue
  351. }
  352. if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
  353. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  354. cfs := ops.CopyUpdatableConfContent()
  355. if len(cfs) > 0 {
  356. jsonByte, _ := json.Marshal(cfs)
  357. sinkResources[plugin] = string(jsonByte)
  358. }
  359. continue
  360. }
  361. }
  362. result.Sources = srcResources
  363. result.Sinks = sinkResources
  364. result.Connections = connectionResources
  365. return result
  366. }
  367. type YamlConfigurationKeys struct {
  368. Sources map[string][]string
  369. Sinks map[string][]string
  370. }
  371. func GetConfigurationsFor(yaml YamlConfigurationKeys) YamlConfigurationSet {
  372. ConfigManager.lock.RLock()
  373. defer ConfigManager.lock.RUnlock()
  374. sourcesConfigKeys := yaml.Sources
  375. sinksConfigKeys := yaml.Sinks
  376. result := YamlConfigurationSet{
  377. Sources: map[string]string{},
  378. Sinks: map[string]string{},
  379. Connections: map[string]string{},
  380. }
  381. srcResources := map[string]string{}
  382. sinkResources := map[string]string{}
  383. connectionResources := map[string]string{}
  384. for key, ops := range ConfigManager.cfgOperators {
  385. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  386. plugin := strings.TrimPrefix(key, ConnectionCfgOperatorKeyPrefix)
  387. cfs := ops.CopyUpdatableConfContent()
  388. if len(cfs) > 0 {
  389. jsonByte, _ := json.Marshal(cfs)
  390. connectionResources[plugin] = string(jsonByte)
  391. }
  392. continue
  393. }
  394. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  395. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  396. keys, ok := sourcesConfigKeys[plugin]
  397. if ok {
  398. cfs := ops.CopyUpdatableConfContentFor(keys)
  399. if len(cfs) > 0 {
  400. jsonByte, _ := json.Marshal(cfs)
  401. srcResources[plugin] = string(jsonByte)
  402. }
  403. }
  404. continue
  405. }
  406. if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
  407. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  408. keys, ok := sinksConfigKeys[plugin]
  409. if ok {
  410. cfs := ops.CopyUpdatableConfContentFor(keys)
  411. if len(cfs) > 0 {
  412. jsonByte, _ := json.Marshal(cfs)
  413. sinkResources[plugin] = string(jsonByte)
  414. }
  415. }
  416. continue
  417. }
  418. }
  419. result.Sources = srcResources
  420. result.Sinks = sinkResources
  421. result.Connections = connectionResources
  422. return result
  423. }
  424. func GetConfigurationStatus() YamlConfigurationSet {
  425. result := YamlConfigurationSet{
  426. Sources: map[string]string{},
  427. Sinks: map[string]string{},
  428. Connections: map[string]string{},
  429. }
  430. all, err := ConfigManager.sourceConfigStatusDb.All()
  431. if err == nil {
  432. result.Sources = all
  433. }
  434. all, err = ConfigManager.sinkConfigStatusDb.All()
  435. if err == nil {
  436. result.Sinks = all
  437. }
  438. all, err = ConfigManager.connectionConfigStatusDb.All()
  439. if err == nil {
  440. result.Connections = all
  441. }
  442. return result
  443. }
  444. func LoadConfigurations(configSets YamlConfigurationSet) YamlConfigurationSet {
  445. configResponse := YamlConfigurationSet{
  446. Sources: map[string]string{},
  447. Sinks: map[string]string{},
  448. Connections: map[string]string{},
  449. }
  450. srcResources := configSets.Sources
  451. sinkResources := configSets.Sinks
  452. connectionResources := configSets.Connections
  453. _ = ConfigManager.sourceConfigStatusDb.Clean()
  454. _ = ConfigManager.sinkConfigStatusDb.Clean()
  455. _ = ConfigManager.connectionConfigStatusDb.Clean()
  456. for key, val := range srcResources {
  457. configs := YamlConfigurations{}
  458. err := json.Unmarshal([]byte(val), &configs)
  459. if err != nil {
  460. _ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
  461. configResponse.Sources[key] = err.Error()
  462. continue
  463. }
  464. err = addSourceConfKeys(key, configs)
  465. if err != nil {
  466. _ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
  467. configResponse.Sources[key] = err.Error()
  468. continue
  469. }
  470. }
  471. for key, val := range sinkResources {
  472. configs := YamlConfigurations{}
  473. err := json.Unmarshal([]byte(val), &configs)
  474. if err != nil {
  475. _ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
  476. configResponse.Sinks[key] = err.Error()
  477. continue
  478. }
  479. err = addSinkConfKeys(key, configs)
  480. if err != nil {
  481. _ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
  482. configResponse.Sinks[key] = err.Error()
  483. continue
  484. }
  485. }
  486. for key, val := range connectionResources {
  487. configs := YamlConfigurations{}
  488. err := json.Unmarshal([]byte(val), &configs)
  489. if err != nil {
  490. _ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
  491. configResponse.Connections[key] = err.Error()
  492. continue
  493. }
  494. err = addConnectionConfKeys(key, configs)
  495. if err != nil {
  496. _ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
  497. configResponse.Connections[key] = err.Error()
  498. continue
  499. }
  500. }
  501. return configResponse
  502. }
  503. func LoadConfigurationsPartial(configSets YamlConfigurationSet) YamlConfigurationSet {
  504. configResponse := YamlConfigurationSet{
  505. Sources: map[string]string{},
  506. Sinks: map[string]string{},
  507. Connections: map[string]string{},
  508. }
  509. srcResources := configSets.Sources
  510. sinkResources := configSets.Sinks
  511. connectionResources := configSets.Connections
  512. for key, val := range srcResources {
  513. configs := YamlConfigurations{}
  514. err := json.Unmarshal([]byte(val), &configs)
  515. if err != nil {
  516. configResponse.Sources[key] = err.Error()
  517. continue
  518. }
  519. err = addSourceConfKeys(key, configs)
  520. if err != nil {
  521. configResponse.Sources[key] = err.Error()
  522. continue
  523. }
  524. }
  525. for key, val := range sinkResources {
  526. configs := YamlConfigurations{}
  527. err := json.Unmarshal([]byte(val), &configs)
  528. if err != nil {
  529. configResponse.Sinks[key] = err.Error()
  530. continue
  531. }
  532. err = addSinkConfKeys(key, configs)
  533. if err != nil {
  534. configResponse.Sinks[key] = err.Error()
  535. continue
  536. }
  537. }
  538. for key, val := range connectionResources {
  539. configs := YamlConfigurations{}
  540. err := json.Unmarshal([]byte(val), &configs)
  541. if err != nil {
  542. configResponse.Connections[key] = err.Error()
  543. continue
  544. }
  545. err = addConnectionConfKeys(key, configs)
  546. if err != nil {
  547. configResponse.Connections[key] = err.Error()
  548. continue
  549. }
  550. }
  551. return configResponse
  552. }