sinkMeta.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package plugins
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "io/ioutil"
  7. "path"
  8. "reflect"
  9. "strings"
  10. )
  11. const (
  12. baseProperty = `properties`
  13. baseOption = `options`
  14. )
  15. type (
  16. author struct {
  17. Name string `json:"name"`
  18. Email string `json:"email"`
  19. Company string `json:"company"`
  20. Website string `json:"website"`
  21. }
  22. fileLanguage struct {
  23. English string `json:"en_US"`
  24. Chinese string `json:"zh_CN"`
  25. }
  26. fileField struct {
  27. Name string `json:"name"`
  28. Default interface{} `json:"default"`
  29. Control string `json:"control"`
  30. Optional bool `json:"optional"`
  31. Type string `json:"type"`
  32. Hint *fileLanguage `json:"hint"`
  33. Label *fileLanguage `json:"label"`
  34. Values interface{} `json:"values"`
  35. }
  36. fileAbout struct {
  37. Trial bool `json:"trial"`
  38. Author *author `json:"author"`
  39. HelpUrl *fileLanguage `json:"helpUrl"`
  40. Description *fileLanguage `json:"description"`
  41. }
  42. fileSink struct {
  43. About *fileAbout `json:"about"`
  44. Libs []string `json:"libs"`
  45. Fields []*fileField `json:"properties"`
  46. }
  47. language struct {
  48. English string `json:"en"`
  49. Chinese string `json:"zh"`
  50. }
  51. about struct {
  52. Trial bool `json:"trial"`
  53. Author *author `json:"author"`
  54. HelpUrl *language `json:"helpUrl"`
  55. Description *language `json:"description"`
  56. }
  57. field struct {
  58. Exist bool `json:"exist"`
  59. Name string `json:"name"`
  60. Default interface{} `json:"default"`
  61. Type string `json:"type"`
  62. Control string `json:"control"`
  63. Optional bool `json:"optional"`
  64. Values interface{} `json:"values"`
  65. Hint *language `json:"hint"`
  66. Label *language `json:"label"`
  67. }
  68. uiSink struct {
  69. About *about `json:"about"`
  70. Libs []string `json:"libs"`
  71. Fields []*field `json:"properties"`
  72. }
  73. uiSinks struct {
  74. CustomProperty map[string]*uiSink `json:"customProperty"`
  75. BaseProperty map[string]*uiSink `json:"baseProperty"`
  76. BaseOption *uiSink `json:"baseOption"`
  77. }
  78. )
  79. //const internal sinks
  80. var InternalSinks = [...]string{"log", "mqtt", "rest", "nop", "edgex"}
  81. func newLanguage(fi *fileLanguage) *language {
  82. if nil == fi {
  83. return nil
  84. }
  85. ui := new(language)
  86. ui.English = fi.English
  87. ui.Chinese = fi.Chinese
  88. return ui
  89. }
  90. func newField(fis []*fileField) (uis []*field, err error) {
  91. for _, fi := range fis {
  92. if nil == fi {
  93. continue
  94. }
  95. ui := new(field)
  96. uis = append(uis, ui)
  97. ui.Name = fi.Name
  98. ui.Type = fi.Type
  99. ui.Control = fi.Control
  100. ui.Optional = fi.Optional
  101. ui.Values = fi.Values
  102. ui.Hint = newLanguage(fi.Hint)
  103. ui.Label = newLanguage(fi.Label)
  104. if nil != fi.Default && reflect.Slice == reflect.TypeOf(fi.Default).Kind() {
  105. var auxFi []*fileField
  106. if err = common.MapToStruct(fi.Default, &auxFi); nil != err {
  107. return nil, err
  108. }
  109. if ui.Default, err = newField(auxFi); nil != err {
  110. return nil, err
  111. }
  112. } else {
  113. ui.Default = fi.Default
  114. }
  115. }
  116. return uis, err
  117. }
  118. func newAbout(fi *fileAbout) *about {
  119. if nil == fi {
  120. return nil
  121. }
  122. ui := new(about)
  123. ui.Trial = fi.Trial
  124. ui.Author = fi.Author
  125. ui.HelpUrl = newLanguage(fi.HelpUrl)
  126. ui.Description = newLanguage(fi.Description)
  127. return ui
  128. }
  129. func newUiSink(fi *fileSink) (*uiSink, error) {
  130. if nil == fi {
  131. return nil, nil
  132. }
  133. var err error
  134. ui := new(uiSink)
  135. ui.Libs = fi.Libs
  136. ui.About = newAbout(fi.About)
  137. ui.Fields, err = newField(fi.Fields)
  138. return ui, err
  139. }
  140. var g_sinkMetadata map[string]*uiSink //map[fileName]
  141. func readSinkMetaDir() error {
  142. confDir, err := common.GetConfLoc()
  143. if nil != err {
  144. return err
  145. }
  146. dir := path.Join(confDir, "sinks")
  147. tmpMap := make(map[string]*uiSink)
  148. //The internal support sinks
  149. for _, sink := range InternalSinks {
  150. file := path.Join(confDir, "sinks", "internal", sink+".json")
  151. common.Log.Infof("Loading metadata file for sink: %s", file)
  152. meta := new(fileSink)
  153. err := common.ReadJsonUnmarshal(file, meta)
  154. if nil != err {
  155. return fmt.Errorf("Failed to load internal sink plugin:%s with err:%v", file, err)
  156. }
  157. tmpMap[sink+".json"], err = newUiSink(meta)
  158. if nil != err {
  159. return err
  160. }
  161. }
  162. files, err := ioutil.ReadDir(dir)
  163. if nil != err {
  164. return err
  165. }
  166. for _, file := range files {
  167. fname := file.Name()
  168. if !strings.HasSuffix(fname, ".json") {
  169. continue
  170. }
  171. filePath := path.Join(dir, fname)
  172. metadata := new(fileSink)
  173. err = common.ReadJsonUnmarshal(filePath, metadata)
  174. if nil != err {
  175. return fmt.Errorf("fname:%s err:%v", fname, err)
  176. }
  177. common.Log.Infof("sinkMeta file : %s", fname)
  178. tmpMap[fname], err = newUiSink(metadata)
  179. if nil != err {
  180. return err
  181. }
  182. }
  183. g_sinkMetadata = tmpMap
  184. return nil
  185. }
  186. func readSinkMetaFile(filePath string) error {
  187. ptrMetadata := new(fileSink)
  188. err := common.ReadJsonUnmarshal(filePath, ptrMetadata)
  189. if nil != err {
  190. return fmt.Errorf("filePath:%s err:%v", filePath, err)
  191. }
  192. sinkMetadata := g_sinkMetadata
  193. tmpMap := make(map[string]*uiSink)
  194. for k, v := range sinkMetadata {
  195. tmpMap[k] = v
  196. }
  197. fileName := path.Base(filePath)
  198. common.Log.Infof("sinkMeta file : %s", fileName)
  199. tmpMap[fileName], err = newUiSink(ptrMetadata)
  200. if nil != err {
  201. return err
  202. }
  203. g_sinkMetadata = tmpMap
  204. return nil
  205. }
  206. func (this *uiSinks) setCustomProperty(pluginName string) error {
  207. fileName := pluginName + `.json`
  208. sinkMetadata := g_sinkMetadata
  209. data := sinkMetadata[fileName]
  210. if nil == data {
  211. return fmt.Errorf(`not found pligin:%s`, fileName)
  212. }
  213. if 0 == len(this.CustomProperty) {
  214. this.CustomProperty = make(map[string]*uiSink)
  215. }
  216. this.CustomProperty[pluginName] = data
  217. return nil
  218. }
  219. func (this *uiSinks) setBasePropertry(pluginName string) error {
  220. sinkMetadata := g_sinkMetadata
  221. data := sinkMetadata[baseProperty+".json"]
  222. if nil == data {
  223. return fmt.Errorf(`not found pligin:%s`, baseProperty)
  224. }
  225. if 0 == len(this.BaseProperty) {
  226. this.BaseProperty = make(map[string]*uiSink)
  227. }
  228. this.BaseProperty[pluginName] = data
  229. return nil
  230. }
  231. func (this *uiSinks) setBaseOption() error {
  232. sinkMetadata := g_sinkMetadata
  233. data := sinkMetadata[baseOption+".json"]
  234. if nil == data {
  235. return fmt.Errorf(`not found pligin:%s`, baseOption)
  236. }
  237. this.BaseOption = data
  238. return nil
  239. }
  240. func (this *uiSinks) hintWhenNewSink(pluginName string) (err error) {
  241. err = this.setCustomProperty(pluginName)
  242. if nil != err {
  243. return err
  244. }
  245. err = this.setBasePropertry(pluginName)
  246. if nil != err {
  247. return err
  248. }
  249. err = this.setBaseOption()
  250. return err
  251. }
  252. func modifyCustom(uiFields []*field, ruleFields map[string]interface{}) (err error) {
  253. for i, ui := range uiFields {
  254. ruleVal := ruleFields[ui.Name]
  255. if nil == ruleVal {
  256. continue
  257. }
  258. if reflect.Map == reflect.TypeOf(ruleVal).Kind() {
  259. var auxRuleFields map[string]interface{}
  260. if err := common.MapToStruct(ruleVal, &auxRuleFields); nil != err {
  261. return fmt.Errorf("%s:%v", ui.Name, err)
  262. }
  263. var auxUiFields []*field
  264. if err := common.MapToStruct(ui.Default, &auxUiFields); nil != err {
  265. return fmt.Errorf("%s:%v", ui.Name, err)
  266. }
  267. uiFields[i].Default = auxUiFields
  268. if err := modifyCustom(auxUiFields, auxRuleFields); nil != err {
  269. return err
  270. }
  271. } else {
  272. uiFields[i].Default = ruleVal
  273. }
  274. }
  275. return nil
  276. }
  277. func (this *uiSink) modifyBase(mapFields map[string]interface{}) (err error) {
  278. for i, field := range this.Fields {
  279. fieldVal := mapFields[field.Name]
  280. if nil != fieldVal {
  281. this.Fields[i].Default = fieldVal
  282. }
  283. }
  284. return nil
  285. }
  286. func (this *uiSinks) modifyProperty(pluginName string, mapFields map[string]interface{}) (err error) {
  287. custom := this.CustomProperty[pluginName]
  288. if nil == custom {
  289. return fmt.Errorf(`not found pligin:%s`, pluginName)
  290. }
  291. if err = modifyCustom(custom.Fields, mapFields); nil != err {
  292. return err
  293. }
  294. base := this.BaseProperty[pluginName]
  295. if nil == base {
  296. return fmt.Errorf(`not found pligin:%s`, pluginName)
  297. }
  298. if err = base.modifyBase(mapFields); nil != err {
  299. return err
  300. }
  301. return nil
  302. }
  303. func (this *uiSinks) modifyOption(option *api.RuleOption) {
  304. baseOption := this.BaseOption
  305. if nil == baseOption {
  306. return
  307. }
  308. for i, field := range baseOption.Fields {
  309. switch field.Name {
  310. case `isEventTime`:
  311. baseOption.Fields[i].Default = option.IsEventTime
  312. case `lateTol`:
  313. baseOption.Fields[i].Default = option.LateTol
  314. case `concurrency`:
  315. baseOption.Fields[i].Default = option.Concurrency
  316. case `bufferLength`:
  317. baseOption.Fields[i].Default = option.BufferLength
  318. case `sendMetaToSink`:
  319. baseOption.Fields[i].Default = option.SendMetaToSink
  320. case `qos`:
  321. baseOption.Fields[i].Default = option.Qos
  322. case `checkpointInterval`:
  323. baseOption.Fields[i].Default = option.CheckpointInterval
  324. }
  325. }
  326. }
  327. func (this *uiSinks) hintWhenModifySink(rule *api.Rule) (err error) {
  328. for _, m := range rule.Actions {
  329. for pluginName, sink := range m {
  330. mapFields, _ := sink.(map[string]interface{})
  331. err = this.hintWhenNewSink(pluginName)
  332. if nil != err {
  333. return err
  334. }
  335. this.modifyProperty(pluginName, mapFields)
  336. }
  337. }
  338. this.modifyOption(rule.Options)
  339. return nil
  340. }
  341. func GetSinkMeta(pluginName string, rule *api.Rule) (ptrSinkProperty *uiSinks, err error) {
  342. ptrSinkProperty = new(uiSinks)
  343. if nil == rule {
  344. err = ptrSinkProperty.hintWhenNewSink(pluginName)
  345. } else {
  346. err = ptrSinkProperty.hintWhenModifySink(rule)
  347. }
  348. return ptrSinkProperty, err
  349. }
  350. type pluginfo struct {
  351. Name string `json:"name"`
  352. About *about `json:"about"`
  353. }
  354. func GetSinks() (sinks []*pluginfo) {
  355. sinkMeta := g_sinkMetadata
  356. for fileName, v := range sinkMeta {
  357. if fileName == baseProperty+".json" || fileName == baseOption+".json" {
  358. continue
  359. }
  360. node := new(pluginfo)
  361. node.Name = strings.TrimSuffix(fileName, `.json`)
  362. node.About = v.About
  363. sinks = append(sinks, node)
  364. }
  365. return sinks
  366. }