sourceMeta.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. package plugins
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. type (
  12. fileSource struct {
  13. About *fileAbout `json:"about"`
  14. Libs []string `json:"libs"`
  15. ConfKeys map[string][]*fileField `json:"properties"`
  16. }
  17. uiSource struct {
  18. About *about `json:"about"`
  19. Libs []string `json:"libs"`
  20. ConfKeys map[string][]*field `json:"properties"`
  21. }
  22. sourceProperty struct {
  23. cf map[string]map[string]interface{}
  24. meta *uiSource
  25. }
  26. )
  27. func isInternalSource(fiName string) bool {
  28. internal := []string{`edgex.json`, `httppull.json`, `mqtt_source.json`}
  29. for _, v := range internal {
  30. if v == fiName {
  31. return true
  32. }
  33. }
  34. return false
  35. }
  36. func newUiSource(fi *fileSource) (*uiSource, error) {
  37. if nil == fi {
  38. return nil, nil
  39. }
  40. var err error
  41. ui := new(uiSource)
  42. ui.Libs = fi.Libs
  43. ui.About = newAbout(fi.About)
  44. ui.ConfKeys = make(map[string][]*field)
  45. for k, fields := range fi.ConfKeys {
  46. if ui.ConfKeys[k], err = newField(fields); nil != err {
  47. return nil, err
  48. }
  49. }
  50. return ui, nil
  51. }
  52. var g_sourceProperty map[string]*sourceProperty
  53. func (m *Manager) readSourceMetaFile(filePath string) error {
  54. fileName := path.Base(filePath)
  55. ptrMeta := new(fileSource)
  56. err := common.ReadJsonUnmarshal(filePath, ptrMeta)
  57. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  58. return fmt.Errorf("file:%s err:%v", filePath, err)
  59. }
  60. if 0 == len(ptrMeta.ConfKeys["default"]) {
  61. return fmt.Errorf("not found default confKey %s", filePath)
  62. }
  63. if nil == ptrMeta.About {
  64. return fmt.Errorf("not found about of %s", filePath)
  65. } else if isInternalSource(fileName) {
  66. ptrMeta.About.Installed = true
  67. } else {
  68. _, ptrMeta.About.Installed = m.registry.Get(SOURCE, strings.TrimSuffix(fileName, `.json`))
  69. }
  70. yamlData := make(map[string]map[string]interface{})
  71. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  72. err = common.ReadYamlUnmarshal(filePath, &yamlData)
  73. if nil != err {
  74. return fmt.Errorf("file:%s err:%v", filePath, err)
  75. }
  76. if 0 == len(yamlData["default"]) {
  77. return fmt.Errorf("not found default confKey from %s", filePath)
  78. }
  79. property := new(sourceProperty)
  80. property.cf = yamlData
  81. property.meta, err = newUiSource(ptrMeta)
  82. if nil != err {
  83. return err
  84. }
  85. g_sourceProperty[fileName] = property
  86. return err
  87. }
  88. func (m *Manager) readSourceMetaDir() error {
  89. g_sourceProperty = make(map[string]*sourceProperty)
  90. confDir, err := common.GetConfLoc()
  91. if nil != err {
  92. return err
  93. }
  94. dir := path.Join(confDir, "sources")
  95. infos, err := ioutil.ReadDir(dir)
  96. if nil != err {
  97. return err
  98. }
  99. if err = m.readSourceMetaFile(path.Join(confDir, "mqtt_source.json")); nil != err {
  100. return err
  101. }
  102. for _, info := range infos {
  103. fileName := info.Name()
  104. if strings.HasSuffix(fileName, ".json") {
  105. filePath := path.Join(dir, fileName)
  106. if err = m.readSourceMetaFile(filePath); nil != err {
  107. return err
  108. }
  109. common.Log.Infof("sourceMeta file : %s", fileName)
  110. }
  111. }
  112. return nil
  113. }
  114. func GetSourceConf(pluginName string) (b []byte, err error) {
  115. property, ok := g_sourceProperty[pluginName+".json"]
  116. if ok {
  117. cf := make(map[string]map[string]interface{})
  118. for key, kvs := range property.cf {
  119. aux := make(map[interface{}]interface{})
  120. for k, v := range kvs {
  121. aux[k] = v
  122. }
  123. cf[key] = common.ConvertMap(aux)
  124. }
  125. return json.Marshal(cf)
  126. }
  127. return nil, fmt.Errorf("not found plugin %s", pluginName)
  128. }
  129. func GetSourceMeta(pluginName string) (ptrSourceProperty *uiSource, err error) {
  130. property, ok := g_sourceProperty[pluginName+".json"]
  131. if ok {
  132. return property.cfToMeta()
  133. }
  134. return nil, fmt.Errorf("not found plugin %s", pluginName)
  135. }
  136. func GetSources() (sources []*pluginfo) {
  137. for fileName, v := range g_sourceProperty {
  138. node := new(pluginfo)
  139. node.Name = strings.TrimSuffix(fileName, `.json`)
  140. if nil == v.meta {
  141. continue
  142. }
  143. if nil == v.meta.About {
  144. continue
  145. }
  146. node.About = v.meta.About
  147. i := 0
  148. for ; i < len(sources); i++ {
  149. if node.Name <= sources[i].Name {
  150. sources = append(sources, node)
  151. copy(sources[i+1:], sources[i:])
  152. sources[i] = node
  153. break
  154. }
  155. }
  156. if len(sources) == i {
  157. sources = append(sources, node)
  158. }
  159. }
  160. return sources
  161. }
  162. func GetSourceConfKeys(pluginName string) (keys []string) {
  163. property := g_sourceProperty[pluginName+".json"]
  164. if nil == property {
  165. return keys
  166. }
  167. for k, _ := range property.cf {
  168. keys = append(keys, k)
  169. }
  170. return keys
  171. }
  172. func DelSourceConfKey(pluginName, confKey string) error {
  173. property := g_sourceProperty[pluginName+".json"]
  174. if nil == property {
  175. return fmt.Errorf("not found plugin %s", pluginName)
  176. }
  177. if nil == property.cf {
  178. return fmt.Errorf("not found confKey %s", confKey)
  179. }
  180. delete(property.cf, confKey)
  181. return property.saveCf(pluginName)
  182. }
  183. func AddSourceConfKey(pluginName, confKey string, content []byte) error {
  184. reqField := make(map[string]interface{})
  185. err := json.Unmarshal(content, &reqField)
  186. if nil != err {
  187. return err
  188. }
  189. property := g_sourceProperty[pluginName+".json"]
  190. if nil == property {
  191. return fmt.Errorf("not found plugin %s", pluginName)
  192. }
  193. if nil == property.cf {
  194. property.cf = make(map[string]map[string]interface{})
  195. }
  196. if 0 != len(property.cf[confKey]) {
  197. return fmt.Errorf("exist confKey %s", confKey)
  198. }
  199. property.cf[confKey] = reqField
  200. g_sourceProperty[pluginName+".json"] = property
  201. return property.saveCf(pluginName)
  202. }
  203. func AddSourceConfKeyField(pluginName, confKey string, content []byte) error {
  204. reqField := make(map[string]interface{})
  205. err := json.Unmarshal(content, &reqField)
  206. if nil != err {
  207. return err
  208. }
  209. property := g_sourceProperty[pluginName+".json"]
  210. if nil == property {
  211. return fmt.Errorf("not found plugin %s", pluginName)
  212. }
  213. if nil == property.cf {
  214. return fmt.Errorf("not found confKey %s", confKey)
  215. }
  216. if nil == property.cf[confKey] {
  217. return fmt.Errorf("not found confKey %s", confKey)
  218. }
  219. for k, v := range reqField {
  220. property.cf[confKey][k] = v
  221. }
  222. return property.saveCf(pluginName)
  223. }
  224. func recursionDelMap(cf, fields map[string]interface{}) error {
  225. for k, v := range fields {
  226. if nil == v {
  227. delete(cf, k)
  228. continue
  229. }
  230. if delKey, ok := v.(string); ok {
  231. if 0 == len(delKey) {
  232. delete(cf, k)
  233. continue
  234. }
  235. var auxCf map[string]interface{}
  236. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  237. return fmt.Errorf("not found second key:%s.%s", k, delKey)
  238. }
  239. cf[k] = auxCf
  240. delete(auxCf, delKey)
  241. continue
  242. }
  243. if reflect.Map == reflect.TypeOf(v).Kind() {
  244. var auxCf, auxFields map[string]interface{}
  245. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  246. return fmt.Errorf("not found second key:%s.%v", k, v)
  247. }
  248. cf[k] = auxCf
  249. if err := common.MapToStruct(v, &auxFields); nil != err {
  250. return fmt.Errorf("requestef format err:%s.%v", k, v)
  251. }
  252. if err := recursionDelMap(auxCf, auxFields); nil != err {
  253. return err
  254. }
  255. }
  256. }
  257. return nil
  258. }
  259. func DelSourceConfKeyField(pluginName, confKey string, content []byte) error {
  260. reqField := make(map[string]interface{})
  261. err := json.Unmarshal(content, &reqField)
  262. if nil != err {
  263. return err
  264. }
  265. property := g_sourceProperty[pluginName+".json"]
  266. if nil == property {
  267. return fmt.Errorf("not found plugin %s", pluginName)
  268. }
  269. if nil == property.cf {
  270. return fmt.Errorf("not found confKey %s", confKey)
  271. }
  272. if nil == property.cf[confKey] {
  273. return fmt.Errorf("not found confKey %s", confKey)
  274. }
  275. err = recursionDelMap(property.cf[confKey], reqField)
  276. if nil != err {
  277. return err
  278. }
  279. return property.saveCf(pluginName)
  280. }
  281. func recursionNewFields(template []*field, conf map[string]interface{}, ret *[]*field) error {
  282. for i := 0; i < len(template); i++ {
  283. p := new(field)
  284. *p = *template[i]
  285. *ret = append(*ret, p)
  286. v, ok := conf[template[i].Name]
  287. if ok {
  288. p.Exist = true
  289. } else {
  290. p.Exist = false
  291. continue
  292. }
  293. var auxRet, auxTemplate []*field
  294. p.Default = &auxRet
  295. if nil == v {
  296. p.Default = v
  297. } else {
  298. if reflect.Map == reflect.TypeOf(v).Kind() {
  299. var nextCf map[string]interface{}
  300. if tmp, ok := v.(map[interface{}]interface{}); ok {
  301. nextCf = common.ConvertMap(tmp)
  302. } else {
  303. if err := common.MapToStruct(v, &nextCf); nil != err {
  304. return err
  305. }
  306. }
  307. if err := common.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  308. return err
  309. }
  310. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  311. return err
  312. }
  313. } else {
  314. p.Default = v
  315. }
  316. }
  317. }
  318. return nil
  319. }
  320. func (this *sourceProperty) cfToMeta() (*uiSource, error) {
  321. fields := this.meta.ConfKeys["default"]
  322. ret := make(map[string][]*field)
  323. for k, kvs := range this.cf {
  324. var sli []*field
  325. err := recursionNewFields(fields, kvs, &sli)
  326. if nil != err {
  327. return nil, err
  328. }
  329. ret[k] = sli
  330. }
  331. meta := new(uiSource)
  332. *meta = *(this.meta)
  333. meta.ConfKeys = ret
  334. return meta, nil
  335. }
  336. func (this *sourceProperty) saveCf(pluginName string) error {
  337. confDir, err := common.GetConfLoc()
  338. if nil != err {
  339. return err
  340. }
  341. dir := path.Join(confDir, "sources")
  342. if "mqtt_source" == pluginName {
  343. dir = confDir
  344. }
  345. filePath := path.Join(dir, pluginName+".yaml")
  346. return common.WriteYamlMarshal(filePath, this.cf)
  347. }