sourceMeta.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. package plugins
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. type (
  12. fileSource struct {
  13. About *fileAbout `json:"about"`
  14. Libs []string `json:"libs"`
  15. ConfKeys map[string][]*fileField `json:"properties"`
  16. }
  17. uiSource struct {
  18. About *about `json:"about"`
  19. Libs []string `json:"libs"`
  20. ConfKeys map[string][]*field `json:"properties"`
  21. }
  22. sourceProperty struct {
  23. cf map[string]map[string]interface{}
  24. meta *uiSource
  25. }
  26. )
  27. func newUiSource(fi *fileSource) (*uiSource, error) {
  28. if nil == fi {
  29. return nil, nil
  30. }
  31. var err error
  32. ui := new(uiSource)
  33. ui.Libs = fi.Libs
  34. ui.About = newAbout(fi.About)
  35. ui.ConfKeys = make(map[string][]*field)
  36. for k, fields := range fi.ConfKeys {
  37. if ui.ConfKeys[k], err = newField(fields); nil != err {
  38. return nil, err
  39. }
  40. }
  41. return ui, nil
  42. }
  43. var g_sourceProperty map[string]*sourceProperty
  44. func (m *Manager) readSourceMetaFile(filePath string) error {
  45. fileName := path.Base(filePath)
  46. ptrMeta := new(fileSource)
  47. err := common.ReadJsonUnmarshal(filePath, ptrMeta)
  48. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  49. return fmt.Errorf("file:%s err:%v", filePath, err)
  50. }
  51. if 0 == len(ptrMeta.ConfKeys["default"]) {
  52. return fmt.Errorf("not found default confKey %s", filePath)
  53. }
  54. if nil == ptrMeta.About {
  55. return fmt.Errorf("not found about of %s", filePath)
  56. } else {
  57. _, ptrMeta.About.Installed = m.registry.Get(SOURCE, strings.TrimSuffix(fileName, `.json`))
  58. }
  59. yamlData := make(map[string]map[string]interface{})
  60. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  61. err = common.ReadYamlUnmarshal(filePath, &yamlData)
  62. if nil != err {
  63. return fmt.Errorf("file:%s err:%v", filePath, err)
  64. }
  65. if 0 == len(yamlData["default"]) {
  66. return fmt.Errorf("not found default confKey from %s", filePath)
  67. }
  68. property := new(sourceProperty)
  69. property.cf = yamlData
  70. property.meta, err = newUiSource(ptrMeta)
  71. if nil != err {
  72. return err
  73. }
  74. g_sourceProperty[fileName] = property
  75. return err
  76. }
  77. func (m *Manager) readSourceMetaDir() error {
  78. g_sourceProperty = make(map[string]*sourceProperty)
  79. confDir, err := common.GetConfLoc()
  80. if nil != err {
  81. return err
  82. }
  83. dir := path.Join(confDir, "sources")
  84. infos, err := ioutil.ReadDir(dir)
  85. if nil != err {
  86. return err
  87. }
  88. if err = m.readSourceMetaFile(path.Join(confDir, "mqtt_source.json")); nil != err {
  89. return err
  90. }
  91. for _, info := range infos {
  92. fileName := info.Name()
  93. if strings.HasSuffix(fileName, ".json") {
  94. filePath := path.Join(dir, fileName)
  95. if err = m.readSourceMetaFile(filePath); nil != err {
  96. return err
  97. }
  98. common.Log.Infof("sourceMeta file : %s", fileName)
  99. }
  100. }
  101. return nil
  102. }
  103. func GetSourceConf(pluginName string) (b []byte, err error) {
  104. property, ok := g_sourceProperty[pluginName+".json"]
  105. if ok {
  106. cf := make(map[string]map[string]interface{})
  107. for key, kvs := range property.cf {
  108. aux := make(map[interface{}]interface{})
  109. for k, v := range kvs {
  110. aux[k] = v
  111. }
  112. cf[key] = common.ConvertMap(aux)
  113. }
  114. return json.Marshal(cf)
  115. }
  116. return nil, fmt.Errorf("not found plugin %s", pluginName)
  117. }
  118. func GetSourceMeta(pluginName string) (ptrSourceProperty *uiSource, err error) {
  119. property, ok := g_sourceProperty[pluginName+".json"]
  120. if ok {
  121. return property.cfToMeta()
  122. }
  123. return nil, fmt.Errorf("not found plugin %s", pluginName)
  124. }
  125. func GetSources() (sources []*pluginfo) {
  126. for fileName, v := range g_sourceProperty {
  127. node := new(pluginfo)
  128. node.Name = strings.TrimSuffix(fileName, `.json`)
  129. if nil == v.meta {
  130. continue
  131. }
  132. if nil == v.meta.About {
  133. continue
  134. }
  135. node.About = v.meta.About
  136. i := 0
  137. for ; i < len(sources); i++ {
  138. if node.Name <= sources[i].Name {
  139. sources = append(sources, node)
  140. copy(sources[i+1:], sources[i:])
  141. sources[i] = node
  142. break
  143. }
  144. }
  145. if len(sources) == i {
  146. sources = append(sources, node)
  147. }
  148. }
  149. return sources
  150. }
  151. func GetSourceConfKeys(pluginName string) (keys []string) {
  152. property := g_sourceProperty[pluginName+".json"]
  153. if nil == property {
  154. return keys
  155. }
  156. for k, _ := range property.cf {
  157. keys = append(keys, k)
  158. }
  159. return keys
  160. }
  161. func DelSourceConfKey(pluginName, confKey string) error {
  162. property := g_sourceProperty[pluginName+".json"]
  163. if nil == property {
  164. return fmt.Errorf("not found plugin %s", pluginName)
  165. }
  166. if nil == property.cf {
  167. return fmt.Errorf("not found confKey %s", confKey)
  168. }
  169. delete(property.cf, confKey)
  170. return property.saveCf(pluginName)
  171. }
  172. func AddSourceConfKey(pluginName, confKey string, content []byte) error {
  173. reqField := make(map[string]interface{})
  174. err := json.Unmarshal(content, &reqField)
  175. if nil != err {
  176. return err
  177. }
  178. property := g_sourceProperty[pluginName+".json"]
  179. if nil == property {
  180. return fmt.Errorf("not found plugin %s", pluginName)
  181. }
  182. if nil == property.cf {
  183. property.cf = make(map[string]map[string]interface{})
  184. }
  185. if 0 != len(property.cf[confKey]) {
  186. return fmt.Errorf("exist confKey %s", confKey)
  187. }
  188. property.cf[confKey] = reqField
  189. g_sourceProperty[pluginName+".json"] = property
  190. return property.saveCf(pluginName)
  191. }
  192. func AddSourceConfKeyField(pluginName, confKey string, content []byte) error {
  193. reqField := make(map[string]interface{})
  194. err := json.Unmarshal(content, &reqField)
  195. if nil != err {
  196. return err
  197. }
  198. property := g_sourceProperty[pluginName+".json"]
  199. if nil == property {
  200. return fmt.Errorf("not found plugin %s", pluginName)
  201. }
  202. if nil == property.cf {
  203. return fmt.Errorf("not found confKey %s", confKey)
  204. }
  205. if nil == property.cf[confKey] {
  206. return fmt.Errorf("not found confKey %s", confKey)
  207. }
  208. for k, v := range reqField {
  209. property.cf[confKey][k] = v
  210. }
  211. return property.saveCf(pluginName)
  212. }
  213. func recursionDelMap(cf, fields map[string]interface{}) error {
  214. for k, v := range fields {
  215. if nil == v {
  216. delete(cf, k)
  217. continue
  218. }
  219. if delKey, ok := v.(string); ok {
  220. if 0 == len(delKey) {
  221. delete(cf, k)
  222. continue
  223. }
  224. var auxCf map[string]interface{}
  225. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  226. return fmt.Errorf("not found second key:%s.%s", k, delKey)
  227. }
  228. cf[k] = auxCf
  229. delete(auxCf, delKey)
  230. continue
  231. }
  232. if reflect.Map == reflect.TypeOf(v).Kind() {
  233. var auxCf, auxFields map[string]interface{}
  234. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  235. return fmt.Errorf("not found second key:%s.%v", k, v)
  236. }
  237. cf[k] = auxCf
  238. if err := common.MapToStruct(v, &auxFields); nil != err {
  239. return fmt.Errorf("requestef format err:%s.%v", k, v)
  240. }
  241. if err := recursionDelMap(auxCf, auxFields); nil != err {
  242. return err
  243. }
  244. }
  245. }
  246. return nil
  247. }
  248. func DelSourceConfKeyField(pluginName, confKey string, content []byte) error {
  249. reqField := make(map[string]interface{})
  250. err := json.Unmarshal(content, &reqField)
  251. if nil != err {
  252. return err
  253. }
  254. property := g_sourceProperty[pluginName+".json"]
  255. if nil == property {
  256. return fmt.Errorf("not found plugin %s", pluginName)
  257. }
  258. if nil == property.cf {
  259. return fmt.Errorf("not found confKey %s", confKey)
  260. }
  261. if nil == property.cf[confKey] {
  262. return fmt.Errorf("not found confKey %s", confKey)
  263. }
  264. err = recursionDelMap(property.cf[confKey], reqField)
  265. if nil != err {
  266. return err
  267. }
  268. return property.saveCf(pluginName)
  269. }
  270. func recursionNewFields(template []*field, conf map[string]interface{}, ret *[]*field) error {
  271. for i := 0; i < len(template); i++ {
  272. p := new(field)
  273. *p = *template[i]
  274. *ret = append(*ret, p)
  275. v, ok := conf[template[i].Name]
  276. if ok {
  277. p.Exist = true
  278. } else {
  279. p.Exist = false
  280. continue
  281. }
  282. var auxRet, auxTemplate []*field
  283. p.Default = &auxRet
  284. if nil == v {
  285. p.Default = v
  286. } else {
  287. if reflect.Map == reflect.TypeOf(v).Kind() {
  288. var nextCf map[string]interface{}
  289. if tmp, ok := v.(map[interface{}]interface{}); ok {
  290. nextCf = common.ConvertMap(tmp)
  291. } else {
  292. if err := common.MapToStruct(v, &nextCf); nil != err {
  293. return err
  294. }
  295. }
  296. if err := common.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  297. return err
  298. }
  299. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  300. return err
  301. }
  302. } else {
  303. p.Default = v
  304. }
  305. }
  306. }
  307. return nil
  308. }
  309. func (this *sourceProperty) cfToMeta() (*uiSource, error) {
  310. fields := this.meta.ConfKeys["default"]
  311. ret := make(map[string][]*field)
  312. for k, kvs := range this.cf {
  313. var sli []*field
  314. err := recursionNewFields(fields, kvs, &sli)
  315. if nil != err {
  316. return nil, err
  317. }
  318. ret[k] = sli
  319. }
  320. meta := new(uiSource)
  321. *meta = *(this.meta)
  322. meta.ConfKeys = ret
  323. return meta, nil
  324. }
  325. func (this *sourceProperty) saveCf(pluginName string) error {
  326. confDir, err := common.GetConfLoc()
  327. if nil != err {
  328. return err
  329. }
  330. dir := path.Join(confDir, "sources")
  331. if "mqtt_source" == pluginName {
  332. dir = confDir
  333. }
  334. filePath := path.Join(dir, pluginName+".yaml")
  335. return common.WriteYamlMarshal(filePath, this.cf)
  336. }