yaml_config_ops.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package meta
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "path"
  22. "reflect"
  23. "sync"
  24. )
  25. //ConfKeysOperator define interface to query/add/update/delete the configs in memory
  26. type ConfKeysOperator interface {
  27. GetPluginName() string
  28. GetConfContentByte() ([]byte, error)
  29. CopyConfContent() map[string]map[string]interface{}
  30. GetConfKeys() (keys []string)
  31. DeleteConfKey(confKey string)
  32. DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
  33. AddConfKey(confKey string, reqField map[string]interface{}) error
  34. AddConfKeyField(confKey string, reqField map[string]interface{}) error
  35. }
  36. //ConfigOperator define interface to query/add/update/delete the configs in disk
  37. type ConfigOperator interface {
  38. ConfKeysOperator
  39. IsSource() bool
  40. SaveCfgToFile() error
  41. }
  42. // ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml
  43. // Hold the connection configs for each connection type in cf field
  44. // Provide method to query/add/update/delete the configs
  45. type ConfigKeys struct {
  46. lock sync.RWMutex
  47. pluginName string // source type, can be mqtt/edgex/httppull
  48. cf map[string]map[string]interface{} // configs defined in yaml
  49. }
  50. func (c *ConfigKeys) GetPluginName() string {
  51. return c.pluginName
  52. }
  53. func (c *ConfigKeys) GetConfContentByte() ([]byte, error) {
  54. cf := make(map[string]map[string]interface{})
  55. c.lock.RLock()
  56. defer c.lock.RUnlock()
  57. for key, kvs := range c.cf {
  58. aux := make(map[string]interface{})
  59. for k, v := range kvs {
  60. aux[k] = v
  61. }
  62. cf[key] = aux
  63. }
  64. return json.Marshal(cf)
  65. }
  66. func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{} {
  67. cf := make(map[string]map[string]interface{})
  68. c.lock.RLock()
  69. defer c.lock.RUnlock()
  70. for key, kvs := range c.cf {
  71. aux := make(map[string]interface{})
  72. for k, v := range kvs {
  73. aux[k] = v
  74. }
  75. cf[key] = aux
  76. }
  77. return cf
  78. }
  79. func (c *ConfigKeys) GetConfKeys() (keys []string) {
  80. c.lock.RLock()
  81. defer c.lock.RUnlock()
  82. for k := range c.cf {
  83. keys = append(keys, k)
  84. }
  85. return keys
  86. }
  87. func (c *ConfigKeys) DeleteConfKey(confKey string) {
  88. c.lock.Lock()
  89. defer c.lock.Unlock()
  90. delete(c.cf, confKey)
  91. }
  92. func recursionDelMap(cf, fields map[string]interface{}) error {
  93. for k, v := range fields {
  94. if nil == v {
  95. delete(cf, k)
  96. continue
  97. }
  98. if delKey, ok := v.(string); ok {
  99. if 0 == len(delKey) {
  100. delete(cf, k)
  101. continue
  102. }
  103. var auxCf map[string]interface{}
  104. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  105. return fmt.Errorf(`%s%s.%s`, "type_conversion_fail", k, delKey)
  106. }
  107. cf[k] = auxCf
  108. delete(auxCf, delKey)
  109. continue
  110. }
  111. if reflect.TypeOf(v) != nil && reflect.Map == reflect.TypeOf(v).Kind() {
  112. var auxCf, auxFields map[string]interface{}
  113. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  114. return fmt.Errorf(`%s%s.%v`, "type_conversion_fail", k, v)
  115. }
  116. cf[k] = auxCf
  117. if err := cast.MapToStruct(v, &auxFields); nil != err {
  118. return fmt.Errorf(`%s%s.%v`, "type_conversion_fail", k, v)
  119. }
  120. if err := recursionDelMap(auxCf, auxFields); nil != err {
  121. return err
  122. }
  123. }
  124. }
  125. return nil
  126. }
  127. func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error {
  128. c.lock.Lock()
  129. defer c.lock.Unlock()
  130. err := recursionDelMap(c.cf[confKey], reqField)
  131. if nil != err {
  132. return err
  133. }
  134. return nil
  135. }
  136. func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error {
  137. c.lock.Lock()
  138. defer c.lock.Unlock()
  139. c.cf[confKey] = reqField
  140. return nil
  141. }
  142. func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error {
  143. c.lock.Lock()
  144. defer c.lock.Unlock()
  145. if nil == c.cf[confKey] {
  146. return fmt.Errorf(`%s`, "not_found_confkey")
  147. }
  148. for k, v := range reqField {
  149. c.cf[confKey][k] = v
  150. }
  151. return nil
  152. }
  153. // SourceConfigKeysOps implement ConfOperator interface, load the configs from etc/sources/xx.yaml
  154. type SourceConfigKeysOps struct {
  155. *ConfigKeys
  156. }
  157. func (c *SourceConfigKeysOps) IsSource() bool {
  158. return true
  159. }
  160. func (c *SourceConfigKeysOps) SaveCfgToFile() error {
  161. pluginName := c.pluginName
  162. confDir, err := conf.GetConfLoc()
  163. if nil != err {
  164. return err
  165. }
  166. dir := path.Join(confDir, "sources")
  167. if "mqtt" == pluginName {
  168. pluginName = "mqtt_source"
  169. dir = confDir
  170. }
  171. filePath := path.Join(dir, pluginName+".yaml")
  172. cfg := c.CopyConfContent()
  173. err = filex.WriteYamlMarshal(filePath, cfg)
  174. if nil != err {
  175. return err
  176. }
  177. return nil
  178. }
  179. // ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml
  180. type ConnectionConfigKeysOps struct {
  181. *ConfigKeys
  182. }
  183. func (p *ConnectionConfigKeysOps) IsSource() bool {
  184. return false
  185. }
  186. func (p *ConnectionConfigKeysOps) SaveCfgToFile() error {
  187. pluginName := p.pluginName
  188. confDir, err := conf.GetConfLoc()
  189. if nil != err {
  190. return err
  191. }
  192. cfg := p.CopyConfContent()
  193. yamlPath := path.Join(confDir, "connections/connection.yaml")
  194. yamlData := make(map[string]interface{})
  195. err = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
  196. if nil != err {
  197. return err
  198. }
  199. yamlData[pluginName] = cfg
  200. return filex.WriteYamlMarshal(yamlPath, yamlData)
  201. }
  202. // NewConfigOperatorFromSourceYaml construct function, Load the configs from etc/sources/xx.yaml
  203. func NewConfigOperatorFromSourceYaml(pluginName string) (ConfigOperator, error) {
  204. confDir, err := conf.GetConfLoc()
  205. if nil != err {
  206. return nil, err
  207. }
  208. c := &SourceConfigKeysOps{
  209. &ConfigKeys{
  210. lock: sync.RWMutex{},
  211. pluginName: pluginName,
  212. cf: map[string]map[string]interface{}{},
  213. },
  214. }
  215. dir := path.Join(confDir, "sources")
  216. fileName := pluginName
  217. if "mqtt" == pluginName {
  218. fileName = "mqtt_source"
  219. dir = confDir
  220. }
  221. filePath := path.Join(dir, fileName+`.yaml`)
  222. err = filex.ReadYamlUnmarshal(filePath, &c.cf)
  223. if nil != err {
  224. return nil, err
  225. }
  226. return c, nil
  227. }
  228. // NewConfigOperatorFromConnectionYaml construct function, Load the configs from et/connections/connection.yaml
  229. func NewConfigOperatorFromConnectionYaml(pluginName string) (ConfigOperator, error) {
  230. confDir, err := conf.GetConfLoc()
  231. if nil != err {
  232. return nil, err
  233. }
  234. yamlPath := path.Join(confDir, "connections/connection.yaml")
  235. yamlData := make(map[string]interface{})
  236. err = filex.ReadYamlUnmarshal(yamlPath, &yamlData)
  237. if nil != err {
  238. return nil, err
  239. }
  240. c := &ConnectionConfigKeysOps{
  241. &ConfigKeys{
  242. lock: sync.RWMutex{},
  243. pluginName: pluginName,
  244. cf: map[string]map[string]interface{}{},
  245. },
  246. }
  247. if plgCnfs, ok := yamlData[pluginName]; ok {
  248. if cf, ok1 := plgCnfs.(map[string]interface{}); ok1 {
  249. for confKey, confVal := range cf {
  250. if conf, ok := confVal.(map[string]interface{}); ok {
  251. c.cf[confKey] = conf
  252. } else {
  253. return nil, fmt.Errorf("file content is not right: %s.%v", confKey, confVal)
  254. }
  255. }
  256. } else {
  257. return nil, fmt.Errorf("file content is not right: %v", plgCnfs)
  258. }
  259. } else {
  260. return nil, fmt.Errorf("not find the target connection type: %s", c.pluginName)
  261. }
  262. return c, nil
  263. }