Selaa lähdekoodia

Source (#434)

* feat():add source test

* feat():source metadata

* feat():save source yaml file

* feat():API format

* feat():source metadata
EMQmyd 4 vuotta sitten
vanhempi
commit
110837b7a4
7 muutettua tiedostoa jossa 601 lisäystä ja 116 poistoa
  1. 1 1
      common/util.go
  2. 3 3
      plugins/manager.go
  3. 14 4
      plugins/sinkMeta.go
  4. 113 0
      plugins/sinkMeta_test.go
  5. 119 56
      plugins/sourceMeta.go
  6. 238 0
      plugins/sourceMeta_test.go
  7. 113 52
      xstream/server/server/rest.go

+ 1 - 1
common/util.go

@@ -310,7 +310,7 @@ func ToInt(input interface{}) (int, error) {
 *   Convert a map into a struct. The output parameter must be a pointer to a struct
 *   The struct can have the json meta data
  */
-func MapToStruct(input map[string]interface{}, output interface{}) error {
+func MapToStruct(input, output interface{}) error {
 	// convert map to json
 	jsonString, err := json.Marshal(input)
 	if err != nil {

+ 3 - 3
plugins/manager.go

@@ -212,9 +212,9 @@ func NewPluginManager() (*Manager, error) {
 			etcDir:    etcDir,
 			registry:  registry,
 		}
-		outerErr = singleton.readSourceMetaDir()
+		outerErr = readSourceMetaDir()
 		if nil == err {
-			outerErr = singleton.readSinkMetaDir()
+			outerErr = readSinkMetaDir()
 		}
 	})
 	return singleton, outerErr
@@ -278,7 +278,7 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 		}
 		return fmt.Errorf("fail to unzip file %s: %s", uri, err)
 	}
-	m.readSinkMetaFile(path.Join(m.pluginDir, PluginTypes[t], name+`.json`))
+	readSinkMetaFile(path.Join(m.pluginDir, PluginTypes[t], name+`.json`))
 	m.registry.Store(t, name, version)
 	return nil
 }

+ 14 - 4
plugins/sinkMeta.go

@@ -26,6 +26,7 @@ type (
 		Chinese string `json:"zh_CN"`
 	}
 	field struct {
+		Exist    bool        `json:"exist"`
 		Name     string      `json:"name"`
 		Default  interface{} `json:"default"`
 		Type     string      `json:"type"`
@@ -44,7 +45,7 @@ type (
 )
 
 var g_sinkMetadata map[string]*sinkMeta //map[fileName]
-func (this *Manager) readSinkMetaDir() error {
+func readSinkMetaDir() error {
 	confDir, err := common.GetLoc("/plugins")
 	if nil != err {
 		return err
@@ -76,7 +77,7 @@ func (this *Manager) readSinkMetaDir() error {
 	return nil
 }
 
-func (this *Manager) readSinkMetaFile(filePath string) error {
+func readSinkMetaFile(filePath string) error {
 	ptrMetadata := new(sinkMeta)
 	err := common.ReadJsonUnmarshal(filePath, ptrMetadata)
 	if nil != err {
@@ -123,10 +124,16 @@ type (
 )
 
 func (this *hintLanguage) set(l *language) {
+	if nil == l {
+		return
+	}
 	this.English = l.English
 	this.Chinese = l.Chinese
 }
 func (this *hintField) setSinkField(v *field) {
+	if nil == v {
+		return
+	}
 	this.Name = v.Name
 	this.Type = v.Type
 	this.Default = v.Default
@@ -140,6 +147,9 @@ func (this *hintField) setSinkField(v *field) {
 }
 
 func (this *sinkPropertyNode) setNodeFromMetal(data *sinkMeta) {
+	if nil == data {
+		return
+	}
 	this.Libs = data.Libs
 	if nil != data.HelpUrl {
 		this.HelpUrl = new(hintLanguage)
@@ -271,7 +281,7 @@ func (this *sinkProperty) hintWhenModifySink(rule *api.Rule) (err error) {
 	return nil
 }
 
-func (this *Manager) GetSinkMeta(pluginName string, rule *api.Rule) (ptrSinkProperty *sinkProperty, err error) {
+func GetSinkMeta(pluginName string, rule *api.Rule) (ptrSinkProperty *sinkProperty, err error) {
 	ptrSinkProperty = new(sinkProperty)
 	if nil == rule {
 		err = ptrSinkProperty.hintWhenNewSink(pluginName)
@@ -281,7 +291,7 @@ func (this *Manager) GetSinkMeta(pluginName string, rule *api.Rule) (ptrSinkProp
 	return ptrSinkProperty, err
 }
 
-func (this *Manager) GetSinks() (sinks []string) {
+func GetSinks() (sinks []string) {
 	sinkMeta := g_sinkMetadata
 	for fileName, _ := range sinkMeta {
 		if fileName == baseProperty+".json" || fileName == baseOption+".json" {

+ 113 - 0
plugins/sinkMeta_test.go

@@ -0,0 +1,113 @@
+package plugins
+
+import (
+	"github.com/emqx/kuiper/xstream/api"
+	"testing"
+)
+
+func TestHintWhenModifySink(t *testing.T) {
+	taosMeta := &sinkMeta{
+		Fields: []*field{
+			{
+				Name:    "ip",
+				Default: "911.911.911.911",
+			},
+		},
+	}
+	opMeta := &sinkMeta{
+		Fields: []*field{
+			{
+				Name:    "isEventTime",
+				Default: false,
+			},
+		},
+	}
+	baseMeta := &sinkMeta{
+		Fields: []*field{
+			{
+				Name:    "bufferLength",
+				Default: 911,
+			},
+		},
+	}
+
+	g_sinkMetadata = make(map[string]*sinkMeta)
+	g_sinkMetadata["taos.json"] = taosMeta
+	g_sinkMetadata["properties.json"] = baseMeta
+	g_sinkMetadata["options.json"] = opMeta
+
+	newSink := &sinkProperty{
+		CustomProperty: map[string]*sinkPropertyNode{
+			"taos": &sinkPropertyNode{
+				Fields: []*hintField{
+					{
+						Name:    "ip",
+						Default: "114.114.114.114",
+					},
+				},
+			},
+		},
+		BaseProperty: map[string]*sinkPropertyNode{
+			"taos": &sinkPropertyNode{
+				Fields: []*hintField{
+					{
+						Name:    "bufferLength",
+						Default: 1024,
+					},
+				},
+			},
+		},
+		BaseOption: &sinkPropertyNode{
+			Fields: []*hintField{
+				{
+					Name:    "isEventTime",
+					Default: true,
+				},
+			},
+		},
+	}
+
+	rule := &api.Rule{
+		Actions: []map[string]interface{}{
+			{
+				"taos": map[string]interface{}{
+					"ip":           "114.114.114.114",
+					"bufferLength": 1024,
+				},
+			},
+		},
+		Options: &api.RuleOption{
+			IsEventTime: true,
+		},
+	}
+
+	oldSink := new(sinkProperty)
+	err := oldSink.hintWhenNewSink("taos")
+	if nil != err {
+		t.Error(err)
+	}
+
+	if false != oldSink.BaseOption.Fields[0].Default {
+		t.Errorf("fail")
+	}
+	if 911 != oldSink.BaseProperty["taos"].Fields[0].Default {
+		t.Errorf("fail")
+	}
+	if "911.911.911.911" != oldSink.CustomProperty["taos"].Fields[0].Default {
+		t.Errorf("fail")
+	}
+	err = oldSink.hintWhenModifySink(rule)
+	if nil != err {
+		t.Error(err)
+	}
+
+	if oldSink.BaseOption.Fields[0].Default != newSink.BaseOption.Fields[0].Default {
+		t.Errorf("fail")
+	}
+	if oldSink.BaseProperty["taos"].Fields[0].Default != newSink.BaseProperty["taos"].Fields[0].Default {
+		t.Errorf("fail")
+	}
+	if oldSink.CustomProperty["taos"].Fields[0].Default != newSink.CustomProperty["taos"].Fields[0].Default {
+		t.Errorf("fail")
+	}
+}

+ 119 - 56
plugins/sourceMeta.go

@@ -6,6 +6,7 @@ import (
 	"github.com/emqx/kuiper/common"
 	"io/ioutil"
 	"path"
+	"reflect"
 	"strings"
 )
 
@@ -24,7 +25,7 @@ type (
 
 var g_sourceProperty map[string]*sourceProperty
 
-func (this *Manager) readSourceMetaFile(filePath string) (*sourceProperty, error) {
+func readSourceMetaFile(filePath string) (*sourceProperty, error) {
 	ptrMeta := new(sourceMeta)
 	err := common.ReadJsonUnmarshal(filePath, ptrMeta)
 	if nil != err || 0 == len(ptrMeta.ConfKeys) {
@@ -51,7 +52,7 @@ func (this *Manager) readSourceMetaFile(filePath string) (*sourceProperty, error
 	return property, err
 }
 
-func (this *Manager) readSourceMetaDir() error {
+func readSourceMetaDir() error {
 	confDir, err := common.GetConfLoc()
 	if nil != err {
 		return err
@@ -64,7 +65,7 @@ func (this *Manager) readSourceMetaDir() error {
 	}
 
 	tmpMap := make(map[string]*sourceProperty)
-	tmpMap["mqtt_source.json"], err = this.readSourceMetaFile(path.Join(confDir, "mqtt_source.json"))
+	tmpMap["mqtt_source.json"], err = readSourceMetaFile(path.Join(confDir, "mqtt_source.json"))
 	if nil != err {
 		return err
 	}
@@ -73,7 +74,7 @@ func (this *Manager) readSourceMetaDir() error {
 		fileName := info.Name()
 		if strings.HasSuffix(fileName, ".json") {
 			filePath := path.Join(dir, fileName)
-			tmpMap[fileName], err = this.readSourceMetaFile(filePath)
+			tmpMap[fileName], err = readSourceMetaFile(filePath)
 			if nil != err {
 				return err
 			}
@@ -84,23 +85,22 @@ func (this *Manager) readSourceMetaDir() error {
 	return nil
 }
 
-func (this *Manager) GetSourceMeta(pluginName string) (ptrSourceProperty *sourceMeta, err error) {
+func GetSourceMeta(pluginName string) (ptrSourceProperty *sourceMeta, err error) {
 	property, ok := g_sourceProperty[pluginName+".json"]
 	if ok {
-		property.cfToMeta()
-		return property.meta, nil
+		return property.cfToMeta()
 	}
 	return nil, fmt.Errorf("not found plugin %s", pluginName)
 }
 
-func (this *Manager) GetSources() (sources []string) {
+func GetSources() (sources []string) {
 	for fileName, _ := range g_sourceProperty {
 		sources = append(sources, strings.TrimSuffix(fileName, `.json`))
 	}
 	return sources
 }
 
-func (this *Manager) GetSourceConfKeys(pluginName string) (keys []string) {
+func GetSourceConfKeys(pluginName string) (keys []string) {
 	property := g_sourceProperty[pluginName+".json"]
 	if nil == property {
 		return keys
@@ -111,7 +111,7 @@ func (this *Manager) GetSourceConfKeys(pluginName string) (keys []string) {
 	return keys
 }
 
-func (this *Manager) DelSourceConfKey(pluginName, confKey string) error {
+func DelSourceConfKey(pluginName, confKey string) error {
 	property := g_sourceProperty[pluginName+".json"]
 	if nil == property {
 		return fmt.Errorf("not found plugin %s", pluginName)
@@ -120,11 +120,10 @@ func (this *Manager) DelSourceConfKey(pluginName, confKey string) error {
 		return fmt.Errorf("not found confKey %s", confKey)
 	}
 	delete(property.cf, confKey)
-	g_sourceProperty[pluginName+".json"] = property
 	return property.saveCf(pluginName)
 }
 
-func (this *Manager) AddSourceConfKey(pluginName, confKey, content string) error {
+func AddSourceConfKey(pluginName, confKey, content string) error {
 	reqField := make(map[string]interface{})
 	err := json.Unmarshal([]byte(content), &reqField)
 	if nil != err {
@@ -137,7 +136,7 @@ func (this *Manager) AddSourceConfKey(pluginName, confKey, content string) error
 	}
 
 	if nil == property.cf {
-		return fmt.Errorf("not found confKey %s", confKey)
+		property.cf = make(map[string]map[string]interface{})
 	}
 
 	if 0 != len(property.cf[confKey]) {
@@ -149,7 +148,7 @@ func (this *Manager) AddSourceConfKey(pluginName, confKey, content string) error
 	return property.saveCf(pluginName)
 }
 
-func (this *Manager) UpdateSourceConfKey(pluginName, confKey, content string) error {
+func AddSourceConfKeyField(pluginName, confKey, content string) error {
 	reqField := make(map[string]interface{})
 	err := json.Unmarshal([]byte(content), &reqField)
 	if nil != err {
@@ -165,62 +164,137 @@ func (this *Manager) UpdateSourceConfKey(pluginName, confKey, content string) er
 		return fmt.Errorf("not found confKey %s", confKey)
 	}
 
-	if 0 == len(property.cf[confKey]) {
+	if nil == property.cf[confKey] {
 		return fmt.Errorf("not found confKey %s", confKey)
 	}
 
 	for k, v := range reqField {
 		property.cf[confKey][k] = v
 	}
-	g_sourceProperty[pluginName+".json"] = property
 	return property.saveCf(pluginName)
 }
 
-func (this *sourceProperty) newFields(fields []*field, m map[string]interface{}, sli *[]*field) error {
-	for k, v := range m {
+func recursionDelMap(cf, fields map[string]interface{}) error {
+	for k, v := range fields {
+		if nil == v {
+			delete(cf, k)
+			continue
+		}
+
+		if delKey, ok := v.(string); ok {
+			if 0 == len(delKey) {
+				delete(cf, k)
+				continue
+			}
+
+			var auxCf map[string]interface{}
+			if err := common.MapToStruct(cf[k], &auxCf); nil != err {
+				return fmt.Errorf("not found second key:%s.%s", k, delKey)
+			}
+			cf[k] = auxCf
+			delete(auxCf, delKey)
+			continue
+		}
+		if reflect.Map == reflect.TypeOf(v).Kind() {
+			var auxCf, auxFields map[string]interface{}
+			if err := common.MapToStruct(cf[k], &auxCf); nil != err {
+				return fmt.Errorf("not found second key:%s.%v", k, v)
+			}
+			cf[k] = auxCf
+			if err := common.MapToStruct(v, &auxFields); nil != err {
+				return fmt.Errorf("requestef format err:%s.%v", k, v)
+			}
+			if err := recursionDelMap(auxCf, auxFields); nil != err {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func DelSourceConfKeyField(pluginName, confKey, content string) error {
+	reqField := make(map[string]interface{})
+	err := json.Unmarshal([]byte(content), &reqField)
+	if nil != err {
+		return err
+	}
+
+	property := g_sourceProperty[pluginName+".json"]
+	if nil == property {
+		return fmt.Errorf("not found plugin %s", pluginName)
+	}
+
+	if nil == property.cf {
+		return fmt.Errorf("not found confKey %s", confKey)
+	}
+
+	if nil == property.cf[confKey] {
+		return fmt.Errorf("not found confKey %s", confKey)
+	}
+
+	err = recursionDelMap(property.cf[confKey], reqField)
+	if nil != err {
+		return err
+	}
+	return property.saveCf(pluginName)
+}
+
+func recursionNewFields(template []*field, conf map[string]interface{}, ret *[]*field) error {
+	for i := 0; i < len(template); i++ {
 		p := new(field)
-		for _, fd := range fields {
-			if fd.Name == k {
-				*p = *fd
-				*sli = append(*sli, p)
-
-				switch t := v.(type) {
-				case map[interface{}]interface{}:
-					tt := common.ConvertMap(t)
-					var tmpSli, tmpFields []*field
-					p.Default = &tmpSli
-					b, err := json.Marshal(fd.Default)
-					if nil != err {
-						return err
-					}
-					err = json.Unmarshal(b, &tmpFields)
-					if nil != err {
+		*p = *template[i]
+		*ret = append(*ret, p)
+		v, ok := conf[template[i].Name]
+		if ok {
+			p.Exist = true
+		} else {
+			p.Exist = false
+			continue
+		}
+
+		var auxRet, auxTemplate []*field
+		p.Default = &auxRet
+		if nil == v {
+			p.Default = v
+		} else {
+			if reflect.Map == reflect.TypeOf(v).Kind() {
+				var nextCf map[string]interface{}
+				if tmp, ok := v.(map[interface{}]interface{}); ok {
+					nextCf = common.ConvertMap(tmp)
+				} else {
+					if err := common.MapToStruct(v, &nextCf); nil != err {
 						return err
 					}
-					this.newFields(tmpFields, tt, &tmpSli)
-				case map[string]interface{}:
-					var tmpSli []*field
-					p.Default = &tmpSli
-					this.newFields(fields, t, &tmpSli)
-				default:
-					p.Default = v
 				}
-				break
+				if err := common.MapToStruct(template[i].Default, &auxTemplate); nil != err {
+					return err
+				}
+				if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
+					return err
+				}
+			} else {
+				p.Default = v
 			}
 		}
 	}
 	return nil
 }
 
-func (this *sourceProperty) cfToMeta() {
+func (this *sourceProperty) cfToMeta() (*sourceMeta, error) {
 	fields := this.meta.ConfKeys["default"]
 	ret := make(map[string][]*field)
 	for k, kvs := range this.cf {
 		var sli []*field
-		this.newFields(fields, kvs, &sli)
+		err := recursionNewFields(fields, kvs, &sli)
+		if nil != err {
+			return nil, err
+		}
 		ret[k] = sli
 	}
-	this.meta.ConfKeys = ret
+	meta := new(sourceMeta)
+	*meta = *(this.meta)
+	meta.ConfKeys = ret
+	return meta, nil
 }
 
 func (this *sourceProperty) saveCf(pluginName string) error {
@@ -234,16 +308,5 @@ func (this *sourceProperty) saveCf(pluginName string) error {
 		dir = confDir
 	}
 	filePath := path.Join(dir, pluginName+".yaml")
-	for key, kvs := range this.cf {
-		for k, v := range kvs {
-			switch t := v.(type) {
-			case map[interface{}]interface{}:
-				kvs[k] = common.ConvertMap(t)
-				this.cf[key] = kvs
-			}
-		}
-	}
-
 	return common.WriteYamlMarshal(filePath, this.cf)
-
 }

Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 238 - 0
plugins/sourceMeta_test.go


+ 113 - 52
xstream/server/server/rest.go

@@ -11,9 +11,7 @@ import (
 	"io"
 	"io/ioutil"
 	"net/http"
-	"net/url"
 	"runtime"
-	"strings"
 	"time"
 )
 
@@ -90,8 +88,15 @@ func createRestServer(port int) *http.Server {
 	r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
 
-	r.HandleFunc("/metadata/sinks", sinkMetaHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/sources", sourceMetaHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPost)
+	r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sinks/{name}/{ruleId}", showSinkMetaHandler).Methods(http.MethodGet)
+
+	r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sources/{name}/confKeys", sourceConfKeysHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPost)
+	r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}/field", sourceConfKeyFieldsHandler).Methods(http.MethodDelete, http.MethodPost)
 
 	server := &http.Server{
 		Addr: fmt.Sprintf("0.0.0.0:%d", port),
@@ -406,77 +411,134 @@ func functionHandler(w http.ResponseWriter, r *http.Request) {
 	pluginHandler(w, r, plugins.FUNCTION)
 }
 
-func parseRequest(r *http.Request) map[string]string {
-	mapQuery := make(map[string]string)
-	strQuery := ""
-	if http.MethodGet == r.Method {
-		strQuery = r.URL.RawQuery
-	} else {
-		var byteQuery []byte
-		r.Body.Read(byteQuery)
-		strQuery = string(byteQuery)
+//list sink plugin
+func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	sinks := plugins.GetSinks()
+	jsonResponse(sinks, w, logger)
+	return
+}
+
+//Get sink metadata when creating rules
+func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+
+	ptrMetadata, err := plugins.GetSinkMeta(pluginName, nil)
+	if err != nil {
+		handleError(w, err, "metadata error", logger)
+		return
 	}
+	jsonResponse(ptrMetadata, w, logger)
+}
 
-	for _, kv := range strings.Split(strQuery, "&") {
-		pos := strings.Index(kv, "=")
-		if 0 < pos && pos+1 < len(kv) {
-			mapQuery[kv[:pos]], _ = url.QueryUnescape(kv[pos+1:])
-		}
+//Get sink metadata when displaying rules
+func showSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	ruleid := vars["ruleId"]
+
+	rule, err := ruleProcessor.GetRuleByName(ruleid)
+	if err != nil {
+		handleError(w, err, "describe rule error", logger)
+		return
 	}
-	return mapQuery
+
+	ptrMetadata, err := plugins.GetSinkMeta(pluginName, rule)
+	if err != nil {
+		handleError(w, err, "metadata error", logger)
+		return
+	}
+	jsonResponse(ptrMetadata, w, logger)
 }
 
-func sinkMetaHandler(w http.ResponseWriter, r *http.Request) {
+//list source plugin
+func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
 	defer r.Body.Close()
-	///metadata/sinks
-	if 0 == len(r.URL.RawQuery) {
-		sinks := pluginManager.GetSinks()
-		jsonResponse(sinks, w, logger)
+	ret := plugins.GetSources()
+	if nil != ret {
+		jsonResponse(ret, w, logger)
+		return
+	}
+}
+
+//Get source metadata when creating stream
+func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	ret, err := plugins.GetSourceMeta(pluginName)
+	if err != nil {
+		handleError(w, err, "metadata error", logger)
 		return
 	}
+	if nil != ret {
+		jsonResponse(ret, w, logger)
+		return
+	}
+}
 
-	mapQuery := parseRequest(r)
-	ruleid := mapQuery["rule"]
-	pluginName := mapQuery["name"]
+//Get confKeys of the source metadata
+func sourceConfKeysHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	ret := plugins.GetSourceConfKeys(pluginName)
+	if nil != ret {
+		jsonResponse(ret, w, logger)
+		return
+	}
+}
 
-	var rule *api.Rule
+//Add  del confkey
+func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	var ret interface{}
 	var err error
-	///metadata/sinks?name=taos&rule=demo
-	if 0 != len(ruleid) {
-		rule, err = ruleProcessor.GetRuleByName(ruleid)
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	confKey := vars["confKey"]
+	switch r.Method {
+	case http.MethodDelete:
+		err = plugins.DelSourceConfKey(pluginName, confKey)
+	case http.MethodPost:
+		v, err := decodeStatementDescriptor(r.Body)
 		if err != nil {
-			handleError(w, err, "describe rule error", logger)
+			handleError(w, err, "Invalid body", logger)
 			return
 		}
+		err = plugins.AddSourceConfKey(pluginName, confKey, v.Sql)
 	}
-
-	///metadata/sinks?name=taos
-	ptrMetadata, err := pluginManager.GetSinkMeta(pluginName, rule)
 	if err != nil {
 		handleError(w, err, "metadata error", logger)
 		return
 	}
-	jsonResponse(ptrMetadata, w, logger)
+	if nil != ret {
+		jsonResponse(ret, w, logger)
+		return
+	}
 }
 
-func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
+//Del and Update field of confkey
+func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
 	defer r.Body.Close()
-	mapQuery := parseRequest(r)
 	var ret interface{}
 	var err error
-	switch mapQuery["cmd"] {
-	case "plugins":
-		ret = pluginManager.GetSources()
-	case "confKeys":
-		ret = pluginManager.GetSourceConfKeys(mapQuery["pluginName"])
-	case "getPlugin":
-		ret, err = pluginManager.GetSourceMeta(mapQuery["pluginName"])
-	case "addConfKey":
-		err = pluginManager.AddSourceConfKey(mapQuery["pluginName"], mapQuery["confKey"], mapQuery["confData"])
-	case "delConfKey":
-		err = pluginManager.DelSourceConfKey(mapQuery["pluginName"], mapQuery["confKey"])
-	case "updateConfKey":
-		err = pluginManager.UpdateSourceConfKey(mapQuery["pluginName"], mapQuery["confKey"], mapQuery["confData"])
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	confKey := vars["confKey"]
+	v, err := decodeStatementDescriptor(r.Body)
+	if err != nil {
+		handleError(w, err, "Invalid body", logger)
+		return
+	}
+	switch r.Method {
+	case http.MethodDelete:
+		err = plugins.DelSourceConfKeyField(pluginName, confKey, v.Sql)
+	case http.MethodPost:
+		err = plugins.AddSourceConfKeyField(pluginName, confKey, v.Sql)
 	}
 	if err != nil {
 		handleError(w, err, "metadata error", logger)
@@ -486,5 +548,4 @@ func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
 		jsonResponse(ret, w, logger)
 		return
 	}
-	w.Write([]byte("successful"))
 }