Browse Source

Merge pull request #183 from emqx/plugins

Plugins management callback, cli and doc
jinfahua 5 years ago
parent
commit
83077b9cdf

+ 7 - 2
common/data.go

@@ -1,5 +1,10 @@
 package common
 
-type Rule struct {
+type RuleDesc struct {
 	Name, Json string
-}
+}
+
+type PluginDesc struct {
+	RuleDesc
+	Type int
+}

+ 0 - 46
common/plugin_manager/manager.go

@@ -1,46 +0,0 @@
-package plugin_manager
-
-import (
-	"fmt"
-	"github.com/emqx/kuiper/common"
-	"path"
-	"plugin"
-	"unicode"
-)
-
-var registry map[string]plugin.Symbol
-
-func init() {
-	registry = make(map[string]plugin.Symbol)
-}
-
-func GetPlugin(t string, ptype string) (plugin.Symbol, error) {
-	t = ucFirst(t)
-	key := ptype + "/" + t
-	var nf plugin.Symbol
-	nf, ok := registry[key]
-	if !ok {
-		loc, err := common.GetLoc("/plugins/")
-		if err != nil {
-			return nil, fmt.Errorf("cannot find the plugins folder")
-		}
-		mod := path.Join(loc, ptype, t+".so")
-		plug, err := plugin.Open(mod)
-		if err != nil {
-			return nil, fmt.Errorf("cannot open %s: %v", mod, err)
-		}
-		nf, err = plug.Lookup(t)
-		if err != nil {
-			return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
-		}
-		registry[key] = nf
-	}
-	return nf, nil
-}
-
-func ucFirst(str string) string {
-	for i, v := range str {
-		return string(unicode.ToUpper(v)) + str[i+1:]
-	}
-	return ""
-}

+ 1 - 0
docs/en_US/cli/overview.md

@@ -7,4 +7,5 @@ The Kuiper CLI acts as a client to the Kuiper server. The Kuiper server runs the
 
 - [Streams](streams.md)
 - [Rules](rules.md)
+- [Plugins](plugins.md)
 

+ 84 - 0
docs/en_US/cli/plugins.md

@@ -0,0 +1,84 @@
+# Plugins management
+
+The Kuiper plugin command line tools allows you to manage plugins, such as create, show and drop plugins. Notice that, drop a plugin will need to restart kuiper to take effect. To update a plugin, do the following:
+1. Drop the plugin.
+2. Restart Kuiper.
+3. Create the plugin with the new configuration.
+
+## create a plugin
+
+The command is used for creating a plugin.  The plugin's definition is specified with JSON format.
+
+```shell
+create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file
+```
+
+The plugin can be created with two ways. 
+
+- Specify the plugin definition in command line.
+
+Sample:
+
+```shell
+# bin/cli create plugin source random {"file":"http://127.0.0.1/plugins/sources/random.zip", "callback":"http://mycallback.com"}
+```
+
+The command create a source plugin named ``random``. 
+
+- Specify the plugin definition in a file. If the plugin is complex, or the plugin is already wrote in text files with well organized formats, you can just specify the plugin definition through ``-f`` option.
+
+Sample:
+
+```shell
+# bin/cli create plugin sink plugin1 -f /tmp/plugin1.txt
+```
+
+Below is the contents of ``rule.txt``.
+
+```json
+{
+  "file":"http://127.0.0.1/plugins/sources/random.zip",
+  "callback":"http://mycallback.com"
+}
+```
+### parameters
+1. plugin_type: the type of the plugin. Available values are `["source", "sink", "functions"]`
+2. plugin_name: a unique name of the plugin. The name must be the same as the camel case version of the plugin with lowercase first letter. For example, if the exported plugin name is `Random`, then the name of this plugin is `random`.
+3. file: the url of the plugin files. It must be a zip file with: a compiled so file and the yaml file(only required for sources). The name of the files must match the name of the plugin. Please check [Extension](../extension/overview.md) for the naming rule.
+4. callback: optional parameter to specify the url to call once the plugin is created. We will issue a GET request to the callback url.
+
+## show plugins
+
+The command is used for displaying all plugins defined in the server for a plugin type.
+
+```shell
+show plugins function
+```
+
+Sample:
+
+```shell
+# bin/cli show plugins function
+function1
+function2
+```
+
+## drop a plugin
+
+The command is used for drop the plugin.
+
+```shell
+drop plugin $plugin_type $plugin_name $plugin_json 
+```
+In which, `$plugin_json` is optional. Currently, only `callback` parameter is supported in the `plugin_json`
+Sample:
+
+```shell
+# bin/cli drop plugin source random
+Plugin random is dropped.
+```
+
+```shell
+# bin/cli drop plugin sink plugin1 {"callback":"http://mycallback.com"}
+Plugin plugin1 is dropped.
+```

+ 4 - 4
docs/en_US/cli/rules.md

@@ -105,7 +105,7 @@ Sample:
 
 ```shell
 # bin/cli drop rule rule1
-rule rule1 dropped
+Rule rule1 is dropped.
 ```
 
 ## start a rule
@@ -120,7 +120,7 @@ Sample:
 
 ```shell
 # bin/cli start rule rule1
-rule rule1 started
+Rule rule1 was started.
 ```
 
 ## stop a rule
@@ -135,7 +135,7 @@ Sample:
 
 ```shell
 # bin/cli stop rule rule1
-rule rule1 stopped
+Rule rule1 was stopped.
 ```
 
 ## restart a rule
@@ -150,7 +150,7 @@ Sample:
 
 ```shell
 # bin/cli restart rule rule1
-rule rule1 restarted
+Rule rule1 was restarted.
 ```
 
 ## get the status of a rule

+ 1 - 0
docs/en_US/restapi/overview.md

@@ -4,4 +4,5 @@ By default, the REST API are running in port 9081. You can change the port in `/
 
 - [Streams](streams.md)
 - [Rules](rules.md)
+- [Plugins](plugins.md)
 

+ 61 - 0
docs/en_US/restapi/plugins.md

@@ -0,0 +1,61 @@
+# Plugins management
+
+The Kuiper REST api for plugins allows you to manage plugins, such as create, drop and list plugins. Notice that, drop a plugin will need to restart kuiper to take effect. To update a plugin, do the following:
+1. Drop the plugin.
+2. Restart Kuiper.
+3. Create the plugin with the new configuration.
+
+## create a plugin
+
+The API accepts a JSON content to create a new plugin. Each plugin type has a standalone endpoint. The supported types are `["sources", "sinks", "functions"`. The plugin is identified by the name. The name must be unique.
+```shell
+POST http://localhost:9081/plugins/sources
+POST http://localhost:9081/plugins/sinks
+POST http://localhost:9081/plugins/functions
+```
+Request Sample
+
+```json
+{
+  "name":"random",
+  "file":"http://127.0.0.1/plugins/sources/random.zip",
+  "callback":"http://mycallback.com"
+}
+```
+
+### Parameters
+
+1. name: a unique name of the plugin. The name must be the same as the camel case version of the plugin with lowercase first letter. For example, if the exported plugin name is `Random`, then the name of this plugin is `random`.
+2. file: the url of the plugin files. It must be a zip file with: a compiled so file and the yaml file(only required for sources). The name of the files must match the name of the plugin. Please check [Extension](../extension/overview.md) for the naming rule.
+3. callback: optional parameter to specify the url to call once the plugin is created. We will issue a GET request to the callback url.
+
+
+## show plugins
+
+The API is used for displaying all of plugins defined in the server for a plugin type.
+
+```shell
+GET http://localhost:9081/plugins/sources
+GET http://localhost:9081/plugins/sinks
+GET http://localhost:9081/plugins/functions
+```
+
+Response Sample:
+
+```json
+["plugin1","plugin2"]
+```
+
+## drop a rule
+
+The API is used for drop the plugin. The kuiper server needs to be restarted to take effect.
+
+```shell
+DELETE http://localhost:8080/rules/sources/{name}
+DELETE http://localhost:8080/rules/sinks/{name}
+DELETE http://localhost:8080/rules/functions/{name}
+```
+The user can pass a query parameter for a callback url. Kuiper will issue a GET request to the callback url after deleting the plugin.
+```shell
+DELETE http://localhost:8080/rules/sources/{name}?callback=http%3A%2F%2Fwww.mycallback.com%2Fcallback
+```

+ 42 - 4
plugins/manager.go

@@ -12,6 +12,7 @@ import (
 	"os"
 	"path"
 	"path/filepath"
+	"plugin"
 	"strings"
 	"sync"
 	"unicode"
@@ -69,6 +70,32 @@ func (rr *Registry) List(t PluginType) (values []string) {
 //	rr.Unlock()
 //}
 
+var symbolRegistry = make(map[string]plugin.Symbol)
+
+func GetPlugin(t string, ptype string) (plugin.Symbol, error) {
+	t = ucFirst(t)
+	key := ptype + "/" + t
+	var nf plugin.Symbol
+	nf, ok := symbolRegistry[key]
+	if !ok {
+		loc, err := common.GetLoc("/plugins/")
+		if err != nil {
+			return nil, fmt.Errorf("cannot find the plugins folder")
+		}
+		mod := path.Join(loc, ptype, t+".so")
+		plug, err := plugin.Open(mod)
+		if err != nil {
+			return nil, fmt.Errorf("cannot open %s: %v", mod, err)
+		}
+		nf, err = plug.Lookup(t)
+		if err != nil {
+			return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
+		}
+		symbolRegistry[key] = nf
+	}
+	return nf, nil
+}
+
 type Manager struct {
 	pluginDir string
 	etcDir    string
@@ -167,7 +194,7 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 	return callback(cb)
 }
 
-func (m *Manager) Delete(t PluginType, name string) (result error) {
+func (m *Manager) Delete(t PluginType, name string, cb string) error {
 	name = strings.Trim(name, " ")
 	if name == "" {
 		return fmt.Errorf("invalid name %s: should not be empty", name)
@@ -203,7 +230,7 @@ func (m *Manager) Delete(t PluginType, name string) (result error) {
 	if len(results) > 0 {
 		return errors.New(strings.Join(results, "\n"))
 	} else {
-		return nil
+		return callback(cb)
 	}
 }
 
@@ -292,14 +319,13 @@ func isValidUrl(uri string) bool {
 }
 
 func downloadFile(filepath string, url string) error {
-
 	// Get the data
 	resp, err := http.Get(url)
 	if err != nil {
 		return err
 	}
 	if resp.StatusCode != http.StatusOK {
-		return fmt.Errorf("cannot download the file with status: %d %s", resp.StatusCode, resp.Status)
+		return fmt.Errorf("cannot download the file with status: %s", resp.Status)
 	}
 	defer resp.Body.Close()
 
@@ -330,5 +356,17 @@ func lcFirst(str string) string {
 }
 
 func callback(u string) error {
+	if strings.Trim(u, " ") == "" {
+		return nil
+	} else {
+		resp, err := http.Get(u)
+		if err != nil {
+			return fmt.Errorf("action succeded but callback failed: %v", err)
+		} else {
+			if resp.StatusCode < 200 || resp.StatusCode > 299 {
+				return fmt.Errorf("action succeeded but callback failed: status %s", resp.Status)
+			}
+		}
+	}
 	return nil
 }

+ 60 - 7
plugins/manager_test.go

@@ -12,16 +12,28 @@ import (
 )
 
 func TestManager_Register(t *testing.T) {
+	//file server
 	s := httptest.NewServer(
 		http.FileServer(http.Dir("testzips")),
 	)
 	defer s.Close()
 	endpoint := s.URL
+	//callback server
+	h := http.NewServeMux()
+	h.HandleFunc("/callback/", func(res http.ResponseWriter, req *http.Request) {
+		res.WriteHeader(http.StatusOK)
+	})
+	h.HandleFunc("/callbackE/", func(res http.ResponseWriter, req *http.Request) {
+		http.Error(res, "error", 500)
+	})
+	hs := httptest.NewServer(h)
+	defer hs.Close()
 
 	data := []struct {
 		t   PluginType
 		n   string
 		u   string
+		c   string
 		err error
 	}{
 		{
@@ -54,10 +66,21 @@ func TestManager_Register(t *testing.T) {
 			n: "random2",
 			u: endpoint + "/sources/random2.zip",
 		}, {
+			t: SOURCE,
+			n: "random3",
+			u: endpoint + "/sources/random3.zip",
+			c: hs.URL + "/callback",
+		}, {
 			t: SINK,
 			n: "file2",
 			u: endpoint + "/sinks/file2.zip",
 		}, {
+			t:   SINK,
+			n:   "file3",
+			u:   endpoint + "/sinks/file3.zip",
+			c:   hs.URL + "/callbackE",
+			err: errors.New("action succeeded but callback failed: status 500 Internal Server Error"),
+		}, {
 			t: FUNCTION,
 			n: "echo2",
 			u: endpoint + "/functions/echo2.zip",
@@ -66,6 +89,12 @@ func TestManager_Register(t *testing.T) {
 			n:   "echo2",
 			u:   endpoint + "/functions/echo2.zip",
 			err: errors.New("invalid name echo2: duplicate"),
+		}, {
+			t:   FUNCTION,
+			n:   "echo3",
+			u:   endpoint + "/functions/echo3.zip",
+			c:   hs.URL + "/nonExist",
+			err: errors.New("action succeeded but callback failed: status 404 Not Found"),
 		},
 	}
 	manager, err := NewPluginManager()
@@ -78,7 +107,7 @@ func TestManager_Register(t *testing.T) {
 		err = manager.Register(tt.t, &Plugin{
 			Name:     tt.n,
 			File:     tt.u,
-			Callback: "",
+			Callback: tt.c,
 		})
 		if !reflect.DeepEqual(tt.err, err) {
 			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
@@ -93,20 +122,44 @@ func TestManager_Register(t *testing.T) {
 }
 
 func TestManager_Delete(t *testing.T) {
+	h := http.NewServeMux()
+	h.HandleFunc("/callback/", func(res http.ResponseWriter, req *http.Request) {
+		res.WriteHeader(http.StatusOK)
+	})
+	h.HandleFunc("/callbackE/", func(res http.ResponseWriter, req *http.Request) {
+		http.Error(res, "error", 500)
+	})
+	s := httptest.NewServer(h)
+	defer s.Close()
 	data := []struct {
 		t   PluginType
 		n   string
+		c   string
 		err error
 	}{
 		{
-			t: SOURCE,
-			n: "random2",
+			t:   SOURCE,
+			n:   "random2",
+			c:   s.URL + "/callbackN",
+			err: errors.New("action succeeded but callback failed: status 404 Not Found"),
 		}, {
 			t: SINK,
 			n: "file2",
+			c: s.URL + "/callback",
+		}, {
+			t:   FUNCTION,
+			n:   "echo2",
+			c:   s.URL + "/callbackE",
+			err: errors.New("action succeeded but callback failed: status 500 Internal Server Error"),
+		}, {
+			t: SOURCE,
+			n: "random3",
+		}, {
+			t: SINK,
+			n: "file3",
 		}, {
 			t: FUNCTION,
-			n: "echo2",
+			n: "echo3",
 		},
 	}
 	manager, err := NewPluginManager()
@@ -116,9 +169,9 @@ func TestManager_Delete(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(data))
 
 	for i, p := range data {
-		err = manager.Delete(p.t, p.n)
-		if err != nil {
-			t.Errorf("%d: delete error : %s\n\n", i, err)
+		err = manager.Delete(p.t, p.n, p.c)
+		if !reflect.DeepEqual(p.err, err) {
+			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, p.err, err)
 		}
 	}
 }

+ 2 - 2
plugins/plugins.http

@@ -15,13 +15,13 @@ DELETE http://127.0.0.1:9081/plugins/sources/random2
 POST http://127.0.0.1:9081/plugins/sinks
 Content-Type: application/json
 
-{"name":"random2","file":"http://127.0.0.1/plugins/sources/random2.zip","callback":""}
+{"name":"random2","file":"http://127.0.0.1/plugins/sources/random2.zip","callback":"http://www.mycallback.com/callback"}
 
 ###
 GET http://127.0.0.1:9081/plugins/sinks
 
 ###
-DELETE http://127.0.0.1:9081/plugins/sinks/random2
+DELETE http://127.0.0.1:9081/plugins/sinks/random2?callback=http%3A%2F%2Fwww.mycallback.com%2Fcallback
 
 ###
 POST http://127.0.0.1:9081/plugins/functions

BIN
plugins/testzips/functions/echo3.zip


BIN
plugins/testzips/sinks/file3.zip


BIN
plugins/testzips/sources/random3.zip


+ 2 - 2
xsql/ast.go

@@ -3,7 +3,7 @@ package xsql
 import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/common/plugin_manager"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/api"
 	"math"
 	"reflect"
@@ -1680,7 +1680,7 @@ func isAggFunc(f *Call) bool {
 	} else if _, ok := mathFuncMap[fn]; ok {
 		return false
 	} else {
-		if nf, err := plugin_manager.GetPlugin(f.Name, "functions"); err == nil {
+		if nf, err := plugins.GetPlugin(f.Name, "functions"); err == nil {
 			if ef, ok := nf.(api.Function); ok && ef.IsAggregate() {
 				return true
 			}

+ 2 - 2
xsql/funcs_aggregate.go

@@ -3,7 +3,7 @@ package xsql
 import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/common/plugin_manager"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/api"
 	"strings"
 )
@@ -140,7 +140,7 @@ func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interfac
 		return 0, true
 	default:
 		common.Log.Debugf("run aggregate func %s", name)
-		if nf, err := plugin_manager.GetPlugin(name, "functions"); err != nil {
+		if nf, err := plugins.GetPlugin(name, "functions"); err != nil {
 			return nil, false
 		} else {
 			f, ok := nf.(api.Function)

+ 2 - 2
xsql/funcs_ast_validator.go

@@ -2,7 +2,7 @@ package xsql
 
 import (
 	"fmt"
-	"github.com/emqx/kuiper/common/plugin_manager"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/api"
 	"strings"
 )
@@ -26,7 +26,7 @@ func validateFuncs(funcName string, args []Expr) error {
 	} else if _, ok := aggFuncMap[lowerName]; ok {
 		return validateAggFunc(lowerName, args)
 	} else {
-		if nf, err := plugin_manager.GetPlugin(funcName, "functions"); err != nil {
+		if nf, err := plugins.GetPlugin(funcName, "functions"); err != nil {
 			return err
 		} else {
 			f, ok := nf.(api.Function)

+ 2 - 2
xsql/functions.go

@@ -2,7 +2,7 @@ package xsql
 
 import (
 	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/common/plugin_manager"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/api"
 	"strings"
 )
@@ -76,7 +76,7 @@ func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool)
 		return nil, false
 	} else {
 		common.Log.Debugf("run func %s", name)
-		if nf, err := plugin_manager.GetPlugin(name, "functions"); err != nil {
+		if nf, err := plugins.GetPlugin(name, "functions"); err != nil {
 			return err, false
 		} else {
 			f, ok := nf.(api.Function)

+ 146 - 17
xstream/cli/main.go

@@ -146,13 +146,8 @@ func main() {
 					Action: func(c *cli.Context) error {
 						sfile := c.String("file")
 						if sfile != "" {
-							if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
-								fmt.Printf("The specified stream defintion file %s does not existed.\n", sfile)
-								return nil
-							}
-							fmt.Printf("Creating a new stream from file %s.\n", sfile)
-							if stream, err := ioutil.ReadFile(sfile); err != nil {
-								fmt.Printf("Failed to read from stream definition file %s.\n", sfile)
+							if stream, err := readDef(sfile, "stream"); err != nil {
+								fmt.Printf("%s", err)
 								return nil
 							} else {
 								args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
@@ -178,13 +173,8 @@ func main() {
 					Action: func(c *cli.Context) error {
 						sfile := c.String("file")
 						if sfile != "" {
-							if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
-								fmt.Printf("The specified rule defenition file %s is not existed.\n", sfile)
-								return nil
-							}
-							fmt.Printf("Creating a new rule from file %s.\n", sfile)
-							if rule, err := ioutil.ReadFile(sfile); err != nil {
-								fmt.Printf("Failed to read from rule definition file %s.\n", sfile)
+							if rule, err := readDef(sfile, "rule"); err != nil {
+								fmt.Printf("%s", err)
 								return nil
 							} else {
 								if len(c.Args()) != 1 {
@@ -193,7 +183,7 @@ func main() {
 								}
 								rname := c.Args()[0]
 								var reply string
-								args := &common.Rule{rname, string(rule)}
+								args := &common.RuleDesc{rname, string(rule)}
 								err = client.Call("Server.CreateRule", args, &reply)
 								if err != nil {
 									fmt.Println(err)
@@ -210,7 +200,7 @@ func main() {
 							rname := c.Args()[0]
 							rjson := c.Args()[1]
 							var reply string
-							args := &common.Rule{rname, rjson}
+							args := &common.RuleDesc{rname, rjson}
 							err = client.Call("Server.CreateRule", args, &reply)
 							if err != nil {
 								fmt.Println(err)
@@ -221,6 +211,62 @@ func main() {
 						}
 					},
 				},
+				{
+					Name:  "plugin",
+					Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f rule_def_file]",
+					Flags: []cli.Flag{
+						cli.StringFlag{
+							Name:     "file, f",
+							Usage:    "the location of plugin definition file",
+							FilePath: "/home/myplugin.txt",
+						},
+					},
+					Action: func(c *cli.Context) error {
+						if len(c.Args()) < 2 {
+							fmt.Printf("Expect plugin type and name.\n")
+							return nil
+						}
+						ptype, err := getPluginType(c.Args()[0])
+						if err != nil {
+							fmt.Printf("%s\n", err)
+							return nil
+						}
+						pname := c.Args()[1]
+						sfile := c.String("file")
+						args := &common.PluginDesc{
+							RuleDesc: common.RuleDesc{
+								Name: pname,
+							},
+							Type: ptype,
+						}
+						if sfile != "" {
+							if len(c.Args()) != 2 {
+								fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
+								return nil
+							}
+							if p, err := readDef(sfile, "plugin"); err != nil {
+								fmt.Printf("%s", err)
+								return nil
+							} else {
+								args.Json = string(p)
+							}
+						} else {
+							if len(c.Args()) != 3 {
+								fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
+								return nil
+							}
+							args.Json = c.Args()[2]
+						}
+						var reply string
+						err = client.Call("Server.CreatePlugin", args, &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 		},
 		{
@@ -293,13 +339,47 @@ func main() {
 						return nil
 					},
 				},
+				{
+					Name:  "plugin",
+					Usage: "drop plugin $plugin_type $plugin_name $plugin_json",
+					//Flags: nflag,
+					Action: func(c *cli.Context) error {
+						if len(c.Args()) < 2 || len(c.Args()) > 3 {
+							fmt.Printf("Expect plugin type and name.\n")
+							return nil
+						}
+						ptype, err := getPluginType(c.Args()[0])
+						if err != nil {
+							fmt.Printf("%s\n", err)
+							return nil
+						}
+						pname := c.Args()[1]
+						args := &common.PluginDesc{
+							RuleDesc: common.RuleDesc{
+								Name: pname,
+							},
+							Type: ptype,
+						}
+						if len(c.Args()) == 3 {
+							args.Json = c.Args()[2]
+						}
+						var reply string
+						err = client.Call("Server.DropPlugin", args, &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 		},
 
 		{
 			Name:    "show",
 			Aliases: []string{"show"},
-			Usage:   "show streams | show rules",
+			Usage:   "show streams | show rules | show plugins $plugin_type",
 
 			Subcommands: []cli.Command{
 				{
@@ -324,6 +404,29 @@ func main() {
 						return nil
 					},
 				},
+				{
+					Name:  "plugins",
+					Usage: "show plugins $plugin_type",
+					Action: func(c *cli.Context) error {
+						if len(c.Args()) != 1 {
+							fmt.Printf("Expect plugin type.\n")
+							return nil
+						}
+						ptype, err := getPluginType(c.Args()[0])
+						if err != nil {
+							fmt.Printf("%s\n", err)
+							return nil
+						}
+						var reply string
+						err = client.Call("Server.ShowPlugins", ptype, &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 		},
 
@@ -455,3 +558,29 @@ func main() {
 		fmt.Printf("%v", err)
 	}
 }
+
+func getPluginType(arg string) (ptype int, err error) {
+	switch arg {
+	case "source":
+		ptype = 0
+	case "sink":
+		ptype = 1
+	case "function":
+		ptype = 2
+	default:
+		err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\" or \"function\".\n", arg)
+	}
+	return
+}
+
+func readDef(sfile string, t string) ([]byte, error) {
+	if _, err := os.Stat(sfile); os.IsNotExist(err) {
+		return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
+	}
+	fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
+	if rule, err := ioutil.ReadFile(sfile); err != nil {
+		return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
+	} else {
+		return rule, nil
+	}
+}

+ 2 - 2
xstream/nodes/sink_node.go

@@ -3,7 +3,7 @@ package nodes
 import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/common/plugin_manager"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/sinks"
 	"sync"
@@ -209,7 +209,7 @@ func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 	case "rest":
 		s = &sinks.RestSink{}
 	default:
-		nf, err := plugin_manager.GetPlugin(name, "sinks")
+		nf, err := plugins.GetPlugin(name, "sinks")
 		if err != nil {
 			return nil, err
 		}

+ 2 - 2
xstream/nodes/source_node.go

@@ -3,7 +3,7 @@ package nodes
 import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/common/plugin_manager"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/extensions"
@@ -153,7 +153,7 @@ func doGetSource(t string) (api.Source, error) {
 	case "mqtt":
 		s = &extensions.MQTTSource{}
 	default:
-		nf, err := plugin_manager.GetPlugin(t, "sources")
+		nf, err := plugins.GetPlugin(t, "sources")
 		if err != nil {
 			return nil, err
 		}

+ 2 - 1
xstream/server/server/rest.go

@@ -300,10 +300,11 @@ func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType)
 	defer r.Body.Close()
 	vars := mux.Vars(r)
 	name := vars["name"]
+	cb := r.URL.Query().Get("callback")
 
 	switch r.Method {
 	case http.MethodDelete:
-		err := pluginManager.Delete(t, name)
+		err := pluginManager.Delete(t, name, cb)
 		if err != nil {
 			handleError(w, fmt.Errorf("delete %s plugin %s error: %s", plugins.PluginTypes[t], name, err), http.StatusBadRequest, logger)
 			return

+ 61 - 1
xstream/server/server/rpc.go

@@ -1,8 +1,10 @@
 package server
 
 import (
+	"encoding/json"
 	"fmt"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/sinks"
 	"strings"
 	"time"
@@ -72,7 +74,7 @@ func (t *Server) Stream(stream string, reply *string) error {
 	return nil
 }
 
-func (t *Server) CreateRule(rule *common.Rule, reply *string) error {
+func (t *Server) CreateRule(rule *common.RuleDesc, reply *string) error {
 	r, err := ruleProcessor.ExecCreate(rule.Name, rule.Json)
 	if err != nil {
 		return fmt.Errorf("Create rule error : %s.", err)
@@ -158,6 +160,64 @@ func (t *Server) DropRule(name string, reply *string) error {
 	return nil
 }
 
+func (t *Server) CreatePlugin(arg *common.PluginDesc, reply *string) error {
+	pt := plugins.PluginType(arg.Type)
+	p, err := getPluginByJson(arg)
+	if err != nil {
+		return fmt.Errorf("Create plugin error: %s", err)
+	}
+	if p.File == "" {
+		return fmt.Errorf("Create plugin error: Missing plugin file url.")
+	}
+	err = pluginManager.Register(pt, p)
+	if err != nil {
+		return fmt.Errorf("Create plugin error: %s", err)
+	} else {
+		*reply = fmt.Sprintf("Plugin %s is created.", p.Name)
+	}
+	return nil
+}
+
+func (t *Server) DropPlugin(arg *common.PluginDesc, reply *string) error {
+	pt := plugins.PluginType(arg.Type)
+	p, err := getPluginByJson(arg)
+	if err != nil {
+		return fmt.Errorf("Drop plugin error: %s", err)
+	}
+	err = pluginManager.Delete(pt, p.Name, p.Callback)
+	if err != nil {
+		return fmt.Errorf("Drop plugin error: %s", err)
+	} else {
+		*reply = fmt.Sprintf("Plugin %s is dropped.", p.Name)
+	}
+	return nil
+}
+
+func (t *Server) ShowPlugins(arg int, reply *string) error {
+	pt := plugins.PluginType(arg)
+	l, err := pluginManager.List(pt)
+	if err != nil {
+		return fmt.Errorf("Drop plugin error: %s", err)
+	} else {
+		if len(l) == 0 {
+			l = append(l, "No plugin is found.")
+		}
+		*reply = strings.Join(l, "\n")
+	}
+	return nil
+}
+
+func getPluginByJson(arg *common.PluginDesc) (*plugins.Plugin, error) {
+	var p plugins.Plugin
+	if arg.Json != "" {
+		if err := json.Unmarshal([]byte(arg.Json), &p); err != nil {
+			return nil, fmt.Errorf("Parse plugin %s error : %s.", arg.Json, err)
+		}
+	}
+	p.Name = arg.Name
+	return &p, nil
+}
+
 func init() {
 	ticker := time.NewTicker(time.Second * 5)
 	go func() {