sinkMeta.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package plugins
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. const (
  12. baseProperty = `properties`
  13. baseOption = `options`
  14. sink = `sink`
  15. source = `source`
  16. )
  17. type (
  18. author struct {
  19. Name string `json:"name"`
  20. Email string `json:"email"`
  21. Company string `json:"company"`
  22. Website string `json:"website"`
  23. }
  24. fileLanguage struct {
  25. English string `json:"en_US"`
  26. Chinese string `json:"zh_CN"`
  27. }
  28. fileField struct {
  29. Name string `json:"name"`
  30. Default interface{} `json:"default"`
  31. Control string `json:"control"`
  32. Optional bool `json:"optional"`
  33. Type string `json:"type"`
  34. Hint *fileLanguage `json:"hint"`
  35. Label *fileLanguage `json:"label"`
  36. Values interface{} `json:"values"`
  37. }
  38. fileAbout struct {
  39. Trial bool `json:"trial"`
  40. Installed bool `json:"installed"`
  41. Author *author `json:"author"`
  42. HelpUrl *fileLanguage `json:"helpUrl"`
  43. Description *fileLanguage `json:"description"`
  44. }
  45. fileSink struct {
  46. About *fileAbout `json:"about"`
  47. Libs []string `json:"libs"`
  48. Fields []*fileField `json:"properties"`
  49. }
  50. language struct {
  51. English string `json:"en"`
  52. Chinese string `json:"zh"`
  53. }
  54. about struct {
  55. Trial bool `json:"trial"`
  56. Installed bool `json:"installed"`
  57. Author *author `json:"author"`
  58. HelpUrl *language `json:"helpUrl"`
  59. Description *language `json:"description"`
  60. }
  61. field struct {
  62. Exist bool `json:"exist"`
  63. Name string `json:"name"`
  64. Default interface{} `json:"default"`
  65. Type string `json:"type"`
  66. Control string `json:"control"`
  67. Optional bool `json:"optional"`
  68. Values interface{} `json:"values"`
  69. Hint *language `json:"hint"`
  70. Label *language `json:"label"`
  71. }
  72. uiSink struct {
  73. About *about `json:"about"`
  74. Libs []string `json:"libs"`
  75. Fields []*field `json:"properties"`
  76. }
  77. uiSinks struct {
  78. CustomProperty map[string]*uiSink `json:"customProperty"`
  79. BaseProperty map[string]*uiSink `json:"baseProperty"`
  80. BaseOption *uiSink `json:"baseOption"`
  81. language string
  82. }
  83. )
  84. func isInternalSink(fiName string) bool {
  85. internal := []string{`edgex.json`, `log.json`, `mqtt.json`, `nop.json`, `rest.json`}
  86. for _, v := range internal {
  87. if v == fiName {
  88. return true
  89. }
  90. }
  91. return false
  92. }
  93. func newLanguage(fi *fileLanguage) *language {
  94. if nil == fi {
  95. return nil
  96. }
  97. ui := new(language)
  98. ui.English = fi.English
  99. ui.Chinese = fi.Chinese
  100. return ui
  101. }
  102. func newField(fis []*fileField) (uis []*field, err error) {
  103. for _, fi := range fis {
  104. if nil == fi {
  105. continue
  106. }
  107. ui := new(field)
  108. uis = append(uis, ui)
  109. ui.Name = fi.Name
  110. ui.Type = fi.Type
  111. ui.Control = fi.Control
  112. ui.Optional = fi.Optional
  113. ui.Values = fi.Values
  114. ui.Hint = newLanguage(fi.Hint)
  115. ui.Label = newLanguage(fi.Label)
  116. switch t := fi.Default.(type) {
  117. case []map[string]interface{}:
  118. var auxFi []*fileField
  119. if err = common.MapToStruct(t, &auxFi); nil != err {
  120. return nil, err
  121. }
  122. if ui.Default, err = newField(auxFi); nil != err {
  123. return nil, err
  124. }
  125. default:
  126. ui.Default = fi.Default
  127. }
  128. }
  129. return uis, err
  130. }
  131. func newAbout(fi *fileAbout) *about {
  132. if nil == fi {
  133. return nil
  134. }
  135. ui := new(about)
  136. ui.Trial = fi.Trial
  137. ui.Installed = fi.Installed
  138. ui.Author = fi.Author
  139. ui.HelpUrl = newLanguage(fi.HelpUrl)
  140. ui.Description = newLanguage(fi.Description)
  141. return ui
  142. }
  143. func newUiSink(fi *fileSink) (*uiSink, error) {
  144. if nil == fi {
  145. return nil, nil
  146. }
  147. var err error
  148. ui := new(uiSink)
  149. ui.Libs = fi.Libs
  150. ui.About = newAbout(fi.About)
  151. ui.Fields, err = newField(fi.Fields)
  152. return ui, err
  153. }
  154. var g_sinkMetadata map[string]*uiSink //map[fileName]
  155. func (m *Manager) readSinkMetaDir() error {
  156. g_sinkMetadata = make(map[string]*uiSink)
  157. confDir, err := common.GetConfLoc()
  158. if nil != err {
  159. return err
  160. }
  161. dir := path.Join(confDir, "sinks")
  162. files, err := ioutil.ReadDir(dir)
  163. if nil != err {
  164. return err
  165. }
  166. for _, file := range files {
  167. fname := file.Name()
  168. if !strings.HasSuffix(fname, ".json") {
  169. continue
  170. }
  171. filePath := path.Join(dir, fname)
  172. if err := m.readSinkMetaFile(filePath); nil != err {
  173. return err
  174. }
  175. }
  176. return nil
  177. }
  178. func (m *Manager) uninstalSink(name string) {
  179. if ui, ok := g_sinkMetadata[name+".json"]; ok {
  180. if nil != ui.About {
  181. ui.About.Installed = false
  182. }
  183. }
  184. }
  185. func (m *Manager) readSinkMetaFile(filePath string) error {
  186. finame := path.Base(filePath)
  187. pluginName := strings.TrimSuffix(finame, `.json`)
  188. metadata := new(fileSink)
  189. err := common.ReadJsonUnmarshal(filePath, metadata)
  190. if nil != err {
  191. return fmt.Errorf("filePath:%s err:%v", filePath, err)
  192. }
  193. if pluginName != baseProperty && pluginName != baseOption {
  194. if nil == metadata.About {
  195. return fmt.Errorf("not found about of %s", finame)
  196. } else if isInternalSink(finame) {
  197. metadata.About.Installed = true
  198. } else {
  199. _, metadata.About.Installed = m.registry.Get(SINK, pluginName)
  200. }
  201. }
  202. g_sinkMetadata[finame], err = newUiSink(metadata)
  203. if nil != err {
  204. return err
  205. }
  206. common.Log.Infof("Loading metadata file for sink: %s", finame)
  207. return nil
  208. }
  209. func (this *uiSinks) setCustomProperty(pluginName string) error {
  210. fileName := pluginName + `.json`
  211. sinkMetadata := g_sinkMetadata
  212. data := sinkMetadata[fileName]
  213. if nil == data {
  214. return fmt.Errorf(`%s%s`, getMsg(this.language, sink, "not_found_plugin"), pluginName)
  215. }
  216. if 0 == len(this.CustomProperty) {
  217. this.CustomProperty = make(map[string]*uiSink)
  218. }
  219. this.CustomProperty[pluginName] = data
  220. return nil
  221. }
  222. func (this *uiSinks) setBasePropertry(pluginName string) error {
  223. sinkMetadata := g_sinkMetadata
  224. data := sinkMetadata[baseProperty+".json"]
  225. if nil == data {
  226. return fmt.Errorf(`%s%s`, getMsg(this.language, sink, "not_found_plugin"), baseProperty)
  227. }
  228. if 0 == len(this.BaseProperty) {
  229. this.BaseProperty = make(map[string]*uiSink)
  230. }
  231. this.BaseProperty[pluginName] = data
  232. return nil
  233. }
  234. func (this *uiSinks) setBaseOption() error {
  235. sinkMetadata := g_sinkMetadata
  236. data := sinkMetadata[baseOption+".json"]
  237. if nil == data {
  238. return fmt.Errorf(`%s%s`, getMsg(this.language, sink, "not_found_plugin"), baseOption)
  239. }
  240. this.BaseOption = data
  241. return nil
  242. }
  243. func (this *uiSinks) hintWhenNewSink(pluginName string) (err error) {
  244. err = this.setCustomProperty(pluginName)
  245. if nil != err {
  246. return err
  247. }
  248. err = this.setBasePropertry(pluginName)
  249. if nil != err {
  250. return err
  251. }
  252. err = this.setBaseOption()
  253. return err
  254. }
  255. func (this *uiSinks) modifyCustom(uiFields []*field, ruleFields map[string]interface{}) (err error) {
  256. for i, ui := range uiFields {
  257. ruleVal := ruleFields[ui.Name]
  258. if nil == ruleVal {
  259. continue
  260. }
  261. if reflect.Map == reflect.TypeOf(ruleVal).Kind() && "object" != ui.Type {
  262. var auxRuleFields map[string]interface{}
  263. if err := common.MapToStruct(ruleVal, &auxRuleFields); nil != err {
  264. return fmt.Errorf(`%s%v %s`, getMsg(this.language, sink, "type_conversion_fail"), err, ui.Name)
  265. }
  266. var auxUiFields []*field
  267. if err := common.MapToStruct(ui.Default, &auxUiFields); nil != err {
  268. return fmt.Errorf(`%s%v %s`, getMsg(this.language, sink, "type_conversion_fail"), err, ui.Name)
  269. }
  270. uiFields[i].Default = auxUiFields
  271. if err := this.modifyCustom(auxUiFields, auxRuleFields); nil != err {
  272. return err
  273. }
  274. } else {
  275. uiFields[i].Default = ruleVal
  276. }
  277. }
  278. return nil
  279. }
  280. func (this *uiSink) modifyBase(mapFields map[string]interface{}) {
  281. for i, field := range this.Fields {
  282. fieldVal := mapFields[field.Name]
  283. if nil != fieldVal {
  284. this.Fields[i].Default = fieldVal
  285. }
  286. }
  287. }
  288. func (this *uiSinks) modifyProperty(pluginName string, mapFields map[string]interface{}) (err error) {
  289. custom := this.CustomProperty[pluginName]
  290. if nil == custom {
  291. return fmt.Errorf(`%s%s`, getMsg(this.language, sink, "not_found_plugin"), pluginName)
  292. }
  293. if err = this.modifyCustom(custom.Fields, mapFields); nil != err {
  294. return err
  295. }
  296. base := this.BaseProperty[pluginName]
  297. if nil == base {
  298. return fmt.Errorf(`%s%s`, getMsg(this.language, sink, "not_found_plugin"), pluginName)
  299. }
  300. base.modifyBase(mapFields)
  301. return nil
  302. }
  303. func (this *uiSinks) modifyOption(option *api.RuleOption) {
  304. baseOption := this.BaseOption
  305. if nil == baseOption {
  306. return
  307. }
  308. for i, field := range baseOption.Fields {
  309. switch field.Name {
  310. case `isEventTime`:
  311. baseOption.Fields[i].Default = option.IsEventTime
  312. case `lateTol`:
  313. baseOption.Fields[i].Default = option.LateTol
  314. case `concurrency`:
  315. baseOption.Fields[i].Default = option.Concurrency
  316. case `bufferLength`:
  317. baseOption.Fields[i].Default = option.BufferLength
  318. case `sendMetaToSink`:
  319. baseOption.Fields[i].Default = option.SendMetaToSink
  320. case `qos`:
  321. baseOption.Fields[i].Default = option.Qos
  322. case `checkpointInterval`:
  323. baseOption.Fields[i].Default = option.CheckpointInterval
  324. }
  325. }
  326. }
  327. func (this *uiSinks) hintWhenModifySink(rule *api.Rule) (err error) {
  328. for _, m := range rule.Actions {
  329. for pluginName, sink := range m {
  330. mapFields, _ := sink.(map[string]interface{})
  331. err = this.hintWhenNewSink(pluginName)
  332. if nil != err {
  333. return err
  334. }
  335. if err := this.modifyProperty(pluginName, mapFields); nil != err {
  336. return err
  337. }
  338. }
  339. }
  340. this.modifyOption(rule.Options)
  341. return nil
  342. }
  343. func GetSinkMeta(pluginName, language string, rule *api.Rule) (ptrSinkProperty *uiSinks, err error) {
  344. ptrSinkProperty = new(uiSinks)
  345. ptrSinkProperty.language = language
  346. if nil == rule {
  347. err = ptrSinkProperty.hintWhenNewSink(pluginName)
  348. } else {
  349. err = ptrSinkProperty.hintWhenModifySink(rule)
  350. }
  351. return ptrSinkProperty, err
  352. }
  353. type pluginfo struct {
  354. Name string `json:"name"`
  355. About *about `json:"about"`
  356. }
  357. func GetSinks() (sinks []*pluginfo) {
  358. sinkMeta := g_sinkMetadata
  359. for fileName, v := range sinkMeta {
  360. if fileName == baseProperty+".json" || fileName == baseOption+".json" {
  361. continue
  362. }
  363. node := new(pluginfo)
  364. node.Name = strings.TrimSuffix(fileName, `.json`)
  365. node.About = v.About
  366. i := 0
  367. for ; i < len(sinks); i++ {
  368. if node.Name <= sinks[i].Name {
  369. sinks = append(sinks, node)
  370. copy(sinks[i+1:], sinks[i:])
  371. sinks[i] = node
  372. break
  373. }
  374. }
  375. if len(sinks) == i {
  376. sinks = append(sinks, node)
  377. }
  378. }
  379. return sinks
  380. }