sourceMeta.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. package plugins
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. type (
  12. fileSource struct {
  13. About *fileAbout `json:"about"`
  14. Libs []string `json:"libs"`
  15. ConfKeys map[string][]*fileField `json:"properties"`
  16. }
  17. uiSource struct {
  18. About *about `json:"about"`
  19. Libs []string `json:"libs"`
  20. ConfKeys map[string][]field `json:"properties"`
  21. }
  22. sourceProperty struct {
  23. cf map[string]map[string]interface{}
  24. meta *uiSource
  25. }
  26. )
  27. func isInternalSource(fiName string) bool {
  28. internal := []string{`edgex.json`, `httppull.json`, `mqtt.json`}
  29. for _, v := range internal {
  30. if v == fiName {
  31. return true
  32. }
  33. }
  34. return false
  35. }
  36. func newUiSource(fi *fileSource) (*uiSource, error) {
  37. if nil == fi {
  38. return nil, nil
  39. }
  40. var err error
  41. ui := new(uiSource)
  42. ui.Libs = fi.Libs
  43. ui.About = newAbout(fi.About)
  44. ui.ConfKeys = make(map[string][]field)
  45. for k, fields := range fi.ConfKeys {
  46. if ui.ConfKeys[k], err = newField(fields); nil != err {
  47. return nil, err
  48. }
  49. }
  50. return ui, nil
  51. }
  52. var g_sourceProperty map[string]*sourceProperty
  53. func (m *Manager) uninstalSource(name string) {
  54. if v, ok := g_sourceProperty[name+".json"]; ok {
  55. if ui := v.meta; nil != ui {
  56. if nil != ui.About {
  57. ui.About.Installed = false
  58. }
  59. }
  60. }
  61. }
  62. func (m *Manager) readSourceMetaFile(filePath string) error {
  63. fileName := path.Base(filePath)
  64. if "mqtt_source.json" == fileName {
  65. fileName = "mqtt.json"
  66. }
  67. ptrMeta := new(fileSource)
  68. err := common.ReadJsonUnmarshal(filePath, ptrMeta)
  69. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  70. return fmt.Errorf("file:%s err:%v", filePath, err)
  71. }
  72. if 0 == len(ptrMeta.ConfKeys["default"]) {
  73. return fmt.Errorf("not found default confKey %s", filePath)
  74. }
  75. if nil == ptrMeta.About {
  76. return fmt.Errorf("not found about of %s", filePath)
  77. } else if isInternalSource(fileName) {
  78. ptrMeta.About.Installed = true
  79. } else {
  80. _, ptrMeta.About.Installed = m.registry.Get(SOURCE, strings.TrimSuffix(fileName, `.json`))
  81. }
  82. yamlData := make(map[string]map[string]interface{})
  83. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  84. err = common.ReadYamlUnmarshal(filePath, &yamlData)
  85. if nil != err {
  86. return fmt.Errorf("file:%s err:%v", filePath, err)
  87. }
  88. if 0 == len(yamlData["default"]) {
  89. return fmt.Errorf("not found default confKey from %s", filePath)
  90. }
  91. property := new(sourceProperty)
  92. property.cf = yamlData
  93. property.meta, err = newUiSource(ptrMeta)
  94. if nil != err {
  95. return err
  96. }
  97. g_sourceProperty[fileName] = property
  98. return err
  99. }
  100. func (m *Manager) readSourceMetaDir() error {
  101. g_sourceProperty = make(map[string]*sourceProperty)
  102. confDir, err := common.GetConfLoc()
  103. if nil != err {
  104. return err
  105. }
  106. dir := path.Join(confDir, "sources")
  107. infos, err := ioutil.ReadDir(dir)
  108. if nil != err {
  109. return err
  110. }
  111. if err = m.readSourceMetaFile(path.Join(confDir, "mqtt_source.json")); nil != err {
  112. return err
  113. }
  114. for _, info := range infos {
  115. fileName := info.Name()
  116. if strings.HasSuffix(fileName, ".json") {
  117. filePath := path.Join(dir, fileName)
  118. if err = m.readSourceMetaFile(filePath); nil != err {
  119. return err
  120. }
  121. common.Log.Infof("sourceMeta file : %s", fileName)
  122. }
  123. }
  124. return nil
  125. }
  126. func GetSourceConf(pluginName, language string) (b []byte, err error) {
  127. property, ok := g_sourceProperty[pluginName+".json"]
  128. if ok {
  129. cf := make(map[string]map[string]interface{})
  130. for key, kvs := range property.cf {
  131. aux := make(map[interface{}]interface{})
  132. for k, v := range kvs {
  133. aux[k] = v
  134. }
  135. cf[key] = common.ConvertMap(aux)
  136. }
  137. if b, err = json.Marshal(cf); nil != err {
  138. return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
  139. } else {
  140. return b, err
  141. }
  142. }
  143. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  144. }
  145. func GetSourceMeta(pluginName, language string) (ptrSourceProperty *uiSource, err error) {
  146. property, ok := g_sourceProperty[pluginName+".json"]
  147. if ok {
  148. return property.cfToMeta(language)
  149. }
  150. return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  151. }
  152. func GetSources() (sources []*pluginfo) {
  153. for fileName, v := range g_sourceProperty {
  154. node := new(pluginfo)
  155. node.Name = strings.TrimSuffix(fileName, `.json`)
  156. if nil == v.meta {
  157. continue
  158. }
  159. if nil == v.meta.About {
  160. continue
  161. }
  162. node.About = v.meta.About
  163. i := 0
  164. for ; i < len(sources); i++ {
  165. if node.Name <= sources[i].Name {
  166. sources = append(sources, node)
  167. copy(sources[i+1:], sources[i:])
  168. sources[i] = node
  169. break
  170. }
  171. }
  172. if len(sources) == i {
  173. sources = append(sources, node)
  174. }
  175. }
  176. return sources
  177. }
  178. func GetSourceConfKeys(pluginName string) (keys []string) {
  179. property := g_sourceProperty[pluginName+".json"]
  180. if nil == property {
  181. return keys
  182. }
  183. for k := range property.cf {
  184. keys = append(keys, k)
  185. }
  186. return keys
  187. }
  188. func DelSourceConfKey(pluginName, confKey, language string) error {
  189. property := g_sourceProperty[pluginName+".json"]
  190. if nil == property {
  191. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  192. }
  193. if nil == property.cf {
  194. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  195. }
  196. delete(property.cf, confKey)
  197. return property.saveCf(pluginName, language)
  198. }
  199. func AddSourceConfKey(pluginName, confKey, language string, content []byte) error {
  200. reqField := make(map[string]interface{})
  201. err := json.Unmarshal(content, &reqField)
  202. if nil != err {
  203. return err
  204. }
  205. property := g_sourceProperty[pluginName+".json"]
  206. if nil == property {
  207. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  208. }
  209. if nil == property.cf {
  210. property.cf = make(map[string]map[string]interface{})
  211. }
  212. if 0 != len(property.cf[confKey]) {
  213. return fmt.Errorf(`%s%s`, getMsg(language, source, "confkey_already_exist"), pluginName)
  214. }
  215. property.cf[confKey] = reqField
  216. g_sourceProperty[pluginName+".json"] = property
  217. return property.saveCf(pluginName, language)
  218. }
  219. func AddSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
  220. reqField := make(map[string]interface{})
  221. err := json.Unmarshal(content, &reqField)
  222. if nil != err {
  223. return err
  224. }
  225. property := g_sourceProperty[pluginName+".json"]
  226. if nil == property {
  227. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  228. }
  229. if nil == property.cf {
  230. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  231. }
  232. if nil == property.cf[confKey] {
  233. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  234. }
  235. for k, v := range reqField {
  236. property.cf[confKey][k] = v
  237. }
  238. return property.saveCf(pluginName, language)
  239. }
  240. func recursionDelMap(cf, fields map[string]interface{}, language string) error {
  241. for k, v := range fields {
  242. if nil == v {
  243. delete(cf, k)
  244. continue
  245. }
  246. if delKey, ok := v.(string); ok {
  247. if 0 == len(delKey) {
  248. delete(cf, k)
  249. continue
  250. }
  251. var auxCf map[string]interface{}
  252. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  253. return fmt.Errorf(`%s%s.%s`, getMsg(language, source, "type_conversion_fail"), k, delKey)
  254. }
  255. cf[k] = auxCf
  256. delete(auxCf, delKey)
  257. continue
  258. }
  259. if reflect.Map == reflect.TypeOf(v).Kind() {
  260. var auxCf, auxFields map[string]interface{}
  261. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  262. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
  263. }
  264. cf[k] = auxCf
  265. if err := common.MapToStruct(v, &auxFields); nil != err {
  266. return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
  267. }
  268. if err := recursionDelMap(auxCf, auxFields, language); nil != err {
  269. return err
  270. }
  271. }
  272. }
  273. return nil
  274. }
  275. func DelSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
  276. reqField := make(map[string]interface{})
  277. err := json.Unmarshal(content, &reqField)
  278. if nil != err {
  279. return err
  280. }
  281. property := g_sourceProperty[pluginName+".json"]
  282. if nil == property {
  283. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
  284. }
  285. if nil == property.cf {
  286. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  287. }
  288. if nil == property.cf[confKey] {
  289. return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
  290. }
  291. err = recursionDelMap(property.cf[confKey], reqField, language)
  292. if nil != err {
  293. return err
  294. }
  295. return property.saveCf(pluginName, language)
  296. }
  297. func recursionNewFields(template []field, conf map[string]interface{}, ret *[]field) error {
  298. for i := 0; i < len(template); i++ {
  299. p := template[i]
  300. v, ok := conf[template[i].Name]
  301. if ok {
  302. p.Exist = true
  303. } else {
  304. p.Exist = false
  305. continue
  306. }
  307. var auxRet, auxTemplate []field
  308. p.Default = &auxRet
  309. if nil == v {
  310. p.Default = v
  311. } else {
  312. if reflect.Map == reflect.TypeOf(v).Kind() {
  313. var nextCf map[string]interface{}
  314. if tmp, ok := v.(map[interface{}]interface{}); ok {
  315. nextCf = common.ConvertMap(tmp)
  316. } else {
  317. if err := common.MapToStruct(v, &nextCf); nil != err {
  318. return err
  319. }
  320. }
  321. if err := common.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  322. return err
  323. }
  324. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  325. return err
  326. }
  327. } else {
  328. p.Default = v
  329. }
  330. }
  331. *ret = append(*ret, p)
  332. }
  333. return nil
  334. }
  335. func (this *sourceProperty) cfToMeta(language string) (*uiSource, error) {
  336. fields := this.meta.ConfKeys["default"]
  337. ret := make(map[string][]field)
  338. for k, kvs := range this.cf {
  339. var sli []field
  340. err := recursionNewFields(fields, kvs, &sli)
  341. if nil != err {
  342. return nil, fmt.Errorf(`%s%v`, getMsg(language, "source", "type_conversion_fail"), err)
  343. }
  344. ret[k] = sli
  345. }
  346. meta := new(uiSource)
  347. *meta = *(this.meta)
  348. meta.ConfKeys = ret
  349. return meta, nil
  350. }
  351. func (this *sourceProperty) saveCf(pluginName, language string) error {
  352. confDir, err := common.GetConfLoc()
  353. if nil != err {
  354. return fmt.Errorf(`%s%v`, getMsg(language, source, "not_found_file"), err)
  355. }
  356. dir := path.Join(confDir, "sources")
  357. if "mqtt" == pluginName {
  358. pluginName = "mqtt_source"
  359. dir = confDir
  360. }
  361. filePath := path.Join(dir, pluginName+".yaml")
  362. err = common.WriteYamlMarshal(filePath, this.cf)
  363. if nil != err {
  364. return fmt.Errorf(`%s%v`, getMsg(language, "source", "write_data_fail"), err)
  365. }
  366. return nil
  367. }