yaml_config_ops.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package conf
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  19. "github.com/lf-edge/ekuiper/pkg/cast"
  20. "path"
  21. "reflect"
  22. "sync"
  23. )
  24. //ConfKeysOperator define interface to query/add/update/delete the configs in memory
  25. type ConfKeysOperator interface {
  26. GetPluginName() string
  27. GetConfContentByte() ([]byte, error)
  28. CopyConfContent() map[string]map[string]interface{}
  29. CopyReadOnlyConfContent() map[string]map[string]interface{}
  30. CopyUpdatableConfContent() map[string]map[string]interface{}
  31. GetConfKeys() (keys []string)
  32. GetReadOnlyConfKeys() (keys []string)
  33. GetUpdatableConfKeys() (keys []string)
  34. DeleteConfKey(confKey string)
  35. DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
  36. AddConfKey(confKey string, reqField map[string]interface{}) error
  37. AddConfKeyField(confKey string, reqField map[string]interface{}) error
  38. }
  39. //ConfigOperator define interface to query/add/update/delete the configs in disk
  40. type ConfigOperator interface {
  41. ConfKeysOperator
  42. SaveCfgToFile() error
  43. }
  44. // ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml
  45. // Hold the connection configs for each connection type in etcCfg field
  46. // Provide method to query/add/update/delete the configs
  47. type ConfigKeys struct {
  48. lock sync.RWMutex
  49. pluginName string // source type, can be mqtt/edgex/httppull
  50. etcCfg map[string]map[string]interface{} // configs defined in etc/sources/yaml
  51. dataCfg map[string]map[string]interface{} // configs defined in etc/sources/
  52. }
  53. func (c *ConfigKeys) GetPluginName() string {
  54. return c.pluginName
  55. }
  56. func (c *ConfigKeys) GetConfContentByte() ([]byte, error) {
  57. cf := make(map[string]map[string]interface{})
  58. c.lock.RLock()
  59. defer c.lock.RUnlock()
  60. for key, kvs := range c.etcCfg {
  61. aux := make(map[string]interface{})
  62. for k, v := range kvs {
  63. aux[k] = v
  64. }
  65. cf[key] = aux
  66. }
  67. for key, kvs := range c.dataCfg {
  68. aux := make(map[string]interface{})
  69. for k, v := range kvs {
  70. aux[k] = v
  71. }
  72. cf[key] = aux
  73. }
  74. return json.Marshal(cf)
  75. }
  76. func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{} {
  77. cf := make(map[string]map[string]interface{})
  78. c.lock.RLock()
  79. defer c.lock.RUnlock()
  80. for key, kvs := range c.etcCfg {
  81. aux := make(map[string]interface{})
  82. for k, v := range kvs {
  83. aux[k] = v
  84. }
  85. cf[key] = aux
  86. }
  87. //note: config keys in data directory will overwrite those in etc directory with same name
  88. for key, kvs := range c.dataCfg {
  89. aux := make(map[string]interface{})
  90. for k, v := range kvs {
  91. aux[k] = v
  92. }
  93. cf[key] = aux
  94. }
  95. return cf
  96. }
  97. func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{} {
  98. cf := make(map[string]map[string]interface{})
  99. c.lock.RLock()
  100. defer c.lock.RUnlock()
  101. for key, kvs := range c.etcCfg {
  102. aux := make(map[string]interface{})
  103. for k, v := range kvs {
  104. aux[k] = v
  105. }
  106. cf[key] = aux
  107. }
  108. return cf
  109. }
  110. func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{} {
  111. cf := make(map[string]map[string]interface{})
  112. c.lock.RLock()
  113. defer c.lock.RUnlock()
  114. for key, kvs := range c.dataCfg {
  115. aux := make(map[string]interface{})
  116. for k, v := range kvs {
  117. aux[k] = v
  118. }
  119. cf[key] = aux
  120. }
  121. return cf
  122. }
  123. func (c *ConfigKeys) GetConfKeys() (keys []string) {
  124. ro := c.GetReadOnlyConfKeys()
  125. keys = append(keys, ro...)
  126. up := c.GetUpdatableConfKeys()
  127. keys = append(keys, up...)
  128. return keys
  129. }
  130. func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string) {
  131. c.lock.RLock()
  132. defer c.lock.RUnlock()
  133. for k := range c.etcCfg {
  134. keys = append(keys, k)
  135. }
  136. return keys
  137. }
  138. func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string) {
  139. c.lock.RLock()
  140. defer c.lock.RUnlock()
  141. for k := range c.dataCfg {
  142. keys = append(keys, k)
  143. }
  144. return keys
  145. }
  146. func (c *ConfigKeys) DeleteConfKey(confKey string) {
  147. c.lock.Lock()
  148. defer c.lock.Unlock()
  149. delete(c.dataCfg, confKey)
  150. }
  151. func recursionDelMap(cf, fields map[string]interface{}) error {
  152. for k, v := range fields {
  153. if nil == v {
  154. delete(cf, k)
  155. continue
  156. }
  157. if delKey, ok := v.(string); ok {
  158. if 0 == len(delKey) {
  159. delete(cf, k)
  160. continue
  161. }
  162. var auxCf map[string]interface{}
  163. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  164. return fmt.Errorf(`%s%s.%s`, "type_conversion_fail", k, delKey)
  165. }
  166. cf[k] = auxCf
  167. delete(auxCf, delKey)
  168. continue
  169. }
  170. if reflect.TypeOf(v) != nil && reflect.Map == reflect.TypeOf(v).Kind() {
  171. var auxCf, auxFields map[string]interface{}
  172. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  173. return fmt.Errorf(`%s%s.%v`, "type_conversion_fail", k, v)
  174. }
  175. cf[k] = auxCf
  176. if err := cast.MapToStruct(v, &auxFields); nil != err {
  177. return fmt.Errorf(`%s%s.%v`, "type_conversion_fail", k, v)
  178. }
  179. if err := recursionDelMap(auxCf, auxFields); nil != err {
  180. return err
  181. }
  182. }
  183. }
  184. return nil
  185. }
  186. func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error {
  187. c.lock.Lock()
  188. defer c.lock.Unlock()
  189. err := recursionDelMap(c.dataCfg[confKey], reqField)
  190. if nil != err {
  191. return err
  192. }
  193. return nil
  194. }
  195. func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error {
  196. c.lock.Lock()
  197. defer c.lock.Unlock()
  198. c.dataCfg[confKey] = reqField
  199. return nil
  200. }
  201. func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error {
  202. c.lock.Lock()
  203. defer c.lock.Unlock()
  204. if nil == c.dataCfg[confKey] {
  205. return fmt.Errorf(`%s`, "not_found_confkey")
  206. }
  207. for k, v := range reqField {
  208. c.dataCfg[confKey][k] = v
  209. }
  210. return nil
  211. }
  212. // SourceConfigKeysOps implement ConfOperator interface, load the configs from etc/sources/xx.yaml
  213. type SourceConfigKeysOps struct {
  214. *ConfigKeys
  215. }
  216. func (c *SourceConfigKeysOps) SaveCfgToFile() error {
  217. pluginName := c.pluginName
  218. confDir, err := GetDataLoc()
  219. if nil != err {
  220. return err
  221. }
  222. dir := path.Join(confDir, "sources")
  223. filePath := path.Join(dir, pluginName+".yaml")
  224. cfg := c.CopyUpdatableConfContent()
  225. err = filex.WriteYamlMarshal(filePath, cfg)
  226. if nil != err {
  227. return err
  228. }
  229. return nil
  230. }
  231. // SinkConfigKeysOps implement ConfOperator interface, load the configs from data/sinks/xx.yaml
  232. type SinkConfigKeysOps struct {
  233. *ConfigKeys
  234. }
  235. func (c *SinkConfigKeysOps) SaveCfgToFile() error {
  236. pluginName := c.pluginName
  237. confDir, err := GetDataLoc()
  238. if nil != err {
  239. return err
  240. }
  241. dir := path.Join(confDir, "sinks")
  242. filePath := path.Join(dir, pluginName+".yaml")
  243. cfg := c.CopyUpdatableConfContent()
  244. err = filex.WriteYamlMarshal(filePath, cfg)
  245. if nil != err {
  246. return err
  247. }
  248. return nil
  249. }
  250. // ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml
  251. type ConnectionConfigKeysOps struct {
  252. *ConfigKeys
  253. }
  254. func (p *ConnectionConfigKeysOps) SaveCfgToFile() error {
  255. pluginName := p.pluginName
  256. confDir, err := GetDataLoc()
  257. if nil != err {
  258. return err
  259. }
  260. cfg := p.CopyUpdatableConfContent()
  261. yamlPath := path.Join(confDir, "connections/connection.yaml")
  262. yamlData := make(map[string]interface{})
  263. err = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
  264. if nil != err {
  265. return err
  266. }
  267. yamlData[pluginName] = cfg
  268. return filex.WriteYamlMarshal(yamlPath, yamlData)
  269. }
  270. // NewConfigOperatorForSource construct function
  271. func NewConfigOperatorForSource(pluginName string) ConfigOperator {
  272. c := &SourceConfigKeysOps{
  273. &ConfigKeys{
  274. lock: sync.RWMutex{},
  275. pluginName: pluginName,
  276. etcCfg: map[string]map[string]interface{}{},
  277. dataCfg: map[string]map[string]interface{}{},
  278. },
  279. }
  280. return c
  281. }
  282. // NewConfigOperatorFromSourceYaml construct function, Load the configs from etc/sources/xx.yaml
  283. func NewConfigOperatorFromSourceYaml(pluginName string) (ConfigOperator, error) {
  284. c := &SourceConfigKeysOps{
  285. &ConfigKeys{
  286. lock: sync.RWMutex{},
  287. pluginName: pluginName,
  288. etcCfg: map[string]map[string]interface{}{},
  289. dataCfg: map[string]map[string]interface{}{},
  290. },
  291. }
  292. confDir, err := GetConfLoc()
  293. if nil != err {
  294. return nil, err
  295. }
  296. dir := path.Join(confDir, "sources")
  297. fileName := pluginName
  298. if "mqtt" == pluginName {
  299. fileName = "mqtt_source"
  300. dir = confDir
  301. }
  302. filePath := path.Join(dir, fileName+`.yaml`)
  303. // Just ignore error if yaml not found
  304. _ = LoadConfigFromPath(filePath, &c.etcCfg)
  305. dataDir, err := GetDataLoc()
  306. if nil != err {
  307. return nil, err
  308. }
  309. dir = path.Join(dataDir, "sources")
  310. fileName = pluginName
  311. filePath = path.Join(dir, fileName+`.yaml`)
  312. _ = LoadConfigFromPath(filePath, &c.dataCfg)
  313. return c, nil
  314. }
  315. // NewConfigOperatorForSink construct function
  316. func NewConfigOperatorForSink(pluginName string) ConfigOperator {
  317. c := &SinkConfigKeysOps{
  318. &ConfigKeys{
  319. lock: sync.RWMutex{},
  320. pluginName: pluginName,
  321. etcCfg: map[string]map[string]interface{}{},
  322. dataCfg: map[string]map[string]interface{}{},
  323. },
  324. }
  325. return c
  326. }
  327. // NewConfigOperatorFromSinkYaml construct function, Load the configs from etc/sources/xx.yaml
  328. func NewConfigOperatorFromSinkYaml(pluginName string) (ConfigOperator, error) {
  329. c := &SinkConfigKeysOps{
  330. &ConfigKeys{
  331. lock: sync.RWMutex{},
  332. pluginName: pluginName,
  333. etcCfg: map[string]map[string]interface{}{},
  334. dataCfg: map[string]map[string]interface{}{},
  335. },
  336. }
  337. dataDir, err := GetDataLoc()
  338. if nil != err {
  339. return nil, err
  340. }
  341. dir := path.Join(dataDir, "sinks")
  342. filePath := path.Join(dir, pluginName+`.yaml`)
  343. _ = LoadConfigFromPath(filePath, &c.dataCfg)
  344. return c, nil
  345. }
  346. // NewConfigOperatorForConnection construct function
  347. func NewConfigOperatorForConnection(pluginName string) ConfigOperator {
  348. c := &ConnectionConfigKeysOps{
  349. &ConfigKeys{
  350. lock: sync.RWMutex{},
  351. pluginName: pluginName,
  352. etcCfg: map[string]map[string]interface{}{},
  353. dataCfg: map[string]map[string]interface{}{},
  354. },
  355. }
  356. return c
  357. }
  358. // NewConfigOperatorFromConnectionYaml construct function, Load the configs from et/connections/connection.yaml
  359. func NewConfigOperatorFromConnectionYaml(pluginName string) (ConfigOperator, error) {
  360. c := &ConnectionConfigKeysOps{
  361. &ConfigKeys{
  362. lock: sync.RWMutex{},
  363. pluginName: pluginName,
  364. etcCfg: map[string]map[string]interface{}{},
  365. dataCfg: map[string]map[string]interface{}{},
  366. },
  367. }
  368. confDir, err := GetConfLoc()
  369. if nil != err {
  370. return nil, err
  371. }
  372. yamlPath := path.Join(confDir, "connections/connection.yaml")
  373. yamlData := make(map[string]interface{})
  374. err = LoadConfigFromPath(yamlPath, &yamlData)
  375. if nil != err {
  376. return nil, err
  377. }
  378. if plgCnfs, ok := yamlData[pluginName]; ok {
  379. if cf, ok1 := plgCnfs.(map[string]interface{}); ok1 {
  380. for confKey, confVal := range cf {
  381. if conf, ok := confVal.(map[string]interface{}); ok {
  382. c.etcCfg[confKey] = conf
  383. } else {
  384. return nil, fmt.Errorf("file content is not right: %s.%v", confKey, confVal)
  385. }
  386. }
  387. } else {
  388. return nil, fmt.Errorf("file content is not right: %v", plgCnfs)
  389. }
  390. } else {
  391. return nil, fmt.Errorf("not find the target connection type: %s", c.pluginName)
  392. }
  393. confDir, err = GetDataLoc()
  394. if nil != err {
  395. return nil, err
  396. }
  397. yamlPath = path.Join(confDir, "connections/connection.yaml")
  398. yamlData = make(map[string]interface{})
  399. _ = LoadConfigFromPath(yamlPath, &yamlData)
  400. if plgCnfs, ok := yamlData[pluginName]; ok {
  401. if cf, ok1 := plgCnfs.(map[string]interface{}); ok1 {
  402. for confKey, confVal := range cf {
  403. if conf, ok := confVal.(map[string]interface{}); ok {
  404. c.dataCfg[confKey] = conf
  405. } else {
  406. return nil, fmt.Errorf("file content is not right: %s.%v", confKey, confVal)
  407. }
  408. }
  409. } else {
  410. return nil, fmt.Errorf("file content is not right: %v", plgCnfs)
  411. }
  412. }
  413. return c, nil
  414. }