sourceMeta.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. package plugins
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. type (
  12. fileSource struct {
  13. About *fileAbout `json:"about"`
  14. Libs []string `json:"libs"`
  15. ConfKeys map[string][]*fileField `json:"properties"`
  16. }
  17. uiSource struct {
  18. About *about `json:"about"`
  19. Libs []string `json:"libs"`
  20. ConfKeys map[string][]*field `json:"properties"`
  21. }
  22. sourceProperty struct {
  23. cf map[string]map[string]interface{}
  24. meta *uiSource
  25. }
  26. )
  27. func isInternalSource(fiName string) bool {
  28. internal := []string{`edgex.json`, `httppull.json`, `mqtt.json`}
  29. for _, v := range internal {
  30. if v == fiName {
  31. return true
  32. }
  33. }
  34. return false
  35. }
  36. func newUiSource(fi *fileSource) (*uiSource, error) {
  37. if nil == fi {
  38. return nil, nil
  39. }
  40. var err error
  41. ui := new(uiSource)
  42. ui.Libs = fi.Libs
  43. ui.About = newAbout(fi.About)
  44. ui.ConfKeys = make(map[string][]*field)
  45. for k, fields := range fi.ConfKeys {
  46. if ui.ConfKeys[k], err = newField(fields); nil != err {
  47. return nil, err
  48. }
  49. }
  50. return ui, nil
  51. }
  52. var g_sourceProperty map[string]*sourceProperty
  53. func (m *Manager) readSourceMetaFile(filePath string) error {
  54. fileName := path.Base(filePath)
  55. if "mqtt_source.json" == fileName {
  56. fileName = "mqtt.json"
  57. }
  58. ptrMeta := new(fileSource)
  59. err := common.ReadJsonUnmarshal(filePath, ptrMeta)
  60. if nil != err || 0 == len(ptrMeta.ConfKeys) {
  61. return fmt.Errorf("file:%s err:%v", filePath, err)
  62. }
  63. if 0 == len(ptrMeta.ConfKeys["default"]) {
  64. return fmt.Errorf("not found default confKey %s", filePath)
  65. }
  66. if nil == ptrMeta.About {
  67. return fmt.Errorf("not found about of %s", filePath)
  68. } else if isInternalSource(fileName) {
  69. ptrMeta.About.Installed = true
  70. } else {
  71. _, ptrMeta.About.Installed = m.registry.Get(SOURCE, strings.TrimSuffix(fileName, `.json`))
  72. }
  73. yamlData := make(map[string]map[string]interface{})
  74. filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
  75. err = common.ReadYamlUnmarshal(filePath, &yamlData)
  76. if nil != err {
  77. return fmt.Errorf("file:%s err:%v", filePath, err)
  78. }
  79. if 0 == len(yamlData["default"]) {
  80. return fmt.Errorf("not found default confKey from %s", filePath)
  81. }
  82. property := new(sourceProperty)
  83. property.cf = yamlData
  84. property.meta, err = newUiSource(ptrMeta)
  85. if nil != err {
  86. return err
  87. }
  88. g_sourceProperty[fileName] = property
  89. return err
  90. }
  91. func (m *Manager) readSourceMetaDir() error {
  92. g_sourceProperty = make(map[string]*sourceProperty)
  93. confDir, err := common.GetConfLoc()
  94. if nil != err {
  95. return err
  96. }
  97. dir := path.Join(confDir, "sources")
  98. infos, err := ioutil.ReadDir(dir)
  99. if nil != err {
  100. return err
  101. }
  102. if err = m.readSourceMetaFile(path.Join(confDir, "mqtt_source.json")); nil != err {
  103. return err
  104. }
  105. for _, info := range infos {
  106. fileName := info.Name()
  107. if strings.HasSuffix(fileName, ".json") {
  108. filePath := path.Join(dir, fileName)
  109. if err = m.readSourceMetaFile(filePath); nil != err {
  110. return err
  111. }
  112. common.Log.Infof("sourceMeta file : %s", fileName)
  113. }
  114. }
  115. return nil
  116. }
  117. func GetSourceConf(pluginName string) (b []byte, err error) {
  118. property, ok := g_sourceProperty[pluginName+".json"]
  119. if ok {
  120. cf := make(map[string]map[string]interface{})
  121. for key, kvs := range property.cf {
  122. aux := make(map[interface{}]interface{})
  123. for k, v := range kvs {
  124. aux[k] = v
  125. }
  126. cf[key] = common.ConvertMap(aux)
  127. }
  128. return json.Marshal(cf)
  129. }
  130. return nil, fmt.Errorf("not found plugin %s", pluginName)
  131. }
  132. func GetSourceMeta(pluginName string) (ptrSourceProperty *uiSource, err error) {
  133. property, ok := g_sourceProperty[pluginName+".json"]
  134. if ok {
  135. return property.cfToMeta()
  136. }
  137. return nil, fmt.Errorf("not found plugin %s", pluginName)
  138. }
  139. func GetSources() (sources []*pluginfo) {
  140. for fileName, v := range g_sourceProperty {
  141. node := new(pluginfo)
  142. node.Name = strings.TrimSuffix(fileName, `.json`)
  143. if nil == v.meta {
  144. continue
  145. }
  146. if nil == v.meta.About {
  147. continue
  148. }
  149. node.About = v.meta.About
  150. i := 0
  151. for ; i < len(sources); i++ {
  152. if node.Name <= sources[i].Name {
  153. sources = append(sources, node)
  154. copy(sources[i+1:], sources[i:])
  155. sources[i] = node
  156. break
  157. }
  158. }
  159. if len(sources) == i {
  160. sources = append(sources, node)
  161. }
  162. }
  163. return sources
  164. }
  165. func GetSourceConfKeys(pluginName string) (keys []string) {
  166. property := g_sourceProperty[pluginName+".json"]
  167. if nil == property {
  168. return keys
  169. }
  170. for k, _ := range property.cf {
  171. keys = append(keys, k)
  172. }
  173. return keys
  174. }
  175. func DelSourceConfKey(pluginName, confKey string) error {
  176. property := g_sourceProperty[pluginName+".json"]
  177. if nil == property {
  178. return fmt.Errorf("not found plugin %s", pluginName)
  179. }
  180. if nil == property.cf {
  181. return fmt.Errorf("not found confKey %s", confKey)
  182. }
  183. delete(property.cf, confKey)
  184. return property.saveCf(pluginName)
  185. }
  186. func AddSourceConfKey(pluginName, confKey string, content []byte) error {
  187. reqField := make(map[string]interface{})
  188. err := json.Unmarshal(content, &reqField)
  189. if nil != err {
  190. return err
  191. }
  192. property := g_sourceProperty[pluginName+".json"]
  193. if nil == property {
  194. return fmt.Errorf("not found plugin %s", pluginName)
  195. }
  196. if nil == property.cf {
  197. property.cf = make(map[string]map[string]interface{})
  198. }
  199. if 0 != len(property.cf[confKey]) {
  200. return fmt.Errorf("exist confKey %s", confKey)
  201. }
  202. property.cf[confKey] = reqField
  203. g_sourceProperty[pluginName+".json"] = property
  204. return property.saveCf(pluginName)
  205. }
  206. func AddSourceConfKeyField(pluginName, confKey string, content []byte) error {
  207. reqField := make(map[string]interface{})
  208. err := json.Unmarshal(content, &reqField)
  209. if nil != err {
  210. return err
  211. }
  212. property := g_sourceProperty[pluginName+".json"]
  213. if nil == property {
  214. return fmt.Errorf("not found plugin %s", pluginName)
  215. }
  216. if nil == property.cf {
  217. return fmt.Errorf("not found confKey %s", confKey)
  218. }
  219. if nil == property.cf[confKey] {
  220. return fmt.Errorf("not found confKey %s", confKey)
  221. }
  222. for k, v := range reqField {
  223. property.cf[confKey][k] = v
  224. }
  225. return property.saveCf(pluginName)
  226. }
  227. func recursionDelMap(cf, fields map[string]interface{}) error {
  228. for k, v := range fields {
  229. if nil == v {
  230. delete(cf, k)
  231. continue
  232. }
  233. if delKey, ok := v.(string); ok {
  234. if 0 == len(delKey) {
  235. delete(cf, k)
  236. continue
  237. }
  238. var auxCf map[string]interface{}
  239. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  240. return fmt.Errorf("not found second key:%s.%s", k, delKey)
  241. }
  242. cf[k] = auxCf
  243. delete(auxCf, delKey)
  244. continue
  245. }
  246. if reflect.Map == reflect.TypeOf(v).Kind() {
  247. var auxCf, auxFields map[string]interface{}
  248. if err := common.MapToStruct(cf[k], &auxCf); nil != err {
  249. return fmt.Errorf("not found second key:%s.%v", k, v)
  250. }
  251. cf[k] = auxCf
  252. if err := common.MapToStruct(v, &auxFields); nil != err {
  253. return fmt.Errorf("requestef format err:%s.%v", k, v)
  254. }
  255. if err := recursionDelMap(auxCf, auxFields); nil != err {
  256. return err
  257. }
  258. }
  259. }
  260. return nil
  261. }
  262. func DelSourceConfKeyField(pluginName, confKey string, content []byte) error {
  263. reqField := make(map[string]interface{})
  264. err := json.Unmarshal(content, &reqField)
  265. if nil != err {
  266. return err
  267. }
  268. property := g_sourceProperty[pluginName+".json"]
  269. if nil == property {
  270. return fmt.Errorf("not found plugin %s", pluginName)
  271. }
  272. if nil == property.cf {
  273. return fmt.Errorf("not found confKey %s", confKey)
  274. }
  275. if nil == property.cf[confKey] {
  276. return fmt.Errorf("not found confKey %s", confKey)
  277. }
  278. err = recursionDelMap(property.cf[confKey], reqField)
  279. if nil != err {
  280. return err
  281. }
  282. return property.saveCf(pluginName)
  283. }
  284. func recursionNewFields(template []*field, conf map[string]interface{}, ret *[]*field) error {
  285. for i := 0; i < len(template); i++ {
  286. p := new(field)
  287. *p = *template[i]
  288. *ret = append(*ret, p)
  289. v, ok := conf[template[i].Name]
  290. if ok {
  291. p.Exist = true
  292. } else {
  293. p.Exist = false
  294. continue
  295. }
  296. var auxRet, auxTemplate []*field
  297. p.Default = &auxRet
  298. if nil == v {
  299. p.Default = v
  300. } else {
  301. if reflect.Map == reflect.TypeOf(v).Kind() {
  302. var nextCf map[string]interface{}
  303. if tmp, ok := v.(map[interface{}]interface{}); ok {
  304. nextCf = common.ConvertMap(tmp)
  305. } else {
  306. if err := common.MapToStruct(v, &nextCf); nil != err {
  307. return err
  308. }
  309. }
  310. if err := common.MapToStruct(template[i].Default, &auxTemplate); nil != err {
  311. return err
  312. }
  313. if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
  314. return err
  315. }
  316. } else {
  317. p.Default = v
  318. }
  319. }
  320. }
  321. return nil
  322. }
  323. func (this *sourceProperty) cfToMeta() (*uiSource, error) {
  324. fields := this.meta.ConfKeys["default"]
  325. ret := make(map[string][]*field)
  326. for k, kvs := range this.cf {
  327. var sli []*field
  328. err := recursionNewFields(fields, kvs, &sli)
  329. if nil != err {
  330. return nil, err
  331. }
  332. ret[k] = sli
  333. }
  334. meta := new(uiSource)
  335. *meta = *(this.meta)
  336. meta.ConfKeys = ret
  337. return meta, nil
  338. }
  339. func (this *sourceProperty) saveCf(pluginName string) error {
  340. confDir, err := common.GetConfLoc()
  341. if nil != err {
  342. return err
  343. }
  344. dir := path.Join(confDir, "sources")
  345. if "mqtt" == pluginName {
  346. pluginName = "mqtt_source"
  347. dir = confDir
  348. }
  349. filePath := path.Join(dir, pluginName+".yaml")
  350. return common.WriteYamlMarshal(filePath, this.cf)
  351. }