sinkMeta.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. package plugins
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. const (
  12. baseProperty = `properties`
  13. baseOption = `options`
  14. )
  15. type (
  16. author struct {
  17. Name string `json:"name"`
  18. Email string `json:"email"`
  19. Company string `json:"company"`
  20. Website string `json:"website"`
  21. }
  22. fileLanguage struct {
  23. English string `json:"en_US"`
  24. Chinese string `json:"zh_CN"`
  25. }
  26. fileField struct {
  27. Name string `json:"name"`
  28. Default interface{} `json:"default"`
  29. Control string `json:"control"`
  30. Optional bool `json:"optional"`
  31. Type string `json:"type"`
  32. Hint *fileLanguage `json:"hint"`
  33. Label *fileLanguage `json:"label"`
  34. Values interface{} `json:"values"`
  35. }
  36. fileAbout struct {
  37. Trial bool `json:"trial"`
  38. Installed bool `json:"installed"`
  39. Author *author `json:"author"`
  40. HelpUrl *fileLanguage `json:"helpUrl"`
  41. Description *fileLanguage `json:"description"`
  42. }
  43. fileSink struct {
  44. About *fileAbout `json:"about"`
  45. Libs []string `json:"libs"`
  46. Fields []*fileField `json:"properties"`
  47. }
  48. language struct {
  49. English string `json:"en"`
  50. Chinese string `json:"zh"`
  51. }
  52. about struct {
  53. Trial bool `json:"trial"`
  54. Installed bool `json:"installed"`
  55. Author *author `json:"author"`
  56. HelpUrl *language `json:"helpUrl"`
  57. Description *language `json:"description"`
  58. }
  59. field struct {
  60. Exist bool `json:"exist"`
  61. Name string `json:"name"`
  62. Default interface{} `json:"default"`
  63. Type string `json:"type"`
  64. Control string `json:"control"`
  65. Optional bool `json:"optional"`
  66. Values interface{} `json:"values"`
  67. Hint *language `json:"hint"`
  68. Label *language `json:"label"`
  69. }
  70. uiSink struct {
  71. About *about `json:"about"`
  72. Libs []string `json:"libs"`
  73. Fields []*field `json:"properties"`
  74. }
  75. uiSinks struct {
  76. CustomProperty map[string]*uiSink `json:"customProperty"`
  77. BaseProperty map[string]*uiSink `json:"baseProperty"`
  78. BaseOption *uiSink `json:"baseOption"`
  79. }
  80. )
  81. func isInternalSink(fiName string) bool {
  82. internal := []string{`edgex.json`, `log.json`, `mqtt.json`, `nop.json`, `rest.json`}
  83. for _, v := range internal {
  84. if v == fiName {
  85. return true
  86. }
  87. }
  88. return false
  89. }
  90. func newLanguage(fi *fileLanguage) *language {
  91. if nil == fi {
  92. return nil
  93. }
  94. ui := new(language)
  95. ui.English = fi.English
  96. ui.Chinese = fi.Chinese
  97. return ui
  98. }
  99. func newField(fis []*fileField) (uis []*field, err error) {
  100. for _, fi := range fis {
  101. if nil == fi {
  102. continue
  103. }
  104. ui := new(field)
  105. uis = append(uis, ui)
  106. ui.Name = fi.Name
  107. ui.Type = fi.Type
  108. ui.Control = fi.Control
  109. ui.Optional = fi.Optional
  110. ui.Values = fi.Values
  111. ui.Hint = newLanguage(fi.Hint)
  112. ui.Label = newLanguage(fi.Label)
  113. switch t := fi.Default.(type) {
  114. case []map[string]interface{}:
  115. var auxFi []*fileField
  116. if err = common.MapToStruct(t, &auxFi); nil != err {
  117. return nil, err
  118. }
  119. if ui.Default, err = newField(auxFi); nil != err {
  120. return nil, err
  121. }
  122. default:
  123. ui.Default = fi.Default
  124. }
  125. }
  126. return uis, err
  127. }
  128. func newAbout(fi *fileAbout) *about {
  129. if nil == fi {
  130. return nil
  131. }
  132. ui := new(about)
  133. ui.Trial = fi.Trial
  134. ui.Installed = fi.Installed
  135. ui.Author = fi.Author
  136. ui.HelpUrl = newLanguage(fi.HelpUrl)
  137. ui.Description = newLanguage(fi.Description)
  138. return ui
  139. }
  140. func newUiSink(fi *fileSink) (*uiSink, error) {
  141. if nil == fi {
  142. return nil, nil
  143. }
  144. var err error
  145. ui := new(uiSink)
  146. ui.Libs = fi.Libs
  147. ui.About = newAbout(fi.About)
  148. ui.Fields, err = newField(fi.Fields)
  149. return ui, err
  150. }
  151. var g_sinkMetadata map[string]*uiSink //map[fileName]
  152. func (m *Manager) readSinkMetaDir() error {
  153. g_sinkMetadata = make(map[string]*uiSink)
  154. confDir, err := common.GetConfLoc()
  155. if nil != err {
  156. return err
  157. }
  158. dir := path.Join(confDir, "sinks")
  159. files, err := ioutil.ReadDir(dir)
  160. if nil != err {
  161. return err
  162. }
  163. for _, file := range files {
  164. fname := file.Name()
  165. if !strings.HasSuffix(fname, ".json") {
  166. continue
  167. }
  168. filePath := path.Join(dir, fname)
  169. if err := m.readSinkMetaFile(filePath); nil != err {
  170. return err
  171. }
  172. }
  173. return nil
  174. }
  175. func (m *Manager) readSinkMetaFile(filePath string) error {
  176. finame := path.Base(filePath)
  177. pluginName := strings.TrimSuffix(finame, `.json`)
  178. metadata := new(fileSink)
  179. err := common.ReadJsonUnmarshal(filePath, metadata)
  180. if nil != err {
  181. return fmt.Errorf("filePath:%s err:%v", filePath, err)
  182. }
  183. if pluginName != baseProperty && pluginName != baseOption {
  184. if nil == metadata.About {
  185. return fmt.Errorf("not found about of %s", finame)
  186. } else if isInternalSink(finame) {
  187. metadata.About.Installed = true
  188. } else {
  189. _, metadata.About.Installed = m.registry.Get(SINK, pluginName)
  190. }
  191. }
  192. g_sinkMetadata[finame], err = newUiSink(metadata)
  193. if nil != err {
  194. return err
  195. }
  196. common.Log.Infof("Loading metadata file for sink: %s", finame)
  197. return nil
  198. }
  199. func (this *uiSinks) setCustomProperty(pluginName string) error {
  200. fileName := pluginName + `.json`
  201. sinkMetadata := g_sinkMetadata
  202. data := sinkMetadata[fileName]
  203. if nil == data {
  204. return fmt.Errorf(`not found pligin:%s`, fileName)
  205. }
  206. if 0 == len(this.CustomProperty) {
  207. this.CustomProperty = make(map[string]*uiSink)
  208. }
  209. this.CustomProperty[pluginName] = data
  210. return nil
  211. }
  212. func (this *uiSinks) setBasePropertry(pluginName string) error {
  213. sinkMetadata := g_sinkMetadata
  214. data := sinkMetadata[baseProperty+".json"]
  215. if nil == data {
  216. return fmt.Errorf(`not found pligin:%s`, baseProperty)
  217. }
  218. if 0 == len(this.BaseProperty) {
  219. this.BaseProperty = make(map[string]*uiSink)
  220. }
  221. this.BaseProperty[pluginName] = data
  222. return nil
  223. }
  224. func (this *uiSinks) setBaseOption() error {
  225. sinkMetadata := g_sinkMetadata
  226. data := sinkMetadata[baseOption+".json"]
  227. if nil == data {
  228. return fmt.Errorf(`not found pligin:%s`, baseOption)
  229. }
  230. this.BaseOption = data
  231. return nil
  232. }
  233. func (this *uiSinks) hintWhenNewSink(pluginName string) (err error) {
  234. err = this.setCustomProperty(pluginName)
  235. if nil != err {
  236. return err
  237. }
  238. err = this.setBasePropertry(pluginName)
  239. if nil != err {
  240. return err
  241. }
  242. err = this.setBaseOption()
  243. return err
  244. }
  245. func modifyCustom(uiFields []*field, ruleFields map[string]interface{}) (err error) {
  246. for i, ui := range uiFields {
  247. ruleVal := ruleFields[ui.Name]
  248. if nil == ruleVal {
  249. continue
  250. }
  251. if reflect.Map == reflect.TypeOf(ruleVal).Kind() {
  252. var auxRuleFields map[string]interface{}
  253. if err := common.MapToStruct(ruleVal, &auxRuleFields); nil != err {
  254. return fmt.Errorf("%s:%v", ui.Name, err)
  255. }
  256. var auxUiFields []*field
  257. if err := common.MapToStruct(ui.Default, &auxUiFields); nil != err {
  258. return fmt.Errorf("%s:%v", ui.Name, err)
  259. }
  260. uiFields[i].Default = auxUiFields
  261. if err := modifyCustom(auxUiFields, auxRuleFields); nil != err {
  262. return err
  263. }
  264. } else {
  265. uiFields[i].Default = ruleVal
  266. }
  267. }
  268. return nil
  269. }
  270. func (this *uiSink) modifyBase(mapFields map[string]interface{}) (err error) {
  271. for i, field := range this.Fields {
  272. fieldVal := mapFields[field.Name]
  273. if nil != fieldVal {
  274. this.Fields[i].Default = fieldVal
  275. }
  276. }
  277. return nil
  278. }
  279. func (this *uiSinks) modifyProperty(pluginName string, mapFields map[string]interface{}) (err error) {
  280. custom := this.CustomProperty[pluginName]
  281. if nil == custom {
  282. return fmt.Errorf(`not found pligin:%s`, pluginName)
  283. }
  284. if err = modifyCustom(custom.Fields, mapFields); nil != err {
  285. return err
  286. }
  287. base := this.BaseProperty[pluginName]
  288. if nil == base {
  289. return fmt.Errorf(`not found pligin:%s`, pluginName)
  290. }
  291. if err = base.modifyBase(mapFields); nil != err {
  292. return err
  293. }
  294. return nil
  295. }
  296. func (this *uiSinks) modifyOption(option *api.RuleOption) {
  297. baseOption := this.BaseOption
  298. if nil == baseOption {
  299. return
  300. }
  301. for i, field := range baseOption.Fields {
  302. switch field.Name {
  303. case `isEventTime`:
  304. baseOption.Fields[i].Default = option.IsEventTime
  305. case `lateTol`:
  306. baseOption.Fields[i].Default = option.LateTol
  307. case `concurrency`:
  308. baseOption.Fields[i].Default = option.Concurrency
  309. case `bufferLength`:
  310. baseOption.Fields[i].Default = option.BufferLength
  311. case `sendMetaToSink`:
  312. baseOption.Fields[i].Default = option.SendMetaToSink
  313. case `qos`:
  314. baseOption.Fields[i].Default = option.Qos
  315. case `checkpointInterval`:
  316. baseOption.Fields[i].Default = option.CheckpointInterval
  317. }
  318. }
  319. }
  320. func (this *uiSinks) hintWhenModifySink(rule *api.Rule) (err error) {
  321. for _, m := range rule.Actions {
  322. for pluginName, sink := range m {
  323. mapFields, _ := sink.(map[string]interface{})
  324. err = this.hintWhenNewSink(pluginName)
  325. if nil != err {
  326. return err
  327. }
  328. this.modifyProperty(pluginName, mapFields)
  329. }
  330. }
  331. this.modifyOption(rule.Options)
  332. return nil
  333. }
  334. func GetSinkMeta(pluginName string, rule *api.Rule) (ptrSinkProperty *uiSinks, err error) {
  335. ptrSinkProperty = new(uiSinks)
  336. if nil == rule {
  337. err = ptrSinkProperty.hintWhenNewSink(pluginName)
  338. } else {
  339. err = ptrSinkProperty.hintWhenModifySink(rule)
  340. }
  341. return ptrSinkProperty, err
  342. }
  343. type pluginfo struct {
  344. Name string `json:"name"`
  345. About *about `json:"about"`
  346. }
  347. func GetSinks() (sinks []*pluginfo) {
  348. sinkMeta := g_sinkMetadata
  349. for fileName, v := range sinkMeta {
  350. if fileName == baseProperty+".json" || fileName == baseOption+".json" {
  351. continue
  352. }
  353. node := new(pluginfo)
  354. node.Name = strings.TrimSuffix(fileName, `.json`)
  355. node.About = v.About
  356. i := 0
  357. for ; i < len(sinks); i++ {
  358. if node.Name <= sinks[i].Name {
  359. sinks = append(sinks, node)
  360. copy(sinks[i+1:], sinks[i:])
  361. sinks[i] = node
  362. break
  363. }
  364. }
  365. if len(sinks) == i {
  366. sinks = append(sinks, node)
  367. }
  368. }
  369. return sinks
  370. }