yaml_config_ops.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package conf
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  19. "github.com/lf-edge/ekuiper/pkg/cast"
  20. "path"
  21. "reflect"
  22. "sync"
  23. )
  24. //ConfKeysOperator define interface to query/add/update/delete the configs in memory
  25. type ConfKeysOperator interface {
  26. GetPluginName() string
  27. GetConfContentByte() ([]byte, error)
  28. CopyConfContent() map[string]map[string]interface{}
  29. CopyReadOnlyConfContent() map[string]map[string]interface{}
  30. CopyUpdatableConfContent() map[string]map[string]interface{}
  31. LoadConfContent(cf map[string]map[string]interface{})
  32. GetConfKeys() (keys []string)
  33. GetReadOnlyConfKeys() (keys []string)
  34. GetUpdatableConfKeys() (keys []string)
  35. DeleteConfKey(confKey string)
  36. DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
  37. AddConfKey(confKey string, reqField map[string]interface{}) error
  38. AddConfKeyField(confKey string, reqField map[string]interface{}) error
  39. ClearConfKeys()
  40. }
  41. //ConfigOperator define interface to query/add/update/delete the configs in disk
  42. type ConfigOperator interface {
  43. ConfKeysOperator
  44. SaveCfgToFile() error
  45. }
  46. // ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml
  47. // Hold the connection configs for each connection type in etcCfg field
  48. // Provide method to query/add/update/delete the configs
  49. type ConfigKeys struct {
  50. lock sync.RWMutex
  51. pluginName string // source type, can be mqtt/edgex/httppull
  52. etcCfg map[string]map[string]interface{} // configs defined in etc/sources/yaml
  53. dataCfg map[string]map[string]interface{} // configs defined in etc/sources/
  54. }
  55. func (c *ConfigKeys) GetPluginName() string {
  56. return c.pluginName
  57. }
  58. func (c *ConfigKeys) GetConfContentByte() ([]byte, error) {
  59. cf := make(map[string]map[string]interface{})
  60. c.lock.RLock()
  61. defer c.lock.RUnlock()
  62. for key, kvs := range c.etcCfg {
  63. aux := make(map[string]interface{})
  64. for k, v := range kvs {
  65. aux[k] = v
  66. }
  67. cf[key] = aux
  68. }
  69. for key, kvs := range c.dataCfg {
  70. aux := make(map[string]interface{})
  71. for k, v := range kvs {
  72. aux[k] = v
  73. }
  74. cf[key] = aux
  75. }
  76. return json.Marshal(cf)
  77. }
  78. func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{} {
  79. cf := make(map[string]map[string]interface{})
  80. c.lock.RLock()
  81. defer c.lock.RUnlock()
  82. for key, kvs := range c.etcCfg {
  83. aux := make(map[string]interface{})
  84. for k, v := range kvs {
  85. aux[k] = v
  86. }
  87. cf[key] = aux
  88. }
  89. //note: config keys in data directory will overwrite those in etc directory with same name
  90. for key, kvs := range c.dataCfg {
  91. aux := make(map[string]interface{})
  92. for k, v := range kvs {
  93. aux[k] = v
  94. }
  95. cf[key] = aux
  96. }
  97. return cf
  98. }
  99. func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{}) {
  100. c.lock.Lock()
  101. defer c.lock.Unlock()
  102. for key, kvs := range cf {
  103. aux := make(map[string]interface{})
  104. for k, v := range kvs {
  105. aux[k] = v
  106. }
  107. c.dataCfg[key] = aux
  108. }
  109. }
  110. func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{} {
  111. cf := make(map[string]map[string]interface{})
  112. c.lock.RLock()
  113. defer c.lock.RUnlock()
  114. for key, kvs := range c.etcCfg {
  115. aux := make(map[string]interface{})
  116. for k, v := range kvs {
  117. aux[k] = v
  118. }
  119. cf[key] = aux
  120. }
  121. return cf
  122. }
  123. func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{} {
  124. cf := make(map[string]map[string]interface{})
  125. c.lock.RLock()
  126. defer c.lock.RUnlock()
  127. for key, kvs := range c.dataCfg {
  128. aux := make(map[string]interface{})
  129. for k, v := range kvs {
  130. aux[k] = v
  131. }
  132. cf[key] = aux
  133. }
  134. return cf
  135. }
  136. func (c *ConfigKeys) GetConfKeys() (keys []string) {
  137. ro := c.GetReadOnlyConfKeys()
  138. keys = append(keys, ro...)
  139. up := c.GetUpdatableConfKeys()
  140. keys = append(keys, up...)
  141. return keys
  142. }
  143. func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string) {
  144. c.lock.RLock()
  145. defer c.lock.RUnlock()
  146. for k := range c.etcCfg {
  147. keys = append(keys, k)
  148. }
  149. return keys
  150. }
  151. func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string) {
  152. c.lock.RLock()
  153. defer c.lock.RUnlock()
  154. for k := range c.dataCfg {
  155. keys = append(keys, k)
  156. }
  157. return keys
  158. }
  159. func (c *ConfigKeys) DeleteConfKey(confKey string) {
  160. c.lock.Lock()
  161. defer c.lock.Unlock()
  162. delete(c.dataCfg, confKey)
  163. }
  164. func (c *ConfigKeys) ClearConfKeys() {
  165. keys := c.GetUpdatableConfKeys()
  166. for _, key := range keys {
  167. c.DeleteConfKey(key)
  168. }
  169. }
  170. func recursionDelMap(cf, fields map[string]interface{}) error {
  171. for k, v := range fields {
  172. if nil == v {
  173. delete(cf, k)
  174. continue
  175. }
  176. if delKey, ok := v.(string); ok {
  177. if 0 == len(delKey) {
  178. delete(cf, k)
  179. continue
  180. }
  181. var auxCf map[string]interface{}
  182. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  183. return fmt.Errorf(`%s%s.%s`, "type_conversion_fail", k, delKey)
  184. }
  185. cf[k] = auxCf
  186. delete(auxCf, delKey)
  187. continue
  188. }
  189. if reflect.TypeOf(v) != nil && reflect.Map == reflect.TypeOf(v).Kind() {
  190. var auxCf, auxFields map[string]interface{}
  191. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  192. return fmt.Errorf(`%s%s.%v`, "type_conversion_fail", k, v)
  193. }
  194. cf[k] = auxCf
  195. if err := cast.MapToStruct(v, &auxFields); nil != err {
  196. return fmt.Errorf(`%s%s.%v`, "type_conversion_fail", k, v)
  197. }
  198. if err := recursionDelMap(auxCf, auxFields); nil != err {
  199. return err
  200. }
  201. }
  202. }
  203. return nil
  204. }
  205. func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error {
  206. c.lock.Lock()
  207. defer c.lock.Unlock()
  208. err := recursionDelMap(c.dataCfg[confKey], reqField)
  209. if nil != err {
  210. return err
  211. }
  212. return nil
  213. }
  214. func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error {
  215. c.lock.Lock()
  216. defer c.lock.Unlock()
  217. c.dataCfg[confKey] = reqField
  218. return nil
  219. }
  220. func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error {
  221. c.lock.Lock()
  222. defer c.lock.Unlock()
  223. if nil == c.dataCfg[confKey] {
  224. return fmt.Errorf(`%s`, "not_found_confkey")
  225. }
  226. for k, v := range reqField {
  227. c.dataCfg[confKey][k] = v
  228. }
  229. return nil
  230. }
  231. // SourceConfigKeysOps implement ConfOperator interface, load the configs from etc/sources/xx.yaml
  232. type SourceConfigKeysOps struct {
  233. *ConfigKeys
  234. }
  235. func (c *SourceConfigKeysOps) SaveCfgToFile() error {
  236. pluginName := c.pluginName
  237. confDir, err := GetDataLoc()
  238. if nil != err {
  239. return err
  240. }
  241. dir := path.Join(confDir, "sources")
  242. filePath := path.Join(dir, pluginName+".yaml")
  243. cfg := c.CopyUpdatableConfContent()
  244. err = filex.WriteYamlMarshal(filePath, cfg)
  245. if nil != err {
  246. return err
  247. }
  248. return nil
  249. }
  250. // SinkConfigKeysOps implement ConfOperator interface, load the configs from data/sinks/xx.yaml
  251. type SinkConfigKeysOps struct {
  252. *ConfigKeys
  253. }
  254. func (c *SinkConfigKeysOps) SaveCfgToFile() error {
  255. pluginName := c.pluginName
  256. confDir, err := GetDataLoc()
  257. if nil != err {
  258. return err
  259. }
  260. dir := path.Join(confDir, "sinks")
  261. filePath := path.Join(dir, pluginName+".yaml")
  262. cfg := c.CopyUpdatableConfContent()
  263. err = filex.WriteYamlMarshal(filePath, cfg)
  264. if nil != err {
  265. return err
  266. }
  267. return nil
  268. }
  269. // ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml
  270. type ConnectionConfigKeysOps struct {
  271. *ConfigKeys
  272. }
  273. func (p *ConnectionConfigKeysOps) SaveCfgToFile() error {
  274. pluginName := p.pluginName
  275. confDir, err := GetDataLoc()
  276. if nil != err {
  277. return err
  278. }
  279. cfg := p.CopyUpdatableConfContent()
  280. yamlPath := path.Join(confDir, "connections/connection.yaml")
  281. yamlData := make(map[string]interface{})
  282. err = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
  283. if nil != err {
  284. return err
  285. }
  286. yamlData[pluginName] = cfg
  287. return filex.WriteYamlMarshal(yamlPath, yamlData)
  288. }
  289. // NewConfigOperatorForSource construct function
  290. func NewConfigOperatorForSource(pluginName string) ConfigOperator {
  291. c := &SourceConfigKeysOps{
  292. &ConfigKeys{
  293. lock: sync.RWMutex{},
  294. pluginName: pluginName,
  295. etcCfg: map[string]map[string]interface{}{},
  296. dataCfg: map[string]map[string]interface{}{},
  297. },
  298. }
  299. return c
  300. }
  301. // NewConfigOperatorFromSourceYaml construct function, Load the configs from etc/sources/xx.yaml
  302. func NewConfigOperatorFromSourceYaml(pluginName string) (ConfigOperator, error) {
  303. c := &SourceConfigKeysOps{
  304. &ConfigKeys{
  305. lock: sync.RWMutex{},
  306. pluginName: pluginName,
  307. etcCfg: map[string]map[string]interface{}{},
  308. dataCfg: map[string]map[string]interface{}{},
  309. },
  310. }
  311. confDir, err := GetConfLoc()
  312. if nil != err {
  313. return nil, err
  314. }
  315. dir := path.Join(confDir, "sources")
  316. fileName := pluginName
  317. if "mqtt" == pluginName {
  318. fileName = "mqtt_source"
  319. dir = confDir
  320. }
  321. filePath := path.Join(dir, fileName+`.yaml`)
  322. // Just ignore error if yaml not found
  323. _ = LoadConfigFromPath(filePath, &c.etcCfg)
  324. dataDir, err := GetDataLoc()
  325. if nil != err {
  326. return nil, err
  327. }
  328. dir = path.Join(dataDir, "sources")
  329. fileName = pluginName
  330. filePath = path.Join(dir, fileName+`.yaml`)
  331. _ = filex.ReadYamlUnmarshal(filePath, &c.dataCfg)
  332. return c, nil
  333. }
  334. // NewConfigOperatorForSink construct function
  335. func NewConfigOperatorForSink(pluginName string) ConfigOperator {
  336. c := &SinkConfigKeysOps{
  337. &ConfigKeys{
  338. lock: sync.RWMutex{},
  339. pluginName: pluginName,
  340. etcCfg: map[string]map[string]interface{}{},
  341. dataCfg: map[string]map[string]interface{}{},
  342. },
  343. }
  344. return c
  345. }
  346. // NewConfigOperatorFromSinkYaml construct function, Load the configs from etc/sources/xx.yaml
  347. func NewConfigOperatorFromSinkYaml(pluginName string) (ConfigOperator, error) {
  348. c := &SinkConfigKeysOps{
  349. &ConfigKeys{
  350. lock: sync.RWMutex{},
  351. pluginName: pluginName,
  352. etcCfg: map[string]map[string]interface{}{},
  353. dataCfg: map[string]map[string]interface{}{},
  354. },
  355. }
  356. dataDir, err := GetDataLoc()
  357. if nil != err {
  358. return nil, err
  359. }
  360. dir := path.Join(dataDir, "sinks")
  361. filePath := path.Join(dir, pluginName+`.yaml`)
  362. _ = filex.ReadYamlUnmarshal(filePath, &c.dataCfg)
  363. return c, nil
  364. }
  365. // NewConfigOperatorForConnection construct function
  366. func NewConfigOperatorForConnection(pluginName string) ConfigOperator {
  367. c := &ConnectionConfigKeysOps{
  368. &ConfigKeys{
  369. lock: sync.RWMutex{},
  370. pluginName: pluginName,
  371. etcCfg: map[string]map[string]interface{}{},
  372. dataCfg: map[string]map[string]interface{}{},
  373. },
  374. }
  375. return c
  376. }
  377. // NewConfigOperatorFromConnectionYaml construct function, Load the configs from et/connections/connection.yaml
  378. func NewConfigOperatorFromConnectionYaml(pluginName string) (ConfigOperator, error) {
  379. c := &ConnectionConfigKeysOps{
  380. &ConfigKeys{
  381. lock: sync.RWMutex{},
  382. pluginName: pluginName,
  383. etcCfg: map[string]map[string]interface{}{},
  384. dataCfg: map[string]map[string]interface{}{},
  385. },
  386. }
  387. confDir, err := GetConfLoc()
  388. if nil != err {
  389. return nil, err
  390. }
  391. yamlPath := path.Join(confDir, "connections/connection.yaml")
  392. yamlData := make(map[string]interface{})
  393. err = LoadConfigFromPath(yamlPath, &yamlData)
  394. if nil != err {
  395. return nil, err
  396. }
  397. if plgCnfs, ok := yamlData[pluginName]; ok {
  398. if cf, ok1 := plgCnfs.(map[string]interface{}); ok1 {
  399. for confKey, confVal := range cf {
  400. if conf, ok := confVal.(map[string]interface{}); ok {
  401. c.etcCfg[confKey] = conf
  402. } else {
  403. return nil, fmt.Errorf("file content is not right: %s.%v", confKey, confVal)
  404. }
  405. }
  406. } else {
  407. return nil, fmt.Errorf("file content is not right: %v", plgCnfs)
  408. }
  409. } else {
  410. return nil, fmt.Errorf("not find the target connection type: %s", c.pluginName)
  411. }
  412. confDir, err = GetDataLoc()
  413. if nil != err {
  414. return nil, err
  415. }
  416. yamlPath = path.Join(confDir, "connections/connection.yaml")
  417. yamlData = make(map[string]interface{})
  418. _ = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
  419. if plgCnfs, ok := yamlData[pluginName]; ok {
  420. if cf, ok1 := plgCnfs.(map[string]interface{}); ok1 {
  421. for confKey, confVal := range cf {
  422. if conf, ok := confVal.(map[string]interface{}); ok {
  423. c.dataCfg[confKey] = conf
  424. } else {
  425. return nil, fmt.Errorf("file content is not right: %s.%v", confKey, confVal)
  426. }
  427. }
  428. } else {
  429. return nil, fmt.Errorf("file content is not right: %v", plgCnfs)
  430. }
  431. }
  432. return c, nil
  433. }