sourceMeta.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package plugins
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. type (
  12. sourceMeta struct {
  13. Author *author `json:"author"`
  14. HelpUrl *language `json:"helpUrl"`
  15. Libs []string `json:"libs"`
  16. ConfKeys map[string][]*field `json:"properties"`
  17. }
  18. sourceProperty struct {
  19. cf map[string]map[string]interface{}
  20. meta *sourceMeta
  21. }
  22. )
  23. var g_sourceProperty map[string]*sourceProperty
  24. func readSourceMetaFile(filePath string) (*sourceProperty, error) {
  25. ptrMeta := new(sourceMeta)
  26. err := common.ReadJsonUnmarshal(filePath, ptrMeta)
  27. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  28. return nil, fmt.Errorf("file:%s err:%v", filePath, err)
  29. }
  30. if 0 == len(ptrMeta.ConfKeys["default"]) {
  31. return nil, fmt.Errorf("not found default confKey %s", filePath)
  32. }
  33. yamlData := make(map[string]map[string]interface{})
  34. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  35. err = common.ReadYamlUnmarshal(filePath, &yamlData)
  36. if nil != err {
  37. return nil, fmt.Errorf("file:%s err:%v", filePath, err)
  38. }
  39. if 0 == len(yamlData["default"]) {
  40. return nil, fmt.Errorf("not found default confKey from %s", filePath)
  41. }
  42. property := new(sourceProperty)
  43. property.cf = yamlData
  44. property.meta = ptrMeta
  45. return property, err
  46. }
  47. func readSourceMetaDir() error {
  48. confDir, err := common.GetConfLoc()
  49. if nil != err {
  50. return err
  51. }
  52. dir := path.Join(confDir, "sources")
  53. infos, err := ioutil.ReadDir(dir)
  54. if nil != err {
  55. return err
  56. }
  57. tmpMap := make(map[string]*sourceProperty)
  58. tmpMap["mqtt_source.json"], err = readSourceMetaFile(path.Join(confDir, "mqtt_source.json"))
  59. if nil != err {
  60. return err
  61. }
  62. for _, info := range infos {
  63. fileName := info.Name()
  64. if strings.HasSuffix(fileName, ".json") {
  65. filePath := path.Join(dir, fileName)
  66. tmpMap[fileName], err = readSourceMetaFile(filePath)
  67. if nil != err {
  68. return err
  69. }
  70. common.Log.Infof("sourceMeta file : %s", fileName)
  71. }
  72. }
  73. g_sourceProperty = tmpMap
  74. return nil
  75. }
  76. func GetSourceMeta(pluginName string) (ptrSourceProperty *sourceMeta, err error) {
  77. property, ok := g_sourceProperty[pluginName+".json"]
  78. if ok {
  79. return property.cfToMeta()
  80. }
  81. return nil, fmt.Errorf("not found plugin %s", pluginName)
  82. }
  83. func GetSources() (sources []string) {
  84. for fileName, _ := range g_sourceProperty {
  85. sources = append(sources, strings.TrimSuffix(fileName, `.json`))
  86. }
  87. return sources
  88. }
  89. func GetSourceConfKeys(pluginName string) (keys []string) {
  90. property := g_sourceProperty[pluginName+".json"]
  91. if nil == property {
  92. return keys
  93. }
  94. for k, _ := range property.cf {
  95. keys = append(keys, k)
  96. }
  97. return keys
  98. }
  99. func DelSourceConfKey(pluginName, confKey string) error {
  100. property := g_sourceProperty[pluginName+".json"]
  101. if nil == property {
  102. return fmt.Errorf("not found plugin %s", pluginName)
  103. }
  104. if nil == property.cf {
  105. return fmt.Errorf("not found confKey %s", confKey)
  106. }
  107. delete(property.cf, confKey)
  108. return property.saveCf(pluginName)
  109. }
  110. func AddSourceConfKey(pluginName, confKey, content string) error {
  111. reqField := make(map[string]interface{})
  112. err := json.Unmarshal([]byte(content), &reqField)
  113. if nil != err {
  114. return err
  115. }
  116. property := g_sourceProperty[pluginName+".json"]
  117. if nil == property {
  118. return fmt.Errorf("not found plugin %s", pluginName)
  119. }
  120. if nil == property.cf {
  121. property.cf = make(map[string]map[string]interface{})
  122. }
  123. if 0 != len(property.cf[confKey]) {
  124. return fmt.Errorf("exist confKey %s", confKey)
  125. }
  126. property.cf[confKey] = reqField
  127. g_sourceProperty[pluginName+".json"] = property
  128. return property.saveCf(pluginName)
  129. }
  130. func AddSourceConfKeyField(pluginName, confKey, content string) error {
  131. reqField := make(map[string]interface{})
  132. err := json.Unmarshal([]byte(content), &reqField)
  133. if nil != err {
  134. return err
  135. }
  136. property := g_sourceProperty[pluginName+".json"]
  137. if nil == property {
  138. return fmt.Errorf("not found plugin %s", pluginName)
  139. }
  140. if nil == property.cf {
  141. return fmt.Errorf("not found confKey %s", confKey)
  142. }
  143. if nil == property.cf[confKey] {
  144. return fmt.Errorf("not found confKey %s", confKey)
  145. }
  146. for k, v := range reqField {
  147. property.cf[confKey][k] = v
  148. }
  149. return property.saveCf(pluginName)
  150. }
  151. func recursionDelMap(cf, fields map[string]interface{}) error {
  152. for k, v := range fields {
  153. if nil == v {
  154. delete(cf, k)
  155. continue
  156. }
  157. if delKey, ok := v.(string); ok {
  158. if 0 == len(delKey) {
  159. delete(cf, k)
  160. continue
  161. }
  162. var auxCf map[string]interface{}
  163. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  164. return fmt.Errorf("not found second key:%s.%s", k, delKey)
  165. }
  166. cf[k] = auxCf
  167. delete(auxCf, delKey)
  168. continue
  169. }
  170. if reflect.Map == reflect.TypeOf(v).Kind() {
  171. var auxCf, auxFields map[string]interface{}
  172. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  173. return fmt.Errorf("not found second key:%s.%v", k, v)
  174. }
  175. cf[k] = auxCf
  176. if err := common.MapToStruct(v, &auxFields); nil != err {
  177. return fmt.Errorf("requestef format err:%s.%v", k, v)
  178. }
  179. if err := recursionDelMap(auxCf, auxFields); nil != err {
  180. return err
  181. }
  182. }
  183. }
  184. return nil
  185. }
  186. func DelSourceConfKeyField(pluginName, confKey, content string) error {
  187. reqField := make(map[string]interface{})
  188. err := json.Unmarshal([]byte(content), &reqField)
  189. if nil != err {
  190. return err
  191. }
  192. property := g_sourceProperty[pluginName+".json"]
  193. if nil == property {
  194. return fmt.Errorf("not found plugin %s", pluginName)
  195. }
  196. if nil == property.cf {
  197. return fmt.Errorf("not found confKey %s", confKey)
  198. }
  199. if nil == property.cf[confKey] {
  200. return fmt.Errorf("not found confKey %s", confKey)
  201. }
  202. err = recursionDelMap(property.cf[confKey], reqField)
  203. if nil != err {
  204. return err
  205. }
  206. return property.saveCf(pluginName)
  207. }
  208. func recursionNewFields(template []*field, conf map[string]interface{}, ret *[]*field) error {
  209. for i := 0; i < len(template); i++ {
  210. p := new(field)
  211. *p = *template[i]
  212. *ret = append(*ret, p)
  213. v, ok := conf[template[i].Name]
  214. if ok {
  215. p.Exist = true
  216. } else {
  217. p.Exist = false
  218. continue
  219. }
  220. var auxRet, auxTemplate []*field
  221. p.Default = &auxRet
  222. if nil == v {
  223. p.Default = v
  224. } else {
  225. if reflect.Map == reflect.TypeOf(v).Kind() {
  226. var nextCf map[string]interface{}
  227. if tmp, ok := v.(map[interface{}]interface{}); ok {
  228. nextCf = common.ConvertMap(tmp)
  229. } else {
  230. if err := common.MapToStruct(v, &nextCf); nil != err {
  231. return err
  232. }
  233. }
  234. if err := common.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  235. return err
  236. }
  237. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  238. return err
  239. }
  240. } else {
  241. p.Default = v
  242. }
  243. }
  244. }
  245. return nil
  246. }
  247. func (this *sourceProperty) cfToMeta() (*sourceMeta, error) {
  248. fields := this.meta.ConfKeys["default"]
  249. ret := make(map[string][]*field)
  250. for k, kvs := range this.cf {
  251. var sli []*field
  252. err := recursionNewFields(fields, kvs, &sli)
  253. if nil != err {
  254. return nil, err
  255. }
  256. ret[k] = sli
  257. }
  258. meta := new(sourceMeta)
  259. *meta = *(this.meta)
  260. meta.ConfKeys = ret
  261. return meta, nil
  262. }
  263. func (this *sourceProperty) saveCf(pluginName string) error {
  264. confDir, err := common.GetConfLoc()
  265. if nil != err {
  266. return err
  267. }
  268. dir := path.Join(confDir, "sources")
  269. if "mqtt_source" == pluginName {
  270. dir = confDir
  271. }
  272. filePath := path.Join(dir, pluginName+".yaml")
  273. return common.WriteYamlMarshal(filePath, this.cf)
  274. }