Ver código fonte

feat():add sink hint (#414)

* feat():add sink hint

* feat():add sink hint

* feat():add metal

* feat():format

* feat():format json

* feat():add metadata log
EMQmyd 4 anos atrás
pai
commit
22e2846851

+ 13 - 1
plugins/manager.go

@@ -212,6 +212,7 @@ func NewPluginManager() (*Manager, error) {
 			etcDir:    etcDir,
 			registry:  registry,
 		}
+		outerErr = singleton.readMetadataDir(path.Join(dir, "sinks"))
 	})
 	return singleton, outerErr
 }
@@ -274,7 +275,7 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 		}
 		return fmt.Errorf("fail to unzip file %s: %s", uri, err)
 	}
-
+	m.readMetadataFile(path.Join(m.pluginDir, PluginTypes[t], name+`.json`))
 	m.registry.Store(t, name, version)
 	return nil
 }
@@ -295,6 +296,11 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
 	if t == SOURCE {
 		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
 	}
+	if t == SINK {
+		//m.delMetadata(name)
+		metadataFile := path.Join(m.pluginDir, "sinks", name+".json")
+		os.Remove(metadataFile)
+	}
 	for _, p := range paths {
 		_, err := os.Stat(p)
 		if err == nil {
@@ -372,6 +378,12 @@ func (m *Manager) install(t PluginType, name string, src string) ([]string, stri
 				return filenames, "", err
 			}
 			filenames = append(filenames, yamlPath)
+		} else if fileName == name+".json" {
+			soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
+			err = unzipTo(file, soPath)
+			if err != nil {
+				return filenames, "", err
+			}
 		} else if soPrefix.Match([]byte(fileName)) {
 			soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
 			err = unzipTo(file, soPath)

+ 301 - 0
plugins/metadata.go

@@ -0,0 +1,301 @@
+package plugins
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/api"
+	"io/ioutil"
+	"path"
+	"strings"
+)
+
+const (
+	baseProperty = `properies`
+	baseOption   = `options`
+)
+
+type (
+	author struct {
+		Name    string `json:"name"`
+		Email   string `json:"email"`
+		Company string `json:"company"`
+		Website string `json:"website"`
+	}
+	language struct {
+		English string `json:"en_US"`
+		Chinese string `json:"zh_CN"`
+	}
+	field struct {
+		Optional bool        `json:"optional"`
+		Name     string      `json:"name"`
+		Control  string      `json:"control"`
+		Type     string      `json:"type"`
+		Hint     *language   `json:"hint"`
+		Label    *language   `json:"label"`
+		Default  interface{} `json:"default"`
+		Values   interface{} `json:"values"`
+	}
+	metadata struct {
+		Author  *author   `json:"author"`
+		HelpUrl *language `json:"helpUrl"`
+		Fields  []*field  `json:"properties"`
+		Libs    []string  `json:"libs"`
+	}
+)
+
+var g_sinkMetadata map[string]*metadata //map[fileName]
+func (this *Manager) delMetadata(pluginName string) {
+	sinkMetadata := g_sinkMetadata
+	if _, ok := sinkMetadata[pluginName]; !ok {
+		return
+	}
+	tmp := make(map[string]*metadata)
+	fileName := pluginName + `.json`
+	for k, v := range sinkMetadata {
+		if k != fileName {
+			tmp[k] = v
+		}
+	}
+	g_sinkMetadata = tmp
+}
+func (this *Manager) readMetadataDir(dir string) error {
+	tmpMap := make(map[string]*metadata)
+	infos, err := ioutil.ReadDir(dir)
+	if nil != err {
+		return err
+	}
+	//add info log
+	for _, info := range infos {
+		fileName := info.Name()
+		if !strings.HasSuffix(fileName, ".json") {
+			continue
+		}
+
+		filePath := path.Join(dir, fileName)
+		byteContent, err := ioutil.ReadFile(filePath)
+		if nil != err {
+			return err
+		}
+
+		ptrMetadata := new(metadata)
+		err = json.Unmarshal(byteContent, ptrMetadata)
+		if nil != err {
+			return fmt.Errorf("fileName:%s err:%v", fileName, err)
+		}
+		common.Log.Infof("metadata file : %s", fileName)
+		tmpMap[fileName] = ptrMetadata
+	}
+	g_sinkMetadata = tmpMap
+	return nil
+}
+
+func (this *Manager) readMetadataFile(filePath string) error {
+	byteContent, err := ioutil.ReadFile(filePath)
+	if nil != err {
+		return err
+	}
+
+	ptrMetadata := new(metadata)
+	err = json.Unmarshal(byteContent, ptrMetadata)
+	if nil != err {
+		return fmt.Errorf("filePath:%s err:%v", filePath, err)
+	}
+
+	sinkMetadata := g_sinkMetadata
+	tmpMap := make(map[string]*metadata)
+	for k, v := range sinkMetadata {
+		tmpMap[k] = v
+	}
+	fileName := path.Base(filePath)
+	common.Log.Infof("metadata file : %s", fileName)
+	tmpMap[fileName] = ptrMetadata
+	g_sinkMetadata = tmpMap
+
+	return nil
+}
+
+type (
+	sinkLanguage struct {
+		English string `json:"en"`
+		Chinese string `json:"zh"`
+	}
+	sinkField struct {
+		Name     string        `json:"name"`
+		Default  interface{}   `json:"default"`
+		Control  string        `json:"control"`
+		Optional bool          `json:"optional"`
+		Type     string        `json:"type"`
+		Hint     *sinkLanguage `json:"hint"`
+		Label    *sinkLanguage `json:"label"`
+		Values   interface{}   `json:"values"`
+	}
+	sinkPropertyNode struct {
+		Fields  []*sinkField  `json:"properties"`
+		HelpUrl *sinkLanguage `json:"helpUrl"`
+		Libs    []string      `json:"libs"`
+	}
+	sinkProperty struct {
+		CustomProperty map[string]*sinkPropertyNode `json:"customProperty"`
+		BaseProperty   map[string]*sinkPropertyNode `json:"baseProperty"`
+		BaseOption     *sinkPropertyNode            `json:"baseOption"`
+	}
+)
+
+func (this *sinkLanguage) set(l *language) {
+	this.English = l.English
+	this.Chinese = l.Chinese
+}
+func (this *sinkField) setSinkField(v *field) {
+	this.Name = v.Name
+	this.Type = v.Type
+	this.Default = v.Default
+	this.Values = v.Values
+	this.Control = v.Control
+	this.Optional = v.Optional
+	this.Hint = new(sinkLanguage)
+	this.Hint.set(v.Hint)
+	this.Label = new(sinkLanguage)
+	this.Label.set(v.Label)
+}
+
+func (this *sinkPropertyNode) setNodeFromMetal(data *metadata) {
+	this.Libs = data.Libs
+	if nil != data.HelpUrl {
+		this.HelpUrl = new(sinkLanguage)
+		this.HelpUrl.set(data.HelpUrl)
+	}
+	for _, v := range data.Fields {
+		field := new(sinkField)
+		field.setSinkField(v)
+		this.Fields = append(this.Fields, field)
+	}
+}
+
+func (this *sinkProperty) setCustomProperty(pluginName string) error {
+	fileName := pluginName + `.json`
+	sinkMetadata := g_sinkMetadata
+	data := sinkMetadata[fileName]
+	if nil == data {
+		return fmt.Errorf(`not find pligin:%s`, fileName)
+	}
+	node := new(sinkPropertyNode)
+	node.setNodeFromMetal(data)
+	if 0 == len(this.CustomProperty) {
+		this.CustomProperty = make(map[string]*sinkPropertyNode)
+	}
+	this.CustomProperty[pluginName] = node
+	return nil
+}
+
+func (this *sinkProperty) setBasePropertry(pluginName string) error {
+	sinkMetadata := g_sinkMetadata
+	data := sinkMetadata[baseProperty+".json"]
+	if nil == data {
+		return fmt.Errorf(`not find pligin:%s`, baseProperty)
+	}
+	node := new(sinkPropertyNode)
+	node.setNodeFromMetal(data)
+	if 0 == len(this.BaseProperty) {
+		this.BaseProperty = make(map[string]*sinkPropertyNode)
+	}
+	this.BaseProperty[pluginName] = node
+	return nil
+}
+
+func (this *sinkProperty) setBaseOption() error {
+	sinkMetadata := g_sinkMetadata
+	data := sinkMetadata[baseOption+".json"]
+	if nil == data {
+		return fmt.Errorf(`not find pligin:%s`, baseOption)
+	}
+	node := new(sinkPropertyNode)
+	node.setNodeFromMetal(data)
+	this.BaseOption = node
+	return nil
+}
+
+func (this *sinkProperty) hintWhenNewSink(pluginName string) (err error) {
+	err = this.setCustomProperty(pluginName)
+	if nil != err {
+		return err
+	}
+	err = this.setBasePropertry(pluginName)
+	if nil != err {
+		return err
+	}
+	err = this.setBaseOption()
+	return err
+}
+
+func (this *sinkPropertyNode) modifyPropertyNode(mapFields map[string]interface{}) (err error) {
+	for i, field := range this.Fields {
+		fieldVal := mapFields[field.Name]
+		if nil != fieldVal {
+			this.Fields[i].Default = fieldVal
+		}
+	}
+	return nil
+}
+func (this *sinkProperty) modifyProperty(pluginName string, mapFields map[string]interface{}) (err error) {
+	customProperty := this.CustomProperty[pluginName]
+	if nil != customProperty {
+		customProperty.modifyPropertyNode(mapFields)
+	}
+
+	baseProperty := this.BaseProperty[pluginName]
+	if nil != baseProperty {
+		baseProperty.modifyPropertyNode(mapFields)
+	}
+
+	return nil
+}
+
+func (this *sinkProperty) modifyOption(option *api.RuleOption) {
+	baseOption := this.BaseOption
+	if nil == baseOption {
+		return
+	}
+	for i, field := range baseOption.Fields {
+		switch field.Name {
+		case `isEventTime`:
+			baseOption.Fields[i].Default = option.IsEventTime
+		case `lateTol`:
+			baseOption.Fields[i].Default = option.LateTol
+		case `concurrency`:
+			baseOption.Fields[i].Default = option.Concurrency
+		case `bufferLength`:
+			baseOption.Fields[i].Default = option.BufferLength
+		case `sendMetaToSink`:
+			baseOption.Fields[i].Default = option.SendMetaToSink
+		case `qos`:
+			baseOption.Fields[i].Default = option.Qos
+		case `checkpointInterval`:
+			baseOption.Fields[i].Default = option.CheckpointInterval
+		}
+	}
+}
+func (this *sinkProperty) hintWhenModifySink(rule *api.Rule) (err error) {
+	for _, m := range rule.Actions {
+		for pluginName, sink := range m {
+			mapFields, _ := sink.(map[string]interface{})
+			err = this.hintWhenNewSink(pluginName)
+			if nil != err {
+				return err
+			}
+			this.modifyProperty(pluginName, mapFields)
+		}
+	}
+	this.modifyOption(rule.Options)
+	return nil
+}
+
+func (this *Manager) Metadata(pluginName string, rule *api.Rule) (ptrSinkProperty *sinkProperty, err error) {
+	ptrSinkProperty = new(sinkProperty)
+	if nil == rule {
+		err = ptrSinkProperty.hintWhenNewSink(pluginName)
+	} else {
+		err = ptrSinkProperty.hintWhenModifySink(rule)
+	}
+	return ptrSinkProperty, err
+}

+ 41 - 0
plugins/sinks/file.json

@@ -0,0 +1,41 @@
+{
+	"author": {
+		"name": "",
+		"email": "",
+		"company": "EMQ Technologies Co., Ltd",
+		"website": "https://www.emqx.io"
+	},
+	"helpUrl": {
+		"en_US": "https://github.com/emqx/kuiper/blob/dev/0.9.1/docs/en_US/plugins/sinks/file.md",
+		"zh_CN": "https://github.com/emqx/kuiper/blob/dev/0.9.1/docs/zh_CN/plugins/sinks/file.md"
+	},
+	"properties": [{
+		"name": "path",
+		"default": "",
+		"optional": false,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The file path for saving the result",
+			"zh_CN": "保存结果的文件路径"
+		},
+		"label": {
+			"en_US": "Path of file",
+			"zh_CN": "文件路径"
+		}
+	}, {
+		"name": "interval",
+		"default": 1000,
+		"optional": true,
+		"control": "text",
+		"type": "int",
+		"hint": {
+			"en_US": "The time interval (ms) for writing the analysis result.",
+			"zh_CN": "写入分析结果的时间间隔(毫秒)。"
+		},
+		"label": {
+			"en_US": "Intervals",
+			"zh_CN": "间隔时间"
+		}
+	}]
+}

+ 125 - 0
plugins/sinks/influxdb.json

@@ -0,0 +1,125 @@
+{
+	"author": {
+		"name": "",
+		"email": "",
+		"company": "EMQ Technologies Co., Ltd",
+		"website": "https://www.emqx.io"
+	},
+	"helpUrl": {
+		"en_US": "https://github.com/emqx/kuiper/blob/dev/0.9.1/docs/en_US/plugins/sinks/influxdb.md",
+		"zh_CN": "https://github.com/emqx/kuiper/blob/dev/0.9.1/docs/zh_CN/plugins/sinks/influxdb.md"
+	},
+	"properties": [{
+		"name": "addr",
+		"default": "",
+		"optional": true,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The addr of the InfluxDB",
+			"zh_CN": "InfluxDB的地址"
+		},
+		"label": {
+			"en_US": "Addr",
+			"zh_CN": "地址"
+		}
+	}, {
+		"name": "measurement",
+		"default": "",
+		"optional": true,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The measurement of the InfluxDb",
+			"zh_CN": "InfluxDb的测量"
+		},
+		"label": {
+			"en_US": "measurement",
+			"zh_CN": "测量"
+		}
+	}, {
+		"name": "username",
+		"default": "",
+		"optional": false,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The InfluxDB login username",
+			"zh_CN": "InfluxDB登陆用户名"
+		},
+		"label": {
+			"en_US": "username",
+			"zh_CN": "用户名"
+		}
+	}, {
+		"name": "password",
+		"default": "",
+		"optional": false,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The InfluxDB login password",
+			"zh_CN": "InfluxDB登陆密码"
+		},
+		"label": {
+			"en_US": "password",
+			"zh_CN": "密码"
+		}
+	}, {
+		"name": "databasename",
+		"default": "",
+		"optional": true,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The database of the InfluxDB",
+			"zh_CN": "InfluxDB的数据库"
+		},
+		"label": {
+			"en_US": "DBbase name",
+			"zh_CN": "数据库名"
+		}
+	}, {
+		"name": "tagkey",
+		"default": "",
+		"optional": true,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The tag key of the InfluxDB",
+			"zh_CN": "InfluxDB的标签键"
+		},
+		"label": {
+			"en_US": "Tag",
+			"zh_CN": "标签键"
+		}
+	}, {
+		"name": "tagvalue",
+		"default": "",
+		"optional": true,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The tag value of the InfluxDB",
+			"zh_CN": "InfluxDB的标签值"
+		},
+		"label": {
+			"en_US": "Tag value",
+			"zh_CN": "标签值"
+		}
+	}, {
+		"name": "fields",
+		"default": "",
+		"optional": true,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The column of the InfluxDB",
+			"zh_CN": "InfluxDB的列名"
+		},
+		"label": {
+			"en_US": "Column",
+			"zh_CN": "列名"
+		}
+	}]
+}

+ 106 - 0
plugins/sinks/options.json

@@ -0,0 +1,106 @@
+{
+	"helpUrl": {
+		"en_US": "https://github.com/emqx/kuiper/blob/master/docs/en/rules/overview.md",
+		"zh_CN": "https://github.com/emqx/kuiper/blob/master/docs/zh/rules/overview.md"
+	},
+	"properties": [{
+		"name": "isEventTime",
+		"default":false, 
+		"optional": true,
+		"control": "radio",
+		"type": "bool",
+		"hint": {
+			"en_US": "Whether to use event time or processing time as the timestamp for an event. If event time is used, the timestamp will be extracted from the payload. The timestamp filed must be specified by the stream definition.",
+			"zh_CN": "使用事件时间还是将时间用作事件的时间戳。 如果使用事件时间,则将从有效负载中提取时间戳。 必须通过 stream 定义指定时间戳记。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "是否使用时间事件"
+		}
+	}, {
+		"name": "lateTolerance",
+		"default": 0,
+		"optional": true,
+		"control": "text",
+		"type": "int",
+		"hint": {
+			"en_US": "When working with event-time windowing, it can happen that elements arrive late. LateTolerance can specify by how much time(unit is millisecond) elements can be late before they are dropped. By default, the value is 0 which means late elements are dropped.",
+			"zh_CN": "在使用事件时间窗口时,可能会出现元素延迟到达的情况。 LateTolerance 可以指定在删除元素之前可以延迟多少时间(单位为 ms)。 默认情况下,该值为0,表示后期元素将被删除。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "延迟多少毫秒"
+		}
+	}, {
+		"name": "concurrency",
+		"default": 1,
+		"optional": true,
+		"control": "text",
+		"type": "int",
+		"hint": {
+			"en_US": "A rule is processed by several phases of plans according to the sql statement. This option will specify how many instances will be run for each plan. If the value is bigger than 1, the order of the messages may not be retained.",
+			"zh_CN": "一条规则运行时会根据 sql 语句分解成多个 plan 运行。该参数设置每个 plan 运行的线程数。该参数值大于1时,消息处理顺序可能无法保证。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "线程数"
+		}
+	}, {
+		"name": "bufferLength",
+		"default":1024,
+		"optional": true,
+		"control": "text",
+		"type": "int",
+		"hint": {
+			"en_US": "Specify how many messages can be buffered in memory for each plan. If the buffered messages exceed the limit, the plan will block message receiving until the buffered messages have been sent out so that the buffered size is less than the limit. A bigger value will accommodate more throughput but will also take up more memory footprint.",
+			"zh_CN": "指定每个 plan 可缓存消息数。若缓存消息数超过此限制,plan 将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。此选项值越大,则消息吞吐能力越强,但是内存占用也会越多。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "缓存大小"
+		}
+	}, {
+		"name": "sendMetaToSink",
+		"default":false,
+		"optional": true,
+		"control": "radio",
+		"type": "bool",
+		"hint": {
+			"en_US": "Specify whether the meta data of an event will be sent to the sink. If true, the sink can get te meta data information.",
+			"zh_CN": "指定是否将事件的元数据发送到目标。 如果为 true,则目标可以获取元数据信息。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "是否发送元数据"
+		}
+	}, {
+		"name": "qos",
+		"default":0,
+		"optional": true,
+		"control": "select",
+		"type": "list_int",
+    "values":[0,1,2],
+		"hint": {
+			"en_US": "Specify the qos of the stream. The options are 0: At most once; 1: At least once and 2: Exactly once. If qos is bigger than 0, the checkpoint mechanism will be activated to save states periodically so that the rule can be resumed from errors.",
+			"zh_CN": "指定流的 qos。 值为0对应最多一次; 1对应至少一次,2对应恰好一次。 如果 qos 大于0,将激活检查点机制以定期保存状态,以便可以从错误中恢复规则。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "流的 qos"
+		}
+	}, {
+		"name": "checkpointInterval",
+		"default":300000,
+		"optional": true,
+		"control": "text",
+		"type": "int",
+		"hint": {
+			"en_US": "Specify the time interval in milliseconds to trigger a checkpoint. This is only effective when qos is bigger than 0.",
+			"zh_CN": "指定触发检查点的时间间隔(单位为 ms)。 仅当 qos 大于0时才有效。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "检查点间隔毫秒数"
+		}
+	}]
+}

+ 133 - 0
plugins/sinks/properties.json

@@ -0,0 +1,133 @@
+{
+	"helpUrl": {
+		"en_US": "https://github.com/emqx/kuiper/blob/master/docs/en/rules/overview.md",
+		"zh_CN": "https://github.com/emqx/kuiper/blob/master/docs/zh/rules/overview.md"
+	},
+	"properties": [{
+		"name": "concurrency",
+		"default":1,
+		"optional":true,
+		"control": "text",
+		"type": "int",
+		"hint": {
+			"en_US": "Specify how many instances of the sink will be run. If the value is bigger than 1, the order of the messages may not be retained.",
+			"zh_CN": "设置运行的线程数。该参数值大于1时,消息发出的顺序可能无法保证。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "线程数"
+		}
+	}, {
+		"name": "bufferLength",
+		"default": 1024,
+		"optional":true,
+		"type": "int",
+		"control": "text",
+		"hint": {
+			"en_US": "Specify how many messages can be buffered in memory. If the buffered messages exceed the limit, the sink will block message receiving until the buffered messages have been sent out so that the buffered size is less than the limit.",
+			"zh_CN": "设置可缓存消息数目。若缓存消息数超过此限制,sink将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "缓存大小"
+		}
+	}, {
+		"name": "runAsync",
+		"default": false,
+		"optional":true,
+		"type": "bool",
+		"control": "radio",
+		"hint": {
+			"en_US": "Whether the sink will run asynchronously for better performance. If it is true, the sink result order is not promised.",
+			"zh_CN": "设置是否异步运行输出操作以提升性能。请注意,异步运行的情况下,输出结果顺序不能保证。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "是否异步运行"
+		}
+	}, {
+		"name": "retryInterval",
+		"default": 1000,
+		"optional":true,
+		"type": "int",
+		"control": "text",
+		"hint": {
+			"en_US": "Specify how many milliseconds will the sink retry to send data out if the previous send failed. If the specified value <= 0, then it will not retry.",
+			"zh_CN": "设置信息发送失败后重试等待时间,单位为毫秒。如果该值的设置 <= 0,那么不会尝试重新发送。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "失败重试间隔毫秒"
+		}
+	}, {
+		"name": "cacheLength",
+		"default": 1024,
+		"optional":true,
+		"type": "int",
+		"control": "text",
+		"hint": {
+			"en_US": "Specify how many messages can be cached. The cached messages will be resent to external system until the data sent out successfully. The cached message will be sent in order except in runAsync or concurrent mode. The cached message will be saved to disk in fixed intervals.",
+			"zh_CN": "设置最大消息缓存数量。缓存的消息会一直保留直到消息发送成功。缓存消息将按顺序发送,除非运行在异步或者并发模式下。缓存消息会定期存储到磁盘中。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "最大消息缓存数量"
+		}
+	}, {
+		"name": "cacheSaveInterval",
+		"default": 1024,
+		"optional":true,
+		"type": "int",
+		"control": "text",
+		"hint": {
+			"en_US": "Specify the interval to save cached message to the disk. Notice that, if the rule is closed in plan, all the cached messages will be saved at close. A larger value can reduce the saving overhead but may lose more cache messages when the system is interrupted in error.",
+			"zh_CN": "设置缓存存储间隔时间,单位为毫秒。需要注意的是,当规则关闭时,缓存会自动存储。该值越大,则缓存保存开销越小,但系统意外退出时缓存丢失的风险变大。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "缓存间隔毫秒"
+		}
+	}, {
+		"name": "omitIfEmpty",
+		"default":false, 
+		"optional": true,
+		"type": "bool",
+		"control": "radio",
+		"hint": {
+			"en_US": "If the configuration item is set to true, when SELECT result is empty, then the result will not feed to sink operator.",
+			"zh_CN": "如果选择结果为空,则忽略输出。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "是否忽略输出"
+		}
+	}, {
+		"name": "sendSingle",
+		"default":true, 
+		"optional": true,
+		"type": "bool",
+		"control": "radio",
+		"hint": {
+			"en_US": "The output messages are received as an array. This is indicate whether to send the results one by one. If false, the output message will be {\"result\":\"${the string of received message}\"}. For example, {\"result\":\"[{\"count\":30},\"\"count\":20}]\"}. Otherwise, the result message will be sent one by one with the actual field name. For the same example as above, it will send {\"count\":30}, then send {\"count\":20} to the RESTful endpoint.Default to false.",
+			"zh_CN": "输出消息以数组形式接收,该属性意味着是否将结果一一发送。 如果为false,则输出消息将为{\"result\":\"${the string of received message}\"}。 例如,{\"result\":\"[{\"count\":30},\"\"count\":20}]\"}。否则,结果消息将与实际字段名称一一对应发送。 对于与上述相同的示例,它将发送 {\"count\":30},然后发送{\"count\":20} 到 RESTful 端点。默认为 false。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "是否指定接收数据格式"
+		}
+	}, {
+		"name": "dataTemplate",
+		"default":true, 
+		"optional": true,
+		"type": "bool",
+		"control": "radio",
+		"hint": {
+			"en_US": "The golang template format string to specify the output data format. The input of the template is the sink message which is always an array of map. If no data template is specified, the raw input will be the data.",
+			"zh_CN": "golang 模板格式字符串,用于指定输出数据格式。 模板的输入是目标消息,该消息始终是映射数组。 如果未指定数据模板,则将数据作为原始输入。"
+		},
+		"label": {
+			"en_US": "",
+			"zh_CN": "是否指定输出数据格式"
+		}
+	}]
+}

+ 113 - 24
plugins/sinks/taos.json

@@ -1,25 +1,114 @@
 {
-  "libs": [
-    "github.com/taosdata/driver-go@master"
-  ],
-  "helpUrl": {
-    "en_US": "https://github.com/emqx/kuiper/blob/master/docs/en_US/plugins/sinks/taos.md",
-    "zh_CN": "https://github.com/emqx/kuiper/blob/master/docs/zh_CN/plugins/sinks/taos.md"
-  },
-  "properties":[
-    {
-      "name" : "Database",
-      "default": "http://192.168.100.245:8086",
-      "optional": "false",
-      "control": "text",
-      "hint": {
-        "en_US": "Address of Taosdb",
-        "zh_CN": "Taosdb 地址"
-      },
-      "label": {
-        "en_US": "DB address",
-        "zh_CN": "数据库地址"
-      }
-    }
-  ]
-}
+	"author": {
+		"name": "Yuedong Ma",
+		"email": "mayuedong@emqx.io",
+		"company": "EMQ Technologies Co., Ltd",
+		"website": "https://www.emqx.io"
+	},
+	"libs": [
+		"github.com/taosdata/driver-go@master"
+	],
+	"helpUrl": {
+		"en_US": "https://github.com/emqx/kuiper/blob/master/docs/en/plugins/sinks/taos.md",
+		"zh_CN": "https://github.com/emqx/kuiper/blob/master/docs/zh/plugins/sinks/taos.md"
+	},
+	"properties": [{
+		"name": "port",
+		"default": 0,
+		"optional": false,
+		"control": "text",
+		"type": "int",
+		"hint": {
+			"en_US": "Port of Taosdb",
+			"zh_CN": "Taosdb 端口号"
+		},
+		"label": {
+			"en_US": "DB port",
+			"zh_CN": "数据库端口"
+		}
+	}, {
+		"name": "ip",
+		"default": "127.0.0.1",
+		"optional": false,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "IP address of Taosdb",
+			"zh_CN": "Taosdb IP 地址"
+		},
+		"label": {
+			"en_US": "DB IP address",
+			"zh_CN": "数据库IP 地址"
+		}
+	}, {
+		"name": "user",
+		"default": "root",
+		"optional": false,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "User of Taosdb",
+			"zh_CN": "Taosdb 用户名"
+		},
+		"label": {
+			"en_US": "DB user",
+			"zh_CN": "数据库用户名"
+		}
+	}, {
+		"name": "password",
+		"default": "",
+		"optional": false,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "Password of Taosdb",
+			"zh_CN": "数据库密码"
+		},
+		"label": {
+			"en_US": "DB password",
+			"zh_CN": "数据库密码"
+		}
+	}, {
+		"name": "database",
+		"default": "",
+		"optional": false,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "name of database",
+			"zh_CN": "库名称"
+		},
+		"label": {
+			"en_US": "Database name",
+			"zh_CN": "库名称"
+		}
+	}, {
+		"name": "table",
+		"default": "",
+		"optional": false,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "Name of table",
+			"zh_CN": "表名"
+		},
+		"label": {
+			"en_US": "Name of table",
+			"zh_CN": "表名"
+		}
+	}, {
+		"name": "fields",
+		"default": [],
+		"optional": false,
+		"control": "list",
+		"type": "list_string",
+		"hint": {
+			"en_US": "Field of table",
+			"zh_CN": "表字段"
+		},
+		"label": {
+			"en_US": "table field",
+			"zh_CN": "表字段"
+		}
+	}]
+}

+ 41 - 0
plugins/sinks/zmq.json

@@ -0,0 +1,41 @@
+{
+	"author": {
+		"name": "",
+		"email": "",
+		"company": "EMQ Technologies Co., Ltd",
+		"website": "https://www.emqx.io"
+	},
+	"helpUrl": {
+		"en_US": "https://github.com/emqx/kuiper/blob/dev/0.9.1/docs/en_US/plugins/sinks/zmq.md",
+		"zh_CN": "https://github.com/emqx/kuiper/blob/dev/0.9.1/docs/zh_CN/plugins/sinks/zmq.md"
+	},
+	"properties": [{
+		"name": "server",
+		"default": "127.0.0.1:5536",
+		"optional": false,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The url of the Zero Mq server",
+			"zh_CN": "Zero Mq 服务器的 URL"
+		},
+		"label": {
+			"en_US": "server address",
+			"zh_CN": "服务器地址"
+		}
+	}, {
+		"name": "topic",
+		"default": "",
+		"optional": true,
+		"control": "text",
+		"type": "string",
+		"hint": {
+			"en_US": "The topic to publish to.",
+			"zh_CN": "订阅主题"
+		},
+		"label": {
+			"en_US": "topic",
+			"zh_CN": "主题"
+		}
+	}]
+}

+ 39 - 0
xstream/server/server/rest.go

@@ -11,7 +11,9 @@ import (
 	"io"
 	"io/ioutil"
 	"net/http"
+	"net/url"
 	"runtime"
+	"strings"
 	"time"
 )
 
@@ -87,6 +89,8 @@ func createRestServer(port int) *http.Server {
 	r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
 
+	r.HandleFunc("/metadata", metadataHandler).Methods(http.MethodGet)
+
 	server := &http.Server{
 		Addr: fmt.Sprintf("0.0.0.0:%d", port),
 		// Good practice to set timeouts to avoid Slowloris attacks.
@@ -384,3 +388,38 @@ func functionsHandler(w http.ResponseWriter, r *http.Request) {
 func functionHandler(w http.ResponseWriter, r *http.Request) {
 	pluginHandler(w, r, plugins.FUNCTION)
 }
+
+func parseRequest(req string) map[string]string {
+	mapQuery := make(map[string]string)
+	for _, kv := range strings.Split(req, "&") {
+		pos := strings.Index(kv, "=")
+		if 0 < pos && pos+1 < len(kv) {
+			mapQuery[kv[:pos]], _ = url.QueryUnescape(kv[pos+1:])
+		}
+	}
+	return mapQuery
+}
+
+func metadataHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	mapQuery := parseRequest(r.URL.RawQuery)
+	ruleid := mapQuery["rule"]
+	pluginName := mapQuery["name"]
+
+	var rule *api.Rule
+	var err error
+	if 0 != len(ruleid) {
+		rule, err = ruleProcessor.GetRuleByName(ruleid)
+		if err != nil {
+			handleError(w, err, "describe rule error", logger)
+			return
+		}
+	}
+
+	ptrMetadata, err := pluginManager.Metadata(pluginName, rule)
+	if err != nil {
+		handleError(w, err, "metadata error", logger)
+		return
+	}
+	jsonResponse(ptrMetadata, w, logger)
+}