sourceMeta.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package plugins
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "io/ioutil"
  7. "path"
  8. "strings"
  9. )
  10. type (
  11. sourceMeta struct {
  12. Author *author `json:"author"`
  13. HelpUrl *language `json:"helpUrl"`
  14. Libs []string `json:"libs"`
  15. ConfKeys map[string][]*field `json:"properties"`
  16. }
  17. sourceProperty struct {
  18. cf map[string]map[string]interface{}
  19. meta *sourceMeta
  20. }
  21. )
  22. var g_sourceProperty map[string]*sourceProperty
  23. func (this *Manager) readSourceMetaFile(filePath string) (*sourceProperty, error) {
  24. ptrMeta := new(sourceMeta)
  25. err := common.ReadJsonUnmarshal(filePath, ptrMeta)
  26. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  27. return nil, fmt.Errorf("file:%s err:%v", filePath, err)
  28. }
  29. if 0 == len(ptrMeta.ConfKeys["default"]) {
  30. return nil, fmt.Errorf("not found default confKey %s", filePath)
  31. }
  32. yamlData := make(map[string]map[string]interface{})
  33. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  34. err = common.ReadYamlUnmarshal(filePath, &yamlData)
  35. if nil != err {
  36. return nil, fmt.Errorf("file:%s err:%v", filePath, err)
  37. }
  38. if 0 == len(yamlData["default"]) {
  39. return nil, fmt.Errorf("not found default confKey from %s", filePath)
  40. }
  41. property := new(sourceProperty)
  42. property.cf = yamlData
  43. property.meta = ptrMeta
  44. return property, err
  45. }
  46. func (this *Manager) readSourceMetaDir() error {
  47. confDir, err := common.GetConfLoc()
  48. if nil != err {
  49. return err
  50. }
  51. dir := path.Join(confDir, "sources")
  52. infos, err := ioutil.ReadDir(dir)
  53. if nil != err {
  54. return err
  55. }
  56. tmpMap := make(map[string]*sourceProperty)
  57. tmpMap["mqtt_source.json"], err = this.readSourceMetaFile(path.Join(confDir, "mqtt_source.json"))
  58. if nil != err {
  59. return err
  60. }
  61. for _, info := range infos {
  62. fileName := info.Name()
  63. if strings.HasSuffix(fileName, ".json") {
  64. filePath := path.Join(dir, fileName)
  65. tmpMap[fileName], err = this.readSourceMetaFile(filePath)
  66. if nil != err {
  67. return err
  68. }
  69. common.Log.Infof("sourceMeta file : %s", fileName)
  70. }
  71. }
  72. g_sourceProperty = tmpMap
  73. return nil
  74. }
  75. func (this *Manager) GetSourceMeta(pluginName string) (ptrSourceProperty *sourceMeta, err error) {
  76. property, ok := g_sourceProperty[pluginName+".json"]
  77. if ok {
  78. property.cfToMeta()
  79. return property.meta, nil
  80. }
  81. return nil, fmt.Errorf("not found plugin %s", pluginName)
  82. }
  83. func (this *Manager) GetSources() (sources []string) {
  84. for fileName, _ := range g_sourceProperty {
  85. sources = append(sources, strings.TrimSuffix(fileName, `.json`))
  86. }
  87. return sources
  88. }
  89. func (this *Manager) GetSourceConfKeys(pluginName string) (keys []string) {
  90. property := g_sourceProperty[pluginName+".json"]
  91. if nil == property {
  92. return keys
  93. }
  94. for k, _ := range property.cf {
  95. keys = append(keys, k)
  96. }
  97. return keys
  98. }
  99. func (this *Manager) DelSourceConfKey(pluginName, confKey string) error {
  100. property := g_sourceProperty[pluginName+".json"]
  101. if nil == property {
  102. return fmt.Errorf("not found plugin %s", pluginName)
  103. }
  104. if nil == property.cf {
  105. return fmt.Errorf("not found confKey %s", confKey)
  106. }
  107. delete(property.cf, confKey)
  108. g_sourceProperty[pluginName+".json"] = property
  109. return property.saveCf(pluginName)
  110. }
  111. func (this *Manager) AddSourceConfKey(pluginName, confKey, content string) error {
  112. reqField := make(map[string]interface{})
  113. err := json.Unmarshal([]byte(content), &reqField)
  114. if nil != err {
  115. return err
  116. }
  117. property := g_sourceProperty[pluginName+".json"]
  118. if nil == property {
  119. return fmt.Errorf("not found plugin %s", pluginName)
  120. }
  121. if nil == property.cf {
  122. return fmt.Errorf("not found confKey %s", confKey)
  123. }
  124. if 0 != len(property.cf[confKey]) {
  125. return fmt.Errorf("exist confKey %s", confKey)
  126. }
  127. property.cf[confKey] = reqField
  128. g_sourceProperty[pluginName+".json"] = property
  129. return property.saveCf(pluginName)
  130. }
  131. func (this *Manager) UpdateSourceConfKey(pluginName, confKey, content string) error {
  132. reqField := make(map[string]interface{})
  133. err := json.Unmarshal([]byte(content), &reqField)
  134. if nil != err {
  135. return err
  136. }
  137. property := g_sourceProperty[pluginName+".json"]
  138. if nil == property {
  139. return fmt.Errorf("not found plugin %s", pluginName)
  140. }
  141. if nil == property.cf {
  142. return fmt.Errorf("not found confKey %s", confKey)
  143. }
  144. if 0 == len(property.cf[confKey]) {
  145. return fmt.Errorf("not found confKey %s", confKey)
  146. }
  147. for k, v := range reqField {
  148. property.cf[confKey][k] = v
  149. }
  150. g_sourceProperty[pluginName+".json"] = property
  151. return property.saveCf(pluginName)
  152. }
  153. func (this *sourceProperty) newFields(fields []*field, m map[string]interface{}, sli *[]*field) error {
  154. for k, v := range m {
  155. p := new(field)
  156. for _, fd := range fields {
  157. if fd.Name == k {
  158. *p = *fd
  159. *sli = append(*sli, p)
  160. switch t := v.(type) {
  161. case map[interface{}]interface{}:
  162. tt := common.ConvertMap(t)
  163. var tmpSli, tmpFields []*field
  164. p.Default = &tmpSli
  165. b, err := json.Marshal(fd.Default)
  166. if nil != err {
  167. return err
  168. }
  169. err = json.Unmarshal(b, &tmpFields)
  170. if nil != err {
  171. return err
  172. }
  173. this.newFields(tmpFields, tt, &tmpSli)
  174. case map[string]interface{}:
  175. var tmpSli []*field
  176. p.Default = &tmpSli
  177. this.newFields(fields, t, &tmpSli)
  178. default:
  179. p.Default = v
  180. }
  181. break
  182. }
  183. }
  184. }
  185. return nil
  186. }
  187. func (this *sourceProperty) cfToMeta() {
  188. fields := this.meta.ConfKeys["default"]
  189. ret := make(map[string][]*field)
  190. for k, kvs := range this.cf {
  191. var sli []*field
  192. this.newFields(fields, kvs, &sli)
  193. ret[k] = sli
  194. }
  195. this.meta.ConfKeys = ret
  196. }
  197. func (this *sourceProperty) saveCf(pluginName string) error {
  198. confDir, err := common.GetConfLoc()
  199. if nil != err {
  200. return err
  201. }
  202. dir := path.Join(confDir, "sources")
  203. if "mqtt_source" == pluginName {
  204. dir = confDir
  205. }
  206. filePath := path.Join(dir, pluginName+".yaml")
  207. for key, kvs := range this.cf {
  208. for k, v := range kvs {
  209. switch t := v.(type) {
  210. case map[interface{}]interface{}:
  211. kvs[k] = common.ConvertMap(t)
  212. this.cf[key] = kvs
  213. }
  214. }
  215. }
  216. return common.WriteYamlMarshal(filePath, this.cf)
  217. }