|
@@ -12,20 +12,23 @@
|
|
|
// See the License for the specific language governing permissions and
|
|
|
// limitations under the License.
|
|
|
|
|
|
-package plugin
|
|
|
+// Manage the loading of both native and portable plugins
|
|
|
+
|
|
|
+package native
|
|
|
|
|
|
import (
|
|
|
"archive/zip"
|
|
|
"bytes"
|
|
|
- "errors"
|
|
|
"fmt"
|
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/meta"
|
|
|
"github.com/lf-edge/ekuiper/internal/pkg/filex"
|
|
|
"github.com/lf-edge/ekuiper/internal/pkg/httpx"
|
|
|
"github.com/lf-edge/ekuiper/internal/pkg/store"
|
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
|
"github.com/lf-edge/ekuiper/pkg/errorx"
|
|
|
"github.com/lf-edge/ekuiper/pkg/kv"
|
|
|
+ "github.com/pkg/errors"
|
|
|
"io/ioutil"
|
|
|
"os"
|
|
|
"os/exec"
|
|
@@ -39,91 +42,102 @@ import (
|
|
|
"unicode"
|
|
|
)
|
|
|
|
|
|
-type Plugin interface {
|
|
|
- GetName() string
|
|
|
- GetFile() string
|
|
|
- GetShellParas() []string
|
|
|
- GetSymbols() []string
|
|
|
- SetName(n string)
|
|
|
-}
|
|
|
-
|
|
|
-type IOPlugin struct {
|
|
|
- Name string `json:"name"`
|
|
|
- File string `json:"file"`
|
|
|
- ShellParas []string `json:"shellParas"`
|
|
|
-}
|
|
|
-
|
|
|
-func (p *IOPlugin) GetName() string {
|
|
|
- return p.Name
|
|
|
-}
|
|
|
+// Manager Initialized in the binder
|
|
|
+var manager *Manager
|
|
|
|
|
|
-func (p *IOPlugin) GetFile() string {
|
|
|
- return p.File
|
|
|
-}
|
|
|
-
|
|
|
-func (p *IOPlugin) GetShellParas() []string {
|
|
|
- return p.ShellParas
|
|
|
-}
|
|
|
-
|
|
|
-func (p *IOPlugin) GetSymbols() []string {
|
|
|
- return nil
|
|
|
-}
|
|
|
+const DELETED = "$deleted"
|
|
|
|
|
|
-func (p *IOPlugin) SetName(n string) {
|
|
|
- p.Name = n
|
|
|
+//Manager is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
|
|
|
+type Manager struct {
|
|
|
+ sync.RWMutex
|
|
|
+ // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
|
|
|
+ plugins []map[string]string
|
|
|
+ // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
|
|
|
+ symbols map[string]string
|
|
|
+ // loaded symbols in current runtime
|
|
|
+ runtime map[string]plugin.Symbol
|
|
|
+ // dirs
|
|
|
+ pluginDir string
|
|
|
+ etcDir string
|
|
|
+ // the access to db
|
|
|
+ db kv.KeyValue
|
|
|
}
|
|
|
|
|
|
-type FuncPlugin struct {
|
|
|
- IOPlugin
|
|
|
- // Optional, if not specified, a default element with the same name of the file will be registered
|
|
|
- Functions []string `json:"functions"`
|
|
|
-}
|
|
|
+// InitManager must only be called once
|
|
|
+func InitManager() (*Manager, error) {
|
|
|
+ pluginDir, err := conf.GetPluginsLoc()
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("cannot find plugins folder: %s", err)
|
|
|
+ }
|
|
|
+ etcDir, err := conf.GetConfLoc()
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("cannot find etc folder: %s", err)
|
|
|
+ }
|
|
|
+ err, db := store.GetKV("pluginFuncs")
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error when opening db: %v", err)
|
|
|
+ }
|
|
|
+ plugins := make([]map[string]string, 3)
|
|
|
+ for i := range PluginTypes {
|
|
|
+ names, err := findAll(PluginType(i), pluginDir)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("fail to find existing plugins: %s", err)
|
|
|
+ }
|
|
|
+ plugins[i] = names
|
|
|
+ }
|
|
|
+ registry := &Manager{plugins: plugins, symbols: make(map[string]string), db: db, pluginDir: pluginDir, etcDir: etcDir, runtime: make(map[string]plugin.Symbol)}
|
|
|
|
|
|
-func (fp *FuncPlugin) GetSymbols() []string {
|
|
|
- return fp.Functions
|
|
|
+ for pf := range plugins[FUNCTION] {
|
|
|
+ l := make([]string, 0)
|
|
|
+ if ok, err := db.Get(pf, &l); ok {
|
|
|
+ registry.storeSymbols(pf, l)
|
|
|
+ } else if err != nil {
|
|
|
+ return nil, fmt.Errorf("error when querying kv: %s", err)
|
|
|
+ } else {
|
|
|
+ registry.storeSymbols(pf, []string{pf})
|
|
|
+ }
|
|
|
+ }
|
|
|
+ manager = registry
|
|
|
+ return registry, nil
|
|
|
}
|
|
|
|
|
|
-type PluginType int
|
|
|
+func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
|
|
|
+ result = make(map[string]string)
|
|
|
+ dir := path.Join(pluginDir, PluginTypes[t])
|
|
|
+ files, err := ioutil.ReadDir(dir)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
-func NewPluginByType(t PluginType) Plugin {
|
|
|
- switch t {
|
|
|
- case FUNCTION:
|
|
|
- return &FuncPlugin{}
|
|
|
- default:
|
|
|
- return &IOPlugin{}
|
|
|
+ for _, file := range files {
|
|
|
+ baseName := filepath.Base(file.Name())
|
|
|
+ if strings.HasSuffix(baseName, ".so") {
|
|
|
+ n, v := parseName(baseName)
|
|
|
+ result[n] = v
|
|
|
+ }
|
|
|
}
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
-const (
|
|
|
- SOURCE PluginType = iota
|
|
|
- SINK
|
|
|
- FUNCTION
|
|
|
-)
|
|
|
-
|
|
|
-const DELETED = "$deleted"
|
|
|
-
|
|
|
-var (
|
|
|
- PluginTypes = []string{"sources", "sinks", "functions"}
|
|
|
- once sync.Once
|
|
|
- singleton *Manager
|
|
|
-)
|
|
|
+func GetManager() *Manager {
|
|
|
+ return manager
|
|
|
+}
|
|
|
|
|
|
-//Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
|
|
|
-type Registry struct {
|
|
|
- sync.RWMutex
|
|
|
- // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
|
|
|
- plugins []map[string]string
|
|
|
- // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
|
|
|
- symbols map[string]string
|
|
|
+func (rr *Manager) get(t PluginType, name string) (string, bool) {
|
|
|
+ rr.RLock()
|
|
|
+ result := rr.plugins[t]
|
|
|
+ rr.RUnlock()
|
|
|
+ r, ok := result[name]
|
|
|
+ return r, ok
|
|
|
}
|
|
|
|
|
|
-func (rr *Registry) Store(t PluginType, name string, version string) {
|
|
|
+func (rr *Manager) store(t PluginType, name string, version string) {
|
|
|
rr.Lock()
|
|
|
rr.plugins[t][name] = version
|
|
|
rr.Unlock()
|
|
|
}
|
|
|
|
|
|
-func (rr *Registry) StoreSymbols(name string, symbols []string) error {
|
|
|
+func (rr *Manager) storeSymbols(name string, symbols []string) error {
|
|
|
rr.Lock()
|
|
|
defer rr.Unlock()
|
|
|
for _, s := range symbols {
|
|
@@ -137,7 +151,7 @@ func (rr *Registry) StoreSymbols(name string, symbols []string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (rr *Registry) RemoveSymbols(symbols []string) {
|
|
|
+func (rr *Manager) removeSymbols(symbols []string) {
|
|
|
rr.Lock()
|
|
|
for _, s := range symbols {
|
|
|
delete(rr.symbols, s)
|
|
@@ -145,7 +159,9 @@ func (rr *Registry) RemoveSymbols(symbols []string) {
|
|
|
rr.Unlock()
|
|
|
}
|
|
|
|
|
|
-func (rr *Registry) List(t PluginType) []string {
|
|
|
+// API for management
|
|
|
+
|
|
|
+func (rr *Manager) List(t PluginType) []string {
|
|
|
rr.RLock()
|
|
|
result := rr.plugins[t]
|
|
|
rr.RUnlock()
|
|
@@ -156,7 +172,7 @@ func (rr *Registry) List(t PluginType) []string {
|
|
|
return keys
|
|
|
}
|
|
|
|
|
|
-func (rr *Registry) ListSymbols() []string {
|
|
|
+func (rr *Manager) ListSymbols() []string {
|
|
|
rr.RLock()
|
|
|
result := rr.symbols
|
|
|
rr.RUnlock()
|
|
@@ -167,15 +183,7 @@ func (rr *Registry) ListSymbols() []string {
|
|
|
return keys
|
|
|
}
|
|
|
|
|
|
-func (rr *Registry) Get(t PluginType, name string) (string, bool) {
|
|
|
- rr.RLock()
|
|
|
- result := rr.plugins[t]
|
|
|
- rr.RUnlock()
|
|
|
- r, ok := result[name]
|
|
|
- return r, ok
|
|
|
-}
|
|
|
-
|
|
|
-func (rr *Registry) GetPluginVersionBySymbol(t PluginType, symbolName string) (string, bool) {
|
|
|
+func (rr *Manager) GetPluginVersionBySymbol(t PluginType, symbolName string) (string, bool) {
|
|
|
switch t {
|
|
|
case FUNCTION:
|
|
|
rr.RLock()
|
|
@@ -189,11 +197,11 @@ func (rr *Registry) GetPluginVersionBySymbol(t PluginType, symbolName string) (s
|
|
|
return "", false
|
|
|
}
|
|
|
default:
|
|
|
- return rr.Get(t, symbolName)
|
|
|
+ return rr.get(t, symbolName)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (rr *Registry) GetPluginBySymbol(t PluginType, symbolName string) (string, bool) {
|
|
|
+func (rr *Manager) GetPluginBySymbol(t PluginType, symbolName string) (string, bool) {
|
|
|
switch t {
|
|
|
case FUNCTION:
|
|
|
rr.RLock()
|
|
@@ -205,175 +213,7 @@ func (rr *Registry) GetPluginBySymbol(t PluginType, symbolName string) (string,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-var symbolRegistry = make(map[string]plugin.Symbol)
|
|
|
-var mu sync.RWMutex
|
|
|
-
|
|
|
-func getPlugin(t string, pt PluginType) (plugin.Symbol, error) {
|
|
|
- ut := ucFirst(t)
|
|
|
- ptype := PluginTypes[pt]
|
|
|
- key := ptype + "/" + t
|
|
|
- mu.Lock()
|
|
|
- defer mu.Unlock()
|
|
|
- var nf plugin.Symbol
|
|
|
- nf, ok := symbolRegistry[key]
|
|
|
- if !ok {
|
|
|
- m, err := NewPluginManager()
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("fail to initialize the plugin manager")
|
|
|
- }
|
|
|
- mod, err := getSoFilePath(m, pt, t, false)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("cannot get the plugin file path: %v", err)
|
|
|
- }
|
|
|
- conf.Log.Debugf("Opening plugin %s", mod)
|
|
|
- plug, err := plugin.Open(mod)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("cannot open %s: %v", mod, err)
|
|
|
- }
|
|
|
- conf.Log.Debugf("Successfully open plugin %s", mod)
|
|
|
- nf, err = plug.Lookup(ut)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
|
|
|
- }
|
|
|
- conf.Log.Debugf("Successfully look-up plugin %s", mod)
|
|
|
- symbolRegistry[key] = nf
|
|
|
- }
|
|
|
- return nf, nil
|
|
|
-}
|
|
|
-
|
|
|
-func GetSource(t string) (api.Source, error) {
|
|
|
- nf, err := getPlugin(t, SOURCE)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- var s api.Source
|
|
|
- switch t := nf.(type) {
|
|
|
- case api.Source:
|
|
|
- s = t
|
|
|
- case func() api.Source:
|
|
|
- s = t()
|
|
|
- default:
|
|
|
- return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
|
|
|
- }
|
|
|
- return s, nil
|
|
|
-}
|
|
|
-
|
|
|
-func GetSink(t string) (api.Sink, error) {
|
|
|
- nf, err := getPlugin(t, SINK)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- var s api.Sink
|
|
|
- switch t := nf.(type) {
|
|
|
- case api.Sink:
|
|
|
- s = t
|
|
|
- case func() api.Sink:
|
|
|
- s = t()
|
|
|
- default:
|
|
|
- return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
|
|
|
- }
|
|
|
- return s, nil
|
|
|
-}
|
|
|
-
|
|
|
-type Manager struct {
|
|
|
- pluginDir string
|
|
|
- etcDir string
|
|
|
- registry *Registry
|
|
|
- db kv.KeyValue
|
|
|
-}
|
|
|
-
|
|
|
-func NewPluginManager() (*Manager, error) {
|
|
|
- var outerErr error
|
|
|
- once.Do(func() {
|
|
|
- dir, err := conf.GetPluginsLoc()
|
|
|
- if err != nil {
|
|
|
- outerErr = fmt.Errorf("cannot find plugins folder: %s", err)
|
|
|
- return
|
|
|
- }
|
|
|
- etcDir, err := conf.GetConfLoc()
|
|
|
- if err != nil {
|
|
|
- outerErr = fmt.Errorf("cannot find etc folder: %s", err)
|
|
|
- return
|
|
|
- }
|
|
|
- err, db := store.GetKV("pluginFuncs")
|
|
|
- if err != nil {
|
|
|
- outerErr = fmt.Errorf("error when opening db: %v.", err)
|
|
|
- }
|
|
|
- plugins := make([]map[string]string, 3)
|
|
|
- for i := 0; i < 3; i++ {
|
|
|
- names, err := findAll(PluginType(i), dir)
|
|
|
- if err != nil {
|
|
|
- outerErr = fmt.Errorf("fail to find existing plugins: %s", err)
|
|
|
- return
|
|
|
- }
|
|
|
- plugins[i] = names
|
|
|
- }
|
|
|
- registry := &Registry{plugins: plugins, symbols: make(map[string]string)}
|
|
|
- for pf := range plugins[FUNCTION] {
|
|
|
- l := make([]string, 0)
|
|
|
- if ok, err := db.Get(pf, &l); ok {
|
|
|
- registry.StoreSymbols(pf, l)
|
|
|
- } else if err != nil {
|
|
|
- outerErr = fmt.Errorf("error when querying kv: %s", err)
|
|
|
- return
|
|
|
- } else {
|
|
|
- registry.StoreSymbols(pf, []string{pf})
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- singleton = &Manager{
|
|
|
- pluginDir: dir,
|
|
|
- etcDir: etcDir,
|
|
|
- registry: registry,
|
|
|
- db: db,
|
|
|
- }
|
|
|
- if err := singleton.readSourceMetaDir(); nil != err {
|
|
|
- conf.Log.Errorf("readSourceMetaDir:%v", err)
|
|
|
- }
|
|
|
- if err := singleton.readSinkMetaDir(); nil != err {
|
|
|
- conf.Log.Errorf("readSinkMetaDir:%v", err)
|
|
|
- }
|
|
|
- if err := singleton.readFuncMetaDir(); nil != err {
|
|
|
- conf.Log.Errorf("readFuncMetaDir:%v", err)
|
|
|
- }
|
|
|
- if err := singleton.readUiMsgDir(); nil != err {
|
|
|
- conf.Log.Errorf("readUiMsgDir:%v", err)
|
|
|
- }
|
|
|
- })
|
|
|
- return singleton, outerErr
|
|
|
-}
|
|
|
-
|
|
|
-func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
|
|
|
- result = make(map[string]string)
|
|
|
- dir := path.Join(pluginDir, PluginTypes[t])
|
|
|
- files, err := ioutil.ReadDir(dir)
|
|
|
- if err != nil {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- for _, file := range files {
|
|
|
- baseName := filepath.Base(file.Name())
|
|
|
- if strings.HasSuffix(baseName, ".so") {
|
|
|
- n, v := parseName(baseName)
|
|
|
- result[n] = v
|
|
|
- }
|
|
|
- }
|
|
|
- return
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Manager) List(t PluginType) (result []string, err error) {
|
|
|
- return m.registry.List(t), nil
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Manager) ListSymbols() (result []string, err error) {
|
|
|
- return m.registry.ListSymbols(), nil
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Manager) GetSymbol(s string) (result string, ok bool) {
|
|
|
- return m.registry.GetPluginBySymbol(FUNCTION, s)
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Manager) Register(t PluginType, j Plugin) error {
|
|
|
+func (rr *Manager) Register(t PluginType, j Plugin) error {
|
|
|
name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
|
|
|
//Validation
|
|
|
name = strings.Trim(name, " ")
|
|
@@ -384,7 +224,7 @@ func (m *Manager) Register(t PluginType, j Plugin) error {
|
|
|
return fmt.Errorf("invalid uri %s", uri)
|
|
|
}
|
|
|
|
|
|
- if v, ok := m.registry.Get(t, name); ok {
|
|
|
+ if v, ok := rr.get(t, name); ok {
|
|
|
if v == DELETED {
|
|
|
return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
|
|
|
} else {
|
|
@@ -394,20 +234,20 @@ func (m *Manager) Register(t PluginType, j Plugin) error {
|
|
|
var err error
|
|
|
if t == FUNCTION {
|
|
|
if len(j.GetSymbols()) > 0 {
|
|
|
- err = m.db.Set(name, j.GetSymbols())
|
|
|
+ err = rr.db.Set(name, j.GetSymbols())
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- err = m.registry.StoreSymbols(name, j.GetSymbols())
|
|
|
+ err = rr.storeSymbols(name, j.GetSymbols())
|
|
|
} else {
|
|
|
- err = m.registry.StoreSymbols(name, []string{name})
|
|
|
+ err = rr.storeSymbols(name, []string{name})
|
|
|
}
|
|
|
}
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- zipPath := path.Join(m.pluginDir, name+".zip")
|
|
|
+ zipPath := path.Join(rr.pluginDir, name+".zip")
|
|
|
var unzipFiles []string
|
|
|
//clean up: delete zip file and unzip files in error
|
|
|
defer os.Remove(zipPath)
|
|
@@ -417,66 +257,66 @@ func (m *Manager) Register(t PluginType, j Plugin) error {
|
|
|
return fmt.Errorf("fail to download file %s: %s", uri, err)
|
|
|
}
|
|
|
//unzip and copy to destination
|
|
|
- unzipFiles, version, err := m.install(t, name, zipPath, shellParas)
|
|
|
+ unzipFiles, version, err := rr.install(t, name, zipPath, shellParas)
|
|
|
if err == nil && len(j.GetSymbols()) > 0 {
|
|
|
- err = m.db.Set(name, j.GetSymbols())
|
|
|
+ err = rr.db.Set(name, j.GetSymbols())
|
|
|
}
|
|
|
if err != nil { //Revert for any errors
|
|
|
if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
|
|
|
os.RemoveAll(unzipFiles[0])
|
|
|
}
|
|
|
if len(j.GetSymbols()) > 0 {
|
|
|
- m.registry.RemoveSymbols(j.GetSymbols())
|
|
|
+ rr.removeSymbols(j.GetSymbols())
|
|
|
} else {
|
|
|
- m.registry.RemoveSymbols([]string{name})
|
|
|
+ rr.removeSymbols([]string{name})
|
|
|
}
|
|
|
return fmt.Errorf("fail to install plugin: %s", err)
|
|
|
}
|
|
|
- m.registry.Store(t, name, version)
|
|
|
+ rr.store(t, name, version)
|
|
|
|
|
|
switch t {
|
|
|
case SINK:
|
|
|
- if err := m.readSinkMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
|
|
|
+ if err := meta.ReadSinkMetaFile(path.Join(rr.etcDir, PluginTypes[t], name+`.json`), true); nil != err {
|
|
|
conf.Log.Errorf("readSinkFile:%v", err)
|
|
|
}
|
|
|
case SOURCE:
|
|
|
- if err := m.readSourceMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
|
|
|
+ if err := meta.ReadSourceMetaFile(path.Join(rr.etcDir, PluginTypes[t], name+`.json`), true); nil != err {
|
|
|
conf.Log.Errorf("readSourceFile:%v", err)
|
|
|
}
|
|
|
case FUNCTION:
|
|
|
- if err := m.readFuncMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
|
|
|
+ if err := meta.ReadFuncMetaFile(path.Join(rr.etcDir, PluginTypes[t], name+`.json`), true); nil != err {
|
|
|
conf.Log.Errorf("readFuncFile:%v", err)
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// prerequisite:function plugin of name exists
|
|
|
-func (m *Manager) RegisterFuncs(name string, functions []string) error {
|
|
|
+// RegisterFuncs prerequisite:function plugin of name exists
|
|
|
+func (rr *Manager) RegisterFuncs(name string, functions []string) error {
|
|
|
if len(functions) == 0 {
|
|
|
return fmt.Errorf("property 'functions' must not be empty")
|
|
|
}
|
|
|
old := make([]string, 0)
|
|
|
- if ok, err := m.db.Get(name, &old); err != nil {
|
|
|
+ if ok, err := rr.db.Get(name, &old); err != nil {
|
|
|
return err
|
|
|
} else if ok {
|
|
|
- m.registry.RemoveSymbols(old)
|
|
|
+ rr.removeSymbols(old)
|
|
|
} else if !ok {
|
|
|
- m.registry.RemoveSymbols([]string{name})
|
|
|
+ rr.removeSymbols([]string{name})
|
|
|
}
|
|
|
- err := m.db.Set(name, functions)
|
|
|
+ err := rr.db.Set(name, functions)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- return m.registry.StoreSymbols(name, functions)
|
|
|
+ return rr.storeSymbols(name, functions)
|
|
|
}
|
|
|
|
|
|
-func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
|
+func (rr *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
|
name = strings.Trim(name, " ")
|
|
|
if name == "" {
|
|
|
return fmt.Errorf("invalid name %s: should not be empty", name)
|
|
|
}
|
|
|
- soPath, err := getSoFilePath(m, t, name, true)
|
|
|
+ soPath, err := rr.getSoFilePath(t, name, true)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -485,7 +325,7 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
|
soPath,
|
|
|
}
|
|
|
// Find etc folder
|
|
|
- etcPath := path.Join(m.etcDir, PluginTypes[t], name)
|
|
|
+ etcPath := path.Join(rr.etcDir, PluginTypes[t], name)
|
|
|
if fi, err := os.Stat(etcPath); err == nil {
|
|
|
if fi.Mode().IsDir() {
|
|
|
paths = append(paths, etcPath)
|
|
@@ -493,24 +333,24 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
|
}
|
|
|
switch t {
|
|
|
case SOURCE:
|
|
|
- paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
|
|
|
- m.uninstalSource(name)
|
|
|
+ paths = append(paths, path.Join(rr.etcDir, PluginTypes[t], name+".yaml"))
|
|
|
+ meta.UninstallSource(name)
|
|
|
case SINK:
|
|
|
- m.uninstalSink(name)
|
|
|
+ meta.UninstallSink(name)
|
|
|
case FUNCTION:
|
|
|
old := make([]string, 0)
|
|
|
- if ok, err := m.db.Get(name, &old); err != nil {
|
|
|
+ if ok, err := rr.db.Get(name, &old); err != nil {
|
|
|
return err
|
|
|
} else if ok {
|
|
|
- m.registry.RemoveSymbols(old)
|
|
|
- err := m.db.Delete(name)
|
|
|
+ rr.removeSymbols(old)
|
|
|
+ err := rr.db.Delete(name)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
} else if !ok {
|
|
|
- m.registry.RemoveSymbols([]string{name})
|
|
|
+ rr.removeSymbols([]string{name})
|
|
|
}
|
|
|
- m.uninstalFunc(name)
|
|
|
+ meta.UninstallFunc(name)
|
|
|
}
|
|
|
|
|
|
for _, p := range paths {
|
|
@@ -528,7 +368,7 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
|
if len(results) > 0 {
|
|
|
return errors.New(strings.Join(results, "\n"))
|
|
|
} else {
|
|
|
- m.registry.Store(t, name, DELETED)
|
|
|
+ rr.store(t, name, DELETED)
|
|
|
if stop {
|
|
|
go func() {
|
|
|
time.Sleep(1 * time.Second)
|
|
@@ -538,8 +378,8 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
|
|
|
return nil
|
|
|
}
|
|
|
}
|
|
|
-func (m *Manager) Get(t PluginType, name string) (map[string]interface{}, bool) {
|
|
|
- v, ok := m.registry.Get(t, name)
|
|
|
+func (rr *Manager) GetPluginInfo(t PluginType, name string) (map[string]interface{}, bool) {
|
|
|
+ v, ok := rr.get(t, name)
|
|
|
if strings.HasPrefix(v, "v") {
|
|
|
v = v[1:]
|
|
|
}
|
|
@@ -550,7 +390,7 @@ func (m *Manager) Get(t PluginType, name string) (map[string]interface{}, bool)
|
|
|
}
|
|
|
if t == FUNCTION {
|
|
|
l := make([]string, 0)
|
|
|
- if ok, _ := m.db.Get(name, &l); ok {
|
|
|
+ if ok, _ := rr.db.Get(name, &l); ok {
|
|
|
r["functions"] = l
|
|
|
}
|
|
|
// ignore the error
|
|
@@ -560,70 +400,9 @@ func (m *Manager) Get(t PluginType, name string) (map[string]interface{}, bool)
|
|
|
return nil, false
|
|
|
}
|
|
|
|
|
|
-// Start implement xsql.FunctionRegister
|
|
|
-
|
|
|
-func (m *Manager) HasFunction(name string) bool {
|
|
|
- _, ok := m.GetSymbol(name)
|
|
|
- return ok
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Manager) Function(name string) (api.Function, error) {
|
|
|
- nf, err := getPlugin(name, FUNCTION)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- var s api.Function
|
|
|
- switch t := nf.(type) {
|
|
|
- case api.Function:
|
|
|
- s = t
|
|
|
- case func() api.Function:
|
|
|
- s = t()
|
|
|
- default:
|
|
|
- return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
|
|
|
- }
|
|
|
- return s, nil
|
|
|
-}
|
|
|
-
|
|
|
-// End Implement FunctionRegister
|
|
|
-
|
|
|
-// Return the lowercase version of so name. It may be upper case in path.
|
|
|
-func getSoFilePath(m *Manager, t PluginType, name string, isSoName bool) (string, error) {
|
|
|
- var (
|
|
|
- v string
|
|
|
- soname string
|
|
|
- ok bool
|
|
|
- )
|
|
|
- // We must identify plugin or symbol when deleting function plugin
|
|
|
- if isSoName {
|
|
|
- soname = name
|
|
|
- } else {
|
|
|
- soname, ok = m.registry.GetPluginBySymbol(t, name)
|
|
|
- if !ok {
|
|
|
- return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
|
|
|
- }
|
|
|
- }
|
|
|
- v, ok = m.registry.Get(t, soname)
|
|
|
- if !ok {
|
|
|
- return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
|
|
|
- }
|
|
|
-
|
|
|
- soFile := soname + ".so"
|
|
|
- if v != "" {
|
|
|
- soFile = fmt.Sprintf("%s@%s.so", soname, v)
|
|
|
- }
|
|
|
- p := path.Join(m.pluginDir, PluginTypes[t], soFile)
|
|
|
- if _, err := os.Stat(p); err != nil {
|
|
|
- p = path.Join(m.pluginDir, PluginTypes[t], ucFirst(soFile))
|
|
|
- }
|
|
|
- if _, err := os.Stat(p); err != nil {
|
|
|
- return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
|
|
|
- }
|
|
|
- return p, nil
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Manager) install(t PluginType, name, src string, shellParas []string) ([]string, string, error) {
|
|
|
+func (rr *Manager) install(t PluginType, name, src string, shellParas []string) ([]string, string, error) {
|
|
|
var filenames []string
|
|
|
- var tempPath = path.Join(m.pluginDir, "temp", PluginTypes[t], name)
|
|
|
+ var tempPath = path.Join(rr.pluginDir, "temp", PluginTypes[t], name)
|
|
|
defer os.RemoveAll(tempPath)
|
|
|
r, err := zip.OpenReader(src)
|
|
|
if err != nil {
|
|
@@ -636,7 +415,7 @@ func (m *Manager) install(t PluginType, name, src string, shellParas []string) (
|
|
|
expFiles := 1
|
|
|
if t == SOURCE {
|
|
|
yamlFile = name + ".yaml"
|
|
|
- yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
|
|
|
+ yamlPath = path.Join(rr.etcDir, PluginTypes[t], yamlFile)
|
|
|
expFiles = 2
|
|
|
}
|
|
|
var revokeFiles []string
|
|
@@ -651,14 +430,14 @@ func (m *Manager) install(t PluginType, name, src string, shellParas []string) (
|
|
|
revokeFiles = append(revokeFiles, yamlPath)
|
|
|
filenames = append(filenames, yamlPath)
|
|
|
} else if fileName == name+".json" {
|
|
|
- jsonPath := path.Join(m.etcDir, PluginTypes[t], fileName)
|
|
|
+ jsonPath := path.Join(rr.etcDir, PluginTypes[t], fileName)
|
|
|
if err := filex.UnzipTo(file, jsonPath); nil != err {
|
|
|
conf.Log.Errorf("Failed to decompress the metadata %s file", fileName)
|
|
|
} else {
|
|
|
revokeFiles = append(revokeFiles, jsonPath)
|
|
|
}
|
|
|
} else if soPrefix.Match([]byte(fileName)) {
|
|
|
- soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
|
|
|
+ soPath := path.Join(rr.pluginDir, PluginTypes[t], fileName)
|
|
|
err = filex.UnzipTo(file, soPath)
|
|
|
if err != nil {
|
|
|
return filenames, "", err
|
|
@@ -667,7 +446,7 @@ func (m *Manager) install(t PluginType, name, src string, shellParas []string) (
|
|
|
revokeFiles = append(revokeFiles, soPath)
|
|
|
_, version = parseName(fileName)
|
|
|
} else if strings.HasPrefix(fileName, "etc/") {
|
|
|
- err = filex.UnzipTo(file, path.Join(m.etcDir, PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
|
|
|
+ err = filex.UnzipTo(file, path.Join(rr.etcDir, PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
|
|
|
if err != nil {
|
|
|
return filenames, "", err
|
|
|
}
|
|
@@ -711,6 +490,140 @@ func (m *Manager) install(t PluginType, name, src string, shellParas []string) (
|
|
|
return filenames, version, nil
|
|
|
}
|
|
|
|
|
|
+// binder factory implementations
|
|
|
+
|
|
|
+func (rr *Manager) Source(name string) (api.Source, error) {
|
|
|
+ nf, err := rr.loadRuntime(SOURCE, name)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if nf == nil {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ switch t := nf.(type) {
|
|
|
+ case api.Source:
|
|
|
+ return t, nil
|
|
|
+ case func() api.Source:
|
|
|
+ return t(), nil
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (rr *Manager) Sink(name string) (api.Sink, error) {
|
|
|
+ nf, err := rr.loadRuntime(SINK, name)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if nf == nil {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ var s api.Sink
|
|
|
+ switch t := nf.(type) {
|
|
|
+ case api.Sink:
|
|
|
+ s = t
|
|
|
+ case func() api.Sink:
|
|
|
+ s = t()
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
|
|
|
+ }
|
|
|
+ return s, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (rr *Manager) Function(name string) (api.Function, error) {
|
|
|
+ nf, err := rr.loadRuntime(FUNCTION, name)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if nf == nil {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ var s api.Function
|
|
|
+ switch t := nf.(type) {
|
|
|
+ case api.Function:
|
|
|
+ s = t
|
|
|
+ case func() api.Function:
|
|
|
+ s = t()
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
|
|
|
+ }
|
|
|
+ return s, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (rr *Manager) HasFunctionSet(name string) bool {
|
|
|
+ _, ok := rr.get(FUNCTION, name)
|
|
|
+ return ok
|
|
|
+}
|
|
|
+
|
|
|
+// If not found, return nil,nil; Other errors return nil, err
|
|
|
+func (rr *Manager) loadRuntime(t PluginType, name string) (plugin.Symbol, error) {
|
|
|
+ ut := ucFirst(name)
|
|
|
+ ptype := PluginTypes[t]
|
|
|
+ key := ptype + "/" + name
|
|
|
+ var nf plugin.Symbol
|
|
|
+ rr.RLock()
|
|
|
+ nf, ok := rr.runtime[key]
|
|
|
+ rr.RUnlock()
|
|
|
+ if !ok {
|
|
|
+ mod, err := rr.getSoFilePath(t, name, false)
|
|
|
+ if err != nil {
|
|
|
+ conf.Log.Debugf(fmt.Sprintf("cannot find the native plugin in path: %v", err))
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ conf.Log.Debugf("Opening plugin %s", mod)
|
|
|
+ plug, err := plugin.Open(mod)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("cannot open %s: %v", mod, err)
|
|
|
+ }
|
|
|
+ conf.Log.Debugf("Successfully open plugin %s", mod)
|
|
|
+ nf, err = plug.Lookup(ut)
|
|
|
+ if err != nil {
|
|
|
+ conf.Log.Debugf(fmt.Sprintf("cannot find symbol %s, please check if it is exported", name))
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ conf.Log.Debugf("Successfully look-up plugin %s", mod)
|
|
|
+ rr.Lock()
|
|
|
+ rr.runtime[key] = nf
|
|
|
+ rr.Unlock()
|
|
|
+ }
|
|
|
+ return nf, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Return the lowercase version of so name. It may be upper case in path.
|
|
|
+func (rr *Manager) getSoFilePath(t PluginType, name string, isSoName bool) (string, error) {
|
|
|
+ var (
|
|
|
+ v string
|
|
|
+ soname string
|
|
|
+ ok bool
|
|
|
+ )
|
|
|
+ // We must identify plugin or symbol when deleting function plugin
|
|
|
+ if isSoName {
|
|
|
+ soname = name
|
|
|
+ } else {
|
|
|
+ soname, ok = rr.GetPluginBySymbol(t, name)
|
|
|
+ if !ok {
|
|
|
+ return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ v, ok = rr.get(t, soname)
|
|
|
+ if !ok {
|
|
|
+ return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
|
|
|
+ }
|
|
|
+
|
|
|
+ soFile := soname + ".so"
|
|
|
+ if v != "" {
|
|
|
+ soFile = fmt.Sprintf("%s@%s.so", soname, v)
|
|
|
+ }
|
|
|
+ p := path.Join(rr.pluginDir, PluginTypes[t], soFile)
|
|
|
+ if _, err := os.Stat(p); err != nil {
|
|
|
+ p = path.Join(rr.pluginDir, PluginTypes[t], ucFirst(soFile))
|
|
|
+ }
|
|
|
+ if _, err := os.Stat(p); err != nil {
|
|
|
+ return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
|
|
|
+ }
|
|
|
+ return p, nil
|
|
|
+}
|
|
|
+
|
|
|
func parseName(n string) (string, string) {
|
|
|
result := strings.Split(n, ".so")
|
|
|
result = strings.Split(result[0], "@")
|