123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- package plugins
- import (
- "archive/zip"
- "errors"
- "fmt"
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/xstream/api"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "os/exec"
- "path"
- "path/filepath"
- "plugin"
- "regexp"
- "strings"
- "sync"
- "time"
- "unicode"
- )
- type Plugin struct {
- Name string `json:"name"`
- File string `json:"file"`
- }
- type PluginType int
- const (
- SOURCE PluginType = iota
- SINK
- FUNCTION
- )
- const DELETED = "$deleted"
- var (
- PluginTypes = []string{"sources", "sinks", "functions"}
- once sync.Once
- singleton *Manager
- )
- //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
- type Registry struct {
- sync.RWMutex
- internal []map[string]string
- }
- func (rr *Registry) Store(t PluginType, name string, version string) {
- rr.Lock()
- rr.internal[t][name] = version
- rr.Unlock()
- }
- func (rr *Registry) List(t PluginType) []string {
- rr.RLock()
- result := rr.internal[t]
- rr.RUnlock()
- keys := make([]string, 0, len(result))
- for k := range result {
- keys = append(keys, k)
- }
- return keys
- }
- func (rr *Registry) Get(t PluginType, name string) (string, bool) {
- rr.RLock()
- result := rr.internal[t]
- rr.RUnlock()
- r, ok := result[name]
- return r, ok
- }
- //func (rr *Registry) Delete(t PluginType, value string) {
- // rr.Lock()
- // s := rr.internal[t]
- // for i, f := range s{
- // if f == value{
- // s[len(s)-1], s[i] = s[i], s[len(s)-1]
- // rr.internal[t] = s
- // break
- // }
- // }
- // rr.Unlock()
- //}
- var symbolRegistry = make(map[string]plugin.Symbol)
- func getPlugin(t string, pt PluginType) (plugin.Symbol, error) {
- ut := ucFirst(t)
- ptype := PluginTypes[pt]
- key := ptype + "/" + t
- var nf plugin.Symbol
- nf, ok := symbolRegistry[key]
- if !ok {
- loc, err := common.GetLoc("/plugins/")
- if err != nil {
- return nil, fmt.Errorf("cannot find the plugins folder")
- }
- m, err := NewPluginManager()
- if err != nil {
- return nil, fmt.Errorf("fail to initialize the plugin manager")
- }
- soFile, err := getSoFileName(m, pt, t)
- if err != nil {
- return nil, fmt.Errorf("cannot get the plugin file name: %v", err)
- }
- mod := path.Join(loc, ptype, soFile)
- plug, err := plugin.Open(mod)
- if err != nil {
- return nil, fmt.Errorf("cannot open %s: %v", mod, err)
- }
- nf, err = plug.Lookup(ut)
- if err != nil {
- return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
- }
- symbolRegistry[key] = nf
- }
- return nf, nil
- }
- func GetSource(t string) (api.Source, error) {
- nf, err := getPlugin(t, SOURCE)
- if err != nil {
- return nil, err
- }
- var s api.Source
- switch t := nf.(type) {
- case api.Source:
- s = t
- case func() api.Source:
- s = t()
- default:
- return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
- }
- return s, nil
- }
- func GetSink(t string) (api.Sink, error) {
- nf, err := getPlugin(t, SINK)
- if err != nil {
- return nil, err
- }
- var s api.Sink
- switch t := nf.(type) {
- case api.Sink:
- s = t
- case func() api.Sink:
- s = t()
- default:
- return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
- }
- return s, nil
- }
- func GetFunction(t string) (api.Function, error) {
- nf, err := getPlugin(t, FUNCTION)
- if err != nil {
- return nil, err
- }
- var s api.Function
- switch t := nf.(type) {
- case api.Function:
- s = t
- case func() api.Function:
- s = t()
- default:
- return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
- }
- return s, nil
- }
- type Manager struct {
- pluginDir string
- etcDir string
- registry *Registry
- }
- func NewPluginManager() (*Manager, error) {
- var err error
- once.Do(func() {
- dir, err := common.GetLoc("/plugins")
- if err != nil {
- err = fmt.Errorf("cannot find plugins folder: %s", err)
- return
- }
- etcDir, err := common.GetLoc("/etc")
- if err != nil {
- err = fmt.Errorf("cannot find etc folder: %s", err)
- return
- }
- plugins := make([]map[string]string, 3)
- for i := 0; i < 3; i++ {
- names, err := findAll(PluginType(i), dir)
- if err != nil {
- err = fmt.Errorf("fail to find existing plugins: %s", err)
- return
- }
- plugins[i] = names
- }
- registry := &Registry{internal: plugins}
- singleton = &Manager{
- pluginDir: dir,
- etcDir: etcDir,
- registry: registry,
- }
- })
- return singleton, err
- }
- func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
- result = make(map[string]string)
- dir := path.Join(pluginDir, PluginTypes[t])
- files, err := ioutil.ReadDir(dir)
- if err != nil {
- return
- }
- for _, file := range files {
- baseName := filepath.Base(file.Name())
- if strings.HasSuffix(baseName, ".so") {
- n, v := parseName(baseName)
- result[n] = v
- }
- }
- return
- }
- func (m *Manager) List(t PluginType) (result []string, err error) {
- return m.registry.List(t), nil
- }
- func (m *Manager) Register(t PluginType, j *Plugin) error {
- name, uri := j.Name, j.File
- //Validation
- name = strings.Trim(name, " ")
- if name == "" {
- return fmt.Errorf("invalid name %s: should not be empty", name)
- }
- if !isValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
- return fmt.Errorf("invalid uri %s", uri)
- }
- if v, ok := m.registry.Get(t, name); ok {
- if v == DELETED {
- return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
- } else {
- return fmt.Errorf("invalid name %s: duplicate", name)
- }
- }
- zipPath := path.Join(m.pluginDir, name+".zip")
- var unzipFiles []string
- //clean up: delete zip file and unzip files in error
- defer os.Remove(zipPath)
- //download
- err := downloadFile(zipPath, uri)
- if err != nil {
- return fmt.Errorf("fail to download file %s: %s", uri, err)
- }
- //unzip and copy to destination
- unzipFiles, version, err := m.install(t, name, zipPath)
- if err != nil {
- if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
- os.Remove(unzipFiles[0])
- }
- return fmt.Errorf("fail to unzip file %s: %s", uri, err)
- }
- m.registry.Store(t, name, version)
- return nil
- }
- func (m *Manager) Delete(t PluginType, name string, stop bool) error {
- name = strings.Trim(name, " ")
- if name == "" {
- return fmt.Errorf("invalid name %s: should not be empty", name)
- }
- soFile, err := getSoFileName(m, t, name)
- if err != nil {
- return err
- }
- var results []string
- paths := []string{
- path.Join(m.pluginDir, PluginTypes[t], soFile),
- }
- if t == SOURCE {
- paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
- }
- for _, p := range paths {
- _, err := os.Stat(p)
- if err == nil {
- err = os.Remove(p)
- if err != nil {
- results = append(results, err.Error())
- }
- } else {
- results = append(results, fmt.Sprintf("can't find %s", p))
- }
- }
- if len(results) > 0 {
- return errors.New(strings.Join(results, "\n"))
- } else {
- m.registry.Store(t, name, DELETED)
- if stop {
- go func() {
- time.Sleep(1 * time.Second)
- os.Exit(100)
- }()
- }
- return nil
- }
- }
- func (m *Manager) Get(t PluginType, name string) (map[string]string, bool) {
- v, ok := m.registry.Get(t, name)
- if ok {
- m := map[string]string{
- "name": name,
- "version": v,
- }
- return m, ok
- }
- return nil, false
- }
- func getSoFileName(m *Manager, t PluginType, name string) (string, error) {
- v, ok := m.registry.Get(t, name)
- if !ok {
- return "", fmt.Errorf("invalid name %s: not exist", name)
- }
- soFile := ucFirst(name) + ".so"
- if v != "" {
- soFile = fmt.Sprintf("%s@v%s.so", ucFirst(name), v)
- }
- return soFile, nil
- }
- func (m *Manager) install(t PluginType, name string, src string) ([]string, string, error) {
- var filenames []string
- var tempPath = path.Join(m.pluginDir, "temp", PluginTypes[t], name)
- defer os.RemoveAll(tempPath)
- r, err := zip.OpenReader(src)
- if err != nil {
- return filenames, "", err
- }
- defer r.Close()
- soPrefix := regexp.MustCompile(fmt.Sprintf(`^%s(@v.*)?\.so$`, ucFirst(name)))
- var yamlFile, yamlPath, version string
- expFiles := 1
- if t == SOURCE {
- yamlFile = name + ".yaml"
- yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
- expFiles = 2
- }
- needInstall := false
- for _, file := range r.File {
- fileName := file.Name
- if yamlFile == fileName {
- err = unzipTo(file, yamlPath)
- if err != nil {
- return filenames, "", err
- }
- filenames = append(filenames, yamlPath)
- } else if soPrefix.Match([]byte(fileName)) {
- soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
- err = unzipTo(file, soPath)
- if err != nil {
- return filenames, "", err
- }
- filenames = append(filenames, soPath)
- _, version = parseName(fileName)
- } else { //unzip other files
- err = unzipTo(file, path.Join(tempPath, fileName))
- if err != nil {
- return filenames, "", err
- }
- if fileName == "install.sh" {
- needInstall = true
- }
- }
- }
- if len(filenames) != expFiles {
- return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
- } else if needInstall {
- //run install script if there is
- spath := path.Join(tempPath, "install.sh")
- out, err := exec.Command("/bin/sh", spath).Output()
- if err != nil {
- return filenames, "", err
- } else {
- common.Log.Infof("install %s plugin %s log: %s", PluginTypes[t], name, out)
- }
- }
- return filenames, version, nil
- }
- func parseName(n string) (string, string) {
- result := strings.Split(n, ".so")
- result = strings.Split(result[0], "@v")
- name := lcFirst(result[0])
- if len(result) > 1 {
- return name, result[1]
- }
- return name, ""
- }
- func unzipTo(f *zip.File, fpath string) error {
- _, err := os.Stat(fpath)
- if err == nil || !os.IsNotExist(err) {
- if err = os.Remove(fpath); err != nil {
- return fmt.Errorf("failed to delete file %s", fpath)
- }
- }
- if f.FileInfo().IsDir() {
- return fmt.Errorf("%s: not a file, but a directory", fpath)
- }
- if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
- return err
- }
- outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
- if err != nil {
- return err
- }
- rc, err := f.Open()
- if err != nil {
- return err
- }
- _, err = io.Copy(outFile, rc)
- outFile.Close()
- rc.Close()
- return err
- }
- func isValidUrl(uri string) bool {
- _, err := url.ParseRequestURI(uri)
- if err != nil {
- return false
- }
- u, err := url.Parse(uri)
- if err != nil || u.Scheme == "" || u.Host == "" {
- return false
- }
- return true
- }
- func downloadFile(filepath string, url string) error {
- // Get the data
- resp, err := http.Get(url)
- if err != nil {
- return err
- }
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("cannot download the file with status: %s", resp.Status)
- }
- defer resp.Body.Close()
- // Create the file
- out, err := os.Create(filepath)
- if err != nil {
- return err
- }
- defer out.Close()
- // Write the body to file
- _, err = io.Copy(out, resp.Body)
- return err
- }
- func ucFirst(str string) string {
- for i, v := range str {
- return string(unicode.ToUpper(v)) + str[i+1:]
- }
- return ""
- }
- func lcFirst(str string) string {
- for i, v := range str {
- return string(unicode.ToLower(v)) + str[i+1:]
- }
- return ""
- }
|