Quellcode durchsuchen

feat(plugin): rest service

ngjaying vor 5 Jahren
Ursprung
Commit
3998bcd1bf
4 geänderte Dateien mit 99 neuen und 10 gelöschten Zeilen
  1. 8 8
      plugins/manager.go
  2. 2 2
      plugins/manager_test.go
  3. 83 0
      xstream/server/server/rest.go
  4. 6 0
      xstream/server/server/server.go

+ 8 - 8
plugins/manager.go

@@ -32,9 +32,9 @@ const (
 )
 
 var (
-	pluginFolders = []string{"sources", "sinks", "functions"}
-	once          sync.Once
-	singleton     *Manager
+	PluginTypes = []string{"sources", "sinks", "functions"}
+	once        sync.Once
+	singleton   *Manager
 )
 
 //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
@@ -110,7 +110,7 @@ func NewPluginManager() (*Manager, error) {
 }
 
 func findAll(t PluginType, pluginDir string) (result []string, err error) {
-	dir := path.Join(pluginDir, pluginFolders[t])
+	dir := path.Join(pluginDir, PluginTypes[t])
 	files, err := ioutil.ReadDir(dir)
 	if err != nil {
 		return
@@ -183,10 +183,10 @@ func (m *Manager) Delete(t PluginType, name string) (result error) {
 	}
 	var results []string
 	paths := []string{
-		path.Join(m.pluginDir, pluginFolders[t], ucFirst(name)+".so"),
+		path.Join(m.pluginDir, PluginTypes[t], ucFirst(name)+".so"),
 	}
 	if t == SOURCE {
-		paths = append(paths, path.Join(m.etcDir, pluginFolders[t], name+".yaml"))
+		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
 	}
 	for _, p := range paths {
 		_, err := os.Stat(p)
@@ -219,11 +219,11 @@ func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string,
 		ucFirst(name) + ".so",
 	}
 	paths := []string{
-		path.Join(m.pluginDir, pluginFolders[t], files[0]),
+		path.Join(m.pluginDir, PluginTypes[t], files[0]),
 	}
 	if t == SOURCE {
 		files = append(files, name+".yaml")
-		paths = append(paths, path.Join(m.etcDir, pluginFolders[t], files[1]))
+		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], files[1]))
 	}
 	for i, d := range files {
 		var z *zip.File

+ 2 - 2
plugins/manager_test.go

@@ -117,13 +117,13 @@ func TestManager_Delete(t *testing.T) {
 }
 
 func checkFile(pluginDir string, etcDir string, t PluginType, name string) error {
-	soPath := path.Join(pluginDir, pluginFolders[t], ucFirst(name)+".so")
+	soPath := path.Join(pluginDir, PluginTypes[t], ucFirst(name)+".so")
 	_, err := os.Stat(soPath)
 	if err != nil {
 		return err
 	}
 	if t == SOURCE {
-		etcPath := path.Join(etcDir, pluginFolders[t], name+".yaml")
+		etcPath := path.Join(etcDir, PluginTypes[t], name+".yaml")
 		_, err = os.Stat(etcPath)
 		if err != nil {
 			return err

+ 83 - 0
xstream/server/server/rest.go

@@ -3,6 +3,7 @@ package server
 import (
 	"encoding/json"
 	"fmt"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/gorilla/mux"
 	"io"
@@ -60,6 +61,13 @@ func createRestServer(port int) *http.Server {
 	r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
 	r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
 
+	r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete)
+	r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete)
+	r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete)
+
 	server := &http.Server{
 		Addr: fmt.Sprintf("0.0.0.0:%d", port),
 		// Good practice to set timeouts to avoid Slowloris attacks.
@@ -248,3 +256,78 @@ func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
 }
+
+func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
+	defer r.Body.Close()
+	switch r.Method {
+	case http.MethodGet:
+		content, err := pluginManager.List(t)
+		if err != nil {
+			handleError(w, fmt.Errorf("%s plugins list command error: %s", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
+			return
+		}
+		jsonResponse(content, w, logger)
+	case http.MethodPost:
+		sd := plugins.Plugin{}
+		err := json.NewDecoder(r.Body).Decode(&sd)
+		// Problems decoding
+		if err != nil {
+			handleError(w, fmt.Errorf("Invalid body: Error decoding the %s plugin json: %v", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
+			return
+		}
+		err = pluginManager.Register(t, &sd)
+		if err != nil {
+			handleError(w, fmt.Errorf("%s plugins create command error: %s", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
+			return
+		}
+		w.WriteHeader(http.StatusCreated)
+		w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugins.PluginTypes[t], sd.Name)))
+	}
+}
+
+func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	name := vars["name"]
+
+	switch r.Method {
+	case http.MethodDelete:
+		err := pluginManager.Delete(t, name)
+		if err != nil {
+			handleError(w, fmt.Errorf("delete %s plugin %s error: %s", plugins.PluginTypes[t], name, err), http.StatusBadRequest, logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte(fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)))
+	}
+}
+
+//list or create source plugin
+func sourcesHandler(w http.ResponseWriter, r *http.Request) {
+	pluginsHandler(w, r, plugins.SOURCE)
+}
+
+//delete a source plugin
+func sourceHandler(w http.ResponseWriter, r *http.Request) {
+	pluginHandler(w, r, plugins.SOURCE)
+}
+
+//list or create sink plugin
+func sinksHandler(w http.ResponseWriter, r *http.Request) {
+	pluginsHandler(w, r, plugins.SINK)
+}
+
+//delete a sink plugin
+func sinkHandler(w http.ResponseWriter, r *http.Request) {
+	pluginHandler(w, r, plugins.SINK)
+}
+
+//list or create function plugin
+func functionsHandler(w http.ResponseWriter, r *http.Request) {
+	pluginsHandler(w, r, plugins.FUNCTION)
+}
+
+//delete a function plugin
+func functionHandler(w http.ResponseWriter, r *http.Request) {
+	pluginHandler(w, r, plugins.FUNCTION)
+}

+ 6 - 0
xstream/server/server/server.go

@@ -3,6 +3,7 @@ package server
 import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xsql/processors"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"net"
@@ -17,6 +18,7 @@ var (
 
 	ruleProcessor   *processors.RuleProcessor
 	streamProcessor *processors.StreamProcessor
+	pluginManager   *plugins.Manager
 )
 
 func StartUp(Version string) {
@@ -31,6 +33,10 @@ func StartUp(Version string) {
 	}
 	ruleProcessor = processors.NewRuleProcessor(path.Dir(dataDir))
 	streamProcessor = processors.NewStreamProcessor(path.Join(path.Dir(dataDir), "stream"))
+	pluginManager, err = plugins.NewPluginManager()
+	if err != nil {
+		logger.Panic(err)
+	}
 
 	registry = &RuleRegistry{internal: make(map[string]*RuleState)}