sourceMeta.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package plugin
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "io/ioutil"
  22. "path"
  23. "reflect"
  24. "strings"
  25. )
  26. type (
  27. fileSource struct {
  28. About *fileAbout `json:"about"`
  29. Libs []string `json:"libs"`
  30. ConfKeys map[string][]*fileField `json:"properties"`
  31. }
  32. uiSource struct {
  33. About *about `json:"about"`
  34. Libs []string `json:"libs"`
  35. ConfKeys map[string][]field `json:"properties"`
  36. }
  37. sourceProperty struct {
  38. cf map[string]map[string]interface{}
  39. meta *uiSource
  40. }
  41. )
  42. func isInternalSource(fiName string) bool {
  43. internal := []string{`edgex.json`, `httppull.json`, `mqtt.json`}
  44. for _, v := range internal {
  45. if v == fiName {
  46. return true
  47. }
  48. }
  49. return false
  50. }
  51. func newUiSource(fi *fileSource) (*uiSource, error) {
  52. if nil == fi {
  53. return nil, nil
  54. }
  55. var err error
  56. ui := new(uiSource)
  57. ui.Libs = fi.Libs
  58. ui.About = newAbout(fi.About)
  59. ui.ConfKeys = make(map[string][]field)
  60. for k, fields := range fi.ConfKeys {
  61. if ui.ConfKeys[k], err = newField(fields); nil != err {
  62. return nil, err
  63. }
  64. }
  65. return ui, nil
  66. }
  67. var gSourceproperty map[string]*sourceProperty
  68. func (m *Manager) uninstalSource(name string) {
  69. if v, ok := gSourceproperty[name+".json"]; ok {
  70. if ui := v.meta; nil != ui {
  71. if nil != ui.About {
  72. ui.About.Installed = false
  73. }
  74. }
  75. }
  76. }
  77. func (m *Manager) readSourceMetaFile(filePath string) error {
  78. fileName := path.Base(filePath)
  79. if "mqtt_source.json" == fileName {
  80. fileName = "mqtt.json"
  81. }
  82. ptrMeta := new(fileSource)
  83. err := filex.ReadJsonUnmarshal(filePath, ptrMeta)
  84. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  85. return fmt.Errorf("file:%s err:%v", filePath, err)
  86. }
  87. if 0 == len(ptrMeta.ConfKeys["default"]) {
  88. return fmt.Errorf("not found default confKey %s", filePath)
  89. }
  90. if nil == ptrMeta.About {
  91. return fmt.Errorf("not found about of %s", filePath)
  92. } else if isInternalSource(fileName) {
  93. ptrMeta.About.Installed = true
  94. } else {
  95. _, ptrMeta.About.Installed = m.registry.Get(SOURCE, strings.TrimSuffix(fileName, `.json`))
  96. }
  97. yamlData := make(map[string]map[string]interface{})
  98. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  99. err = filex.ReadYamlUnmarshal(filePath, &yamlData)
  100. if nil != err {
  101. return fmt.Errorf("file:%s err:%v", filePath, err)
  102. }
  103. if 0 == len(yamlData["default"]) {
  104. return fmt.Errorf("not found default confKey from %s", filePath)
  105. }
  106. property := new(sourceProperty)
  107. property.cf = yamlData
  108. property.meta, err = newUiSource(ptrMeta)
  109. if nil != err {
  110. return err
  111. }
  112. gSourceproperty[fileName] = property
  113. return err
  114. }
  115. func (m *Manager) readSourceMetaDir() error {
  116. gSourceproperty = make(map[string]*sourceProperty)
  117. confDir, err := conf.GetConfLoc()
  118. if nil != err {
  119. return err
  120. }
  121. dir := path.Join(confDir, "sources")
  122. infos, err := ioutil.ReadDir(dir)
  123. if nil != err {
  124. return err
  125. }
  126. if err = m.readSourceMetaFile(path.Join(confDir, "mqtt_source.json")); nil != err {
  127. return err
  128. }
  129. for _, info := range infos {
  130. fileName := info.Name()
  131. if strings.HasSuffix(fileName, ".json") {
  132. filePath := path.Join(dir, fileName)
  133. if err = m.readSourceMetaFile(filePath); nil != err {
  134. return err
  135. }
  136. conf.Log.Infof("sourceMeta file : %s", fileName)
  137. }
  138. }
  139. return nil
  140. }
  141. func GetSourceConf(pluginName, language string) (b []byte, err error) {
  142. property, ok := gSourceproperty[pluginName+".json"]
  143. if ok {
  144. cf := make(map[string]map[string]interface{})
  145. for key, kvs := range property.cf {
  146. aux := make(map[string]interface{})
  147. for k, v := range kvs {
  148. aux[k] = v
  149. }
  150. cf[key] = aux
  151. }
  152. if b, err = json.Marshal(cf); nil != err {
  153. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  154. } else {
  155. return b, err
  156. }
  157. }
  158. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  159. }
  160. func GetSourceMeta(pluginName, language string) (ptrSourceProperty *uiSource, err error) {
  161. property, ok := gSourceproperty[pluginName+".json"]
  162. if ok {
  163. return property.cfToMeta(language)
  164. }
  165. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  166. }
  167. func GetSources() (sources []*pluginfo) {
  168. for fileName, v := range gSourceproperty {
  169. node := new(pluginfo)
  170. node.Name = strings.TrimSuffix(fileName, `.json`)
  171. if nil == v.meta {
  172. continue
  173. }
  174. if nil == v.meta.About {
  175. continue
  176. }
  177. node.About = v.meta.About
  178. i := 0
  179. for ; i < len(sources); i++ {
  180. if node.Name <= sources[i].Name {
  181. sources = append(sources, node)
  182. copy(sources[i+1:], sources[i:])
  183. sources[i] = node
  184. break
  185. }
  186. }
  187. if len(sources) == i {
  188. sources = append(sources, node)
  189. }
  190. }
  191. return sources
  192. }
  193. func GetSourceConfKeys(pluginName string) (keys []string) {
  194. property := gSourceproperty[pluginName+".json"]
  195. if nil == property {
  196. return keys
  197. }
  198. for k := range property.cf {
  199. keys = append(keys, k)
  200. }
  201. return keys
  202. }
  203. func DelSourceConfKey(pluginName, confKey, language string) error {
  204. property := gSourceproperty[pluginName+".json"]
  205. if nil == property {
  206. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  207. }
  208. if nil == property.cf {
  209. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  210. }
  211. delete(property.cf, confKey)
  212. return property.saveCf(pluginName, language)
  213. }
  214. func AddSourceConfKey(pluginName, confKey, language string, content []byte) error {
  215. reqField := make(map[string]interface{})
  216. err := json.Unmarshal(content, &reqField)
  217. if nil != err {
  218. return err
  219. }
  220. property := gSourceproperty[pluginName+".json"]
  221. if nil == property {
  222. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  223. }
  224. if nil == property.cf {
  225. property.cf = make(map[string]map[string]interface{})
  226. }
  227. if 0 != len(property.cf[confKey]) {
  228. return fmt.Errorf(`%s%s`, getMsg(language, source, "confkey_already_exist"), pluginName)
  229. }
  230. property.cf[confKey] = reqField
  231. gSourceproperty[pluginName+".json"] = property
  232. return property.saveCf(pluginName, language)
  233. }
  234. func AddSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
  235. reqField := make(map[string]interface{})
  236. err := json.Unmarshal(content, &reqField)
  237. if nil != err {
  238. return err
  239. }
  240. property := gSourceproperty[pluginName+".json"]
  241. if nil == property {
  242. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  243. }
  244. if nil == property.cf {
  245. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  246. }
  247. if nil == property.cf[confKey] {
  248. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  249. }
  250. for k, v := range reqField {
  251. property.cf[confKey][k] = v
  252. }
  253. return property.saveCf(pluginName, language)
  254. }
  255. func recursionDelMap(cf, fields map[string]interface{}, language string) error {
  256. for k, v := range fields {
  257. if nil == v {
  258. delete(cf, k)
  259. continue
  260. }
  261. if delKey, ok := v.(string); ok {
  262. if 0 == len(delKey) {
  263. delete(cf, k)
  264. continue
  265. }
  266. var auxCf map[string]interface{}
  267. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  268. return fmt.Errorf(`%s%s.%s`, getMsg(language, source, "type_conversion_fail"), k, delKey)
  269. }
  270. cf[k] = auxCf
  271. delete(auxCf, delKey)
  272. continue
  273. }
  274. if reflect.Map == reflect.TypeOf(v).Kind() {
  275. var auxCf, auxFields map[string]interface{}
  276. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  277. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
  278. }
  279. cf[k] = auxCf
  280. if err := cast.MapToStruct(v, &auxFields); nil != err {
  281. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
  282. }
  283. if err := recursionDelMap(auxCf, auxFields, language); nil != err {
  284. return err
  285. }
  286. }
  287. }
  288. return nil
  289. }
  290. func DelSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
  291. reqField := make(map[string]interface{})
  292. err := json.Unmarshal(content, &reqField)
  293. if nil != err {
  294. return err
  295. }
  296. property := gSourceproperty[pluginName+".json"]
  297. if nil == property {
  298. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  299. }
  300. if nil == property.cf {
  301. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  302. }
  303. if nil == property.cf[confKey] {
  304. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  305. }
  306. err = recursionDelMap(property.cf[confKey], reqField, language)
  307. if nil != err {
  308. return err
  309. }
  310. return property.saveCf(pluginName, language)
  311. }
  312. func recursionNewFields(template []field, conf map[string]interface{}, ret *[]field) error {
  313. for i := 0; i < len(template); i++ {
  314. p := template[i]
  315. v, ok := conf[template[i].Name]
  316. if ok {
  317. p.Exist = true
  318. } else {
  319. p.Exist = false
  320. continue
  321. }
  322. var auxRet, auxTemplate []field
  323. p.Default = &auxRet
  324. if nil == v {
  325. p.Default = v
  326. } else {
  327. if reflect.Map == reflect.TypeOf(v).Kind() {
  328. var nextCf map[string]interface{}
  329. if tmp, ok := v.(map[interface{}]interface{}); ok {
  330. nextCf = cast.ConvertMap(tmp)
  331. } else {
  332. if err := cast.MapToStruct(v, &nextCf); nil != err {
  333. return err
  334. }
  335. }
  336. if err := cast.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  337. return err
  338. }
  339. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  340. return err
  341. }
  342. } else {
  343. p.Default = v
  344. }
  345. }
  346. *ret = append(*ret, p)
  347. }
  348. return nil
  349. }
  350. func (p *sourceProperty) cfToMeta(language string) (*uiSource, error) {
  351. fields := p.meta.ConfKeys["default"]
  352. ret := make(map[string][]field)
  353. for k, kvs := range p.cf {
  354. var sli []field
  355. err := recursionNewFields(fields, kvs, &sli)
  356. if nil != err {
  357. return nil, fmt.Errorf(`%s%v`, getMsg(language, "source", "type_conversion_fail"), err)
  358. }
  359. ret[k] = sli
  360. }
  361. meta := new(uiSource)
  362. *meta = *(p.meta)
  363. meta.ConfKeys = ret
  364. return meta, nil
  365. }
  366. func (p *sourceProperty) saveCf(pluginName, language string) error {
  367. confDir, err := conf.GetConfLoc()
  368. if nil != err {
  369. return fmt.Errorf(`%s%v`, getMsg(language, source, "not_found_file"), err)
  370. }
  371. dir := path.Join(confDir, "sources")
  372. if "mqtt" == pluginName {
  373. pluginName = "mqtt_source"
  374. dir = confDir
  375. }
  376. filePath := path.Join(dir, pluginName+".yaml")
  377. err = filex.WriteYamlMarshal(filePath, p.cf)
  378. if nil != err {
  379. return fmt.Errorf(`%s%v`, getMsg(language, "source", "write_data_fail"), err)
  380. }
  381. return nil
  382. }