sourceMeta.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. package plugin
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/lf-edge/ekuiper/internal/conf"
  6. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  7. "github.com/lf-edge/ekuiper/pkg/cast"
  8. "io/ioutil"
  9. "path"
  10. "reflect"
  11. "strings"
  12. )
  13. type (
  14. fileSource struct {
  15. About *fileAbout `json:"about"`
  16. Libs []string `json:"libs"`
  17. ConfKeys map[string][]*fileField `json:"properties"`
  18. }
  19. uiSource struct {
  20. About *about `json:"about"`
  21. Libs []string `json:"libs"`
  22. ConfKeys map[string][]field `json:"properties"`
  23. }
  24. sourceProperty struct {
  25. cf map[string]map[string]interface{}
  26. meta *uiSource
  27. }
  28. )
  29. func isInternalSource(fiName string) bool {
  30. internal := []string{`edgex.json`, `httppull.json`, `mqtt.json`}
  31. for _, v := range internal {
  32. if v == fiName {
  33. return true
  34. }
  35. }
  36. return false
  37. }
  38. func newUiSource(fi *fileSource) (*uiSource, error) {
  39. if nil == fi {
  40. return nil, nil
  41. }
  42. var err error
  43. ui := new(uiSource)
  44. ui.Libs = fi.Libs
  45. ui.About = newAbout(fi.About)
  46. ui.ConfKeys = make(map[string][]field)
  47. for k, fields := range fi.ConfKeys {
  48. if ui.ConfKeys[k], err = newField(fields); nil != err {
  49. return nil, err
  50. }
  51. }
  52. return ui, nil
  53. }
  54. var gSourceproperty map[string]*sourceProperty
  55. func (m *Manager) uninstalSource(name string) {
  56. if v, ok := gSourceproperty[name+".json"]; ok {
  57. if ui := v.meta; nil != ui {
  58. if nil != ui.About {
  59. ui.About.Installed = false
  60. }
  61. }
  62. }
  63. }
  64. func (m *Manager) readSourceMetaFile(filePath string) error {
  65. fileName := path.Base(filePath)
  66. if "mqtt_source.json" == fileName {
  67. fileName = "mqtt.json"
  68. }
  69. ptrMeta := new(fileSource)
  70. err := filex.ReadJsonUnmarshal(filePath, ptrMeta)
  71. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  72. return fmt.Errorf("file:%s err:%v", filePath, err)
  73. }
  74. if 0 == len(ptrMeta.ConfKeys["default"]) {
  75. return fmt.Errorf("not found default confKey %s", filePath)
  76. }
  77. if nil == ptrMeta.About {
  78. return fmt.Errorf("not found about of %s", filePath)
  79. } else if isInternalSource(fileName) {
  80. ptrMeta.About.Installed = true
  81. } else {
  82. _, ptrMeta.About.Installed = m.registry.Get(SOURCE, strings.TrimSuffix(fileName, `.json`))
  83. }
  84. yamlData := make(map[string]map[string]interface{})
  85. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  86. err = filex.ReadYamlUnmarshal(filePath, &yamlData)
  87. if nil != err {
  88. return fmt.Errorf("file:%s err:%v", filePath, err)
  89. }
  90. if 0 == len(yamlData["default"]) {
  91. return fmt.Errorf("not found default confKey from %s", filePath)
  92. }
  93. property := new(sourceProperty)
  94. property.cf = yamlData
  95. property.meta, err = newUiSource(ptrMeta)
  96. if nil != err {
  97. return err
  98. }
  99. gSourceproperty[fileName] = property
  100. return err
  101. }
  102. func (m *Manager) readSourceMetaDir() error {
  103. gSourceproperty = make(map[string]*sourceProperty)
  104. confDir, err := conf.GetConfLoc()
  105. if nil != err {
  106. return err
  107. }
  108. dir := path.Join(confDir, "sources")
  109. infos, err := ioutil.ReadDir(dir)
  110. if nil != err {
  111. return err
  112. }
  113. if err = m.readSourceMetaFile(path.Join(confDir, "mqtt_source.json")); nil != err {
  114. return err
  115. }
  116. for _, info := range infos {
  117. fileName := info.Name()
  118. if strings.HasSuffix(fileName, ".json") {
  119. filePath := path.Join(dir, fileName)
  120. if err = m.readSourceMetaFile(filePath); nil != err {
  121. return err
  122. }
  123. conf.Log.Infof("sourceMeta file : %s", fileName)
  124. }
  125. }
  126. return nil
  127. }
  128. func GetSourceConf(pluginName, language string) (b []byte, err error) {
  129. property, ok := gSourceproperty[pluginName+".json"]
  130. if ok {
  131. cf := make(map[string]map[string]interface{})
  132. for key, kvs := range property.cf {
  133. aux := make(map[string]interface{})
  134. for k, v := range kvs {
  135. aux[k] = v
  136. }
  137. cf[key] = aux
  138. }
  139. if b, err = json.Marshal(cf); nil != err {
  140. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  141. } else {
  142. return b, err
  143. }
  144. }
  145. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  146. }
  147. func GetSourceMeta(pluginName, language string) (ptrSourceProperty *uiSource, err error) {
  148. property, ok := gSourceproperty[pluginName+".json"]
  149. if ok {
  150. return property.cfToMeta(language)
  151. }
  152. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  153. }
  154. func GetSources() (sources []*pluginfo) {
  155. for fileName, v := range gSourceproperty {
  156. node := new(pluginfo)
  157. node.Name = strings.TrimSuffix(fileName, `.json`)
  158. if nil == v.meta {
  159. continue
  160. }
  161. if nil == v.meta.About {
  162. continue
  163. }
  164. node.About = v.meta.About
  165. i := 0
  166. for ; i < len(sources); i++ {
  167. if node.Name <= sources[i].Name {
  168. sources = append(sources, node)
  169. copy(sources[i+1:], sources[i:])
  170. sources[i] = node
  171. break
  172. }
  173. }
  174. if len(sources) == i {
  175. sources = append(sources, node)
  176. }
  177. }
  178. return sources
  179. }
  180. func GetSourceConfKeys(pluginName string) (keys []string) {
  181. property := gSourceproperty[pluginName+".json"]
  182. if nil == property {
  183. return keys
  184. }
  185. for k := range property.cf {
  186. keys = append(keys, k)
  187. }
  188. return keys
  189. }
  190. func DelSourceConfKey(pluginName, confKey, language string) error {
  191. property := gSourceproperty[pluginName+".json"]
  192. if nil == property {
  193. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  194. }
  195. if nil == property.cf {
  196. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  197. }
  198. delete(property.cf, confKey)
  199. return property.saveCf(pluginName, language)
  200. }
  201. func AddSourceConfKey(pluginName, confKey, language string, content []byte) error {
  202. reqField := make(map[string]interface{})
  203. err := json.Unmarshal(content, &reqField)
  204. if nil != err {
  205. return err
  206. }
  207. property := gSourceproperty[pluginName+".json"]
  208. if nil == property {
  209. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  210. }
  211. if nil == property.cf {
  212. property.cf = make(map[string]map[string]interface{})
  213. }
  214. if 0 != len(property.cf[confKey]) {
  215. return fmt.Errorf(`%s%s`, getMsg(language, source, "confkey_already_exist"), pluginName)
  216. }
  217. property.cf[confKey] = reqField
  218. gSourceproperty[pluginName+".json"] = property
  219. return property.saveCf(pluginName, language)
  220. }
  221. func AddSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
  222. reqField := make(map[string]interface{})
  223. err := json.Unmarshal(content, &reqField)
  224. if nil != err {
  225. return err
  226. }
  227. property := gSourceproperty[pluginName+".json"]
  228. if nil == property {
  229. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  230. }
  231. if nil == property.cf {
  232. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  233. }
  234. if nil == property.cf[confKey] {
  235. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  236. }
  237. for k, v := range reqField {
  238. property.cf[confKey][k] = v
  239. }
  240. return property.saveCf(pluginName, language)
  241. }
  242. func recursionDelMap(cf, fields map[string]interface{}, language string) error {
  243. for k, v := range fields {
  244. if nil == v {
  245. delete(cf, k)
  246. continue
  247. }
  248. if delKey, ok := v.(string); ok {
  249. if 0 == len(delKey) {
  250. delete(cf, k)
  251. continue
  252. }
  253. var auxCf map[string]interface{}
  254. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  255. return fmt.Errorf(`%s%s.%s`, getMsg(language, source, "type_conversion_fail"), k, delKey)
  256. }
  257. cf[k] = auxCf
  258. delete(auxCf, delKey)
  259. continue
  260. }
  261. if reflect.Map == reflect.TypeOf(v).Kind() {
  262. var auxCf, auxFields map[string]interface{}
  263. if err := cast.MapToStruct(cf[k], &auxCf); nil != err {
  264. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
  265. }
  266. cf[k] = auxCf
  267. if err := cast.MapToStruct(v, &auxFields); nil != err {
  268. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
  269. }
  270. if err := recursionDelMap(auxCf, auxFields, language); nil != err {
  271. return err
  272. }
  273. }
  274. }
  275. return nil
  276. }
  277. func DelSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
  278. reqField := make(map[string]interface{})
  279. err := json.Unmarshal(content, &reqField)
  280. if nil != err {
  281. return err
  282. }
  283. property := gSourceproperty[pluginName+".json"]
  284. if nil == property {
  285. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  286. }
  287. if nil == property.cf {
  288. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  289. }
  290. if nil == property.cf[confKey] {
  291. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  292. }
  293. err = recursionDelMap(property.cf[confKey], reqField, language)
  294. if nil != err {
  295. return err
  296. }
  297. return property.saveCf(pluginName, language)
  298. }
  299. func recursionNewFields(template []field, conf map[string]interface{}, ret *[]field) error {
  300. for i := 0; i < len(template); i++ {
  301. p := template[i]
  302. v, ok := conf[template[i].Name]
  303. if ok {
  304. p.Exist = true
  305. } else {
  306. p.Exist = false
  307. continue
  308. }
  309. var auxRet, auxTemplate []field
  310. p.Default = &auxRet
  311. if nil == v {
  312. p.Default = v
  313. } else {
  314. if reflect.Map == reflect.TypeOf(v).Kind() {
  315. var nextCf map[string]interface{}
  316. if tmp, ok := v.(map[interface{}]interface{}); ok {
  317. nextCf = cast.ConvertMap(tmp)
  318. } else {
  319. if err := cast.MapToStruct(v, &nextCf); nil != err {
  320. return err
  321. }
  322. }
  323. if err := cast.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  324. return err
  325. }
  326. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  327. return err
  328. }
  329. } else {
  330. p.Default = v
  331. }
  332. }
  333. *ret = append(*ret, p)
  334. }
  335. return nil
  336. }
  337. func (p *sourceProperty) cfToMeta(language string) (*uiSource, error) {
  338. fields := p.meta.ConfKeys["default"]
  339. ret := make(map[string][]field)
  340. for k, kvs := range p.cf {
  341. var sli []field
  342. err := recursionNewFields(fields, kvs, &sli)
  343. if nil != err {
  344. return nil, fmt.Errorf(`%s%v`, getMsg(language, "source", "type_conversion_fail"), err)
  345. }
  346. ret[k] = sli
  347. }
  348. meta := new(uiSource)
  349. *meta = *(p.meta)
  350. meta.ConfKeys = ret
  351. return meta, nil
  352. }
  353. func (p *sourceProperty) saveCf(pluginName, language string) error {
  354. confDir, err := conf.GetConfLoc()
  355. if nil != err {
  356. return fmt.Errorf(`%s%v`, getMsg(language, source, "not_found_file"), err)
  357. }
  358. dir := path.Join(confDir, "sources")
  359. if "mqtt" == pluginName {
  360. pluginName = "mqtt_source"
  361. dir = confDir
  362. }
  363. filePath := path.Join(dir, pluginName+".yaml")
  364. err = filex.WriteYamlMarshal(filePath, p.cf)
  365. if nil != err {
  366. return fmt.Errorf(`%s%v`, getMsg(language, "source", "write_data_fail"), err)
  367. }
  368. return nil
  369. }