sourceMeta.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package plugins
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. type (
  12. fileSource struct {
  13. About *fileAbout `json:"about"`
  14. Libs []string `json:"libs"`
  15. ConfKeys map[string][]*fileField `json:"properties"`
  16. }
  17. uiSource struct {
  18. About *about `json:"about"`
  19. Libs []string `json:"libs"`
  20. ConfKeys map[string][]*field `json:"properties"`
  21. }
  22. sourceProperty struct {
  23. cf map[string]map[string]interface{}
  24. meta *uiSource
  25. }
  26. )
  27. func newUiSource(fi *fileSource) *uiSource {
  28. if nil == fi {
  29. return nil
  30. }
  31. ui := new(uiSource)
  32. ui.Libs = fi.Libs
  33. ui.About = newAbout(fi.About)
  34. ui.ConfKeys = make(map[string][]*field)
  35. for k, fields := range fi.ConfKeys {
  36. var sliField []*field
  37. for _, v := range fields {
  38. sliField = append(sliField, newField(v))
  39. }
  40. ui.ConfKeys[k] = sliField
  41. }
  42. return ui
  43. }
  44. var g_sourceProperty map[string]*sourceProperty
  45. func readSourceMetaFile(filePath string) error {
  46. fileName := path.Base(filePath)
  47. ptrMeta := new(fileSource)
  48. err := common.ReadJsonUnmarshal(filePath, ptrMeta)
  49. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  50. return fmt.Errorf("file:%s err:%v", filePath, err)
  51. }
  52. if 0 == len(ptrMeta.ConfKeys["default"]) {
  53. return fmt.Errorf("not found default confKey %s", filePath)
  54. }
  55. yamlData := make(map[string]map[string]interface{})
  56. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  57. err = common.ReadYamlUnmarshal(filePath, &yamlData)
  58. if nil != err {
  59. return fmt.Errorf("file:%s err:%v", filePath, err)
  60. }
  61. if 0 == len(yamlData["default"]) {
  62. return fmt.Errorf("not found default confKey from %s", filePath)
  63. }
  64. property := new(sourceProperty)
  65. property.cf = yamlData
  66. property.meta = newUiSource(ptrMeta)
  67. g_sourceProperty[fileName] = property
  68. return err
  69. }
  70. func readSourceMetaDir() error {
  71. g_sourceProperty = make(map[string]*sourceProperty)
  72. confDir, err := common.GetConfLoc()
  73. if nil != err {
  74. return err
  75. }
  76. dir := path.Join(confDir, "sources")
  77. infos, err := ioutil.ReadDir(dir)
  78. if nil != err {
  79. return err
  80. }
  81. if err = readSourceMetaFile(path.Join(confDir, "mqtt_source.json")); nil != err {
  82. return err
  83. }
  84. for _, info := range infos {
  85. fileName := info.Name()
  86. if strings.HasSuffix(fileName, ".json") {
  87. filePath := path.Join(dir, fileName)
  88. if err = readSourceMetaFile(filePath); nil != err {
  89. return err
  90. }
  91. common.Log.Infof("sourceMeta file : %s", fileName)
  92. }
  93. }
  94. return nil
  95. }
  96. func GetSourceMeta(pluginName string) (ptrSourceProperty *uiSource, err error) {
  97. property, ok := g_sourceProperty[pluginName+".json"]
  98. if ok {
  99. return property.cfToMeta()
  100. }
  101. return nil, fmt.Errorf("not found plugin %s", pluginName)
  102. }
  103. func GetSources() (sources []*pluginfo) {
  104. for fileName, v := range g_sourceProperty {
  105. node := new(pluginfo)
  106. node.Name = strings.TrimSuffix(fileName, `.json`)
  107. if nil == v.meta {
  108. continue
  109. }
  110. if nil == v.meta.About {
  111. continue
  112. }
  113. node.About = v.meta.About
  114. sources = append(sources, node)
  115. }
  116. return sources
  117. }
  118. func GetSourceConfKeys(pluginName string) (keys []string) {
  119. property := g_sourceProperty[pluginName+".json"]
  120. if nil == property {
  121. return keys
  122. }
  123. for k, _ := range property.cf {
  124. keys = append(keys, k)
  125. }
  126. return keys
  127. }
  128. func DelSourceConfKey(pluginName, confKey string) error {
  129. property := g_sourceProperty[pluginName+".json"]
  130. if nil == property {
  131. return fmt.Errorf("not found plugin %s", pluginName)
  132. }
  133. if nil == property.cf {
  134. return fmt.Errorf("not found confKey %s", confKey)
  135. }
  136. delete(property.cf, confKey)
  137. return property.saveCf(pluginName)
  138. }
  139. func AddSourceConfKey(pluginName, confKey string, content []byte) error {
  140. reqField := make(map[string]interface{})
  141. err := json.Unmarshal(content, &reqField)
  142. if nil != err {
  143. return err
  144. }
  145. property := g_sourceProperty[pluginName+".json"]
  146. if nil == property {
  147. return fmt.Errorf("not found plugin %s", pluginName)
  148. }
  149. if nil == property.cf {
  150. property.cf = make(map[string]map[string]interface{})
  151. }
  152. if 0 != len(property.cf[confKey]) {
  153. return fmt.Errorf("exist confKey %s", confKey)
  154. }
  155. property.cf[confKey] = reqField
  156. g_sourceProperty[pluginName+".json"] = property
  157. return property.saveCf(pluginName)
  158. }
  159. func AddSourceConfKeyField(pluginName, confKey string, content []byte) error {
  160. reqField := make(map[string]interface{})
  161. err := json.Unmarshal(content, &reqField)
  162. if nil != err {
  163. return err
  164. }
  165. property := g_sourceProperty[pluginName+".json"]
  166. if nil == property {
  167. return fmt.Errorf("not found plugin %s", pluginName)
  168. }
  169. if nil == property.cf {
  170. return fmt.Errorf("not found confKey %s", confKey)
  171. }
  172. if nil == property.cf[confKey] {
  173. return fmt.Errorf("not found confKey %s", confKey)
  174. }
  175. for k, v := range reqField {
  176. property.cf[confKey][k] = v
  177. }
  178. return property.saveCf(pluginName)
  179. }
  180. func recursionDelMap(cf, fields map[string]interface{}) error {
  181. for k, v := range fields {
  182. if nil == v {
  183. delete(cf, k)
  184. continue
  185. }
  186. if delKey, ok := v.(string); ok {
  187. if 0 == len(delKey) {
  188. delete(cf, k)
  189. continue
  190. }
  191. var auxCf map[string]interface{}
  192. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  193. return fmt.Errorf("not found second key:%s.%s", k, delKey)
  194. }
  195. cf[k] = auxCf
  196. delete(auxCf, delKey)
  197. continue
  198. }
  199. if reflect.Map == reflect.TypeOf(v).Kind() {
  200. var auxCf, auxFields map[string]interface{}
  201. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  202. return fmt.Errorf("not found second key:%s.%v", k, v)
  203. }
  204. cf[k] = auxCf
  205. if err := common.MapToStruct(v, &auxFields); nil != err {
  206. return fmt.Errorf("requestef format err:%s.%v", k, v)
  207. }
  208. if err := recursionDelMap(auxCf, auxFields); nil != err {
  209. return err
  210. }
  211. }
  212. }
  213. return nil
  214. }
  215. func DelSourceConfKeyField(pluginName, confKey string, content []byte) error {
  216. reqField := make(map[string]interface{})
  217. err := json.Unmarshal(content, &reqField)
  218. if nil != err {
  219. return err
  220. }
  221. property := g_sourceProperty[pluginName+".json"]
  222. if nil == property {
  223. return fmt.Errorf("not found plugin %s", pluginName)
  224. }
  225. if nil == property.cf {
  226. return fmt.Errorf("not found confKey %s", confKey)
  227. }
  228. if nil == property.cf[confKey] {
  229. return fmt.Errorf("not found confKey %s", confKey)
  230. }
  231. err = recursionDelMap(property.cf[confKey], reqField)
  232. if nil != err {
  233. return err
  234. }
  235. return property.saveCf(pluginName)
  236. }
  237. func recursionNewFields(template []*field, conf map[string]interface{}, ret *[]*field) error {
  238. for i := 0; i < len(template); i++ {
  239. p := new(field)
  240. *p = *template[i]
  241. *ret = append(*ret, p)
  242. v, ok := conf[template[i].Name]
  243. if ok {
  244. p.Exist = true
  245. } else {
  246. p.Exist = false
  247. continue
  248. }
  249. var auxRet, auxTemplate []*field
  250. p.Default = &auxRet
  251. if nil == v {
  252. p.Default = v
  253. } else {
  254. if reflect.Map == reflect.TypeOf(v).Kind() {
  255. var nextCf map[string]interface{}
  256. if tmp, ok := v.(map[interface{}]interface{}); ok {
  257. nextCf = common.ConvertMap(tmp)
  258. } else {
  259. if err := common.MapToStruct(v, &nextCf); nil != err {
  260. return err
  261. }
  262. }
  263. if err := common.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  264. return err
  265. }
  266. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  267. return err
  268. }
  269. } else {
  270. p.Default = v
  271. }
  272. }
  273. }
  274. return nil
  275. }
  276. func (this *sourceProperty) cfToMeta() (*uiSource, error) {
  277. fields := this.meta.ConfKeys["default"]
  278. ret := make(map[string][]*field)
  279. for k, kvs := range this.cf {
  280. var sli []*field
  281. err := recursionNewFields(fields, kvs, &sli)
  282. if nil != err {
  283. return nil, err
  284. }
  285. ret[k] = sli
  286. }
  287. meta := new(uiSource)
  288. *meta = *(this.meta)
  289. meta.ConfKeys = ret
  290. return meta, nil
  291. }
  292. func (this *sourceProperty) saveCf(pluginName string) error {
  293. confDir, err := common.GetConfLoc()
  294. if nil != err {
  295. return err
  296. }
  297. dir := path.Join(confDir, "sources")
  298. if "mqtt_source" == pluginName {
  299. dir = confDir
  300. }
  301. filePath := path.Join(dir, pluginName+".yaml")
  302. return common.WriteYamlMarshal(filePath, this.cf)
  303. }