ソースを参照

Merge pull request #196 from emqx/plugins

Plugins update
jinfahua 5 年 前
コミット
8b867afbeb

+ 3 - 3
.github/workflows/run_test_case.yaml

@@ -22,10 +22,10 @@ jobs:
         - name: run test case
         - name: run test case
           run: |
           run: |
             mkdir -p data
             mkdir -p data
-            go build --buildmode=plugin -o plugins/sources/Random.so plugins/sources/random.go
-            go build --buildmode=plugin -o plugins/sinks/File.so plugins/sinks/file.go
+            go build --buildmode=plugin -o plugins/sources/Random@v1.0.0.so plugins/sources/random.go
+            go build --buildmode=plugin -o plugins/sinks/File@v1.0.0.so plugins/sinks/file.go
             go build --buildmode=plugin -o plugins/functions/Echo.so plugins/functions/echo.go
             go build --buildmode=plugin -o plugins/functions/Echo.so plugins/functions/echo.go
-            go build --buildmode=plugin -o plugins/functions/CountPlusOne.so plugins/functions/countPlusOne.go
+            go build --buildmode=plugin -o plugins/functions/CountPlusOne@v1.0.0.so plugins/functions/countPlusOne.go
             go test ./...
             go test ./...
             go test --tags=edgex ./...
             go test --tags=edgex ./...
     
     

+ 1 - 0
common/data.go

@@ -7,4 +7,5 @@ type RuleDesc struct {
 type PluginDesc struct {
 type PluginDesc struct {
 	RuleDesc
 	RuleDesc
 	Type int
 	Type int
+	Stop bool
 }
 }

+ 22 - 12
docs/en_US/cli/plugins.md

@@ -20,7 +20,7 @@ The plugin can be created with two ways.
 Sample:
 Sample:
 
 
 ```shell
 ```shell
-# bin/cli create plugin source random {"file":"http://127.0.0.1/plugins/sources/random.zip", "callback":"http://mycallback.com"}
+# bin/cli create plugin source random {"file":"http://127.0.0.1/plugins/sources/random.zip"}
 ```
 ```
 
 
 The command create a source plugin named ``random``. 
 The command create a source plugin named ``random``. 
@@ -33,19 +33,17 @@ Sample:
 # bin/cli create plugin sink plugin1 -f /tmp/plugin1.txt
 # bin/cli create plugin sink plugin1 -f /tmp/plugin1.txt
 ```
 ```
 
 
-Below is the contents of ``rule.txt``.
+Below is the contents of ``plugin1.txt``.
 
 
 ```json
 ```json
 {
 {
-  "file":"http://127.0.0.1/plugins/sources/random.zip",
-  "callback":"http://mycallback.com"
+  "file":"http://127.0.0.1/plugins/sources/random.zip"
 }
 }
 ```
 ```
 ### parameters
 ### parameters
 1. plugin_type: the type of the plugin. Available values are `["source", "sink", "functions"]`
 1. plugin_type: the type of the plugin. Available values are `["source", "sink", "functions"]`
 2. plugin_name: a unique name of the plugin. The name must be the same as the camel case version of the plugin with lowercase first letter. For example, if the exported plugin name is `Random`, then the name of this plugin is `random`.
 2. plugin_name: a unique name of the plugin. The name must be the same as the camel case version of the plugin with lowercase first letter. For example, if the exported plugin name is `Random`, then the name of this plugin is `random`.
 3. file: the url of the plugin files. It must be a zip file with: a compiled so file and the yaml file(only required for sources). The name of the files must match the name of the plugin. Please check [Extension](../extension/overview.md) for the naming rule.
 3. file: the url of the plugin files. It must be a zip file with: a compiled so file and the yaml file(only required for sources). The name of the files must match the name of the plugin. Please check [Extension](../extension/overview.md) for the naming rule.
-4. callback: optional parameter to specify the url to call once the plugin is created. We will issue a GET request to the callback url.
 
 
 ## show plugins
 ## show plugins
 
 
@@ -63,22 +61,34 @@ function1
 function2
 function2
 ```
 ```
 
 
+## describe a plugin
+The command is used to print out the detailed definition of a plugin.
+
+```shell
+describe plugin $plugin_type $plugin_name
+```
+
+Sample: 
+
+```shell
+# bin/cli describe plugin source plugin1
+{
+  "name": "plugin1",
+  "version": "1.0.0"
+}
+```
+
 ## drop a plugin
 ## drop a plugin
 
 
 The command is used for drop the plugin.
 The command is used for drop the plugin.
 
 
 ```shell
 ```shell
-drop plugin $plugin_type $plugin_name $plugin_json 
+drop plugin $plugin_type $plugin_name -s $stop 
 ```
 ```
-In which, `$plugin_json` is optional. Currently, only `callback` parameter is supported in the `plugin_json`
+In which, `-s $stop` is an optional boolean parameter. If it is set to true, the Kuiper server will be stopped for the delete to take effect. The user will need to restart it manually.
 Sample:
 Sample:
 
 
 ```shell
 ```shell
 # bin/cli drop plugin source random
 # bin/cli drop plugin source random
 Plugin random is dropped.
 Plugin random is dropped.
-```
-
-```shell
-# bin/cli drop plugin sink plugin1 {"callback":"http://mycallback.com"}
-Plugin plugin1 is dropped.
 ```
 ```

+ 12 - 1
docs/en_US/extension/overview.md

@@ -10,11 +10,22 @@ Kuiper extensions are based on golang plugin system. The general steps to make e
 1. Create the plugin package that implements required source, sink or function interface.
 1. Create the plugin package that implements required source, sink or function interface.
 2. Compile the plugin into a _.so_ file, and put it into sources or sinks or functions folder under _plugins_ folder.
 2. Compile the plugin into a _.so_ file, and put it into sources or sinks or functions folder under _plugins_ folder.
 
 
+Currently golang plugins are only supported on Linux and macOS which poses the same limitation for Kuiper extensions.
+
+## Naming
+
 Notice that, there are some restrictions for the names:
 Notice that, there are some restrictions for the names:
 1. The name of _.so_ file must be camel case with an upper case first letter. For example, _MySource.so_ or _MySink.so_.
 1. The name of _.so_ file must be camel case with an upper case first letter. For example, _MySource.so_ or _MySink.so_.
 2. The name of the export symbol of the plugin must be camel case with an upper case first letter.
 2. The name of the export symbol of the plugin must be camel case with an upper case first letter.
 
 
-Currently golang plugins are only supported on Linux and macOS which poses the same limitation for Kuiper extensions.
+### Version
+
+The user can **optionally** add a version string to the name of _.so_ to help identify the version of the plugin. The version can be then retrieved through describe CLI command or REST API. The naming convention is to add a version string to the name after _@v_. The version can be any string. Below are some typical examples.
+
+- _MySource@v1.0.0.so_ : version is 1.0.0
+- _MySource@v20200331.so_:  version is 20200331
+
+If multiple versions of plugins with the same name in place, only the latest version(ordered by the version string) will be taken effect.
 
 
 ## Setup the plugin developing environment
 ## Setup the plugin developing environment
 It is required to build the plugin with exactly the same version of dependencies. And the plugin must implement interfaces exported by Kuiper, so the Kuiper project must be in the gopath. 
 It is required to build the plugin with exactly the same version of dependencies. And the plugin must implement interfaces exported by Kuiper, so the Kuiper project must be in the gopath. 

+ 28 - 9
docs/en_US/restapi/plugins.md

@@ -18,8 +18,7 @@ Request Sample
 ```json
 ```json
 {
 {
   "name":"random",
   "name":"random",
-  "file":"http://127.0.0.1/plugins/sources/random.zip",
-  "callback":"http://mycallback.com"
+  "file":"http://127.0.0.1/plugins/sources/random.zip"
 }
 }
 ```
 ```
 
 
@@ -27,7 +26,6 @@ Request Sample
 
 
 1. name: a unique name of the plugin. The name must be the same as the camel case version of the plugin with lowercase first letter. For example, if the exported plugin name is `Random`, then the name of this plugin is `random`.
 1. name: a unique name of the plugin. The name must be the same as the camel case version of the plugin with lowercase first letter. For example, if the exported plugin name is `Random`, then the name of this plugin is `random`.
 2. file: the url of the plugin files. It must be a zip file with: a compiled so file and the yaml file(only required for sources). The name of the files must match the name of the plugin. Please check [Extension](../extension/overview.md) for the naming rule.
 2. file: the url of the plugin files. It must be a zip file with: a compiled so file and the yaml file(only required for sources). The name of the files must match the name of the plugin. Please check [Extension](../extension/overview.md) for the naming rule.
-3. callback: optional parameter to specify the url to call once the plugin is created. We will issue a GET request to the callback url.
 
 
 
 
 ## show plugins
 ## show plugins
@@ -46,16 +44,37 @@ Response Sample:
 ["plugin1","plugin2"]
 ["plugin1","plugin2"]
 ```
 ```
 
 
-## drop a rule
+## describe a plugin
+
+The API is used to print out the detailed definition of a plugin.
+
+```shell
+GET http://localhost:9081/plugins/sources/{name}
+GET http://localhost:9081/plugins/sinks/{name}
+GET http://localhost:9081/plugins/functions/{name}
+```
+
+Path parameter `name` is the name of the plugin.
+
+Response Sample: 
+
+```json
+{
+  "name": "plugin1",
+  "version": "1.0.0"
+}
+```
+
+## drop a plugin
 
 
 The API is used for drop the plugin. The kuiper server needs to be restarted to take effect.
 The API is used for drop the plugin. The kuiper server needs to be restarted to take effect.
 
 
 ```shell
 ```shell
-DELETE http://localhost:8080/rules/sources/{name}
-DELETE http://localhost:8080/rules/sinks/{name}
-DELETE http://localhost:8080/rules/functions/{name}
+DELETE http://localhost:8080/plugins/sources/{name}
+DELETE http://localhost:8080/plugins/sinks/{name}
+DELETE http://localhost:8080/plugins/functions/{name}
 ```
 ```
-The user can pass a query parameter for a callback url. Kuiper will issue a GET request to the callback url after deleting the plugin.
+The user can pass a query parameter to decide if Kuiper should be stopped after a delete in order to make the deletion take effect. The parameter is `restart` and only when the value is `1` will the Kuiper be stopped. The user has to manually restart it.
 ```shell
 ```shell
-DELETE http://localhost:8080/rules/sources/{name}?callback=http%3A%2F%2Fwww.mycallback.com%2Fcallback
+DELETE http://localhost:8080/plugins/sources/{name}?restart=1
 ```
 ```

+ 116 - 71
plugins/manager.go

@@ -13,15 +13,16 @@ import (
 	"path"
 	"path"
 	"path/filepath"
 	"path/filepath"
 	"plugin"
 	"plugin"
+	"regexp"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
+	"time"
 	"unicode"
 	"unicode"
 )
 )
 
 
 type Plugin struct {
 type Plugin struct {
-	Name     string `json:"name"`
-	File     string `json:"file"`
-	Callback string `json:"callback"`
+	Name string `json:"name"`
+	File string `json:"file"`
 }
 }
 
 
 type PluginType int
 type PluginType int
@@ -41,20 +42,32 @@ var (
 //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
 //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
 type Registry struct {
 type Registry struct {
 	sync.RWMutex
 	sync.RWMutex
-	internal [][]string
+	internal []map[string]string
 }
 }
 
 
-func (rr *Registry) Store(t PluginType, value string) {
+func (rr *Registry) Store(t PluginType, name string, version string) {
 	rr.Lock()
 	rr.Lock()
-	rr.internal[t] = append(rr.internal[t], value)
+	rr.internal[t][name] = version
 	rr.Unlock()
 	rr.Unlock()
 }
 }
 
 
-func (rr *Registry) List(t PluginType) (values []string) {
+func (rr *Registry) List(t PluginType) []string {
 	rr.RLock()
 	rr.RLock()
 	result := rr.internal[t]
 	result := rr.internal[t]
 	rr.RUnlock()
 	rr.RUnlock()
-	return result
+	keys := make([]string, 0, len(result))
+	for k := range result {
+		keys = append(keys, k)
+	}
+	return keys
+}
+
+func (rr *Registry) Get(t PluginType, name string) (string, bool) {
+	rr.RLock()
+	result := rr.internal[t]
+	rr.RUnlock()
+	r, ok := result[name]
+	return r, ok
 }
 }
 
 
 //func (rr *Registry) Delete(t PluginType, value string) {
 //func (rr *Registry) Delete(t PluginType, value string) {
@@ -72,8 +85,9 @@ func (rr *Registry) List(t PluginType) (values []string) {
 
 
 var symbolRegistry = make(map[string]plugin.Symbol)
 var symbolRegistry = make(map[string]plugin.Symbol)
 
 
-func GetPlugin(t string, ptype string) (plugin.Symbol, error) {
-	t = ucFirst(t)
+func GetPlugin(t string, pt PluginType) (plugin.Symbol, error) {
+	ut := ucFirst(t)
+	ptype := PluginTypes[pt]
 	key := ptype + "/" + t
 	key := ptype + "/" + t
 	var nf plugin.Symbol
 	var nf plugin.Symbol
 	nf, ok := symbolRegistry[key]
 	nf, ok := symbolRegistry[key]
@@ -82,12 +96,20 @@ func GetPlugin(t string, ptype string) (plugin.Symbol, error) {
 		if err != nil {
 		if err != nil {
 			return nil, fmt.Errorf("cannot find the plugins folder")
 			return nil, fmt.Errorf("cannot find the plugins folder")
 		}
 		}
-		mod := path.Join(loc, ptype, t+".so")
+		m, err := NewPluginManager()
+		if err != nil {
+			return nil, fmt.Errorf("fail to initialize the plugin manager")
+		}
+		soFile, err := getSoFileName(m, pt, t)
+		if err != nil {
+			return nil, fmt.Errorf("cannot get the plugin file name: %v", err)
+		}
+		mod := path.Join(loc, ptype, soFile)
 		plug, err := plugin.Open(mod)
 		plug, err := plugin.Open(mod)
 		if err != nil {
 		if err != nil {
 			return nil, fmt.Errorf("cannot open %s: %v", mod, err)
 			return nil, fmt.Errorf("cannot open %s: %v", mod, err)
 		}
 		}
-		nf, err = plug.Lookup(t)
+		nf, err = plug.Lookup(ut)
 		if err != nil {
 		if err != nil {
 			return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
 			return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
 		}
 		}
@@ -116,7 +138,7 @@ func NewPluginManager() (*Manager, error) {
 			return
 			return
 		}
 		}
 
 
-		plugins := make([][]string, 3)
+		plugins := make([]map[string]string, 3)
 		for i := 0; i < 3; i++ {
 		for i := 0; i < 3; i++ {
 			names, err := findAll(PluginType(i), dir)
 			names, err := findAll(PluginType(i), dir)
 			if err != nil {
 			if err != nil {
@@ -136,7 +158,8 @@ func NewPluginManager() (*Manager, error) {
 	return singleton, err
 	return singleton, err
 }
 }
 
 
-func findAll(t PluginType, pluginDir string) (result []string, err error) {
+func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
+	result = make(map[string]string)
 	dir := path.Join(pluginDir, PluginTypes[t])
 	dir := path.Join(pluginDir, PluginTypes[t])
 	files, err := ioutil.ReadDir(dir)
 	files, err := ioutil.ReadDir(dir)
 	if err != nil {
 	if err != nil {
@@ -146,7 +169,8 @@ func findAll(t PluginType, pluginDir string) (result []string, err error) {
 	for _, file := range files {
 	for _, file := range files {
 		baseName := filepath.Base(file.Name())
 		baseName := filepath.Base(file.Name())
 		if strings.HasSuffix(baseName, ".so") {
 		if strings.HasSuffix(baseName, ".so") {
-			result = append(result, lcFirst(baseName[0:len(baseName)-3]))
+			n, v := parseName(baseName)
+			result[n] = v
 		}
 		}
 	}
 	}
 	return
 	return
@@ -157,7 +181,7 @@ func (m *Manager) List(t PluginType) (result []string, err error) {
 }
 }
 
 
 func (m *Manager) Register(t PluginType, j *Plugin) error {
 func (m *Manager) Register(t PluginType, j *Plugin) error {
-	name, uri, cb := j.Name, j.File, j.Callback
+	name, uri := j.Name, j.File
 	//Validation
 	//Validation
 	name = strings.Trim(name, " ")
 	name = strings.Trim(name, " ")
 	if name == "" {
 	if name == "" {
@@ -182,7 +206,7 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 		return fmt.Errorf("fail to download file %s: %s", uri, err)
 		return fmt.Errorf("fail to download file %s: %s", uri, err)
 	}
 	}
 	//unzip and copy to destination
 	//unzip and copy to destination
-	unzipFiles, err = m.unzipAndCopy(t, name, zipPath)
+	unzipFiles, version, err := m.unzipAndCopy(t, name, zipPath)
 	if err != nil {
 	if err != nil {
 		if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
 		if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
 			os.Remove(unzipFiles[0])
 			os.Remove(unzipFiles[0])
@@ -190,27 +214,22 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 		return fmt.Errorf("fail to unzip file %s: %s", uri, err)
 		return fmt.Errorf("fail to unzip file %s: %s", uri, err)
 	}
 	}
 
 
-	m.registry.Store(t, name)
-	return callback(cb)
+	m.registry.Store(t, name, version)
+	return nil
 }
 }
 
 
-func (m *Manager) Delete(t PluginType, name string, cb string) error {
+func (m *Manager) Delete(t PluginType, name string, stop bool) error {
 	name = strings.Trim(name, " ")
 	name = strings.Trim(name, " ")
 	if name == "" {
 	if name == "" {
 		return fmt.Errorf("invalid name %s: should not be empty", name)
 		return fmt.Errorf("invalid name %s: should not be empty", name)
 	}
 	}
-	found := false
-	for _, n := range m.registry.List(t) {
-		if n == name {
-			found = true
-		}
-	}
-	if !found {
-		return fmt.Errorf("invalid name %s: not exist", name)
+	soFile, err := getSoFileName(m, t, name)
+	if err != nil {
+		return err
 	}
 	}
 	var results []string
 	var results []string
 	paths := []string{
 	paths := []string{
-		path.Join(m.pluginDir, PluginTypes[t], ucFirst(name)+".so"),
+		path.Join(m.pluginDir, PluginTypes[t], soFile),
 	}
 	}
 	if t == SOURCE {
 	if t == SOURCE {
 		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
 		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
@@ -230,47 +249,89 @@ func (m *Manager) Delete(t PluginType, name string, cb string) error {
 	if len(results) > 0 {
 	if len(results) > 0 {
 		return errors.New(strings.Join(results, "\n"))
 		return errors.New(strings.Join(results, "\n"))
 	} else {
 	} else {
-		return callback(cb)
+		if stop {
+			go func() {
+				time.Sleep(1 * time.Second)
+				os.Exit(100)
+			}()
+		}
+		return nil
 	}
 	}
 }
 }
+func (m *Manager) Get(t PluginType, name string) (map[string]string, bool) {
+	v, ok := m.registry.Get(t, name)
+	if ok {
+		m := map[string]string{
+			"name":    name,
+			"version": v,
+		}
+		return m, ok
+	}
+	return nil, false
+}
+
+func getSoFileName(m *Manager, t PluginType, name string) (string, error) {
+	v, ok := m.registry.Get(t, name)
+	if !ok {
+		return "", fmt.Errorf("invalid name %s: not exist", name)
+	}
 
 
-func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, error) {
+	soFile := ucFirst(name) + ".so"
+	if v != "" {
+		soFile = fmt.Sprintf("%s@v%s.so", ucFirst(name), v)
+	}
+	return soFile, nil
+}
+
+func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, string, error) {
 	var filenames []string
 	var filenames []string
 	r, err := zip.OpenReader(src)
 	r, err := zip.OpenReader(src)
 	if err != nil {
 	if err != nil {
-		return filenames, err
+		return filenames, "", err
 	}
 	}
 	defer r.Close()
 	defer r.Close()
 
 
-	files := []string{
-		ucFirst(name) + ".so",
-	}
-	paths := []string{
-		path.Join(m.pluginDir, PluginTypes[t], files[0]),
-	}
+	soPrefix := regexp.MustCompile(fmt.Sprintf(`^%s(@v.*)?\.so$`, ucFirst(name)))
+	var yamlFile, yamlPath, version string
+	expFiles := 1
 	if t == SOURCE {
 	if t == SOURCE {
-		files = append(files, name+".yaml")
-		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], files[1]))
-	}
-	for i, d := range files {
-		var z *zip.File
-		for _, file := range r.File {
-			fileName := file.Name
-			if fileName == d {
-				z = file
+		yamlFile = name + ".yaml"
+		yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
+		expFiles = 2
+	}
+	for _, file := range r.File {
+		fileName := file.Name
+		if yamlFile == fileName {
+			err = unzipTo(file, yamlPath)
+			if err != nil {
+				return filenames, "", err
 			}
 			}
+			filenames = append(filenames, yamlPath)
 		}
 		}
-		if z == nil {
-			return filenames, fmt.Errorf("invalid zip file: so file or conf file is missing")
+		if soPrefix.Match([]byte(fileName)) {
+			soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
+			err = unzipTo(file, soPath)
+			if err != nil {
+				return filenames, "", err
+			}
+			filenames = append(filenames, soPath)
+			_, version = parseName(fileName)
 		}
 		}
+	}
+	if len(filenames) != expFiles {
+		return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
+	}
+	return filenames, version, nil
+}
 
 
-		err = unzipTo(z, paths[i])
-		if err != nil {
-			return filenames, err
-		}
-		filenames = append(filenames, paths[i])
+func parseName(n string) (string, string) {
+	result := strings.Split(n, ".so")
+	result = strings.Split(result[0], "@v")
+	name := lcFirst(result[0])
+	if len(result) > 1 {
+		return name, result[1]
 	}
 	}
-	return filenames, nil
+	return name, ""
 }
 }
 
 
 func unzipTo(f *zip.File, fpath string) error {
 func unzipTo(f *zip.File, fpath string) error {
@@ -354,19 +415,3 @@ func lcFirst(str string) string {
 	}
 	}
 	return ""
 	return ""
 }
 }
-
-func callback(u string) error {
-	if strings.Trim(u, " ") == "" {
-		return nil
-	} else {
-		resp, err := http.Get(u)
-		if err != nil {
-			return fmt.Errorf("action succeded but callback failed: %v", err)
-		} else {
-			if resp.StatusCode < 200 || resp.StatusCode > 299 {
-				return fmt.Errorf("action succeeded but callback failed: status %s", resp.Status)
-			}
-		}
-	}
-	return nil
-}

+ 101 - 59
plugins/manager_test.go

@@ -8,32 +8,22 @@ import (
 	"os"
 	"os"
 	"path"
 	"path"
 	"reflect"
 	"reflect"
+	"sort"
 	"testing"
 	"testing"
 )
 )
 
 
 func TestManager_Register(t *testing.T) {
 func TestManager_Register(t *testing.T) {
-	//file server
 	s := httptest.NewServer(
 	s := httptest.NewServer(
 		http.FileServer(http.Dir("testzips")),
 		http.FileServer(http.Dir("testzips")),
 	)
 	)
 	defer s.Close()
 	defer s.Close()
 	endpoint := s.URL
 	endpoint := s.URL
-	//callback server
-	h := http.NewServeMux()
-	h.HandleFunc("/callback/", func(res http.ResponseWriter, req *http.Request) {
-		res.WriteHeader(http.StatusOK)
-	})
-	h.HandleFunc("/callbackE/", func(res http.ResponseWriter, req *http.Request) {
-		http.Error(res, "error", 500)
-	})
-	hs := httptest.NewServer(h)
-	defer hs.Close()
 
 
 	data := []struct {
 	data := []struct {
 		t   PluginType
 		t   PluginType
 		n   string
 		n   string
 		u   string
 		u   string
-		c   string
+		v   string
 		err error
 		err error
 	}{
 	}{
 		{
 		{
@@ -69,18 +59,12 @@ func TestManager_Register(t *testing.T) {
 			t: SOURCE,
 			t: SOURCE,
 			n: "random3",
 			n: "random3",
 			u: endpoint + "/sources/random3.zip",
 			u: endpoint + "/sources/random3.zip",
-			c: hs.URL + "/callback",
+			v: "1.0.0",
 		}, {
 		}, {
 			t: SINK,
 			t: SINK,
 			n: "file2",
 			n: "file2",
 			u: endpoint + "/sinks/file2.zip",
 			u: endpoint + "/sinks/file2.zip",
 		}, {
 		}, {
-			t:   SINK,
-			n:   "file3",
-			u:   endpoint + "/sinks/file3.zip",
-			c:   hs.URL + "/callbackE",
-			err: errors.New("action succeeded but callback failed: status 500 Internal Server Error"),
-		}, {
 			t: FUNCTION,
 			t: FUNCTION,
 			n: "echo2",
 			n: "echo2",
 			u: endpoint + "/functions/echo2.zip",
 			u: endpoint + "/functions/echo2.zip",
@@ -89,12 +73,6 @@ func TestManager_Register(t *testing.T) {
 			n:   "echo2",
 			n:   "echo2",
 			u:   endpoint + "/functions/echo2.zip",
 			u:   endpoint + "/functions/echo2.zip",
 			err: errors.New("invalid name echo2: duplicate"),
 			err: errors.New("invalid name echo2: duplicate"),
-		}, {
-			t:   FUNCTION,
-			n:   "echo3",
-			u:   endpoint + "/functions/echo3.zip",
-			c:   hs.URL + "/nonExist",
-			err: errors.New("action succeeded but callback failed: status 404 Not Found"),
 		},
 		},
 	}
 	}
 	manager, err := NewPluginManager()
 	manager, err := NewPluginManager()
@@ -105,14 +83,13 @@ func TestManager_Register(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(data))
 	fmt.Printf("The test bucket size is %d.\n\n", len(data))
 	for i, tt := range data {
 	for i, tt := range data {
 		err = manager.Register(tt.t, &Plugin{
 		err = manager.Register(tt.t, &Plugin{
-			Name:     tt.n,
-			File:     tt.u,
-			Callback: tt.c,
+			Name: tt.n,
+			File: tt.u,
 		})
 		})
 		if !reflect.DeepEqual(tt.err, err) {
 		if !reflect.DeepEqual(tt.err, err) {
 			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
 			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
 		} else if tt.err == nil {
 		} else if tt.err == nil {
-			err := checkFile(manager.pluginDir, manager.etcDir, tt.t, tt.n)
+			err := checkFile(manager.pluginDir, manager.etcDir, tt.t, tt.n, tt.v)
 			if err != nil {
 			if err != nil {
 				t.Errorf("%d: error : %s\n\n", i, err)
 				t.Errorf("%d: error : %s\n\n", i, err)
 			}
 			}
@@ -121,45 +98,106 @@ func TestManager_Register(t *testing.T) {
 
 
 }
 }
 
 
+func TestManager_List(t *testing.T) {
+	data := []struct {
+		t PluginType
+		r []string
+	}{
+		{
+			t: SOURCE,
+			r: []string{"random", "random2", "random3"},
+		}, {
+			t: SINK,
+			r: []string{"file", "file2"},
+		}, {
+			t: FUNCTION,
+			r: []string{"countPlusOne", "echo", "echo2"},
+		},
+	}
+	manager, err := NewPluginManager()
+	if err != nil {
+		t.Error(err)
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(data))
+
+	for i, p := range data {
+		result, err := manager.List(p.t)
+		if err != nil {
+			t.Errorf("%d: list error : %s\n\n", i, err)
+			return
+		}
+		sort.Strings(result)
+		if !reflect.DeepEqual(p.r, result) {
+			t.Errorf("%d: result mismatch:\n  exp=%v\n  got=%v\n\n", i, p.r, result)
+		}
+	}
+}
+
+func TestManager_Desc(t *testing.T) {
+	data := []struct {
+		t PluginType
+		n string
+		r map[string]string
+	}{
+		{
+			t: SOURCE,
+			n: "random2",
+			r: map[string]string{
+				"name":    "random2",
+				"version": "",
+			},
+		}, {
+			t: SOURCE,
+			n: "random3",
+			r: map[string]string{
+				"name":    "random3",
+				"version": "1.0.0",
+			},
+		}, {
+			t: FUNCTION,
+			n: "echo2",
+			r: map[string]string{
+				"name":    "echo2",
+				"version": "",
+			},
+		},
+	}
+	manager, err := NewPluginManager()
+	if err != nil {
+		t.Error(err)
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(data))
+
+	for i, p := range data {
+		result, ok := manager.Get(p.t, p.n)
+		if !ok {
+			t.Errorf("%d: get error : not found\n\n", i)
+			return
+		}
+		if !reflect.DeepEqual(p.r, result) {
+			t.Errorf("%d: result mismatch:\n  exp=%v\n  got=%v\n\n", i, p.r, result)
+		}
+	}
+}
+
 func TestManager_Delete(t *testing.T) {
 func TestManager_Delete(t *testing.T) {
-	h := http.NewServeMux()
-	h.HandleFunc("/callback/", func(res http.ResponseWriter, req *http.Request) {
-		res.WriteHeader(http.StatusOK)
-	})
-	h.HandleFunc("/callbackE/", func(res http.ResponseWriter, req *http.Request) {
-		http.Error(res, "error", 500)
-	})
-	s := httptest.NewServer(h)
-	defer s.Close()
 	data := []struct {
 	data := []struct {
 		t   PluginType
 		t   PluginType
 		n   string
 		n   string
-		c   string
 		err error
 		err error
 	}{
 	}{
 		{
 		{
-			t:   SOURCE,
-			n:   "random2",
-			c:   s.URL + "/callbackN",
-			err: errors.New("action succeeded but callback failed: status 404 Not Found"),
+			t: SOURCE,
+			n: "random2",
 		}, {
 		}, {
 			t: SINK,
 			t: SINK,
 			n: "file2",
 			n: "file2",
-			c: s.URL + "/callback",
 		}, {
 		}, {
-			t:   FUNCTION,
-			n:   "echo2",
-			c:   s.URL + "/callbackE",
-			err: errors.New("action succeeded but callback failed: status 500 Internal Server Error"),
+			t: FUNCTION,
+			n: "echo2",
 		}, {
 		}, {
 			t: SOURCE,
 			t: SOURCE,
 			n: "random3",
 			n: "random3",
-		}, {
-			t: SINK,
-			n: "file3",
-		}, {
-			t: FUNCTION,
-			n: "echo3",
 		},
 		},
 	}
 	}
 	manager, err := NewPluginManager()
 	manager, err := NewPluginManager()
@@ -169,15 +207,19 @@ func TestManager_Delete(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(data))
 	fmt.Printf("The test bucket size is %d.\n\n", len(data))
 
 
 	for i, p := range data {
 	for i, p := range data {
-		err = manager.Delete(p.t, p.n, p.c)
-		if !reflect.DeepEqual(p.err, err) {
-			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, p.err, err)
+		err = manager.Delete(p.t, p.n, false)
+		if err != nil {
+			t.Errorf("%d: delete error : %s\n\n", i, err)
 		}
 		}
 	}
 	}
 }
 }
 
 
-func checkFile(pluginDir string, etcDir string, t PluginType, name string) error {
-	soPath := path.Join(pluginDir, PluginTypes[t], ucFirst(name)+".so")
+func checkFile(pluginDir string, etcDir string, t PluginType, name string, version string) error {
+	soName := ucFirst(name) + ".so"
+	if version != "" {
+		soName = fmt.Sprintf("%s@v%s.so", ucFirst(name), version)
+	}
+	soPath := path.Join(pluginDir, PluginTypes[t], soName)
 	_, err := os.Stat(soPath)
 	_, err := os.Stat(soPath)
 	if err != nil {
 	if err != nil {
 		return err
 		return err

+ 16 - 7
plugins/plugins.http

@@ -3,36 +3,45 @@
 POST http://127.0.0.1:9081/plugins/sources
 POST http://127.0.0.1:9081/plugins/sources
 Content-Type: application/json
 Content-Type: application/json
 
 
-{"name":"random2","file":"http://127.0.0.1/plugins/sources/random2.zip","callback":""}
+{"name":"random3","file":"http://127.0.0.1/testzips/sources/random3.zip"}
 
 
 ###
 ###
 GET http://127.0.0.1:9081/plugins/sources
 GET http://127.0.0.1:9081/plugins/sources
 
 
 ###
 ###
-DELETE http://127.0.0.1:9081/plugins/sources/random2
+GET http://127.0.0.1:9081/plugins/sources/random3
+
+###
+DELETE http://127.0.0.1:9081/plugins/sources/random3
 
 
 ###
 ###
 POST http://127.0.0.1:9081/plugins/sinks
 POST http://127.0.0.1:9081/plugins/sinks
 Content-Type: application/json
 Content-Type: application/json
 
 
-{"name":"random2","file":"http://127.0.0.1/plugins/sources/random2.zip","callback":"http://www.mycallback.com/callback"}
+{"name":"file2","file":"http://127.0.0.1/testzips/sinks/file2.zip"}
 
 
 ###
 ###
 GET http://127.0.0.1:9081/plugins/sinks
 GET http://127.0.0.1:9081/plugins/sinks
 
 
 ###
 ###
-DELETE http://127.0.0.1:9081/plugins/sinks/random2?callback=http%3A%2F%2Fwww.mycallback.com%2Fcallback
+GET http://127.0.0.1:9081/plugins/sinks/file2
+
+###
+DELETE http://127.0.0.1:9081/plugins/sinks/file2?stop=1
 
 
 ###
 ###
 POST http://127.0.0.1:9081/plugins/functions
 POST http://127.0.0.1:9081/plugins/functions
 Content-Type: application/json
 Content-Type: application/json
 
 
-{"name":"random2","file":"http://127.0.0.1/plugins/sources/random2.zip","callback":""}
+{"name":"echo2","file":"http://127.0.0.1/testzips/functions/echo2.zip"}
 
 
 ###
 ###
 GET http://127.0.0.1:9081/plugins/functions
 GET http://127.0.0.1:9081/plugins/functions
 
 
 ###
 ###
-DELETE http://127.0.0.1:9081/plugins/functions/random2
+GET http://127.0.0.1:9081/plugins/functions/echo2
+
+###
+DELETE http://127.0.0.1:9081/plugins/functions/echo2
 
 
-###2217
+###

BIN
plugins/testzips/functions/echo3.zip


BIN
plugins/testzips/sinks/file3.zip


BIN
plugins/testzips/sources/random3.zip


+ 1 - 1
xsql/ast.go

@@ -1689,7 +1689,7 @@ func isAggFunc(f *Call) bool {
 	} else if _, ok := mathFuncMap[fn]; ok {
 	} else if _, ok := mathFuncMap[fn]; ok {
 		return false
 		return false
 	} else {
 	} else {
-		if nf, err := plugins.GetPlugin(f.Name, "functions"); err == nil {
+		if nf, err := plugins.GetPlugin(f.Name, plugins.FUNCTION); err == nil {
 			if ef, ok := nf.(api.Function); ok && ef.IsAggregate() {
 			if ef, ok := nf.(api.Function); ok && ef.IsAggregate() {
 				return true
 				return true
 			}
 			}

+ 1 - 1
xsql/funcs_aggregate.go

@@ -140,7 +140,7 @@ func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interfac
 		return 0, true
 		return 0, true
 	default:
 	default:
 		common.Log.Debugf("run aggregate func %s", name)
 		common.Log.Debugf("run aggregate func %s", name)
-		if nf, err := plugins.GetPlugin(name, "functions"); err != nil {
+		if nf, err := plugins.GetPlugin(name, plugins.FUNCTION); err != nil {
 			return nil, false
 			return nil, false
 		} else {
 		} else {
 			f, ok := nf.(api.Function)
 			f, ok := nf.(api.Function)

+ 1 - 1
xsql/funcs_ast_validator.go

@@ -26,7 +26,7 @@ func validateFuncs(funcName string, args []Expr) error {
 	} else if _, ok := aggFuncMap[lowerName]; ok {
 	} else if _, ok := aggFuncMap[lowerName]; ok {
 		return validateAggFunc(lowerName, args)
 		return validateAggFunc(lowerName, args)
 	} else {
 	} else {
-		if nf, err := plugins.GetPlugin(funcName, "functions"); err != nil {
+		if nf, err := plugins.GetPlugin(funcName, plugins.FUNCTION); err != nil {
 			return err
 			return err
 		} else {
 		} else {
 			f, ok := nf.(api.Function)
 			f, ok := nf.(api.Function)

+ 1 - 1
xsql/functions.go

@@ -76,7 +76,7 @@ func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool)
 		return nil, false
 		return nil, false
 	} else {
 	} else {
 		common.Log.Debugf("run func %s", name)
 		common.Log.Debugf("run func %s", name)
-		if nf, err := plugins.GetPlugin(name, "functions"); err != nil {
+		if nf, err := plugins.GetPlugin(name, plugins.FUNCTION); err != nil {
 			return err, false
 			return err, false
 		} else {
 		} else {
 			f, ok := nf.(api.Function)
 			f, ok := nf.(api.Function)

+ 46 - 9
xstream/cli/main.go

@@ -130,7 +130,7 @@ func main() {
 		{
 		{
 			Name:    "create",
 			Name:    "create",
 			Aliases: []string{"create"},
 			Aliases: []string{"create"},
-			Usage:   "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file",
+			Usage:   "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file",
 
 
 			Subcommands: []cli.Command{
 			Subcommands: []cli.Command{
 				{
 				{
@@ -213,7 +213,7 @@ func main() {
 				},
 				},
 				{
 				{
 					Name:  "plugin",
 					Name:  "plugin",
-					Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f rule_def_file]",
+					Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
 					Flags: []cli.Flag{
 					Flags: []cli.Flag{
 						cli.StringFlag{
 						cli.StringFlag{
 							Name:     "file, f",
 							Name:     "file, f",
@@ -272,7 +272,7 @@ func main() {
 		{
 		{
 			Name:    "describe",
 			Name:    "describe",
 			Aliases: []string{"describe"},
 			Aliases: []string{"describe"},
-			Usage:   "describe stream $stream_name | describe rule $rule_name",
+			Usage:   "describe stream $stream_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name",
 			Subcommands: []cli.Command{
 			Subcommands: []cli.Command{
 				{
 				{
 					Name:  "stream",
 					Name:  "stream",
@@ -302,13 +302,41 @@ func main() {
 						return nil
 						return nil
 					},
 					},
 				},
 				},
+				{
+					Name:  "plugin",
+					Usage: "describe plugin $plugin_type $plugin_name",
+					//Flags: nflag,
+					Action: func(c *cli.Context) error {
+						ptype, err := getPluginType(c.Args()[0])
+						if err != nil {
+							fmt.Printf("%s\n", err)
+							return nil
+						}
+						pname := c.Args()[1]
+						args := &common.PluginDesc{
+							RuleDesc: common.RuleDesc{
+								Name: pname,
+							},
+							Type: ptype,
+						}
+
+						var reply string
+						err = client.Call("Server.DescPlugin", args, &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 			},
 		},
 		},
 
 
 		{
 		{
 			Name:    "drop",
 			Name:    "drop",
 			Aliases: []string{"drop"},
 			Aliases: []string{"drop"},
-			Usage:   "drop stream $stream_name | drop rule $rule_name",
+			Usage:   "drop stream $stream_name | drop rule $rule_name | drop plugin $plugin_type $plugin_name -r $stop",
 			Subcommands: []cli.Command{
 			Subcommands: []cli.Command{
 				{
 				{
 					Name:  "stream",
 					Name:  "stream",
@@ -341,9 +369,19 @@ func main() {
 				},
 				},
 				{
 				{
 					Name:  "plugin",
 					Name:  "plugin",
-					Usage: "drop plugin $plugin_type $plugin_name $plugin_json",
-					//Flags: nflag,
+					Usage: "drop plugin $plugin_type $plugin_name -s stop",
+					Flags: []cli.Flag{
+						cli.StringFlag{
+							Name:  "stop, s",
+							Usage: "stop kuiper after the action",
+						},
+					},
 					Action: func(c *cli.Context) error {
 					Action: func(c *cli.Context) error {
+						r := c.String("stop")
+						if r != "true" && r != "false" {
+							fmt.Printf("Expect r to be a boolean value.\n")
+							return nil
+						}
 						if len(c.Args()) < 2 || len(c.Args()) > 3 {
 						if len(c.Args()) < 2 || len(c.Args()) > 3 {
 							fmt.Printf("Expect plugin type and name.\n")
 							fmt.Printf("Expect plugin type and name.\n")
 							return nil
 							return nil
@@ -359,10 +397,9 @@ func main() {
 								Name: pname,
 								Name: pname,
 							},
 							},
 							Type: ptype,
 							Type: ptype,
+							Stop: r == "true",
 						}
 						}
-						if len(c.Args()) == 3 {
-							args.Json = c.Args()[2]
-						}
+
 						var reply string
 						var reply string
 						err = client.Call("Server.DropPlugin", args, &reply)
 						err = client.Call("Server.DropPlugin", args, &reply)
 						if err != nil {
 						if err != nil {

+ 1 - 1
xstream/nodes/sink_node.go

@@ -209,7 +209,7 @@ func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 	case "rest":
 	case "rest":
 		s = &sinks.RestSink{}
 		s = &sinks.RestSink{}
 	default:
 	default:
-		nf, err := plugins.GetPlugin(name, "sinks")
+		nf, err := plugins.GetPlugin(name, plugins.SINK)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}

+ 1 - 1
xstream/nodes/source_node.go

@@ -153,7 +153,7 @@ func doGetSource(t string) (api.Source, error) {
 	case "mqtt":
 	case "mqtt":
 		s = &extensions.MQTTSource{}
 		s = &extensions.MQTTSource{}
 	default:
 	default:
-		nf, err := plugins.GetPlugin(t, "sources")
+		nf, err := plugins.GetPlugin(t, plugins.SOURCE)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}

+ 1 - 1
xstream/server/main.go

@@ -2,7 +2,7 @@ package main
 
 
 import "github.com/emqx/kuiper/xstream/server/server"
 import "github.com/emqx/kuiper/xstream/server/server"
 
 
-var Version string = "unknown"
+var Version = "unknown"
 
 
 func main() {
 func main() {
 	server.StartUp(Version)
 	server.StartUp(Version)

+ 18 - 6
xstream/server/server/rest.go

@@ -63,11 +63,11 @@ func createRestServer(port int) *http.Server {
 	r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
 	r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
 
 
 	r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete)
+	r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
 	r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete)
+	r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
 	r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete)
+	r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
 
 
 	server := &http.Server{
 	server := &http.Server{
 		Addr: fmt.Sprintf("0.0.0.0:%d", port),
 		Addr: fmt.Sprintf("0.0.0.0:%d", port),
@@ -300,17 +300,29 @@ func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType)
 	defer r.Body.Close()
 	defer r.Body.Close()
 	vars := mux.Vars(r)
 	vars := mux.Vars(r)
 	name := vars["name"]
 	name := vars["name"]
-	cb := r.URL.Query().Get("callback")
+	cb := r.URL.Query().Get("stop")
 
 
 	switch r.Method {
 	switch r.Method {
 	case http.MethodDelete:
 	case http.MethodDelete:
-		err := pluginManager.Delete(t, name, cb)
+		r := cb == "1"
+		err := pluginManager.Delete(t, name, r)
 		if err != nil {
 		if err != nil {
 			handleError(w, fmt.Errorf("delete %s plugin %s error: %s", plugins.PluginTypes[t], name, err), http.StatusBadRequest, logger)
 			handleError(w, fmt.Errorf("delete %s plugin %s error: %s", plugins.PluginTypes[t], name, err), http.StatusBadRequest, logger)
 			return
 			return
 		}
 		}
 		w.WriteHeader(http.StatusOK)
 		w.WriteHeader(http.StatusOK)
-		w.Write([]byte(fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)))
+		result := fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)
+		if r {
+			result = fmt.Sprintf("%s and Kuiper will be stopped", result)
+		}
+		w.Write([]byte(result))
+	case http.MethodGet:
+		j, ok := pluginManager.Get(t, name)
+		if !ok {
+			handleError(w, fmt.Errorf("describe %s plugin %s error: not found", plugins.PluginTypes[t], name), http.StatusBadRequest, logger)
+			return
+		}
+		jsonResponse(j, w, logger)
 	}
 	}
 }
 }
 
 

+ 31 - 2
xstream/server/server/rpc.go

@@ -1,6 +1,7 @@
 package server
 package server
 
 
 import (
 import (
+	"bytes"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
@@ -184,11 +185,16 @@ func (t *Server) DropPlugin(arg *common.PluginDesc, reply *string) error {
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("Drop plugin error: %s", err)
 		return fmt.Errorf("Drop plugin error: %s", err)
 	}
 	}
-	err = pluginManager.Delete(pt, p.Name, p.Callback)
+	err = pluginManager.Delete(pt, p.Name, arg.Stop)
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("Drop plugin error: %s", err)
 		return fmt.Errorf("Drop plugin error: %s", err)
 	} else {
 	} else {
-		*reply = fmt.Sprintf("Plugin %s is dropped.", p.Name)
+		if arg.Stop {
+			*reply = fmt.Sprintf("Plugin %s is dropped and Kuiper will be stopped.", p.Name)
+		} else {
+			*reply = fmt.Sprintf("Plugin %s is dropped.", p.Name)
+		}
+
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -207,6 +213,29 @@ func (t *Server) ShowPlugins(arg int, reply *string) error {
 	return nil
 	return nil
 }
 }
 
 
+func (t *Server) DescPlugin(arg *common.PluginDesc, reply *string) error {
+	pt := plugins.PluginType(arg.Type)
+	p, err := getPluginByJson(arg)
+	if err != nil {
+		return fmt.Errorf("Describe plugin error: %s", err)
+	}
+	m, ok := pluginManager.Get(pt, p.Name)
+	if !ok {
+		return fmt.Errorf("Describe plugin error: not found")
+	} else {
+		s, err := json.Marshal(m)
+		if err != nil {
+			return fmt.Errorf("Describe plugin error: invalid json %v", m)
+		}
+		dst := &bytes.Buffer{}
+		if err := json.Indent(dst, s, "", "  "); err != nil {
+			return fmt.Errorf("Describe plugin error: indent json error %v", err)
+		}
+		*reply = dst.String()
+	}
+	return nil
+}
+
 func getPluginByJson(arg *common.PluginDesc) (*plugins.Plugin, error) {
 func getPluginByJson(arg *common.PluginDesc) (*plugins.Plugin, error) {
 	var p plugins.Plugin
 	var p plugins.Plugin
 	if arg.Json != "" {
 	if arg.Json != "" {