sinkMeta.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. package plugins
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. const (
  12. baseProperty = `properties`
  13. baseOption = `options`
  14. )
  15. type (
  16. author struct {
  17. Name string `json:"name"`
  18. Email string `json:"email"`
  19. Company string `json:"company"`
  20. Website string `json:"website"`
  21. }
  22. fileLanguage struct {
  23. English string `json:"en_US"`
  24. Chinese string `json:"zh_CN"`
  25. }
  26. fileField struct {
  27. Name string `json:"name"`
  28. Default interface{} `json:"default"`
  29. Control string `json:"control"`
  30. Optional bool `json:"optional"`
  31. Type string `json:"type"`
  32. Hint *fileLanguage `json:"hint"`
  33. Label *fileLanguage `json:"label"`
  34. Values interface{} `json:"values"`
  35. }
  36. fileAbout struct {
  37. Trial bool `json:"trial"`
  38. Installed bool `json:"installed"`
  39. Author *author `json:"author"`
  40. HelpUrl *fileLanguage `json:"helpUrl"`
  41. Description *fileLanguage `json:"description"`
  42. }
  43. fileSink struct {
  44. About *fileAbout `json:"about"`
  45. Libs []string `json:"libs"`
  46. Fields []*fileField `json:"properties"`
  47. }
  48. language struct {
  49. English string `json:"en"`
  50. Chinese string `json:"zh"`
  51. }
  52. about struct {
  53. Trial bool `json:"trial"`
  54. Installed bool `json:"installed"`
  55. Author *author `json:"author"`
  56. HelpUrl *language `json:"helpUrl"`
  57. Description *language `json:"description"`
  58. }
  59. field struct {
  60. Exist bool `json:"exist"`
  61. Name string `json:"name"`
  62. Default interface{} `json:"default"`
  63. Type string `json:"type"`
  64. Control string `json:"control"`
  65. Optional bool `json:"optional"`
  66. Values interface{} `json:"values"`
  67. Hint *language `json:"hint"`
  68. Label *language `json:"label"`
  69. }
  70. uiSink struct {
  71. About *about `json:"about"`
  72. Libs []string `json:"libs"`
  73. Fields []*field `json:"properties"`
  74. }
  75. uiSinks struct {
  76. CustomProperty map[string]*uiSink `json:"customProperty"`
  77. BaseProperty map[string]*uiSink `json:"baseProperty"`
  78. BaseOption *uiSink `json:"baseOption"`
  79. }
  80. )
  81. func newLanguage(fi *fileLanguage) *language {
  82. if nil == fi {
  83. return nil
  84. }
  85. ui := new(language)
  86. ui.English = fi.English
  87. ui.Chinese = fi.Chinese
  88. return ui
  89. }
  90. func newField(fis []*fileField) (uis []*field, err error) {
  91. for _, fi := range fis {
  92. if nil == fi {
  93. continue
  94. }
  95. ui := new(field)
  96. uis = append(uis, ui)
  97. ui.Name = fi.Name
  98. ui.Type = fi.Type
  99. ui.Control = fi.Control
  100. ui.Optional = fi.Optional
  101. ui.Values = fi.Values
  102. ui.Hint = newLanguage(fi.Hint)
  103. ui.Label = newLanguage(fi.Label)
  104. switch t := fi.Default.(type) {
  105. case []map[string]interface{}:
  106. var auxFi []*fileField
  107. if err = common.MapToStruct(t, &auxFi); nil != err {
  108. return nil, err
  109. }
  110. if ui.Default, err = newField(auxFi); nil != err {
  111. return nil, err
  112. }
  113. default:
  114. ui.Default = fi.Default
  115. }
  116. }
  117. return uis, err
  118. }
  119. func newAbout(fi *fileAbout) *about {
  120. if nil == fi {
  121. return nil
  122. }
  123. ui := new(about)
  124. ui.Trial = fi.Trial
  125. ui.Author = fi.Author
  126. ui.HelpUrl = newLanguage(fi.HelpUrl)
  127. ui.Description = newLanguage(fi.Description)
  128. return ui
  129. }
  130. func newUiSink(fi *fileSink) (*uiSink, error) {
  131. if nil == fi {
  132. return nil, nil
  133. }
  134. var err error
  135. ui := new(uiSink)
  136. ui.Libs = fi.Libs
  137. ui.About = newAbout(fi.About)
  138. ui.Fields, err = newField(fi.Fields)
  139. return ui, err
  140. }
  141. var g_sinkMetadata map[string]*uiSink //map[fileName]
  142. func (m *Manager) readSinkMetaDir() error {
  143. g_sinkMetadata = make(map[string]*uiSink)
  144. confDir, err := common.GetConfLoc()
  145. if nil != err {
  146. return err
  147. }
  148. dir := path.Join(confDir, "sinks")
  149. files, err := ioutil.ReadDir(dir)
  150. if nil != err {
  151. return err
  152. }
  153. for _, file := range files {
  154. fname := file.Name()
  155. if !strings.HasSuffix(fname, ".json") {
  156. continue
  157. }
  158. filePath := path.Join(dir, fname)
  159. if err := m.readSinkMetaFile(filePath); nil != err {
  160. return err
  161. }
  162. }
  163. return nil
  164. }
  165. func (m *Manager) readSinkMetaFile(filePath string) error {
  166. finame := path.Base(filePath)
  167. pluginName := strings.TrimSuffix(finame, `.json`)
  168. metadata := new(fileSink)
  169. err := common.ReadJsonUnmarshal(filePath, metadata)
  170. if nil != err {
  171. return fmt.Errorf("filePath:%s err:%v", filePath, err)
  172. }
  173. if pluginName != baseProperty && pluginName != baseOption {
  174. if nil == metadata.About {
  175. return fmt.Errorf("not found about of %s", finame)
  176. } else {
  177. _, metadata.About.Installed = m.registry.Get(SINK, pluginName)
  178. }
  179. }
  180. g_sinkMetadata[finame], err = newUiSink(metadata)
  181. if nil != err {
  182. return err
  183. }
  184. common.Log.Infof("Loading metadata file for sink: %s", finame)
  185. return nil
  186. }
  187. func (this *uiSinks) setCustomProperty(pluginName string) error {
  188. fileName := pluginName + `.json`
  189. sinkMetadata := g_sinkMetadata
  190. data := sinkMetadata[fileName]
  191. if nil == data {
  192. return fmt.Errorf(`not found pligin:%s`, fileName)
  193. }
  194. if 0 == len(this.CustomProperty) {
  195. this.CustomProperty = make(map[string]*uiSink)
  196. }
  197. this.CustomProperty[pluginName] = data
  198. return nil
  199. }
  200. func (this *uiSinks) setBasePropertry(pluginName string) error {
  201. sinkMetadata := g_sinkMetadata
  202. data := sinkMetadata[baseProperty+".json"]
  203. if nil == data {
  204. return fmt.Errorf(`not found pligin:%s`, baseProperty)
  205. }
  206. if 0 == len(this.BaseProperty) {
  207. this.BaseProperty = make(map[string]*uiSink)
  208. }
  209. this.BaseProperty[pluginName] = data
  210. return nil
  211. }
  212. func (this *uiSinks) setBaseOption() error {
  213. sinkMetadata := g_sinkMetadata
  214. data := sinkMetadata[baseOption+".json"]
  215. if nil == data {
  216. return fmt.Errorf(`not found pligin:%s`, baseOption)
  217. }
  218. this.BaseOption = data
  219. return nil
  220. }
  221. func (this *uiSinks) hintWhenNewSink(pluginName string) (err error) {
  222. err = this.setCustomProperty(pluginName)
  223. if nil != err {
  224. return err
  225. }
  226. err = this.setBasePropertry(pluginName)
  227. if nil != err {
  228. return err
  229. }
  230. err = this.setBaseOption()
  231. return err
  232. }
  233. func modifyCustom(uiFields []*field, ruleFields map[string]interface{}) (err error) {
  234. for i, ui := range uiFields {
  235. ruleVal := ruleFields[ui.Name]
  236. if nil == ruleVal {
  237. continue
  238. }
  239. if reflect.Map == reflect.TypeOf(ruleVal).Kind() {
  240. var auxRuleFields map[string]interface{}
  241. if err := common.MapToStruct(ruleVal, &auxRuleFields); nil != err {
  242. return fmt.Errorf("%s:%v", ui.Name, err)
  243. }
  244. var auxUiFields []*field
  245. if err := common.MapToStruct(ui.Default, &auxUiFields); nil != err {
  246. return fmt.Errorf("%s:%v", ui.Name, err)
  247. }
  248. uiFields[i].Default = auxUiFields
  249. if err := modifyCustom(auxUiFields, auxRuleFields); nil != err {
  250. return err
  251. }
  252. } else {
  253. uiFields[i].Default = ruleVal
  254. }
  255. }
  256. return nil
  257. }
  258. func (this *uiSink) modifyBase(mapFields map[string]interface{}) (err error) {
  259. for i, field := range this.Fields {
  260. fieldVal := mapFields[field.Name]
  261. if nil != fieldVal {
  262. this.Fields[i].Default = fieldVal
  263. }
  264. }
  265. return nil
  266. }
  267. func (this *uiSinks) modifyProperty(pluginName string, mapFields map[string]interface{}) (err error) {
  268. custom := this.CustomProperty[pluginName]
  269. if nil == custom {
  270. return fmt.Errorf(`not found pligin:%s`, pluginName)
  271. }
  272. if err = modifyCustom(custom.Fields, mapFields); nil != err {
  273. return err
  274. }
  275. base := this.BaseProperty[pluginName]
  276. if nil == base {
  277. return fmt.Errorf(`not found pligin:%s`, pluginName)
  278. }
  279. if err = base.modifyBase(mapFields); nil != err {
  280. return err
  281. }
  282. return nil
  283. }
  284. func (this *uiSinks) modifyOption(option *api.RuleOption) {
  285. baseOption := this.BaseOption
  286. if nil == baseOption {
  287. return
  288. }
  289. for i, field := range baseOption.Fields {
  290. switch field.Name {
  291. case `isEventTime`:
  292. baseOption.Fields[i].Default = option.IsEventTime
  293. case `lateTol`:
  294. baseOption.Fields[i].Default = option.LateTol
  295. case `concurrency`:
  296. baseOption.Fields[i].Default = option.Concurrency
  297. case `bufferLength`:
  298. baseOption.Fields[i].Default = option.BufferLength
  299. case `sendMetaToSink`:
  300. baseOption.Fields[i].Default = option.SendMetaToSink
  301. case `qos`:
  302. baseOption.Fields[i].Default = option.Qos
  303. case `checkpointInterval`:
  304. baseOption.Fields[i].Default = option.CheckpointInterval
  305. }
  306. }
  307. }
  308. func (this *uiSinks) hintWhenModifySink(rule *api.Rule) (err error) {
  309. for _, m := range rule.Actions {
  310. for pluginName, sink := range m {
  311. mapFields, _ := sink.(map[string]interface{})
  312. err = this.hintWhenNewSink(pluginName)
  313. if nil != err {
  314. return err
  315. }
  316. this.modifyProperty(pluginName, mapFields)
  317. }
  318. }
  319. this.modifyOption(rule.Options)
  320. return nil
  321. }
  322. func GetSinkMeta(pluginName string, rule *api.Rule) (ptrSinkProperty *uiSinks, err error) {
  323. ptrSinkProperty = new(uiSinks)
  324. if nil == rule {
  325. err = ptrSinkProperty.hintWhenNewSink(pluginName)
  326. } else {
  327. err = ptrSinkProperty.hintWhenModifySink(rule)
  328. }
  329. return ptrSinkProperty, err
  330. }
  331. type pluginfo struct {
  332. Name string `json:"name"`
  333. About *about `json:"about"`
  334. }
  335. func GetSinks() (sinks []*pluginfo) {
  336. sinkMeta := g_sinkMetadata
  337. for fileName, v := range sinkMeta {
  338. if fileName == baseProperty+".json" || fileName == baseOption+".json" {
  339. continue
  340. }
  341. node := new(pluginfo)
  342. node.Name = strings.TrimSuffix(fileName, `.json`)
  343. node.About = v.About
  344. i := 0
  345. for ; i < len(sinks); i++ {
  346. if node.Name <= sinks[i].Name {
  347. sinks = append(sinks, node)
  348. copy(sinks[i+1:], sinks[i:])
  349. sinks[i] = node
  350. break
  351. }
  352. }
  353. if len(sinks) == i {
  354. sinks = append(sinks, node)
  355. }
  356. }
  357. return sinks
  358. }