Forráskód Böngészése

feat(plugin): plugin manager initialize and register func

ngjaying 5 éve
szülő
commit
a1623811b7
2 módosított fájl, 326 hozzáadás és 0 törlés
  1. 277 0
      plugins/manager.go
  2. 49 0
      plugins/manager_test.go

+ 277 - 0
plugins/manager.go

@@ -0,0 +1,277 @@
+package plugins
+
+import (
+	"archive/zip"
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+	"strings"
+	"sync"
+	"unicode"
+)
+
+type PluginType int
+
+const (
+	SOURCE PluginType = iota
+	SINK
+	FUNCTION
+)
+
+var (
+	pluginFolders = []string{"sources", "sinks", "functions"}
+	once          sync.Once
+	singleton     *Manager
+)
+
+type OnRegistered func()
+
+//Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
+type Registry struct {
+	sync.RWMutex
+	internal [][]string
+}
+
+func (rr *Registry) Store(t PluginType, value string) {
+	rr.Lock()
+	rr.internal[t] = append(rr.internal[t], value)
+	rr.Unlock()
+}
+
+func (rr *Registry) List(t PluginType) (values []string) {
+	rr.RLock()
+	result := rr.internal[t]
+	rr.RUnlock()
+	return result
+}
+
+//func (rr *Registry) Delete(t PluginType, value string) {
+//	rr.Lock()
+//	s := rr.internal[t]
+//	for i, f := range s{
+//		if f == value{
+//			s[len(s)-1], s[i] = s[i], s[len(s)-1]
+//			rr.internal[t] = s
+//			break
+//		}
+//	}
+//	rr.Unlock()
+//}
+
+type Manager struct {
+	pluginDir string
+	etcDir    string
+	registry  *Registry
+}
+
+func NewPluginManager() (*Manager, error) {
+	var err error
+	once.Do(func() {
+		dir, err := common.GetLoc("/plugins")
+		if err != nil {
+			err = fmt.Errorf("cannot find plugins folder: %s", err)
+			return
+		}
+		etcDir, err := common.GetLoc("/etc")
+		if err != nil {
+			err = fmt.Errorf("cannot find etc folder: %s", err)
+			return
+		}
+
+		plugins := make([][]string, 3)
+		for i := 0; i < 3; i++ {
+			names, err := findAll(PluginType(i), dir)
+			if err != nil {
+				err = fmt.Errorf("fail to find existing plugins: %s", err)
+				return
+			}
+			plugins[i] = names
+		}
+		registry := &Registry{internal: plugins}
+
+		singleton = &Manager{
+			pluginDir: dir,
+			etcDir:    etcDir,
+			registry:  registry,
+		}
+	})
+	return singleton, err
+}
+
+func findAll(t PluginType, pluginDir string) (result []string, err error) {
+	dir := path.Join(pluginDir, pluginFolders[t])
+	files, err := ioutil.ReadDir(dir)
+	if err != nil {
+		return
+	}
+
+	for _, file := range files {
+		result = append(result, file.Name())
+	}
+	return
+}
+
+func (m *Manager) Register(t PluginType, name string, uri string, callback OnRegistered) error {
+	//Validation
+	name = strings.Trim(name, " ")
+	if name == "" {
+		return fmt.Errorf("invalid name %s: should not be empty", name)
+	}
+	if !isValidUrl(uri) && strings.HasSuffix(uri, ".zip") {
+		return fmt.Errorf("invalid uri %s", uri)
+	}
+
+	for _, n := range m.registry.List(t) {
+		if n == name {
+			return fmt.Errorf("invalid name %s: duplicate", name)
+		}
+	}
+	zipPath := path.Join(m.pluginDir, name+".zip")
+	var unzipFiles []string
+	//clean up: delete zip file and unzip files in error
+	defer func() {
+		os.Remove(zipPath)
+		if len(unzipFiles) == 1 {
+			os.Remove(unzipFiles[0])
+		} else if len(unzipFiles) == 2 {
+			m.registry.Store(t, name)
+			callback()
+		}
+	}()
+	//download
+	err := downloadFile(zipPath, uri)
+	if err != nil {
+		return fmt.Errorf("fail to download file %s: %s", uri, err)
+	}
+	//unzip and copy to destination
+	unzipFiles, err = m.unzipAndCopy(t, name, zipPath)
+	if err != nil {
+		return fmt.Errorf("fail to unzip file %s: %s", uri, err)
+	}
+	return nil
+}
+
+func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, error) {
+	var filenames []string
+	r, err := zip.OpenReader(src)
+	if err != nil {
+		return filenames, err
+	}
+	defer r.Close()
+
+	soFileName := ucFirst(name) + ".so"
+	confFileName := name + ".yaml"
+	var soFile, confFile *zip.File
+	found := 0
+	for _, file := range r.File {
+		fileName := file.Name
+		if fileName == soFileName {
+			soFile = file
+			found++
+		} else if fileName == confFileName {
+			confFile = file
+			found++
+		}
+		if found == 2 {
+			break
+		}
+	}
+	if found < 2 {
+		return filenames, fmt.Errorf("invalid zip file: so file or conf file is missing")
+	}
+
+	soPath := path.Join(m.pluginDir, pluginFolders[t], soFileName)
+	err = unzipTo(soFile, soPath)
+	if err != nil {
+		return filenames, err
+	}
+	filenames = append(filenames, soPath)
+
+	confPath := path.Join(m.etcDir, pluginFolders[t], confFileName)
+	err = unzipTo(confFile, confPath)
+	if err != nil {
+		return filenames, err
+	}
+	filenames = append(filenames, confPath)
+	return filenames, nil
+}
+
+func unzipTo(f *zip.File, fpath string) error {
+
+	if f.FileInfo().IsDir() {
+		// Make Folder
+		os.MkdirAll(fpath, os.ModePerm)
+		return fmt.Errorf("%s: not a file, but a directory", fpath)
+	}
+
+	// Make File
+	if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
+		return err
+	}
+
+	outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_TRUNC, f.Mode())
+	if err != nil {
+		return err
+	}
+
+	rc, err := f.Open()
+	if err != nil {
+		return err
+	}
+
+	_, err = io.Copy(outFile, rc)
+
+	// Close the file without defer to close before next iteration of loop
+	outFile.Close()
+	rc.Close()
+
+	return err
+}
+
+func isValidUrl(uri string) bool {
+	_, err := url.ParseRequestURI(uri)
+	if err != nil {
+		return false
+	}
+
+	u, err := url.Parse(uri)
+	if err != nil || u.Scheme == "" || u.Host == "" {
+		return false
+	}
+
+	return true
+}
+
+func downloadFile(filepath string, url string) error {
+
+	// Get the data
+	resp, err := http.Get(url)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+
+	// Create the file
+	out, err := os.Create(filepath)
+	if err != nil {
+		return err
+	}
+	defer out.Close()
+
+	// Write the body to file
+	_, err = io.Copy(out, resp.Body)
+	return err
+}
+
+func ucFirst(str string) string {
+	for i, v := range str {
+		return string(unicode.ToUpper(v)) + str[i+1:]
+	}
+	return ""
+}

+ 49 - 0
plugins/manager_test.go

@@ -0,0 +1,49 @@
+package plugins
+
+import (
+	"errors"
+	"fmt"
+	"reflect"
+	"testing"
+)
+
+const endpoint = "http://127.0.0.1/plugins"
+
+func TestManager(t *testing.T) {
+
+	data := []struct {
+		t   PluginType
+		n   string
+		u   string
+		err error
+	}{
+		{
+			t:   SOURCE,
+			n:   "",
+			u:   "",
+			err: errors.New("invalid name : should not be empty"),
+		}, {
+			t: SOURCE,
+			n: "random",
+			u: endpoint + "/sources/random.zip",
+		},
+	}
+	callback := func() {
+		fmt.Printf("callback triggered")
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(data))
+	for i, tt := range data {
+		manager, err := NewPluginManager()
+		if err != nil {
+			t.Error(err)
+		}
+		err = manager.Register(tt.t, tt.n, tt.u, callback)
+		if tt.err != nil {
+			if !reflect.DeepEqual(tt.err, err) {
+				t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
+			}
+		} else {
+			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
+		}
+	}
+}