123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911 |
- // Copyright 2021-2022 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- // Manage the loading of both native and portable plugins
- package native
- import (
- "archive/zip"
- "bytes"
- "encoding/json"
- "fmt"
- "os"
- "os/exec"
- "path"
- "path/filepath"
- "plugin"
- "regexp"
- "strings"
- "sync"
- "time"
- "unicode"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/meta"
- "github.com/lf-edge/ekuiper/internal/pkg/filex"
- "github.com/lf-edge/ekuiper/internal/pkg/httpx"
- "github.com/lf-edge/ekuiper/internal/pkg/store"
- plugin2 "github.com/lf-edge/ekuiper/internal/plugin"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/cast"
- "github.com/lf-edge/ekuiper/pkg/errorx"
- "github.com/lf-edge/ekuiper/pkg/kv"
- )
- // Manager Initialized in the binder
- var manager *Manager
- const DELETED = "$deleted"
- // Manager is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
- type Manager struct {
- sync.RWMutex
- // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
- plugins []map[string]string
- // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
- symbols map[string]string
- // loaded symbols in current runtime
- runtime map[string]*plugin.Plugin
- // dirs
- pluginDir string
- pluginConfDir string
- // the access to func symbols db
- funcSymbolsDb kv.KeyValue
- // the access to plugin install script db
- plgInstallDb kv.KeyValue
- // the access to plugin install status db
- plgStatusDb kv.KeyValue
- }
- // InitManager must only be called once
- func InitManager() (*Manager, error) {
- pluginDir, err := conf.GetPluginsLoc()
- if err != nil {
- return nil, fmt.Errorf("cannot find plugins folder: %s", err)
- }
- dataDir, err := conf.GetDataLoc()
- if err != nil {
- return nil, fmt.Errorf("cannot find data folder: %s", err)
- }
- func_db, err := store.GetKV("pluginFuncs")
- if err != nil {
- return nil, fmt.Errorf("error when opening funcSymbolsdb: %v", err)
- }
- plg_db, err := store.GetKV("nativePlugin")
- if err != nil {
- return nil, fmt.Errorf("error when opening nativePlugin: %v", err)
- }
- plg_status_db, err := store.GetKV("nativePluginStatus")
- if err != nil {
- return nil, fmt.Errorf("error when opening nativePluginStatus: %v", err)
- }
- registry := &Manager{symbols: make(map[string]string), funcSymbolsDb: func_db, plgInstallDb: plg_db, plgStatusDb: plg_status_db, pluginDir: pluginDir, pluginConfDir: dataDir, runtime: make(map[string]*plugin.Plugin)}
- manager = registry
- plugins := make([]map[string]string, 3)
- for i := range plugins {
- names, err := findAll(plugin2.PluginType(i), pluginDir)
- if err != nil {
- return nil, fmt.Errorf("fail to find existing plugins: %s", err)
- }
- plugins[i] = names
- }
- registry.plugins = plugins
- for pf := range plugins[plugin2.FUNCTION] {
- l := make([]string, 0)
- if ok, err := func_db.Get(pf, &l); ok {
- registry.storeSymbols(pf, l)
- } else if err != nil {
- return nil, fmt.Errorf("error when querying kv: %s", err)
- } else {
- registry.storeSymbols(pf, []string{pf})
- }
- }
- if manager.hasInstallFlag() {
- manager.pluginInstallWhenReboot()
- manager.clearInstallFlag()
- }
- return registry, nil
- }
- func findAll(t plugin2.PluginType, pluginDir string) (result map[string]string, err error) {
- result = make(map[string]string)
- dir := path.Join(pluginDir, plugin2.PluginTypes[t])
- files, err := os.ReadDir(dir)
- if err != nil {
- return
- }
- for _, file := range files {
- baseName := filepath.Base(file.Name())
- if strings.HasSuffix(baseName, ".so") {
- n, v := parseName(baseName)
- // load the plugins when ekuiper set up
- if !conf.IsTesting {
- if _, err := manager.loadRuntime(t, n, path.Join(dir, baseName), ""); err != nil {
- continue
- }
- }
- result[n] = v
- }
- }
- return
- }
- func GetManager() *Manager {
- return manager
- }
- func (rr *Manager) get(t plugin2.PluginType, name string) (string, bool) {
- rr.RLock()
- result := rr.plugins[t]
- rr.RUnlock()
- r, ok := result[name]
- return r, ok
- }
- func (rr *Manager) store(t plugin2.PluginType, name string, version string) {
- rr.Lock()
- rr.plugins[t][name] = version
- rr.Unlock()
- }
- func (rr *Manager) storeSymbols(name string, symbols []string) error {
- rr.Lock()
- defer rr.Unlock()
- for _, s := range symbols {
- if _, ok := rr.symbols[s]; ok {
- return fmt.Errorf("function name %s already exists", s)
- } else {
- rr.symbols[s] = name
- }
- }
- return nil
- }
- func (rr *Manager) removeSymbols(symbols []string) {
- rr.Lock()
- for _, s := range symbols {
- delete(rr.symbols, s)
- }
- rr.Unlock()
- }
- // API for management
- func (rr *Manager) List(t plugin2.PluginType) []string {
- rr.RLock()
- result := rr.plugins[t]
- rr.RUnlock()
- keys := make([]string, 0, len(result))
- for k := range result {
- keys = append(keys, k)
- }
- return keys
- }
- func (rr *Manager) ListSymbols() []string {
- rr.RLock()
- result := rr.symbols
- rr.RUnlock()
- keys := make([]string, 0, len(result))
- for k := range result {
- keys = append(keys, k)
- }
- return keys
- }
- func (rr *Manager) GetPluginVersionBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
- switch t {
- case plugin2.FUNCTION:
- rr.RLock()
- result := rr.plugins[t]
- name, ok := rr.symbols[symbolName]
- rr.RUnlock()
- if ok {
- r, nok := result[name]
- return r, nok
- } else {
- return "", false
- }
- default:
- return rr.get(t, symbolName)
- }
- }
- func (rr *Manager) GetPluginBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
- switch t {
- case plugin2.FUNCTION:
- rr.RLock()
- defer rr.RUnlock()
- name, ok := rr.symbols[symbolName]
- return name, ok
- default:
- return symbolName, true
- }
- }
- func (rr *Manager) storePluginInstallScript(name string, t plugin2.PluginType, j plugin2.Plugin) {
- key := plugin2.PluginTypes[t] + "_" + name
- val := string(j.GetInstallScripts())
- _ = rr.plgInstallDb.Set(key, val)
- }
- func (rr *Manager) removePluginInstallScript(name string, t plugin2.PluginType) {
- key := plugin2.PluginTypes[t] + "_" + name
- _ = rr.plgInstallDb.Delete(key)
- }
- func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
- name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
- // Validation
- name = strings.Trim(name, " ")
- if name == "" {
- return fmt.Errorf("invalid name %s: should not be empty", name)
- }
- if !httpx.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
- return fmt.Errorf("invalid uri %s", uri)
- }
- if v, ok := rr.get(t, name); ok {
- if v == DELETED {
- return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
- } else {
- return fmt.Errorf("invalid name %s: duplicate", name)
- }
- }
- var err error
- zipPath := path.Join(rr.pluginDir, name+".zip")
- // clean up: delete zip file and unzip files in error
- defer os.Remove(zipPath)
- // download
- err = httpx.DownloadFile(zipPath, uri)
- if err != nil {
- return fmt.Errorf("fail to download file %s: %s", uri, err)
- }
- if t == plugin2.FUNCTION {
- if len(j.GetSymbols()) > 0 {
- err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
- if err != nil {
- return err
- }
- err = rr.storeSymbols(name, j.GetSymbols())
- } else {
- err = rr.storeSymbols(name, []string{name})
- }
- }
- if err != nil {
- return err
- }
- // unzip and copy to destination
- version, err := rr.install(t, name, zipPath, shellParas)
- if err == nil && len(j.GetSymbols()) > 0 {
- err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
- }
- if err != nil { // Revert for any errors
- if len(j.GetSymbols()) > 0 {
- rr.removeSymbols(j.GetSymbols())
- } else {
- rr.removeSymbols([]string{name})
- }
- return fmt.Errorf("fail to install plugin: %s", err)
- }
- rr.store(t, name, version)
- rr.storePluginInstallScript(name, t, j)
- switch t {
- case plugin2.SINK:
- if err := meta.ReadSinkMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), true); nil != err {
- conf.Log.Errorf("readSinkFile:%v", err)
- }
- case plugin2.SOURCE:
- isScan := true
- isLookup := true
- _, err := rr.Source(name)
- if err != nil {
- isScan = false
- }
- _, err = rr.LookupSource(name)
- if err != nil {
- isLookup = false
- }
- if err := meta.ReadSourceMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), isScan, isLookup); nil != err {
- conf.Log.Errorf("readSourceFile:%v", err)
- }
- }
- return nil
- }
- // RegisterFuncs prerequisite:function plugin of name exists
- func (rr *Manager) RegisterFuncs(name string, functions []string) error {
- if len(functions) == 0 {
- return fmt.Errorf("property 'functions' must not be empty")
- }
- old := make([]string, 0)
- if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
- return err
- } else if ok {
- rr.removeSymbols(old)
- } else if !ok {
- rr.removeSymbols([]string{name})
- }
- err := rr.funcSymbolsDb.Set(name, functions)
- if err != nil {
- return err
- }
- return rr.storeSymbols(name, functions)
- }
- func (rr *Manager) Delete(t plugin2.PluginType, name string, stop bool) error {
- name = strings.Trim(name, " ")
- if name == "" {
- return fmt.Errorf("invalid name %s: should not be empty", name)
- }
- soPath, err := rr.getSoFilePath(t, name, true)
- if err != nil {
- return err
- }
- var results []string
- paths := []string{
- soPath,
- }
- // Find etc folder
- etcPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name)
- if fi, err := os.Stat(etcPath); err == nil {
- if fi.Mode().IsDir() {
- paths = append(paths, etcPath)
- }
- }
- switch t {
- case plugin2.SOURCE:
- yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".yaml")
- _ = os.Remove(yamlPaths)
- srcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".json")
- _ = os.Remove(srcJsonPath)
- meta.UninstallSource(name)
- case plugin2.SINK:
- yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".yaml")
- _ = os.Remove(yamlPaths)
- sinkJsonPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".json")
- _ = os.Remove(sinkJsonPaths)
- meta.UninstallSink(name)
- case plugin2.FUNCTION:
- funcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.FUNCTION], name+".json")
- _ = os.Remove(funcJsonPath)
- old := make([]string, 0)
- if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
- return err
- } else if ok {
- rr.removeSymbols(old)
- err := rr.funcSymbolsDb.Delete(name)
- if err != nil {
- return err
- }
- } else if !ok {
- rr.removeSymbols([]string{name})
- }
- }
- for _, p := range paths {
- _, err := os.Stat(p)
- if err == nil {
- err = os.RemoveAll(p)
- if err != nil {
- results = append(results, err.Error())
- }
- } else {
- results = append(results, fmt.Sprintf("can't find %s", p))
- }
- }
- rr.removePluginInstallScript(name, t)
- if len(results) > 0 {
- return fmt.Errorf(strings.Join(results, "\n"))
- } else {
- rr.store(t, name, DELETED)
- if stop {
- go func() {
- time.Sleep(1 * time.Second)
- os.Exit(100)
- }()
- }
- return nil
- }
- }
- func (rr *Manager) GetPluginInfo(t plugin2.PluginType, name string) (map[string]interface{}, bool) {
- v, ok := rr.get(t, name)
- if strings.HasPrefix(v, "v") {
- v = v[1:]
- }
- if ok {
- r := map[string]interface{}{
- "name": name,
- "version": v,
- }
- if t == plugin2.FUNCTION {
- l := make([]string, 0)
- if ok, _ := rr.funcSymbolsDb.Get(name, &l); ok {
- r["functions"] = l
- }
- // ignore the error
- }
- return r, ok
- }
- return nil, false
- }
- func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []string) (string, error) {
- var filenames []string
- tempPath := path.Join(rr.pluginDir, "temp", plugin2.PluginTypes[t], name)
- defer os.RemoveAll(tempPath)
- r, err := zip.OpenReader(src)
- if err != nil {
- return "", err
- }
- defer r.Close()
- haveInstallFile := false
- for _, file := range r.File {
- fileName := file.Name
- if fileName == "install.sh" {
- haveInstallFile = true
- }
- }
- if len(shellParas) != 0 && !haveInstallFile {
- return "", fmt.Errorf("have shell parameters : %s but no install.sh file", shellParas)
- }
- soPrefix := regexp.MustCompile(fmt.Sprintf(`^((%s)|(%s))(@.*)?\.so$`, name, ucFirst(name)))
- var soPath string
- var yamlFile, yamlPath, version, soName string
- expFiles := 1
- if t == plugin2.SOURCE {
- yamlFile = name + ".yaml"
- yamlPath = path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], yamlFile)
- expFiles = 2
- }
- var revokeFiles []string
- defer func() {
- if err != nil {
- for _, f := range revokeFiles {
- os.RemoveAll(f)
- }
- }
- }()
- for _, file := range r.File {
- fileName := file.Name
- if yamlFile == fileName {
- err = filex.UnzipTo(file, yamlPath)
- if err != nil {
- return version, err
- }
- revokeFiles = append(revokeFiles, yamlPath)
- filenames = append(filenames, yamlPath)
- } else if fileName == name+".json" {
- jsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], fileName)
- if err := filex.UnzipTo(file, jsonPath); nil != err {
- conf.Log.Errorf("Failed to decompress the metadata %s file", fileName)
- } else {
- revokeFiles = append(revokeFiles, jsonPath)
- }
- } else if soPrefix.Match([]byte(fileName)) {
- soPath = path.Join(rr.pluginDir, plugin2.PluginTypes[t], fileName)
- err = filex.UnzipTo(file, soPath)
- if err != nil {
- return version, err
- }
- filenames = append(filenames, soPath)
- revokeFiles = append(revokeFiles, soPath)
- soName, version = parseName(fileName)
- } else if strings.HasPrefix(fileName, "etc/") {
- err = filex.UnzipTo(file, path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
- if err != nil {
- return version, err
- }
- } else { // unzip other files
- err = filex.UnzipTo(file, path.Join(tempPath, fileName))
- if err != nil {
- return version, err
- }
- }
- }
- if len(filenames) != expFiles {
- err = fmt.Errorf("invalid zip file: so file or conf file is missing")
- return version, err
- } else if haveInstallFile {
- // run install script if there is
- shell := make([]string, len(shellParas))
- copy(shell, shellParas)
- spath := path.Join(tempPath, "install.sh")
- shell = append(shell, spath)
- if 1 != len(shell) {
- copy(shell[1:], shell[0:])
- shell[0] = spath
- }
- conf.Log.Infof("run install script %s", strings.Join(shell, " "))
- cmd := exec.Command("/bin/sh", shell...)
- var outb, errb bytes.Buffer
- cmd.Stdout = &outb
- cmd.Stderr = &errb
- err := cmd.Run()
- if err != nil {
- conf.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
- return version, err
- }
- conf.Log.Infof(`run install script:%s`, outb.String())
- }
- if !conf.IsTesting {
- // load the runtime first
- _, err = manager.loadRuntime(t, soName, soPath, "")
- if err != nil {
- return version, err
- }
- }
- conf.Log.Infof("install %s plugin %s", plugin2.PluginTypes[t], name)
- return version, nil
- }
- // binder factory implementations
- func (rr *Manager) Source(name string) (api.Source, error) {
- nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", "")
- if err != nil {
- return nil, err
- }
- if nf == nil {
- return nil, nil
- }
- switch t := nf.(type) {
- case api.Source:
- return t, nil
- case func() api.Source:
- return t(), nil
- default:
- return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
- }
- }
- func (rr *Manager) SourcePluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
- _, ok := rr.GetPluginVersionBySymbol(plugin2.SOURCE, name)
- if ok {
- pluginName, _ := rr.GetPluginBySymbol(plugin2.SOURCE, name)
- installScript := ""
- pluginKey := plugin2.PluginTypes[plugin2.SOURCE] + "_" + pluginName
- rr.plgInstallDb.Get(pluginKey, &installScript)
- return plugin2.NATIVE_EXTENSION, pluginKey, installScript
- } else {
- return plugin2.NONE_EXTENSION, "", ""
- }
- }
- func (rr *Manager) LookupSource(name string) (api.LookupSource, error) {
- nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", ucFirst(name)+"Lookup")
- if err != nil {
- return nil, err
- }
- if nf == nil {
- return nil, nil
- }
- switch t := nf.(type) {
- case api.LookupSource:
- return t, nil
- case func() api.LookupSource:
- return t(), nil
- default:
- return nil, fmt.Errorf("exported symbol %s is not type of api.LookupSource or function that return api.LookupSource", t)
- }
- }
- func (rr *Manager) Sink(name string) (api.Sink, error) {
- nf, err := rr.loadRuntime(plugin2.SINK, name, "", "")
- if err != nil {
- return nil, err
- }
- if nf == nil {
- return nil, nil
- }
- var s api.Sink
- switch t := nf.(type) {
- case api.Sink:
- s = t
- case func() api.Sink:
- s = t()
- default:
- return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
- }
- return s, nil
- }
- func (rr *Manager) SinkPluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
- _, ok := rr.GetPluginVersionBySymbol(plugin2.SINK, name)
- if ok {
- pluginName, _ := rr.GetPluginBySymbol(plugin2.SINK, name)
- installScript := ""
- pluginKey := plugin2.PluginTypes[plugin2.SINK] + "_" + pluginName
- rr.plgInstallDb.Get(pluginKey, &installScript)
- return plugin2.NATIVE_EXTENSION, pluginKey, installScript
- } else {
- return plugin2.NONE_EXTENSION, "", ""
- }
- }
- func (rr *Manager) Function(name string) (api.Function, error) {
- nf, err := rr.loadRuntime(plugin2.FUNCTION, name, "", "")
- if err != nil {
- return nil, err
- }
- if nf == nil {
- return nil, nil
- }
- var s api.Function
- switch t := nf.(type) {
- case api.Function:
- s = t
- case func() api.Function:
- s = t()
- default:
- return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
- }
- return s, nil
- }
- func (rr *Manager) HasFunctionSet(name string) bool {
- _, ok := rr.get(plugin2.FUNCTION, name)
- return ok
- }
- func (rr *Manager) FunctionPluginInfo(funcName string) (plugin2.EXTENSION_TYPE, string, string) {
- pluginName, ok := rr.GetPluginBySymbol(plugin2.FUNCTION, funcName)
- if ok {
- installScript := ""
- pluginKey := plugin2.PluginTypes[plugin2.FUNCTION] + "_" + pluginName
- rr.plgInstallDb.Get(pluginKey, &installScript)
- return plugin2.NATIVE_EXTENSION, pluginKey, installScript
- } else {
- return plugin2.NONE_EXTENSION, "", ""
- }
- }
- func (rr *Manager) ConvName(name string) (string, bool) {
- _, ok := rr.GetPluginBySymbol(plugin2.FUNCTION, name)
- if ok {
- return name, true
- }
- return name, false
- }
- // If not found, return nil,nil; Other errors return nil, err
- func (rr *Manager) loadRuntime(t plugin2.PluginType, soName, soFilepath, symbolName string) (plugin.Symbol, error) {
- ptype := plugin2.PluginTypes[t]
- key := ptype + "/" + soName
- var (
- plug *plugin.Plugin
- ok bool
- err error
- )
- rr.RLock()
- plug, ok = rr.runtime[key]
- rr.RUnlock()
- if !ok {
- var soPath string
- if soFilepath != "" {
- soPath = soFilepath
- } else {
- mod, err := rr.getSoFilePath(t, soName, false)
- if err != nil {
- conf.Log.Debugf(fmt.Sprintf("cannot find the native plugin %s in path: %v", soName, err))
- return nil, nil
- }
- soPath = mod
- }
- conf.Log.Debugf("Opening plugin %s", soPath)
- plug, err = plugin.Open(soPath)
- if err != nil {
- conf.Log.Errorf(fmt.Sprintf("plugin %s open error: %v", soName, err))
- return nil, fmt.Errorf("cannot open %s: %v", soPath, err)
- }
- rr.Lock()
- rr.runtime[key] = plug
- rr.Unlock()
- conf.Log.Debugf("Successfully open plugin %s", soPath)
- }
- if symbolName == "" {
- symbolName = ucFirst(soName)
- }
- conf.Log.Debugf("Loading symbol %s", symbolName)
- nf, err := plug.Lookup(symbolName)
- if err != nil {
- conf.Log.Warnf(fmt.Sprintf("cannot find symbol %s, please check if it is exported: %v", symbolName, err))
- return nil, nil
- }
- conf.Log.Debugf("Successfully look-up plugin %s", symbolName)
- return nf, nil
- }
- // Return the lowercase version of so name. It may be upper case in path.
- func (rr *Manager) getSoFilePath(t plugin2.PluginType, name string, isSoName bool) (string, error) {
- var (
- v string
- soname string
- ok bool
- )
- // We must identify plugin or symbol when deleting function plugin
- if isSoName {
- soname = name
- } else {
- soname, ok = rr.GetPluginBySymbol(t, name)
- if !ok {
- return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
- }
- }
- v, ok = rr.get(t, soname)
- if !ok {
- return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
- }
- soFile := soname + ".so"
- if v != "" {
- soFile = fmt.Sprintf("%s@%s.so", soname, v)
- }
- p := path.Join(rr.pluginDir, plugin2.PluginTypes[t], soFile)
- if _, err := os.Stat(p); err != nil {
- p = path.Join(rr.pluginDir, plugin2.PluginTypes[t], ucFirst(soFile))
- }
- if _, err := os.Stat(p); err != nil {
- return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
- }
- return p, nil
- }
- func parseName(n string) (string, string) {
- result := strings.Split(n, ".so")
- result = strings.Split(result[0], "@")
- name := lcFirst(result[0])
- if len(result) > 1 {
- return name, result[1]
- }
- return name, ""
- }
- func ucFirst(str string) string {
- for i, v := range str {
- return string(unicode.ToUpper(v)) + str[i+1:]
- }
- return ""
- }
- func lcFirst(str string) string {
- for i, v := range str {
- return string(unicode.ToLower(v)) + str[i+1:]
- }
- return ""
- }
- func (rr *Manager) UninstallAllPlugins() {
- keys, err := rr.plgInstallDb.Keys()
- if err != nil {
- return
- }
- for _, v := range keys {
- plgType := plugin2.PluginTypeMap[strings.Split(v, "_")[0]]
- plgName := strings.Split(v, "_")[1]
- _ = rr.Delete(plgType, plgName, false)
- }
- }
- func (rr *Manager) GetAllPlugins() map[string]string {
- allPlgs, err := rr.plgInstallDb.All()
- if err != nil {
- return nil
- }
- delete(allPlgs, BOOT_INSTALL)
- return allPlgs
- }
- func (rr *Manager) GetAllPluginsStatus() map[string]string {
- allPlgs, err := rr.plgStatusDb.All()
- if err != nil {
- return nil
- }
- return allPlgs
- }
- const BOOT_INSTALL = "$boot_install"
- // PluginImport save the plugin install information and wait for restart
- func (rr *Manager) PluginImport(plugins map[string]string) error {
- if len(plugins) == 0 {
- return nil
- }
- for k, v := range plugins {
- err := rr.plgInstallDb.Set(k, v)
- if err != nil {
- return err
- }
- }
- // set the flag to install the plugins when eKuiper reboot
- err := rr.plgInstallDb.Set(BOOT_INSTALL, BOOT_INSTALL)
- if err != nil {
- return err
- }
- return nil
- }
- // PluginPartialImport compare the plugin to be installed and the one in database
- // if not exist in database, install;
- // if exist, ignore
- func (rr *Manager) PluginPartialImport(plugins map[string]string) map[string]string {
- errMap := map[string]string{}
- for k, v := range plugins {
- plugInScript := ""
- found, _ := rr.plgInstallDb.Get(k, &plugInScript)
- if !found {
- err := rr.pluginRegisterForImport(k, v)
- if err != nil {
- errMap[k] = err.Error()
- }
- }
- }
- return errMap
- }
- func (rr *Manager) hasInstallFlag() bool {
- val := ""
- found, _ := rr.plgInstallDb.Get(BOOT_INSTALL, &val)
- return found
- }
- func (rr *Manager) clearInstallFlag() {
- _ = rr.plgInstallDb.Delete(BOOT_INSTALL)
- }
- func (rr *Manager) pluginRegisterForImport(key, script string) error {
- plgType := plugin2.PluginTypeMap[strings.Split(key, "_")[0]]
- sd := plugin2.NewPluginByType(plgType)
- err := json.Unmarshal(cast.StringToBytes(script), &sd)
- if err != nil {
- return err
- }
- err = rr.Register(plgType, sd)
- if err != nil {
- conf.Log.Errorf(`install native plugin %s error: %v`, key, err)
- return err
- }
- return nil
- }
- func (rr *Manager) pluginInstallWhenReboot() {
- allPlgs, err := rr.plgInstallDb.All()
- if err != nil {
- return
- }
- delete(allPlgs, BOOT_INSTALL)
- _ = rr.plgStatusDb.Clean()
- for k, v := range allPlgs {
- err := rr.pluginRegisterForImport(k, v)
- _ = rr.plgStatusDb.Set(k, err.Error())
- }
- }
|