|
@@ -3,18 +3,14 @@ package plugins
|
|
|
import (
|
|
|
"fmt"
|
|
|
"github.com/emqx/kuiper/common"
|
|
|
- "github.com/emqx/kuiper/xstream/api"
|
|
|
"io/ioutil"
|
|
|
"path"
|
|
|
- "reflect"
|
|
|
"strings"
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
- baseProperty = `properties`
|
|
|
- baseOption = `options`
|
|
|
- sink = `sink`
|
|
|
- source = `source`
|
|
|
+ sink = `sink`
|
|
|
+ source = `source`
|
|
|
)
|
|
|
|
|
|
type (
|
|
@@ -76,12 +72,10 @@ type (
|
|
|
uiSink struct {
|
|
|
About *about `json:"about"`
|
|
|
Libs []string `json:"libs"`
|
|
|
- Fields []field `json:"properties"` // mutable, so each sink must have its own copy
|
|
|
+ Fields []field `json:"properties"`
|
|
|
}
|
|
|
uiSinks struct {
|
|
|
- CustomProperty map[string]uiSink `json:"customProperty"`
|
|
|
- BaseProperty map[string]uiSink `json:"baseProperty"`
|
|
|
- BaseOption uiSink `json:"baseOption"`
|
|
|
+ CustomProperty map[string]*uiSink `json:"customProperty"`
|
|
|
language string
|
|
|
}
|
|
|
)
|
|
@@ -158,7 +152,7 @@ func newUiSink(fi *fileSink) (*uiSink, error) {
|
|
|
return ui, err
|
|
|
}
|
|
|
|
|
|
-var g_sinkMetadata map[string]*uiSink //map[fileName]
|
|
|
+var g_sinkMetadata map[string]*uiSink //immutable
|
|
|
func (m *Manager) readSinkMetaDir() error {
|
|
|
g_sinkMetadata = make(map[string]*uiSink)
|
|
|
confDir, err := common.GetConfLoc()
|
|
@@ -200,14 +194,12 @@ func (m *Manager) readSinkMetaFile(filePath string) error {
|
|
|
if nil != err {
|
|
|
return fmt.Errorf("filePath:%s err:%v", filePath, err)
|
|
|
}
|
|
|
- if pluginName != baseProperty && pluginName != baseOption {
|
|
|
- if nil == metadata.About {
|
|
|
- return fmt.Errorf("not found about of %s", finame)
|
|
|
- } else if isInternalSink(finame) {
|
|
|
- metadata.About.Installed = true
|
|
|
- } else {
|
|
|
- _, metadata.About.Installed = m.registry.Get(SINK, pluginName)
|
|
|
- }
|
|
|
+ if nil == metadata.About {
|
|
|
+ return fmt.Errorf("not found about of %s", finame)
|
|
|
+ } else if isInternalSink(finame) {
|
|
|
+ metadata.About.Installed = true
|
|
|
+ } else {
|
|
|
+ _, metadata.About.Installed = m.registry.Get(SINK, pluginName)
|
|
|
}
|
|
|
g_sinkMetadata[finame], err = newUiSink(metadata)
|
|
|
if nil != err {
|
|
@@ -225,130 +217,14 @@ func (us *uiSinks) setCustomProperty(pluginName string) error {
|
|
|
return fmt.Errorf(`%s%s`, getMsg(us.language, sink, "not_found_plugin"), pluginName)
|
|
|
}
|
|
|
if 0 == len(us.CustomProperty) {
|
|
|
- us.CustomProperty = make(map[string]uiSink)
|
|
|
- }
|
|
|
- us.CustomProperty[pluginName] = data.clone()
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func (us *uiSinks) setBasePropertry(pluginName string) error {
|
|
|
- sinkMetadata := g_sinkMetadata
|
|
|
- data := sinkMetadata[baseProperty+".json"]
|
|
|
- if nil == data {
|
|
|
- return fmt.Errorf(`%s%s`, getMsg(us.language, sink, "not_found_plugin"), baseProperty)
|
|
|
- }
|
|
|
- if 0 == len(us.BaseProperty) {
|
|
|
- us.BaseProperty = make(map[string]uiSink)
|
|
|
+ us.CustomProperty = make(map[string]*uiSink)
|
|
|
}
|
|
|
- us.BaseProperty[pluginName] = data.clone()
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func (us *uiSinks) setBaseOption() error {
|
|
|
- sinkMetadata := g_sinkMetadata
|
|
|
- data := sinkMetadata[baseOption+".json"]
|
|
|
- if nil == data {
|
|
|
- return fmt.Errorf(`%s%s`, getMsg(us.language, sink, "not_found_plugin"), baseOption)
|
|
|
- }
|
|
|
- us.BaseOption = data.clone()
|
|
|
+ us.CustomProperty[pluginName] = data
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
func (us *uiSinks) hintWhenNewSink(pluginName string) (err error) {
|
|
|
- err = us.setCustomProperty(pluginName)
|
|
|
- if nil != err {
|
|
|
- return err
|
|
|
- }
|
|
|
- err = us.setBasePropertry(pluginName)
|
|
|
- if nil != err {
|
|
|
- return err
|
|
|
- }
|
|
|
- err = us.setBaseOption()
|
|
|
- return err
|
|
|
-}
|
|
|
-
|
|
|
-func (us *uiSinks) modifyCustom(uiFields []field, ruleFields map[string]interface{}) (err error) {
|
|
|
- for i, ui := range uiFields {
|
|
|
- ruleVal := ruleFields[ui.Name]
|
|
|
- if nil == ruleVal {
|
|
|
- continue
|
|
|
- }
|
|
|
- if reflect.Map == reflect.TypeOf(ruleVal).Kind() && "object" != ui.Type {
|
|
|
- var auxRuleFields map[string]interface{}
|
|
|
- if err := common.MapToStruct(ruleVal, &auxRuleFields); nil != err {
|
|
|
- return fmt.Errorf(`%s%v %s`, getMsg(us.language, sink, "type_conversion_fail"), err, ui.Name)
|
|
|
- }
|
|
|
- var auxUiFields []field
|
|
|
- if err := common.MapToStruct(ui.Default, &auxUiFields); nil != err {
|
|
|
- return fmt.Errorf(`%s%v %s`, getMsg(us.language, sink, "type_conversion_fail"), err, ui.Name)
|
|
|
- }
|
|
|
- uiFields[i].Default = auxUiFields
|
|
|
- if err := us.modifyCustom(auxUiFields, auxRuleFields); nil != err {
|
|
|
- return err
|
|
|
- }
|
|
|
- } else {
|
|
|
- uiFields[i].Default = ruleVal
|
|
|
- }
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func (u *uiSink) clone() (c uiSink) {
|
|
|
- c.About = u.About
|
|
|
- c.Libs = u.Libs
|
|
|
- c.Fields = make([]field, len(u.Fields))
|
|
|
- for i, f := range u.Fields {
|
|
|
- c.Fields[i] = f
|
|
|
- }
|
|
|
- return
|
|
|
-}
|
|
|
-
|
|
|
-func (u *uiSink) modifyBase(mapFields map[string]interface{}) {
|
|
|
- for i, field := range u.Fields {
|
|
|
- fieldVal := mapFields[field.Name]
|
|
|
- if nil != fieldVal {
|
|
|
- u.Fields[i].Default = fieldVal
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (us *uiSinks) modifyProperty(pluginName string, mapFields map[string]interface{}) (err error) {
|
|
|
- custom, ok := us.CustomProperty[pluginName]
|
|
|
- if !ok {
|
|
|
- return fmt.Errorf(`%s%s`, getMsg(us.language, sink, "not_found_plugin"), pluginName)
|
|
|
- }
|
|
|
- if err = us.modifyCustom(custom.Fields, mapFields); nil != err {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- base, ok := us.BaseProperty[pluginName]
|
|
|
- if !ok {
|
|
|
- return fmt.Errorf(`%s%s`, getMsg(us.language, sink, "not_found_plugin"), pluginName)
|
|
|
- }
|
|
|
- base.modifyBase(mapFields)
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func (us *uiSinks) modifyOption(option *api.RuleOption) {
|
|
|
- baseOption := us.BaseOption
|
|
|
- for i, field := range baseOption.Fields {
|
|
|
- switch field.Name {
|
|
|
- case `isEventTime`:
|
|
|
- baseOption.Fields[i].Default = option.IsEventTime
|
|
|
- case `lateTol`:
|
|
|
- baseOption.Fields[i].Default = option.LateTol
|
|
|
- case `concurrency`:
|
|
|
- baseOption.Fields[i].Default = option.Concurrency
|
|
|
- case `bufferLength`:
|
|
|
- baseOption.Fields[i].Default = option.BufferLength
|
|
|
- case `sendMetaToSink`:
|
|
|
- baseOption.Fields[i].Default = option.SendMetaToSink
|
|
|
- case `qos`:
|
|
|
- baseOption.Fields[i].Default = option.Qos
|
|
|
- case `checkpointInterval`:
|
|
|
- baseOption.Fields[i].Default = option.CheckpointInterval
|
|
|
- }
|
|
|
- }
|
|
|
+ return us.setCustomProperty(pluginName)
|
|
|
}
|
|
|
|
|
|
func GetSinkMeta(pluginName, language string) (ptrSinkProperty *uiSinks, err error) {
|
|
@@ -366,9 +242,6 @@ type pluginfo struct {
|
|
|
func GetSinks() (sinks []*pluginfo) {
|
|
|
sinkMeta := g_sinkMetadata
|
|
|
for fileName, v := range sinkMeta {
|
|
|
- if fileName == baseProperty+".json" || fileName == baseOption+".json" {
|
|
|
- continue
|
|
|
- }
|
|
|
node := new(pluginfo)
|
|
|
node.Name = strings.TrimSuffix(fileName, `.json`)
|
|
|
node.About = v.About
|