yamlConfigMeta.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package meta
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/pkg/kv"
  21. "strings"
  22. "sync"
  23. )
  24. type configManager struct {
  25. lock sync.RWMutex
  26. cfgOperators map[string]conf.ConfigOperator
  27. sourceConfigStatusDb kv.KeyValue
  28. sinkConfigStatusDb kv.KeyValue
  29. connectionConfigStatusDb kv.KeyValue
  30. }
  31. // ConfigManager Hold the ConfigOperator for yaml configs defined in etc/sources/xxx.yaml and etc/connections/connection.yaml
  32. // for configs in etc/sources/xxx.yaml, the map key is sources.xxx format, xxx will be mqtt/httppull and so on
  33. // for configs in etc/connections/connection.yaml, the map key is connections.xxx format, xxx will be mqtt/edgex
  34. var ConfigManager *configManager
  35. func InitYamlConfigManager() {
  36. ConfigManager = &configManager{
  37. lock: sync.RWMutex{},
  38. cfgOperators: make(map[string]conf.ConfigOperator),
  39. }
  40. ConfigManager.sourceConfigStatusDb, _ = store.GetKV("sourceConfigStatus")
  41. ConfigManager.sinkConfigStatusDb, _ = store.GetKV("sinkConfigStatus")
  42. ConfigManager.connectionConfigStatusDb, _ = store.GetKV("connectionConfigStatus")
  43. }
  44. const SourceCfgOperatorKeyTemplate = "sources.%s"
  45. const SourceCfgOperatorKeyPrefix = "sources."
  46. const SinkCfgOperatorKeyTemplate = "sinks.%s"
  47. const SinkCfgOperatorKeyPrefix = "sinks."
  48. const ConnectionCfgOperatorKeyTemplate = "connections.%s"
  49. const ConnectionCfgOperatorKeyPrefix = "connections."
  50. // loadConfigOperatorForSource
  51. // Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml /data/sources/xxx.yaml
  52. // If plugin xxx not exist, no error response
  53. func loadConfigOperatorForSource(pluginName string) {
  54. yamlKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, pluginName)
  55. if cfg, _ := conf.NewConfigOperatorFromSourceYaml(pluginName); cfg != nil {
  56. ConfigManager.lock.Lock()
  57. ConfigManager.cfgOperators[yamlKey] = cfg
  58. ConfigManager.lock.Unlock()
  59. conf.Log.Infof("Loading yaml file for source: %s", pluginName)
  60. }
  61. }
  62. // loadConfigOperatorForSink
  63. // Try to load ConfigOperator for plugin xxx from /data/sinks/xxx.yaml
  64. // If plugin xxx not exist, no error response
  65. func loadConfigOperatorForSink(pluginName string) {
  66. yamlKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, pluginName)
  67. if cfg, _ := conf.NewConfigOperatorFromSinkYaml(pluginName); cfg != nil {
  68. ConfigManager.lock.Lock()
  69. ConfigManager.cfgOperators[yamlKey] = cfg
  70. ConfigManager.lock.Unlock()
  71. conf.Log.Infof("Loading yaml file for sink: %s", pluginName)
  72. }
  73. }
  74. // loadConfigOperatorForConnection
  75. // Try to load ConfigOperator for plugin from /etc/connections/connection.yaml /data/connections/connection.yaml
  76. // If plugin not exist in /etc/connections/connection.yaml, no error response
  77. func loadConfigOperatorForConnection(pluginName string) {
  78. yamlKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, pluginName)
  79. if cfg, _ := conf.NewConfigOperatorFromConnectionYaml(pluginName); cfg != nil {
  80. ConfigManager.lock.Lock()
  81. ConfigManager.cfgOperators[yamlKey] = cfg
  82. ConfigManager.lock.Unlock()
  83. conf.Log.Infof("Loading yaml file for connection: %s", pluginName)
  84. }
  85. }
  86. func delConfKey(configOperatorKey, confKey, language string) error {
  87. ConfigManager.lock.Lock()
  88. defer ConfigManager.lock.Unlock()
  89. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  90. if !ok {
  91. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  92. }
  93. cfgOps.DeleteConfKey(confKey)
  94. err := cfgOps.SaveCfgToFile()
  95. if err != nil {
  96. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  97. }
  98. return nil
  99. }
  100. func DelSourceConfKey(plgName, confKey, language string) error {
  101. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  102. return delConfKey(configOperatorKey, confKey, language)
  103. }
  104. func DelSinkConfKey(plgName, confKey, language string) error {
  105. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  106. return delConfKey(configOperatorKey, confKey, language)
  107. }
  108. func DelConnectionConfKey(plgName, confKey, language string) error {
  109. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  110. return delConfKey(configOperatorKey, confKey, language)
  111. }
  112. func delYamlConf(configOperatorKey string) {
  113. ConfigManager.lock.Lock()
  114. defer ConfigManager.lock.Unlock()
  115. _, ok := ConfigManager.cfgOperators[configOperatorKey]
  116. if ok {
  117. delete(ConfigManager.cfgOperators, configOperatorKey)
  118. }
  119. }
  120. func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
  121. ConfigManager.lock.RLock()
  122. defer ConfigManager.lock.RUnlock()
  123. cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
  124. if !ok {
  125. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), configOperatorKey)
  126. }
  127. cf := cfgOps.CopyConfContent()
  128. if b, err = json.Marshal(cf); nil != err {
  129. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  130. } else {
  131. return b, err
  132. }
  133. }
  134. func addSourceConfKeys(plgName string, configurations YamlConfigurations) (err error) {
  135. ConfigManager.lock.Lock()
  136. defer ConfigManager.lock.Unlock()
  137. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  138. var cfgOps conf.ConfigOperator
  139. var found bool
  140. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  141. if !found {
  142. cfgOps = conf.NewConfigOperatorForSource(plgName)
  143. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  144. }
  145. cfgOps.LoadConfContent(configurations)
  146. err = cfgOps.SaveCfgToFile()
  147. if err != nil {
  148. return fmt.Errorf(`%s.%v`, configOperatorKey, err)
  149. }
  150. return nil
  151. }
  152. func AddSourceConfKey(plgName, confKey, language string, content []byte) error {
  153. ConfigManager.lock.Lock()
  154. defer ConfigManager.lock.Unlock()
  155. configOperatorKey := fmt.Sprintf(SourceCfgOperatorKeyTemplate, plgName)
  156. reqField := make(map[string]interface{})
  157. err := json.Unmarshal(content, &reqField)
  158. if nil != err {
  159. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  160. }
  161. var cfgOps conf.ConfigOperator
  162. var found bool
  163. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  164. if !found {
  165. cfgOps = conf.NewConfigOperatorForSource(plgName)
  166. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  167. }
  168. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  169. return err
  170. }
  171. err = cfgOps.SaveCfgToFile()
  172. if err != nil {
  173. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  174. }
  175. return nil
  176. }
  177. func AddSinkConfKey(plgName, confKey, language string, content []byte) error {
  178. ConfigManager.lock.Lock()
  179. defer ConfigManager.lock.Unlock()
  180. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  181. reqField := make(map[string]interface{})
  182. err := json.Unmarshal(content, &reqField)
  183. if nil != err {
  184. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "type_conversion_fail"), plgName, err)
  185. }
  186. var cfgOps conf.ConfigOperator
  187. var found bool
  188. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  189. if !found {
  190. cfgOps = conf.NewConfigOperatorForSink(plgName)
  191. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  192. }
  193. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  194. return err
  195. }
  196. err = cfgOps.SaveCfgToFile()
  197. if err != nil {
  198. return fmt.Errorf(`%s%s.%v`, getMsg(language, sink, "write_data_fail"), configOperatorKey, err)
  199. }
  200. return nil
  201. }
  202. func addSinkConfKeys(plgName string, cf YamlConfigurations) error {
  203. ConfigManager.lock.Lock()
  204. defer ConfigManager.lock.Unlock()
  205. configOperatorKey := fmt.Sprintf(SinkCfgOperatorKeyTemplate, plgName)
  206. var cfgOps conf.ConfigOperator
  207. var found bool
  208. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  209. if !found {
  210. cfgOps = conf.NewConfigOperatorForSink(plgName)
  211. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  212. }
  213. cfgOps.LoadConfContent(cf)
  214. err := cfgOps.SaveCfgToFile()
  215. if err != nil {
  216. return fmt.Errorf(`%s.%v`, configOperatorKey, err)
  217. }
  218. return nil
  219. }
  220. func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
  221. ConfigManager.lock.Lock()
  222. defer ConfigManager.lock.Unlock()
  223. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  224. reqField := make(map[string]interface{})
  225. err := json.Unmarshal(content, &reqField)
  226. if nil != err {
  227. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
  228. }
  229. var cfgOps conf.ConfigOperator
  230. var found bool
  231. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  232. if !found {
  233. cfgOps = conf.NewConfigOperatorForConnection(plgName)
  234. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  235. }
  236. if err := cfgOps.AddConfKey(confKey, reqField); err != nil {
  237. return err
  238. }
  239. err = cfgOps.SaveCfgToFile()
  240. if err != nil {
  241. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
  242. }
  243. return nil
  244. }
  245. func addConnectionConfKeys(plgName string, cf YamlConfigurations) error {
  246. ConfigManager.lock.Lock()
  247. defer ConfigManager.lock.Unlock()
  248. configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)
  249. var cfgOps conf.ConfigOperator
  250. var found bool
  251. cfgOps, found = ConfigManager.cfgOperators[configOperatorKey]
  252. if !found {
  253. cfgOps = conf.NewConfigOperatorForConnection(plgName)
  254. ConfigManager.cfgOperators[configOperatorKey] = cfgOps
  255. }
  256. cfgOps.LoadConfContent(cf)
  257. err := cfgOps.SaveCfgToFile()
  258. if err != nil {
  259. return fmt.Errorf(`%s.%v`, configOperatorKey, err)
  260. }
  261. return nil
  262. }
  263. func GetResources(language string) (b []byte, err error) {
  264. ConfigManager.lock.RLock()
  265. defer ConfigManager.lock.RUnlock()
  266. var srcResources []map[string]string
  267. var sinkResources []map[string]string
  268. for key, ops := range ConfigManager.cfgOperators {
  269. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  270. continue
  271. }
  272. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  273. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  274. resourceIds := ops.GetUpdatableConfKeys()
  275. if len(resourceIds) > 0 {
  276. item := map[string]string{}
  277. for _, v := range resourceIds {
  278. item[v] = plugin
  279. }
  280. srcResources = append(srcResources, item)
  281. }
  282. continue
  283. }
  284. if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
  285. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  286. resourceIds := ops.GetUpdatableConfKeys()
  287. if len(resourceIds) > 0 {
  288. item := map[string]string{}
  289. for _, v := range resourceIds {
  290. item[v] = plugin
  291. }
  292. sinkResources = append(sinkResources, item)
  293. }
  294. continue
  295. }
  296. }
  297. result := map[string]interface{}{}
  298. result["sources"] = srcResources
  299. result["sinks"] = sinkResources
  300. if b, err = json.Marshal(result); nil != err {
  301. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), result)
  302. } else {
  303. return b, err
  304. }
  305. }
  306. func ResetConfigs() {
  307. ConfigManager.lock.Lock()
  308. defer ConfigManager.lock.Unlock()
  309. for _, ops := range ConfigManager.cfgOperators {
  310. ops.ClearConfKeys()
  311. _ = ops.SaveCfgToFile()
  312. }
  313. }
  314. type YamlConfigurations map[string]map[string]interface{}
  315. type YamlConfigurationSet struct {
  316. Sources map[string]string `json:"sources"`
  317. Sinks map[string]string `json:"sinks"`
  318. Connections map[string]string `json:"connections"`
  319. }
  320. func GetConfigurations() YamlConfigurationSet {
  321. ConfigManager.lock.RLock()
  322. defer ConfigManager.lock.RUnlock()
  323. result := YamlConfigurationSet{
  324. Sources: map[string]string{},
  325. Sinks: map[string]string{},
  326. Connections: map[string]string{},
  327. }
  328. srcResources := map[string]string{}
  329. sinkResources := map[string]string{}
  330. connectionResources := map[string]string{}
  331. for key, ops := range ConfigManager.cfgOperators {
  332. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  333. plugin := strings.TrimPrefix(key, ConnectionCfgOperatorKeyPrefix)
  334. cfs := ops.CopyUpdatableConfContent()
  335. if len(cfs) > 0 {
  336. jsonByte, _ := json.Marshal(cfs)
  337. connectionResources[plugin] = string(jsonByte)
  338. }
  339. continue
  340. }
  341. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  342. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  343. cfs := ops.CopyUpdatableConfContent()
  344. if len(cfs) > 0 {
  345. jsonByte, _ := json.Marshal(cfs)
  346. srcResources[plugin] = string(jsonByte)
  347. }
  348. continue
  349. }
  350. if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
  351. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  352. cfs := ops.CopyUpdatableConfContent()
  353. if len(cfs) > 0 {
  354. jsonByte, _ := json.Marshal(cfs)
  355. sinkResources[plugin] = string(jsonByte)
  356. }
  357. continue
  358. }
  359. }
  360. result.Sources = srcResources
  361. result.Sinks = sinkResources
  362. result.Connections = connectionResources
  363. return result
  364. }
  365. type YamlConfigurationKeys struct {
  366. Sources map[string][]string
  367. Sinks map[string][]string
  368. }
  369. func GetConfigurationsFor(yaml YamlConfigurationKeys) YamlConfigurationSet {
  370. ConfigManager.lock.RLock()
  371. defer ConfigManager.lock.RUnlock()
  372. sourcesConfigKeys := yaml.Sources
  373. sinksConfigKeys := yaml.Sinks
  374. result := YamlConfigurationSet{
  375. Sources: map[string]string{},
  376. Sinks: map[string]string{},
  377. Connections: map[string]string{},
  378. }
  379. srcResources := map[string]string{}
  380. sinkResources := map[string]string{}
  381. connectionResources := map[string]string{}
  382. for key, ops := range ConfigManager.cfgOperators {
  383. if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
  384. plugin := strings.TrimPrefix(key, ConnectionCfgOperatorKeyPrefix)
  385. cfs := ops.CopyUpdatableConfContent()
  386. if len(cfs) > 0 {
  387. jsonByte, _ := json.Marshal(cfs)
  388. connectionResources[plugin] = string(jsonByte)
  389. }
  390. continue
  391. }
  392. if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
  393. plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
  394. keys, ok := sourcesConfigKeys[plugin]
  395. if ok {
  396. cfs := ops.CopyUpdatableConfContentFor(keys)
  397. if len(cfs) > 0 {
  398. jsonByte, _ := json.Marshal(cfs)
  399. srcResources[plugin] = string(jsonByte)
  400. }
  401. }
  402. continue
  403. }
  404. if strings.HasPrefix(key, SinkCfgOperatorKeyPrefix) {
  405. plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
  406. keys, ok := sinksConfigKeys[plugin]
  407. if ok {
  408. cfs := ops.CopyUpdatableConfContentFor(keys)
  409. if len(cfs) > 0 {
  410. jsonByte, _ := json.Marshal(cfs)
  411. sinkResources[plugin] = string(jsonByte)
  412. }
  413. }
  414. continue
  415. }
  416. }
  417. result.Sources = srcResources
  418. result.Sinks = sinkResources
  419. result.Connections = connectionResources
  420. return result
  421. }
  422. func GetConfigurationStatus() YamlConfigurationSet {
  423. result := YamlConfigurationSet{
  424. Sources: map[string]string{},
  425. Sinks: map[string]string{},
  426. Connections: map[string]string{},
  427. }
  428. all, err := ConfigManager.sourceConfigStatusDb.All()
  429. if err == nil {
  430. result.Sources = all
  431. }
  432. all, err = ConfigManager.sinkConfigStatusDb.All()
  433. if err == nil {
  434. result.Sinks = all
  435. }
  436. all, err = ConfigManager.connectionConfigStatusDb.All()
  437. if err == nil {
  438. result.Connections = all
  439. }
  440. return result
  441. }
  442. func LoadConfigurations(configSets YamlConfigurationSet) YamlConfigurationSet {
  443. configResponse := YamlConfigurationSet{
  444. Sources: map[string]string{},
  445. Sinks: map[string]string{},
  446. Connections: map[string]string{},
  447. }
  448. var srcResources = configSets.Sources
  449. var sinkResources = configSets.Sinks
  450. var connectionResources = configSets.Connections
  451. _ = ConfigManager.sourceConfigStatusDb.Clean()
  452. _ = ConfigManager.sinkConfigStatusDb.Clean()
  453. _ = ConfigManager.connectionConfigStatusDb.Clean()
  454. for key, val := range srcResources {
  455. configs := YamlConfigurations{}
  456. err := json.Unmarshal([]byte(val), &configs)
  457. if err != nil {
  458. _ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
  459. configResponse.Sources[key] = err.Error()
  460. continue
  461. }
  462. err = addSourceConfKeys(key, configs)
  463. if err != nil {
  464. _ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
  465. configResponse.Sources[key] = err.Error()
  466. continue
  467. }
  468. }
  469. for key, val := range sinkResources {
  470. configs := YamlConfigurations{}
  471. err := json.Unmarshal([]byte(val), &configs)
  472. if err != nil {
  473. _ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
  474. configResponse.Sinks[key] = err.Error()
  475. continue
  476. }
  477. err = addSinkConfKeys(key, configs)
  478. if err != nil {
  479. _ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
  480. configResponse.Sinks[key] = err.Error()
  481. continue
  482. }
  483. }
  484. for key, val := range connectionResources {
  485. configs := YamlConfigurations{}
  486. err := json.Unmarshal([]byte(val), &configs)
  487. if err != nil {
  488. _ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
  489. configResponse.Connections[key] = err.Error()
  490. continue
  491. }
  492. err = addConnectionConfKeys(key, configs)
  493. if err != nil {
  494. _ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
  495. configResponse.Connections[key] = err.Error()
  496. continue
  497. }
  498. }
  499. return configResponse
  500. }
  501. func LoadConfigurationsPartial(configSets YamlConfigurationSet) YamlConfigurationSet {
  502. configResponse := YamlConfigurationSet{
  503. Sources: map[string]string{},
  504. Sinks: map[string]string{},
  505. Connections: map[string]string{},
  506. }
  507. var srcResources = configSets.Sources
  508. var sinkResources = configSets.Sinks
  509. var connectionResources = configSets.Connections
  510. for key, val := range srcResources {
  511. configs := YamlConfigurations{}
  512. err := json.Unmarshal([]byte(val), &configs)
  513. if err != nil {
  514. configResponse.Sources[key] = err.Error()
  515. continue
  516. }
  517. err = addSourceConfKeys(key, configs)
  518. if err != nil {
  519. configResponse.Sources[key] = err.Error()
  520. continue
  521. }
  522. }
  523. for key, val := range sinkResources {
  524. configs := YamlConfigurations{}
  525. err := json.Unmarshal([]byte(val), &configs)
  526. if err != nil {
  527. configResponse.Sinks[key] = err.Error()
  528. continue
  529. }
  530. err = addSinkConfKeys(key, configs)
  531. if err != nil {
  532. configResponse.Sinks[key] = err.Error()
  533. continue
  534. }
  535. }
  536. for key, val := range connectionResources {
  537. configs := YamlConfigurations{}
  538. err := json.Unmarshal([]byte(val), &configs)
  539. if err != nil {
  540. configResponse.Connections[key] = err.Error()
  541. continue
  542. }
  543. err = addConnectionConfKeys(key, configs)
  544. if err != nil {
  545. configResponse.Connections[key] = err.Error()
  546. continue
  547. }
  548. }
  549. return configResponse
  550. }