sinkMeta.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. package plugins
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. const (
  12. baseProperty = `properties`
  13. baseOption = `options`
  14. )
  15. type (
  16. author struct {
  17. Name string `json:"name"`
  18. Email string `json:"email"`
  19. Company string `json:"company"`
  20. Website string `json:"website"`
  21. }
  22. fileLanguage struct {
  23. English string `json:"en_US"`
  24. Chinese string `json:"zh_CN"`
  25. }
  26. fileField struct {
  27. Name string `json:"name"`
  28. Default interface{} `json:"default"`
  29. Control string `json:"control"`
  30. Optional bool `json:"optional"`
  31. Type string `json:"type"`
  32. Hint *fileLanguage `json:"hint"`
  33. Label *fileLanguage `json:"label"`
  34. Values interface{} `json:"values"`
  35. }
  36. fileAbout struct {
  37. Trial bool `json:"trial"`
  38. Installed bool `json:"installed"`
  39. Author *author `json:"author"`
  40. HelpUrl *fileLanguage `json:"helpUrl"`
  41. Description *fileLanguage `json:"description"`
  42. }
  43. fileSink struct {
  44. About *fileAbout `json:"about"`
  45. Libs []string `json:"libs"`
  46. Fields []*fileField `json:"properties"`
  47. }
  48. language struct {
  49. English string `json:"en"`
  50. Chinese string `json:"zh"`
  51. }
  52. about struct {
  53. Trial bool `json:"trial"`
  54. Installed bool `json:"installed"`
  55. Author *author `json:"author"`
  56. HelpUrl *language `json:"helpUrl"`
  57. Description *language `json:"description"`
  58. }
  59. field struct {
  60. Exist bool `json:"exist"`
  61. Name string `json:"name"`
  62. Default interface{} `json:"default"`
  63. Type string `json:"type"`
  64. Control string `json:"control"`
  65. Optional bool `json:"optional"`
  66. Values interface{} `json:"values"`
  67. Hint *language `json:"hint"`
  68. Label *language `json:"label"`
  69. }
  70. uiSink struct {
  71. About *about `json:"about"`
  72. Libs []string `json:"libs"`
  73. Fields []*field `json:"properties"`
  74. }
  75. uiSinks struct {
  76. CustomProperty map[string]*uiSink `json:"customProperty"`
  77. BaseProperty map[string]*uiSink `json:"baseProperty"`
  78. BaseOption *uiSink `json:"baseOption"`
  79. }
  80. )
  81. func newLanguage(fi *fileLanguage) *language {
  82. if nil == fi {
  83. return nil
  84. }
  85. ui := new(language)
  86. ui.English = fi.English
  87. ui.Chinese = fi.Chinese
  88. return ui
  89. }
  90. func newField(fis []*fileField) (uis []*field, err error) {
  91. for _, fi := range fis {
  92. if nil == fi {
  93. continue
  94. }
  95. ui := new(field)
  96. uis = append(uis, ui)
  97. ui.Name = fi.Name
  98. ui.Type = fi.Type
  99. ui.Control = fi.Control
  100. ui.Optional = fi.Optional
  101. ui.Values = fi.Values
  102. ui.Hint = newLanguage(fi.Hint)
  103. ui.Label = newLanguage(fi.Label)
  104. switch t := fi.Default.(type) {
  105. case []map[string]interface{}:
  106. var auxFi []*fileField
  107. if err = common.MapToStruct(t, &auxFi); nil != err {
  108. return nil, err
  109. }
  110. if ui.Default, err = newField(auxFi); nil != err {
  111. return nil, err
  112. }
  113. default:
  114. ui.Default = fi.Default
  115. }
  116. }
  117. return uis, err
  118. }
  119. func newAbout(fi *fileAbout) *about {
  120. if nil == fi {
  121. return nil
  122. }
  123. ui := new(about)
  124. ui.Trial = fi.Trial
  125. ui.Installed = fi.Installed
  126. ui.Author = fi.Author
  127. ui.HelpUrl = newLanguage(fi.HelpUrl)
  128. ui.Description = newLanguage(fi.Description)
  129. return ui
  130. }
  131. func newUiSink(fi *fileSink) (*uiSink, error) {
  132. if nil == fi {
  133. return nil, nil
  134. }
  135. var err error
  136. ui := new(uiSink)
  137. ui.Libs = fi.Libs
  138. ui.About = newAbout(fi.About)
  139. ui.Fields, err = newField(fi.Fields)
  140. return ui, err
  141. }
  142. var g_sinkMetadata map[string]*uiSink //map[fileName]
  143. func (m *Manager) readSinkMetaDir() error {
  144. g_sinkMetadata = make(map[string]*uiSink)
  145. confDir, err := common.GetConfLoc()
  146. if nil != err {
  147. return err
  148. }
  149. dir := path.Join(confDir, "sinks")
  150. files, err := ioutil.ReadDir(dir)
  151. if nil != err {
  152. return err
  153. }
  154. for _, file := range files {
  155. fname := file.Name()
  156. if !strings.HasSuffix(fname, ".json") {
  157. continue
  158. }
  159. filePath := path.Join(dir, fname)
  160. if err := m.readSinkMetaFile(filePath); nil != err {
  161. return err
  162. }
  163. }
  164. return nil
  165. }
  166. func (m *Manager) readSinkMetaFile(filePath string) error {
  167. finame := path.Base(filePath)
  168. pluginName := strings.TrimSuffix(finame, `.json`)
  169. metadata := new(fileSink)
  170. err := common.ReadJsonUnmarshal(filePath, metadata)
  171. if nil != err {
  172. return fmt.Errorf("filePath:%s err:%v", filePath, err)
  173. }
  174. if pluginName != baseProperty && pluginName != baseOption {
  175. if nil == metadata.About {
  176. return fmt.Errorf("not found about of %s", finame)
  177. } else {
  178. _, metadata.About.Installed = m.registry.Get(SINK, pluginName)
  179. }
  180. }
  181. g_sinkMetadata[finame], err = newUiSink(metadata)
  182. if nil != err {
  183. return err
  184. }
  185. common.Log.Infof("Loading metadata file for sink: %s", finame)
  186. return nil
  187. }
  188. func (this *uiSinks) setCustomProperty(pluginName string) error {
  189. fileName := pluginName + `.json`
  190. sinkMetadata := g_sinkMetadata
  191. data := sinkMetadata[fileName]
  192. if nil == data {
  193. return fmt.Errorf(`not found pligin:%s`, fileName)
  194. }
  195. if 0 == len(this.CustomProperty) {
  196. this.CustomProperty = make(map[string]*uiSink)
  197. }
  198. this.CustomProperty[pluginName] = data
  199. return nil
  200. }
  201. func (this *uiSinks) setBasePropertry(pluginName string) error {
  202. sinkMetadata := g_sinkMetadata
  203. data := sinkMetadata[baseProperty+".json"]
  204. if nil == data {
  205. return fmt.Errorf(`not found pligin:%s`, baseProperty)
  206. }
  207. if 0 == len(this.BaseProperty) {
  208. this.BaseProperty = make(map[string]*uiSink)
  209. }
  210. this.BaseProperty[pluginName] = data
  211. return nil
  212. }
  213. func (this *uiSinks) setBaseOption() error {
  214. sinkMetadata := g_sinkMetadata
  215. data := sinkMetadata[baseOption+".json"]
  216. if nil == data {
  217. return fmt.Errorf(`not found pligin:%s`, baseOption)
  218. }
  219. this.BaseOption = data
  220. return nil
  221. }
  222. func (this *uiSinks) hintWhenNewSink(pluginName string) (err error) {
  223. err = this.setCustomProperty(pluginName)
  224. if nil != err {
  225. return err
  226. }
  227. err = this.setBasePropertry(pluginName)
  228. if nil != err {
  229. return err
  230. }
  231. err = this.setBaseOption()
  232. return err
  233. }
  234. func modifyCustom(uiFields []*field, ruleFields map[string]interface{}) (err error) {
  235. for i, ui := range uiFields {
  236. ruleVal := ruleFields[ui.Name]
  237. if nil == ruleVal {
  238. continue
  239. }
  240. if reflect.Map == reflect.TypeOf(ruleVal).Kind() {
  241. var auxRuleFields map[string]interface{}
  242. if err := common.MapToStruct(ruleVal, &auxRuleFields); nil != err {
  243. return fmt.Errorf("%s:%v", ui.Name, err)
  244. }
  245. var auxUiFields []*field
  246. if err := common.MapToStruct(ui.Default, &auxUiFields); nil != err {
  247. return fmt.Errorf("%s:%v", ui.Name, err)
  248. }
  249. uiFields[i].Default = auxUiFields
  250. if err := modifyCustom(auxUiFields, auxRuleFields); nil != err {
  251. return err
  252. }
  253. } else {
  254. uiFields[i].Default = ruleVal
  255. }
  256. }
  257. return nil
  258. }
  259. func (this *uiSink) modifyBase(mapFields map[string]interface{}) (err error) {
  260. for i, field := range this.Fields {
  261. fieldVal := mapFields[field.Name]
  262. if nil != fieldVal {
  263. this.Fields[i].Default = fieldVal
  264. }
  265. }
  266. return nil
  267. }
  268. func (this *uiSinks) modifyProperty(pluginName string, mapFields map[string]interface{}) (err error) {
  269. custom := this.CustomProperty[pluginName]
  270. if nil == custom {
  271. return fmt.Errorf(`not found pligin:%s`, pluginName)
  272. }
  273. if err = modifyCustom(custom.Fields, mapFields); nil != err {
  274. return err
  275. }
  276. base := this.BaseProperty[pluginName]
  277. if nil == base {
  278. return fmt.Errorf(`not found pligin:%s`, pluginName)
  279. }
  280. if err = base.modifyBase(mapFields); nil != err {
  281. return err
  282. }
  283. return nil
  284. }
  285. func (this *uiSinks) modifyOption(option *api.RuleOption) {
  286. baseOption := this.BaseOption
  287. if nil == baseOption {
  288. return
  289. }
  290. for i, field := range baseOption.Fields {
  291. switch field.Name {
  292. case `isEventTime`:
  293. baseOption.Fields[i].Default = option.IsEventTime
  294. case `lateTol`:
  295. baseOption.Fields[i].Default = option.LateTol
  296. case `concurrency`:
  297. baseOption.Fields[i].Default = option.Concurrency
  298. case `bufferLength`:
  299. baseOption.Fields[i].Default = option.BufferLength
  300. case `sendMetaToSink`:
  301. baseOption.Fields[i].Default = option.SendMetaToSink
  302. case `qos`:
  303. baseOption.Fields[i].Default = option.Qos
  304. case `checkpointInterval`:
  305. baseOption.Fields[i].Default = option.CheckpointInterval
  306. }
  307. }
  308. }
  309. func (this *uiSinks) hintWhenModifySink(rule *api.Rule) (err error) {
  310. for _, m := range rule.Actions {
  311. for pluginName, sink := range m {
  312. mapFields, _ := sink.(map[string]interface{})
  313. err = this.hintWhenNewSink(pluginName)
  314. if nil != err {
  315. return err
  316. }
  317. this.modifyProperty(pluginName, mapFields)
  318. }
  319. }
  320. this.modifyOption(rule.Options)
  321. return nil
  322. }
  323. func GetSinkMeta(pluginName string, rule *api.Rule) (ptrSinkProperty *uiSinks, err error) {
  324. ptrSinkProperty = new(uiSinks)
  325. if nil == rule {
  326. err = ptrSinkProperty.hintWhenNewSink(pluginName)
  327. } else {
  328. err = ptrSinkProperty.hintWhenModifySink(rule)
  329. }
  330. return ptrSinkProperty, err
  331. }
  332. type pluginfo struct {
  333. Name string `json:"name"`
  334. About *about `json:"about"`
  335. }
  336. func GetSinks() (sinks []*pluginfo) {
  337. sinkMeta := g_sinkMetadata
  338. for fileName, v := range sinkMeta {
  339. if fileName == baseProperty+".json" || fileName == baseOption+".json" {
  340. continue
  341. }
  342. node := new(pluginfo)
  343. node.Name = strings.TrimSuffix(fileName, `.json`)
  344. node.About = v.About
  345. i := 0
  346. for ; i < len(sinks); i++ {
  347. if node.Name <= sinks[i].Name {
  348. sinks = append(sinks, node)
  349. copy(sinks[i+1:], sinks[i:])
  350. sinks[i] = node
  351. break
  352. }
  353. }
  354. if len(sinks) == i {
  355. sinks = append(sinks, node)
  356. }
  357. }
  358. return sinks
  359. }