sourceMeta.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package plugins
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. type (
  12. fileSource struct {
  13. About *fileAbout `json:"about"`
  14. Libs []string `json:"libs"`
  15. ConfKeys map[string][]*fileField `json:"properties"`
  16. }
  17. uiSource struct {
  18. About *about `json:"about"`
  19. Libs []string `json:"libs"`
  20. ConfKeys map[string][]*field `json:"properties"`
  21. }
  22. sourceProperty struct {
  23. cf map[string]map[string]interface{}
  24. meta *uiSource
  25. }
  26. )
  27. func newUiSource(fi *fileSource) (*uiSource, error) {
  28. if nil == fi {
  29. return nil, nil
  30. }
  31. var err error
  32. ui := new(uiSource)
  33. ui.Libs = fi.Libs
  34. ui.About = newAbout(fi.About)
  35. ui.ConfKeys = make(map[string][]*field)
  36. for k, fields := range fi.ConfKeys {
  37. if ui.ConfKeys[k], err = newField(fields); nil != err {
  38. return nil, err
  39. }
  40. }
  41. return ui, nil
  42. }
  43. var g_sourceProperty map[string]*sourceProperty
  44. func readSourceMetaFile(filePath string) error {
  45. fileName := path.Base(filePath)
  46. ptrMeta := new(fileSource)
  47. err := common.ReadJsonUnmarshal(filePath, ptrMeta)
  48. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  49. return fmt.Errorf("file:%s err:%v", filePath, err)
  50. }
  51. if 0 == len(ptrMeta.ConfKeys["default"]) {
  52. return fmt.Errorf("not found default confKey %s", filePath)
  53. }
  54. yamlData := make(map[string]map[string]interface{})
  55. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  56. err = common.ReadYamlUnmarshal(filePath, &yamlData)
  57. if nil != err {
  58. return fmt.Errorf("file:%s err:%v", filePath, err)
  59. }
  60. if 0 == len(yamlData["default"]) {
  61. return fmt.Errorf("not found default confKey from %s", filePath)
  62. }
  63. property := new(sourceProperty)
  64. property.cf = yamlData
  65. property.meta, err = newUiSource(ptrMeta)
  66. if nil != err {
  67. return err
  68. }
  69. g_sourceProperty[fileName] = property
  70. return err
  71. }
  72. func readSourceMetaDir() error {
  73. g_sourceProperty = make(map[string]*sourceProperty)
  74. confDir, err := common.GetConfLoc()
  75. if nil != err {
  76. return err
  77. }
  78. dir := path.Join(confDir, "sources")
  79. infos, err := ioutil.ReadDir(dir)
  80. if nil != err {
  81. return err
  82. }
  83. if err = readSourceMetaFile(path.Join(confDir, "mqtt_source.json")); nil != err {
  84. return err
  85. }
  86. for _, info := range infos {
  87. fileName := info.Name()
  88. if strings.HasSuffix(fileName, ".json") {
  89. filePath := path.Join(dir, fileName)
  90. if err = readSourceMetaFile(filePath); nil != err {
  91. return err
  92. }
  93. common.Log.Infof("sourceMeta file : %s", fileName)
  94. }
  95. }
  96. return nil
  97. }
  98. func GetSourceMeta(pluginName string) (ptrSourceProperty *uiSource, err error) {
  99. property, ok := g_sourceProperty[pluginName+".json"]
  100. if ok {
  101. return property.cfToMeta()
  102. }
  103. return nil, fmt.Errorf("not found plugin %s", pluginName)
  104. }
  105. func GetSources() (sources []*pluginfo) {
  106. for fileName, v := range g_sourceProperty {
  107. node := new(pluginfo)
  108. node.Name = strings.TrimSuffix(fileName, `.json`)
  109. if nil == v.meta {
  110. continue
  111. }
  112. if nil == v.meta.About {
  113. continue
  114. }
  115. node.About = v.meta.About
  116. sources = append(sources, node)
  117. }
  118. return sources
  119. }
  120. func GetSourceConfKeys(pluginName string) (keys []string) {
  121. property := g_sourceProperty[pluginName+".json"]
  122. if nil == property {
  123. return keys
  124. }
  125. for k, _ := range property.cf {
  126. keys = append(keys, k)
  127. }
  128. return keys
  129. }
  130. func DelSourceConfKey(pluginName, confKey string) error {
  131. property := g_sourceProperty[pluginName+".json"]
  132. if nil == property {
  133. return fmt.Errorf("not found plugin %s", pluginName)
  134. }
  135. if nil == property.cf {
  136. return fmt.Errorf("not found confKey %s", confKey)
  137. }
  138. delete(property.cf, confKey)
  139. return property.saveCf(pluginName)
  140. }
  141. func AddSourceConfKey(pluginName, confKey string, content []byte) error {
  142. reqField := make(map[string]interface{})
  143. err := json.Unmarshal(content, &reqField)
  144. if nil != err {
  145. return err
  146. }
  147. property := g_sourceProperty[pluginName+".json"]
  148. if nil == property {
  149. return fmt.Errorf("not found plugin %s", pluginName)
  150. }
  151. if nil == property.cf {
  152. property.cf = make(map[string]map[string]interface{})
  153. }
  154. if 0 != len(property.cf[confKey]) {
  155. return fmt.Errorf("exist confKey %s", confKey)
  156. }
  157. property.cf[confKey] = reqField
  158. g_sourceProperty[pluginName+".json"] = property
  159. return property.saveCf(pluginName)
  160. }
  161. func AddSourceConfKeyField(pluginName, confKey string, content []byte) error {
  162. reqField := make(map[string]interface{})
  163. err := json.Unmarshal(content, &reqField)
  164. if nil != err {
  165. return err
  166. }
  167. property := g_sourceProperty[pluginName+".json"]
  168. if nil == property {
  169. return fmt.Errorf("not found plugin %s", pluginName)
  170. }
  171. if nil == property.cf {
  172. return fmt.Errorf("not found confKey %s", confKey)
  173. }
  174. if nil == property.cf[confKey] {
  175. return fmt.Errorf("not found confKey %s", confKey)
  176. }
  177. for k, v := range reqField {
  178. property.cf[confKey][k] = v
  179. }
  180. return property.saveCf(pluginName)
  181. }
  182. func recursionDelMap(cf, fields map[string]interface{}) error {
  183. for k, v := range fields {
  184. if nil == v {
  185. delete(cf, k)
  186. continue
  187. }
  188. if delKey, ok := v.(string); ok {
  189. if 0 == len(delKey) {
  190. delete(cf, k)
  191. continue
  192. }
  193. var auxCf map[string]interface{}
  194. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  195. return fmt.Errorf("not found second key:%s.%s", k, delKey)
  196. }
  197. cf[k] = auxCf
  198. delete(auxCf, delKey)
  199. continue
  200. }
  201. if reflect.Map == reflect.TypeOf(v).Kind() {
  202. var auxCf, auxFields map[string]interface{}
  203. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  204. return fmt.Errorf("not found second key:%s.%v", k, v)
  205. }
  206. cf[k] = auxCf
  207. if err := common.MapToStruct(v, &auxFields); nil != err {
  208. return fmt.Errorf("requestef format err:%s.%v", k, v)
  209. }
  210. if err := recursionDelMap(auxCf, auxFields); nil != err {
  211. return err
  212. }
  213. }
  214. }
  215. return nil
  216. }
  217. func DelSourceConfKeyField(pluginName, confKey string, content []byte) error {
  218. reqField := make(map[string]interface{})
  219. err := json.Unmarshal(content, &reqField)
  220. if nil != err {
  221. return err
  222. }
  223. property := g_sourceProperty[pluginName+".json"]
  224. if nil == property {
  225. return fmt.Errorf("not found plugin %s", pluginName)
  226. }
  227. if nil == property.cf {
  228. return fmt.Errorf("not found confKey %s", confKey)
  229. }
  230. if nil == property.cf[confKey] {
  231. return fmt.Errorf("not found confKey %s", confKey)
  232. }
  233. err = recursionDelMap(property.cf[confKey], reqField)
  234. if nil != err {
  235. return err
  236. }
  237. return property.saveCf(pluginName)
  238. }
  239. func recursionNewFields(template []*field, conf map[string]interface{}, ret *[]*field) error {
  240. for i := 0; i < len(template); i++ {
  241. p := new(field)
  242. *p = *template[i]
  243. *ret = append(*ret, p)
  244. v, ok := conf[template[i].Name]
  245. if ok {
  246. p.Exist = true
  247. } else {
  248. p.Exist = false
  249. continue
  250. }
  251. var auxRet, auxTemplate []*field
  252. p.Default = &auxRet
  253. if nil == v {
  254. p.Default = v
  255. } else {
  256. if reflect.Map == reflect.TypeOf(v).Kind() {
  257. var nextCf map[string]interface{}
  258. if tmp, ok := v.(map[interface{}]interface{}); ok {
  259. nextCf = common.ConvertMap(tmp)
  260. } else {
  261. if err := common.MapToStruct(v, &nextCf); nil != err {
  262. return err
  263. }
  264. }
  265. if err := common.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  266. return err
  267. }
  268. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  269. return err
  270. }
  271. } else {
  272. p.Default = v
  273. }
  274. }
  275. }
  276. return nil
  277. }
  278. func (this *sourceProperty) cfToMeta() (*uiSource, error) {
  279. fields := this.meta.ConfKeys["default"]
  280. ret := make(map[string][]*field)
  281. for k, kvs := range this.cf {
  282. var sli []*field
  283. err := recursionNewFields(fields, kvs, &sli)
  284. if nil != err {
  285. return nil, err
  286. }
  287. ret[k] = sli
  288. }
  289. meta := new(uiSource)
  290. *meta = *(this.meta)
  291. meta.ConfKeys = ret
  292. return meta, nil
  293. }
  294. func (this *sourceProperty) saveCf(pluginName string) error {
  295. confDir, err := common.GetConfLoc()
  296. if nil != err {
  297. return err
  298. }
  299. dir := path.Join(confDir, "sources")
  300. if "mqtt_source" == pluginName {
  301. dir = confDir
  302. }
  303. filePath := path.Join(dir, pluginName+".yaml")
  304. return common.WriteYamlMarshal(filePath, this.cf)
  305. }