sourceMeta.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package meta
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "io/ioutil"
  22. "path"
  23. "reflect"
  24. "strings"
  25. )
  26. type (
  27. fileSource struct {
  28. About *fileAbout `json:"about"`
  29. Libs []string `json:"libs"`
  30. ConfKeys map[string][]*fileField `json:"properties"`
  31. }
  32. uiSource struct {
  33. About *about `json:"about"`
  34. Libs []string `json:"libs"`
  35. ConfKeys map[string][]field `json:"properties"`
  36. }
  37. sourceProperty struct {
  38. cf map[string]map[string]interface{}
  39. meta *uiSource
  40. }
  41. )
  42. func newUiSource(fi *fileSource) (*uiSource, error) {
  43. if nil == fi {
  44. return nil, nil
  45. }
  46. var err error
  47. ui := new(uiSource)
  48. ui.Libs = fi.Libs
  49. ui.About = newAbout(fi.About)
  50. ui.ConfKeys = make(map[string][]field)
  51. for k, fields := range fi.ConfKeys {
  52. if ui.ConfKeys[k], err = newField(fields); nil != err {
  53. return nil, err
  54. }
  55. }
  56. return ui, nil
  57. }
  58. var gSourceproperty = make(map[string]*sourceProperty)
  59. func UninstallSource(name string) {
  60. if v, ok := gSourceproperty[name+".json"]; ok {
  61. if ui := v.meta; nil != ui {
  62. if nil != ui.About {
  63. ui.About.Installed = false
  64. }
  65. }
  66. }
  67. }
  68. func ReadSourceMetaFile(filePath string, installed bool) error {
  69. fileName := path.Base(filePath)
  70. if "mqtt_source.json" == fileName {
  71. fileName = "mqtt.json"
  72. }
  73. ptrMeta := new(fileSource)
  74. err := filex.ReadJsonUnmarshal(filePath, ptrMeta)
  75. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  76. return fmt.Errorf("file:%s err:%v", filePath, err)
  77. }
  78. if 0 == len(ptrMeta.ConfKeys["default"]) {
  79. return fmt.Errorf("not found default confKey %s", filePath)
  80. }
  81. if nil == ptrMeta.About {
  82. return fmt.Errorf("not found about of %s", filePath)
  83. } else {
  84. ptrMeta.About.Installed = installed
  85. }
  86. yamlData := make(map[string]map[string]interface{})
  87. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  88. err = filex.ReadYamlUnmarshal(filePath, &yamlData)
  89. if nil != err {
  90. return fmt.Errorf("file:%s err:%v", filePath, err)
  91. }
  92. if 0 == len(yamlData["default"]) {
  93. return fmt.Errorf("not found default confKey from %s", filePath)
  94. }
  95. property := new(sourceProperty)
  96. property.cf = yamlData
  97. property.meta, err = newUiSource(ptrMeta)
  98. if nil != err {
  99. return err
  100. }
  101. gSourceproperty[fileName] = property
  102. return err
  103. }
  104. func ReadSourceMetaDir(checker InstallChecker) error {
  105. confDir, err := conf.GetConfLoc()
  106. if nil != err {
  107. return err
  108. }
  109. dir := path.Join(confDir, "sources")
  110. infos, err := ioutil.ReadDir(dir)
  111. if nil != err {
  112. return err
  113. }
  114. if err = ReadSourceMetaFile(path.Join(confDir, "mqtt_source.json"), false); nil != err {
  115. return err
  116. }
  117. for _, info := range infos {
  118. fileName := info.Name()
  119. if strings.HasSuffix(fileName, ".json") {
  120. filePath := path.Join(dir, fileName)
  121. if err = ReadSourceMetaFile(filePath, checker(strings.TrimSuffix(fileName, ".json"))); nil != err {
  122. return err
  123. }
  124. conf.Log.Infof("sourceMeta file : %s", fileName)
  125. }
  126. }
  127. return nil
  128. }
  129. func GetSourceConf(pluginName, language string) (b []byte, err error) {
  130. property, ok := gSourceproperty[pluginName+".json"]
  131. if ok {
  132. cf := make(map[string]map[string]interface{})
  133. for key, kvs := range property.cf {
  134. aux := make(map[string]interface{})
  135. for k, v := range kvs {
  136. aux[k] = v
  137. }
  138. cf[key] = aux
  139. }
  140. if b, err = json.Marshal(cf); nil != err {
  141. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  142. } else {
  143. return b, err
  144. }
  145. }
  146. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  147. }
  148. func GetSourceMeta(pluginName, language string) (ptrSourceProperty *uiSource, err error) {
  149. property, ok := gSourceproperty[pluginName+".json"]
  150. if ok {
  151. return property.cfToMeta(language)
  152. }
  153. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  154. }
  155. func GetSources() (sources []*pluginfo) {
  156. for fileName, v := range gSourceproperty {
  157. node := new(pluginfo)
  158. node.Name = strings.TrimSuffix(fileName, `.json`)
  159. if nil == v.meta {
  160. continue
  161. }
  162. if nil == v.meta.About {
  163. continue
  164. }
  165. node.About = v.meta.About
  166. i := 0
  167. for ; i < len(sources); i++ {
  168. if node.Name <= sources[i].Name {
  169. sources = append(sources, node)
  170. copy(sources[i+1:], sources[i:])
  171. sources[i] = node
  172. break
  173. }
  174. }
  175. if len(sources) == i {
  176. sources = append(sources, node)
  177. }
  178. }
  179. return sources
  180. }
  181. func GetSourceConfKeys(pluginName string) (keys []string) {
  182. property := gSourceproperty[pluginName+".json"]
  183. if nil == property {
  184. return keys
  185. }
  186. for k := range property.cf {
  187. keys = append(keys, k)
  188. }
  189. return keys
  190. }
  191. func DelSourceConfKey(pluginName, confKey, language string) error {
  192. property := gSourceproperty[pluginName+".json"]
  193. if nil == property {
  194. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  195. }
  196. if nil == property.cf {
  197. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  198. }
  199. delete(property.cf, confKey)
  200. return property.saveCf(pluginName, language)
  201. }
  202. func AddSourceConfKey(pluginName, confKey, language string, content []byte) error {
  203. reqField := make(map[string]interface{})
  204. err := json.Unmarshal(content, &reqField)
  205. if nil != err {
  206. return err
  207. }
  208. property := gSourceproperty[pluginName+".json"]
  209. if nil == property {
  210. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  211. }
  212. if nil == property.cf {
  213. property.cf = make(map[string]map[string]interface{})
  214. }
  215. if 0 != len(property.cf[confKey]) {
  216. return fmt.Errorf(`%s%s`, getMsg(language, source, "confkey_already_exist"), pluginName)
  217. }
  218. property.cf[confKey] = reqField
  219. gSourceproperty[pluginName+".json"] = property
  220. return property.saveCf(pluginName, language)
  221. }
  222. func AddSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
  223. reqField := make(map[string]interface{})
  224. err := json.Unmarshal(content, &reqField)
  225. if nil != err {
  226. return err
  227. }
  228. property := gSourceproperty[pluginName+".json"]
  229. if nil == property {
  230. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  231. }
  232. if nil == property.cf {
  233. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  234. }
  235. if nil == property.cf[confKey] {
  236. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  237. }
  238. for k, v := range reqField {
  239. property.cf[confKey][k] = v
  240. }
  241. return property.saveCf(pluginName, language)
  242. }
  243. func recursionDelMap(cf, fields map[string]interface{}, language string) error {
  244. for k, v := range fields {
  245. if nil == v {
  246. delete(cf, k)
  247. continue
  248. }
  249. if delKey, ok := v.(string); ok {
  250. if 0 == len(delKey) {
  251. delete(cf, k)
  252. continue
  253. }
  254. var auxCf map[string]interface{}
  255. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  256. return fmt.Errorf(`%s%s.%s`, getMsg(language, source, "type_conversion_fail"), k, delKey)
  257. }
  258. cf[k] = auxCf
  259. delete(auxCf, delKey)
  260. continue
  261. }
  262. if reflect.TypeOf(v) != nil && reflect.Map == reflect.TypeOf(v).Kind() {
  263. var auxCf, auxFields map[string]interface{}
  264. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  265. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
  266. }
  267. cf[k] = auxCf
  268. if err := cast.MapToStruct(v, &auxFields); nil != err {
  269. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
  270. }
  271. if err := recursionDelMap(auxCf, auxFields, language); nil != err {
  272. return err
  273. }
  274. }
  275. }
  276. return nil
  277. }
  278. func DelSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
  279. reqField := make(map[string]interface{})
  280. err := json.Unmarshal(content, &reqField)
  281. if nil != err {
  282. return err
  283. }
  284. property := gSourceproperty[pluginName+".json"]
  285. if nil == property {
  286. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  287. }
  288. if nil == property.cf {
  289. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  290. }
  291. if nil == property.cf[confKey] {
  292. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  293. }
  294. err = recursionDelMap(property.cf[confKey], reqField, language)
  295. if nil != err {
  296. return err
  297. }
  298. return property.saveCf(pluginName, language)
  299. }
  300. func recursionNewFields(template []field, conf map[string]interface{}, ret *[]field) error {
  301. for i := 0; i < len(template); i++ {
  302. p := template[i]
  303. v, ok := conf[template[i].Name]
  304. if ok {
  305. p.Exist = true
  306. } else {
  307. p.Exist = false
  308. continue
  309. }
  310. var auxRet, auxTemplate []field
  311. p.Default = &auxRet
  312. if nil == v {
  313. p.Default = v
  314. } else {
  315. if reflect.TypeOf(v) != nil && reflect.Map == reflect.TypeOf(v).Kind() {
  316. var nextCf map[string]interface{}
  317. if tmp, ok := v.(map[interface{}]interface{}); ok {
  318. nextCf = cast.ConvertMap(tmp)
  319. } else {
  320. if err := cast.MapToStruct(v, &nextCf); nil != err {
  321. return err
  322. }
  323. }
  324. if err := cast.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  325. return err
  326. }
  327. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  328. return err
  329. }
  330. } else {
  331. p.Default = v
  332. }
  333. }
  334. *ret = append(*ret, p)
  335. }
  336. return nil
  337. }
  338. func (p *sourceProperty) cfToMeta(language string) (*uiSource, error) {
  339. fields := p.meta.ConfKeys["default"]
  340. ret := make(map[string][]field)
  341. for k, kvs := range p.cf {
  342. var sli []field
  343. err := recursionNewFields(fields, kvs, &sli)
  344. if nil != err {
  345. return nil, fmt.Errorf(`%s%v`, getMsg(language, "source", "type_conversion_fail"), err)
  346. }
  347. ret[k] = sli
  348. }
  349. meta := new(uiSource)
  350. *meta = *(p.meta)
  351. meta.ConfKeys = ret
  352. return meta, nil
  353. }
  354. func (p *sourceProperty) saveCf(pluginName, language string) error {
  355. confDir, err := conf.GetConfLoc()
  356. if nil != err {
  357. return fmt.Errorf(`%s%v`, getMsg(language, source, "not_found_file"), err)
  358. }
  359. dir := path.Join(confDir, "sources")
  360. if "mqtt" == pluginName {
  361. pluginName = "mqtt_source"
  362. dir = confDir
  363. }
  364. filePath := path.Join(dir, pluginName+".yaml")
  365. err = filex.WriteYamlMarshal(filePath, p.cf)
  366. if nil != err {
  367. return fmt.Errorf(`%s%v`, getMsg(language, "source", "write_data_fail"), err)
  368. }
  369. return nil
  370. }