|
@@ -7,6 +7,7 @@ import (
|
|
"errors"
|
|
"errors"
|
|
"fmt"
|
|
"fmt"
|
|
"github.com/emqx/kuiper/common"
|
|
"github.com/emqx/kuiper/common"
|
|
|
|
+ "github.com/emqx/kuiper/common/kv"
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
"io"
|
|
"io"
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
@@ -24,14 +25,61 @@ import (
|
|
"unicode"
|
|
"unicode"
|
|
)
|
|
)
|
|
|
|
|
|
-type Plugin struct {
|
|
|
|
|
|
+type Plugin interface {
|
|
|
|
+ GetName() string
|
|
|
|
+ GetFile() string
|
|
|
|
+ GetShellParas() []string
|
|
|
|
+ GetSymbols() []string
|
|
|
|
+ SetName(n string)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type IOPlugin struct {
|
|
Name string `json:"name"`
|
|
Name string `json:"name"`
|
|
File string `json:"file"`
|
|
File string `json:"file"`
|
|
ShellParas []string `json:"shellParas"`
|
|
ShellParas []string `json:"shellParas"`
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (p *IOPlugin) GetName() string {
|
|
|
|
+ return p.Name
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (p *IOPlugin) GetFile() string {
|
|
|
|
+ return p.File
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (p *IOPlugin) GetShellParas() []string {
|
|
|
|
+ return p.ShellParas
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (p *IOPlugin) GetSymbols() []string {
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (p *IOPlugin) SetName(n string) {
|
|
|
|
+ p.Name = n
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type FuncPlugin struct {
|
|
|
|
+ IOPlugin
|
|
|
|
+ // Optional, if not specified, a default element with the same name of the file will be registered
|
|
|
|
+ Functions []string `json:"functions"`
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (fp *FuncPlugin) GetSymbols() []string {
|
|
|
|
+ return fp.Functions
|
|
|
|
+}
|
|
|
|
+
|
|
type PluginType int
|
|
type PluginType int
|
|
|
|
|
|
|
|
+func NewPluginByType(t PluginType) Plugin {
|
|
|
|
+ switch t {
|
|
|
|
+ case FUNCTION:
|
|
|
|
+ return &FuncPlugin{}
|
|
|
|
+ default:
|
|
|
|
+ return &IOPlugin{}
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
const (
|
|
const (
|
|
SOURCE PluginType = iota
|
|
SOURCE PluginType = iota
|
|
SINK
|
|
SINK
|
|
@@ -49,18 +97,54 @@ var (
|
|
//Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
|
|
//Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
|
|
type Registry struct {
|
|
type Registry struct {
|
|
sync.RWMutex
|
|
sync.RWMutex
|
|
- internal []map[string]string
|
|
|
|
|
|
+ // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
|
|
|
|
+ plugins []map[string]string
|
|
|
|
+ // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
|
|
|
|
+ symbols map[string]string
|
|
}
|
|
}
|
|
|
|
|
|
func (rr *Registry) Store(t PluginType, name string, version string) {
|
|
func (rr *Registry) Store(t PluginType, name string, version string) {
|
|
rr.Lock()
|
|
rr.Lock()
|
|
- rr.internal[t][name] = version
|
|
|
|
|
|
+ rr.plugins[t][name] = version
|
|
|
|
+ rr.Unlock()
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (rr *Registry) StoreSymbols(name string, symbols []string) error {
|
|
|
|
+ rr.Lock()
|
|
|
|
+ defer rr.Unlock()
|
|
|
|
+ for _, s := range symbols {
|
|
|
|
+ if _, ok := rr.symbols[s]; ok {
|
|
|
|
+ return fmt.Errorf("function name %s already exists", s)
|
|
|
|
+ } else {
|
|
|
|
+ rr.symbols[s] = name
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (rr *Registry) RemoveSymbols(symbols []string) {
|
|
|
|
+ rr.Lock()
|
|
|
|
+ for _, s := range symbols {
|
|
|
|
+ delete(rr.symbols, s)
|
|
|
|
+ }
|
|
rr.Unlock()
|
|
rr.Unlock()
|
|
}
|
|
}
|
|
|
|
|
|
func (rr *Registry) List(t PluginType) []string {
|
|
func (rr *Registry) List(t PluginType) []string {
|
|
rr.RLock()
|
|
rr.RLock()
|
|
- result := rr.internal[t]
|
|
|
|
|
|
+ result := rr.plugins[t]
|
|
|
|
+ rr.RUnlock()
|
|
|
|
+ keys := make([]string, 0, len(result))
|
|
|
|
+ for k := range result {
|
|
|
|
+ keys = append(keys, k)
|
|
|
|
+ }
|
|
|
|
+ return keys
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (rr *Registry) ListSymbols() []string {
|
|
|
|
+ rr.RLock()
|
|
|
|
+ result := rr.symbols
|
|
rr.RUnlock()
|
|
rr.RUnlock()
|
|
keys := make([]string, 0, len(result))
|
|
keys := make([]string, 0, len(result))
|
|
for k := range result {
|
|
for k := range result {
|
|
@@ -71,24 +155,41 @@ func (rr *Registry) List(t PluginType) []string {
|
|
|
|
|
|
func (rr *Registry) Get(t PluginType, name string) (string, bool) {
|
|
func (rr *Registry) Get(t PluginType, name string) (string, bool) {
|
|
rr.RLock()
|
|
rr.RLock()
|
|
- result := rr.internal[t]
|
|
|
|
|
|
+ result := rr.plugins[t]
|
|
rr.RUnlock()
|
|
rr.RUnlock()
|
|
r, ok := result[name]
|
|
r, ok := result[name]
|
|
return r, ok
|
|
return r, ok
|
|
}
|
|
}
|
|
|
|
|
|
-//func (rr *Registry) Delete(t PluginType, value string) {
|
|
|
|
-// rr.Lock()
|
|
|
|
-// s := rr.internal[t]
|
|
|
|
-// for i, f := range s{
|
|
|
|
-// if f == value{
|
|
|
|
-// s[len(s)-1], s[i] = s[i], s[len(s)-1]
|
|
|
|
-// rr.internal[t] = s
|
|
|
|
-// break
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// rr.Unlock()
|
|
|
|
-//}
|
|
|
|
|
|
+func (rr *Registry) GetPluginVersionBySymbol(t PluginType, symbolName string) (string, bool) {
|
|
|
|
+ switch t {
|
|
|
|
+ case FUNCTION:
|
|
|
|
+ rr.RLock()
|
|
|
|
+ result := rr.plugins[t]
|
|
|
|
+ name, ok := rr.symbols[symbolName]
|
|
|
|
+ rr.RUnlock()
|
|
|
|
+ if ok {
|
|
|
|
+ r, nok := result[name]
|
|
|
|
+ return r, nok
|
|
|
|
+ } else {
|
|
|
|
+ return "", false
|
|
|
|
+ }
|
|
|
|
+ default:
|
|
|
|
+ return rr.Get(t, symbolName)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (rr *Registry) GetPluginBySymbol(t PluginType, symbolName string) (string, bool) {
|
|
|
|
+ switch t {
|
|
|
|
+ case FUNCTION:
|
|
|
|
+ rr.RLock()
|
|
|
|
+ defer rr.RUnlock()
|
|
|
|
+ name, ok := rr.symbols[symbolName]
|
|
|
|
+ return name, ok
|
|
|
|
+ default:
|
|
|
|
+ return symbolName, true
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
|
|
var symbolRegistry = make(map[string]plugin.Symbol)
|
|
var symbolRegistry = make(map[string]plugin.Symbol)
|
|
var mu sync.RWMutex
|
|
var mu sync.RWMutex
|
|
@@ -106,7 +207,7 @@ func getPlugin(t string, pt PluginType) (plugin.Symbol, error) {
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, fmt.Errorf("fail to initialize the plugin manager")
|
|
return nil, fmt.Errorf("fail to initialize the plugin manager")
|
|
}
|
|
}
|
|
- mod, err := getSoFilePath(m, pt, t)
|
|
|
|
|
|
+ mod, err := getSoFilePath(m, pt, t, false)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get the plugin file path: %v", err)
|
|
return nil, fmt.Errorf("cannot get the plugin file path: %v", err)
|
|
}
|
|
}
|
|
@@ -181,6 +282,7 @@ type Manager struct {
|
|
pluginDir string
|
|
pluginDir string
|
|
etcDir string
|
|
etcDir string
|
|
registry *Registry
|
|
registry *Registry
|
|
|
|
+ db kv.KeyValue
|
|
}
|
|
}
|
|
|
|
|
|
func NewPluginManager() (*Manager, error) {
|
|
func NewPluginManager() (*Manager, error) {
|
|
@@ -196,7 +298,17 @@ func NewPluginManager() (*Manager, error) {
|
|
outerErr = fmt.Errorf("cannot find etc folder: %s", err)
|
|
outerErr = fmt.Errorf("cannot find etc folder: %s", err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ dbDir, err := common.GetDataLoc()
|
|
|
|
+ if err != nil {
|
|
|
|
+ outerErr = fmt.Errorf("cannot find db folder: %s", err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ db := kv.GetDefaultKVStore(path.Join(dbDir, "pluginFuncs"))
|
|
|
|
+ err = db.Open()
|
|
|
|
+ if err != nil {
|
|
|
|
+ outerErr = fmt.Errorf("error when opening db: %v.", err)
|
|
|
|
+ }
|
|
|
|
+ defer db.Close()
|
|
plugins := make([]map[string]string, 3)
|
|
plugins := make([]map[string]string, 3)
|
|
for i := 0; i < 3; i++ {
|
|
for i := 0; i < 3; i++ {
|
|
names, err := findAll(PluginType(i), dir)
|
|
names, err := findAll(PluginType(i), dir)
|
|
@@ -206,12 +318,24 @@ func NewPluginManager() (*Manager, error) {
|
|
}
|
|
}
|
|
plugins[i] = names
|
|
plugins[i] = names
|
|
}
|
|
}
|
|
- registry := &Registry{internal: plugins}
|
|
|
|
|
|
+ registry := &Registry{plugins: plugins, symbols: make(map[string]string)}
|
|
|
|
+ for pf, _ := range plugins[FUNCTION] {
|
|
|
|
+ l := make([]string, 0)
|
|
|
|
+ if ok, err := db.Get(pf, &l); ok {
|
|
|
|
+ registry.StoreSymbols(pf, l)
|
|
|
|
+ } else if err != nil {
|
|
|
|
+ outerErr = fmt.Errorf("error when querying kv: %s", err)
|
|
|
|
+ return
|
|
|
|
+ } else {
|
|
|
|
+ registry.StoreSymbols(pf, []string{pf})
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
singleton = &Manager{
|
|
singleton = &Manager{
|
|
pluginDir: dir,
|
|
pluginDir: dir,
|
|
etcDir: etcDir,
|
|
etcDir: etcDir,
|
|
registry: registry,
|
|
registry: registry,
|
|
|
|
+ db: db,
|
|
}
|
|
}
|
|
if err := singleton.readSourceMetaDir(); nil != err {
|
|
if err := singleton.readSourceMetaDir(); nil != err {
|
|
common.Log.Errorf("readSourceMetaDir:%v", err)
|
|
common.Log.Errorf("readSourceMetaDir:%v", err)
|
|
@@ -251,8 +375,16 @@ func (m *Manager) List(t PluginType) (result []string, err error) {
|
|
return m.registry.List(t), nil
|
|
return m.registry.List(t), nil
|
|
}
|
|
}
|
|
|
|
|
|
-func (m *Manager) Register(t PluginType, j *Plugin) error {
|
|
|
|
- name, uri, shellParas := j.Name, j.File, j.ShellParas
|
|
|
|
|
|
+func (m *Manager) ListSymbols() (result []string, err error) {
|
|
|
|
+ return m.registry.ListSymbols(), nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (m *Manager) GetSymbol(s string) (result string, ok bool) {
|
|
|
|
+ return m.registry.GetPluginBySymbol(FUNCTION, s)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (m *Manager) Register(t PluginType, j Plugin) error {
|
|
|
|
+ name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
|
|
//Validation
|
|
//Validation
|
|
name = strings.Trim(name, " ")
|
|
name = strings.Trim(name, " ")
|
|
if name == "" {
|
|
if name == "" {
|
|
@@ -269,23 +401,54 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
|
|
return fmt.Errorf("invalid name %s: duplicate", name)
|
|
return fmt.Errorf("invalid name %s: duplicate", name)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ var err error
|
|
|
|
+ if t == FUNCTION {
|
|
|
|
+ if len(j.GetSymbols()) > 0 {
|
|
|
|
+ err = m.db.Open()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ err = m.db.Set(name, j.GetSymbols())
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ m.db.Close()
|
|
|
|
+ err = m.registry.StoreSymbols(name, j.GetSymbols())
|
|
|
|
+ } else {
|
|
|
|
+ err = m.registry.StoreSymbols(name, []string{name})
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
|
|
zipPath := path.Join(m.pluginDir, name+".zip")
|
|
zipPath := path.Join(m.pluginDir, name+".zip")
|
|
var unzipFiles []string
|
|
var unzipFiles []string
|
|
//clean up: delete zip file and unzip files in error
|
|
//clean up: delete zip file and unzip files in error
|
|
defer os.Remove(zipPath)
|
|
defer os.Remove(zipPath)
|
|
//download
|
|
//download
|
|
- err := downloadFile(zipPath, uri)
|
|
|
|
|
|
+ err = downloadFile(zipPath, uri)
|
|
if err != nil {
|
|
if err != nil {
|
|
return fmt.Errorf("fail to download file %s: %s", uri, err)
|
|
return fmt.Errorf("fail to download file %s: %s", uri, err)
|
|
}
|
|
}
|
|
//unzip and copy to destination
|
|
//unzip and copy to destination
|
|
unzipFiles, version, err := m.install(t, name, zipPath, shellParas)
|
|
unzipFiles, version, err := m.install(t, name, zipPath, shellParas)
|
|
- if err != nil {
|
|
|
|
|
|
+ if err == nil && len(j.GetSymbols()) > 0 {
|
|
|
|
+ if err = m.db.Open(); err == nil {
|
|
|
|
+ err = m.db.Set(name, j.GetSymbols())
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if err != nil { //Revert for any errors
|
|
if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
|
|
if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
|
|
- os.Remove(unzipFiles[0])
|
|
|
|
|
|
+ os.RemoveAll(unzipFiles[0])
|
|
|
|
+ }
|
|
|
|
+ if len(j.GetSymbols()) > 0 {
|
|
|
|
+ m.db.Close()
|
|
|
|
+ m.registry.RemoveSymbols(j.GetSymbols())
|
|
|
|
+ } else {
|
|
|
|
+ m.registry.RemoveSymbols([]string{name})
|
|
}
|
|
}
|
|
- return fmt.Errorf("fail to unzip file %s: %s", uri, err)
|
|
|
|
|
|
+ return fmt.Errorf("fail to install plugin: %s", err)
|
|
}
|
|
}
|
|
m.registry.Store(t, name, version)
|
|
m.registry.Store(t, name, version)
|
|
|
|
|
|
@@ -306,12 +469,37 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// prerequisite:function plugin of name exists
|
|
|
|
+func (m *Manager) RegisterFuncs(name string, functions []string) error {
|
|
|
|
+ if len(functions) == 0 {
|
|
|
|
+ return fmt.Errorf("property 'functions' must not be empty")
|
|
|
|
+ }
|
|
|
|
+ err := m.db.Open()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ defer m.db.Close()
|
|
|
|
+ old := make([]string, 0)
|
|
|
|
+ if ok, err := m.db.Get(name, &old); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ } else if ok {
|
|
|
|
+ m.registry.RemoveSymbols(old)
|
|
|
|
+ } else if !ok {
|
|
|
|
+ m.registry.RemoveSymbols([]string{name})
|
|
|
|
+ }
|
|
|
|
+ err = m.db.Set(name, functions)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ return m.registry.StoreSymbols(name, functions)
|
|
|
|
+}
|
|
|
|
+
|
|
func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
name = strings.Trim(name, " ")
|
|
name = strings.Trim(name, " ")
|
|
if name == "" {
|
|
if name == "" {
|
|
return fmt.Errorf("invalid name %s: should not be empty", name)
|
|
return fmt.Errorf("invalid name %s: should not be empty", name)
|
|
}
|
|
}
|
|
- soPath, err := getSoFilePath(m, t, name)
|
|
|
|
|
|
+ soPath, err := getSoFilePath(m, t, name, true)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
@@ -319,6 +507,13 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
paths := []string{
|
|
paths := []string{
|
|
soPath,
|
|
soPath,
|
|
}
|
|
}
|
|
|
|
+ // Find etc folder
|
|
|
|
+ etcPath := path.Join(m.etcDir, PluginTypes[t], name)
|
|
|
|
+ if fi, err := os.Stat(etcPath); err == nil {
|
|
|
|
+ if fi.Mode().IsDir() {
|
|
|
|
+ paths = append(paths, etcPath)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
switch t {
|
|
switch t {
|
|
case SOURCE:
|
|
case SOURCE:
|
|
paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
|
|
paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
|
|
@@ -326,13 +521,30 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
case SINK:
|
|
case SINK:
|
|
m.uninstalSink(name)
|
|
m.uninstalSink(name)
|
|
case FUNCTION:
|
|
case FUNCTION:
|
|
|
|
+ old := make([]string, 0)
|
|
|
|
+ err = m.db.Open()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ if ok, err := m.db.Get(name, &old); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ } else if ok {
|
|
|
|
+ m.registry.RemoveSymbols(old)
|
|
|
|
+ err := m.db.Delete(name)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ } else if !ok {
|
|
|
|
+ m.registry.RemoveSymbols([]string{name})
|
|
|
|
+ }
|
|
|
|
+ m.db.Close()
|
|
m.uninstalFunc(name)
|
|
m.uninstalFunc(name)
|
|
}
|
|
}
|
|
|
|
|
|
for _, p := range paths {
|
|
for _, p := range paths {
|
|
_, err := os.Stat(p)
|
|
_, err := os.Stat(p)
|
|
if err == nil {
|
|
if err == nil {
|
|
- err = os.Remove(p)
|
|
|
|
|
|
+ err = os.RemoveAll(p)
|
|
if err != nil {
|
|
if err != nil {
|
|
results = append(results, err.Error())
|
|
results = append(results, err.Error())
|
|
}
|
|
}
|
|
@@ -354,38 +566,62 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-func (m *Manager) Get(t PluginType, name string) (map[string]string, bool) {
|
|
|
|
|
|
+func (m *Manager) Get(t PluginType, name string) (map[string]interface{}, bool) {
|
|
v, ok := m.registry.Get(t, name)
|
|
v, ok := m.registry.Get(t, name)
|
|
if strings.HasPrefix(v, "v") {
|
|
if strings.HasPrefix(v, "v") {
|
|
v = v[1:]
|
|
v = v[1:]
|
|
}
|
|
}
|
|
if ok {
|
|
if ok {
|
|
- m := map[string]string{
|
|
|
|
|
|
+ r := map[string]interface{}{
|
|
"name": name,
|
|
"name": name,
|
|
"version": v,
|
|
"version": v,
|
|
}
|
|
}
|
|
- return m, ok
|
|
|
|
|
|
+ if t == FUNCTION {
|
|
|
|
+ if err := m.db.Open(); err == nil {
|
|
|
|
+ l := make([]string, 0)
|
|
|
|
+ if ok, _ := m.db.Get(name, &l); ok {
|
|
|
|
+ r["functions"] = l
|
|
|
|
+ }
|
|
|
|
+ m.db.Close()
|
|
|
|
+ }
|
|
|
|
+ // ignore the error
|
|
|
|
+ }
|
|
|
|
+ return r, ok
|
|
}
|
|
}
|
|
return nil, false
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
|
|
// Return the lowercase version of so name. It may be upper case in path.
|
|
// Return the lowercase version of so name. It may be upper case in path.
|
|
-func getSoFilePath(m *Manager, t PluginType, name string) (string, error) {
|
|
|
|
- v, ok := m.registry.Get(t, name)
|
|
|
|
|
|
+func getSoFilePath(m *Manager, t PluginType, name string, isSoName bool) (string, error) {
|
|
|
|
+ var (
|
|
|
|
+ v string
|
|
|
|
+ soname string
|
|
|
|
+ ok bool
|
|
|
|
+ )
|
|
|
|
+ // We must identify plugin or symbol when deleting function plugin
|
|
|
|
+ if isSoName {
|
|
|
|
+ soname = name
|
|
|
|
+ } else {
|
|
|
|
+ soname, ok = m.registry.GetPluginBySymbol(t, name)
|
|
|
|
+ if !ok {
|
|
|
|
+ return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ v, ok = m.registry.Get(t, soname)
|
|
if !ok {
|
|
if !ok {
|
|
- return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", name))
|
|
|
|
|
|
+ return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
|
|
}
|
|
}
|
|
|
|
|
|
- soFile := name + ".so"
|
|
|
|
|
|
+ soFile := soname + ".so"
|
|
if v != "" {
|
|
if v != "" {
|
|
- soFile = fmt.Sprintf("%s@%s.so", name, v)
|
|
|
|
|
|
+ soFile = fmt.Sprintf("%s@%s.so", soname, v)
|
|
}
|
|
}
|
|
p := path.Join(m.pluginDir, PluginTypes[t], soFile)
|
|
p := path.Join(m.pluginDir, PluginTypes[t], soFile)
|
|
if _, err := os.Stat(p); err != nil {
|
|
if _, err := os.Stat(p); err != nil {
|
|
p = path.Join(m.pluginDir, PluginTypes[t], ucFirst(soFile))
|
|
p = path.Join(m.pluginDir, PluginTypes[t], ucFirst(soFile))
|
|
}
|
|
}
|
|
if _, err := os.Stat(p); err != nil {
|
|
if _, err := os.Stat(p); err != nil {
|
|
- return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", name))
|
|
|
|
|
|
+ return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
|
|
}
|
|
}
|
|
return p, nil
|
|
return p, nil
|
|
}
|
|
}
|
|
@@ -435,6 +671,11 @@ func (m *Manager) install(t PluginType, name, src string, shellParas []string) (
|
|
filenames = append(filenames, soPath)
|
|
filenames = append(filenames, soPath)
|
|
revokeFiles = append(revokeFiles, soPath)
|
|
revokeFiles = append(revokeFiles, soPath)
|
|
_, version = parseName(fileName)
|
|
_, version = parseName(fileName)
|
|
|
|
+ } else if strings.HasPrefix(fileName, "etc/") {
|
|
|
|
+ err = unzipTo(file, path.Join(m.etcDir, PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
|
|
|
|
+ if err != nil {
|
|
|
|
+ return filenames, "", err
|
|
|
|
+ }
|
|
} else { //unzip other files
|
|
} else { //unzip other files
|
|
err = unzipTo(file, path.Join(tempPath, fileName))
|
|
err = unzipTo(file, path.Join(tempPath, fileName))
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -463,11 +704,12 @@ func (m *Manager) install(t PluginType, name, src string, shellParas []string) (
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
for _, f := range revokeFiles {
|
|
for _, f := range revokeFiles {
|
|
- os.Remove(f)
|
|
|
|
|
|
+ os.RemoveAll(f)
|
|
}
|
|
}
|
|
common.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
|
|
common.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
|
|
return filenames, "", err
|
|
return filenames, "", err
|
|
} else {
|
|
} else {
|
|
|
|
+ common.Log.Infof(`run install script:%s`, outb.String())
|
|
common.Log.Infof("install %s plugin %s", PluginTypes[t], name)
|
|
common.Log.Infof("install %s plugin %s", PluginTypes[t], name)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -487,17 +729,21 @@ func parseName(n string) (string, string) {
|
|
func unzipTo(f *zip.File, fpath string) error {
|
|
func unzipTo(f *zip.File, fpath string) error {
|
|
_, err := os.Stat(fpath)
|
|
_, err := os.Stat(fpath)
|
|
if err == nil || !os.IsNotExist(err) {
|
|
if err == nil || !os.IsNotExist(err) {
|
|
- if err = os.Remove(fpath); err != nil {
|
|
|
|
|
|
+ if err = os.RemoveAll(fpath); err != nil {
|
|
return fmt.Errorf("failed to delete file %s", fpath)
|
|
return fmt.Errorf("failed to delete file %s", fpath)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
if f.FileInfo().IsDir() {
|
|
if f.FileInfo().IsDir() {
|
|
- return fmt.Errorf("%s: not a file, but a directory", fpath)
|
|
|
|
|
|
+ // Make Folder
|
|
|
|
+ os.MkdirAll(fpath, os.ModePerm)
|
|
|
|
+ return nil
|
|
}
|
|
}
|
|
|
|
|
|
- if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
|
|
|
|
- return err
|
|
|
|
|
|
+ if _, err := os.Stat(filepath.Dir(fpath)); os.IsNotExist(err) {
|
|
|
|
+ if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
|
|
outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
|