Преглед на файлове

feat(plugins): support version

ngjaying преди 5 години
родител
ревизия
fb59d1fdd0
променени са 3 файла, в които са добавени 120 реда и са изтрити 46 реда
  1. 69 43
      plugins/manager.go
  2. 51 3
      plugins/manager_test.go
  3. BIN
      plugins/testzips/sources/random3.zip

+ 69 - 43
plugins/manager.go

@@ -13,6 +13,7 @@ import (
 	"path"
 	"path/filepath"
 	"plugin"
+	"regexp"
 	"strings"
 	"sync"
 	"time"
@@ -41,20 +42,32 @@ var (
 //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
 type Registry struct {
 	sync.RWMutex
-	internal [][]string
+	internal []map[string]string
 }
 
-func (rr *Registry) Store(t PluginType, value string) {
+func (rr *Registry) Store(t PluginType, name string, version string) {
 	rr.Lock()
-	rr.internal[t] = append(rr.internal[t], value)
+	rr.internal[t][name] = version
 	rr.Unlock()
 }
 
-func (rr *Registry) List(t PluginType) (values []string) {
+func (rr *Registry) List(t PluginType) []string {
 	rr.RLock()
 	result := rr.internal[t]
 	rr.RUnlock()
-	return result
+	keys := make([]string, 0, len(result))
+	for k := range result {
+		keys = append(keys, k)
+	}
+	return keys
+}
+
+func (rr *Registry) Get(t PluginType, name string) (string, bool) {
+	rr.RLock()
+	result := rr.internal[t]
+	rr.RUnlock()
+	r, ok := result[name]
+	return r, ok
 }
 
 //func (rr *Registry) Delete(t PluginType, value string) {
@@ -116,7 +129,7 @@ func NewPluginManager() (*Manager, error) {
 			return
 		}
 
-		plugins := make([][]string, 3)
+		plugins := make([]map[string]string, 3)
 		for i := 0; i < 3; i++ {
 			names, err := findAll(PluginType(i), dir)
 			if err != nil {
@@ -136,7 +149,8 @@ func NewPluginManager() (*Manager, error) {
 	return singleton, err
 }
 
-func findAll(t PluginType, pluginDir string) (result []string, err error) {
+func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
+	result = make(map[string]string)
 	dir := path.Join(pluginDir, PluginTypes[t])
 	files, err := ioutil.ReadDir(dir)
 	if err != nil {
@@ -146,7 +160,8 @@ func findAll(t PluginType, pluginDir string) (result []string, err error) {
 	for _, file := range files {
 		baseName := filepath.Base(file.Name())
 		if strings.HasSuffix(baseName, ".so") {
-			result = append(result, lcFirst(baseName[0:len(baseName)-3]))
+			n, v := parseName(baseName)
+			result[n] = v
 		}
 	}
 	return
@@ -182,7 +197,7 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 		return fmt.Errorf("fail to download file %s: %s", uri, err)
 	}
 	//unzip and copy to destination
-	unzipFiles, err = m.unzipAndCopy(t, name, zipPath)
+	unzipFiles, version, err := m.unzipAndCopy(t, name, zipPath)
 	if err != nil {
 		if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
 			os.Remove(unzipFiles[0])
@@ -190,7 +205,7 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 		return fmt.Errorf("fail to unzip file %s: %s", uri, err)
 	}
 
-	m.registry.Store(t, name)
+	m.registry.Store(t, name, version)
 	return nil
 }
 
@@ -199,18 +214,17 @@ func (m *Manager) Delete(t PluginType, name string, restart bool) error {
 	if name == "" {
 		return fmt.Errorf("invalid name %s: should not be empty", name)
 	}
-	found := false
-	for _, n := range m.registry.List(t) {
-		if n == name {
-			found = true
-		}
-	}
-	if !found {
+	v, ok := m.registry.Get(t, name)
+	if !ok {
 		return fmt.Errorf("invalid name %s: not exist", name)
 	}
 	var results []string
+	soFile := ucFirst(name) + ".so"
+	if v != "" {
+		soFile = fmt.Sprintf("%s@v%s.so", ucFirst(name), v)
+	}
 	paths := []string{
-		path.Join(m.pluginDir, PluginTypes[t], ucFirst(name)+".so"),
+		path.Join(m.pluginDir, PluginTypes[t], soFile),
 	}
 	if t == SOURCE {
 		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
@@ -240,43 +254,55 @@ func (m *Manager) Delete(t PluginType, name string, restart bool) error {
 	}
 }
 
-func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, error) {
+func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, string, error) {
 	var filenames []string
 	r, err := zip.OpenReader(src)
 	if err != nil {
-		return filenames, err
+		return filenames, "", err
 	}
 	defer r.Close()
 
-	files := []string{
-		ucFirst(name) + ".so",
-	}
-	paths := []string{
-		path.Join(m.pluginDir, PluginTypes[t], files[0]),
-	}
+	soPrefix := regexp.MustCompile(fmt.Sprintf(`^%s(@v.*)?\.so$`, ucFirst(name)))
+	var yamlFile, yamlPath, version string
+	expFiles := 1
 	if t == SOURCE {
-		files = append(files, name+".yaml")
-		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], files[1]))
-	}
-	for i, d := range files {
-		var z *zip.File
-		for _, file := range r.File {
-			fileName := file.Name
-			if fileName == d {
-				z = file
+		yamlFile = name + ".yaml"
+		yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
+		expFiles = 2
+	}
+	for _, file := range r.File {
+		fileName := file.Name
+		if yamlFile == fileName {
+			err = unzipTo(file, yamlPath)
+			if err != nil {
+				return filenames, "", err
 			}
+			filenames = append(filenames, yamlPath)
 		}
-		if z == nil {
-			return filenames, fmt.Errorf("invalid zip file: so file or conf file is missing")
+		if soPrefix.Match([]byte(fileName)) {
+			soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
+			err = unzipTo(file, soPath)
+			if err != nil {
+				return filenames, "", err
+			}
+			filenames = append(filenames, soPath)
+			_, version = parseName(fileName)
 		}
+	}
+	if len(filenames) != expFiles {
+		return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
+	}
+	return filenames, version, nil
+}
 
-		err = unzipTo(z, paths[i])
-		if err != nil {
-			return filenames, err
-		}
-		filenames = append(filenames, paths[i])
+func parseName(n string) (string, string) {
+	result := strings.Split(n, ".so")
+	result = strings.Split(result[0], "@v")
+	name := lcFirst(result[0])
+	if len(result) > 1 {
+		return name, result[1]
 	}
-	return filenames, nil
+	return name, ""
 }
 
 func unzipTo(f *zip.File, fpath string) error {

+ 51 - 3
plugins/manager_test.go

@@ -8,6 +8,7 @@ import (
 	"os"
 	"path"
 	"reflect"
+	"sort"
 	"testing"
 )
 
@@ -22,6 +23,7 @@ func TestManager_Register(t *testing.T) {
 		t   PluginType
 		n   string
 		u   string
+		v   string
 		err error
 	}{
 		{
@@ -54,6 +56,11 @@ func TestManager_Register(t *testing.T) {
 			n: "random2",
 			u: endpoint + "/sources/random2.zip",
 		}, {
+			t: SOURCE,
+			n: "random3",
+			u: endpoint + "/sources/random3.zip",
+			v: "1.0.0",
+		}, {
 			t: SINK,
 			n: "file2",
 			u: endpoint + "/sinks/file2.zip",
@@ -82,7 +89,7 @@ func TestManager_Register(t *testing.T) {
 		if !reflect.DeepEqual(tt.err, err) {
 			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
 		} else if tt.err == nil {
-			err := checkFile(manager.pluginDir, manager.etcDir, tt.t, tt.n)
+			err := checkFile(manager.pluginDir, manager.etcDir, tt.t, tt.n, tt.v)
 			if err != nil {
 				t.Errorf("%d: error : %s\n\n", i, err)
 			}
@@ -91,6 +98,40 @@ func TestManager_Register(t *testing.T) {
 
 }
 
+func TestManager_List(t *testing.T) {
+	data := []struct {
+		t PluginType
+		r []string
+	}{
+		{
+			t: SOURCE,
+			r: []string{"random2", "random3"},
+		}, {
+			t: SINK,
+			r: []string{"file2"},
+		}, {
+			t: FUNCTION,
+			r: []string{"echo2"},
+		},
+	}
+	manager, err := NewPluginManager()
+	if err != nil {
+		t.Error(err)
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(data))
+
+	for i, p := range data {
+		result, err := manager.List(p.t)
+		if err != nil {
+			t.Errorf("%d: list error : %s\n\n", i, err)
+		}
+		sort.Strings(result)
+		if !reflect.DeepEqual(p.r, result) {
+			t.Errorf("%d: result mismatch:\n  exp=%v\n  got=%v\n\n", i, p.r, result)
+		}
+	}
+}
+
 func TestManager_Delete(t *testing.T) {
 	data := []struct {
 		t   PluginType
@@ -106,6 +147,9 @@ func TestManager_Delete(t *testing.T) {
 		}, {
 			t: FUNCTION,
 			n: "echo2",
+		}, {
+			t: SOURCE,
+			n: "random3",
 		},
 	}
 	manager, err := NewPluginManager()
@@ -122,8 +166,12 @@ func TestManager_Delete(t *testing.T) {
 	}
 }
 
-func checkFile(pluginDir string, etcDir string, t PluginType, name string) error {
-	soPath := path.Join(pluginDir, PluginTypes[t], ucFirst(name)+".so")
+func checkFile(pluginDir string, etcDir string, t PluginType, name string, version string) error {
+	soName := ucFirst(name) + ".so"
+	if version != "" {
+		soName = fmt.Sprintf("%s@v%s.so", ucFirst(name), version)
+	}
+	soPath := path.Join(pluginDir, PluginTypes[t], soName)
 	_, err := os.Stat(soPath)
 	if err != nil {
 		return err

BIN
plugins/testzips/sources/random3.zip