123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- package plugins
- import (
- "archive/zip"
- "errors"
- "fmt"
- "github.com/emqx/kuiper/common"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "path"
- "path/filepath"
- "strings"
- "sync"
- "unicode"
- )
- type Plugin struct {
- Name string `json:"name"`
- File string `json:"file"`
- Callback string `json:"callback"`
- }
- type PluginType int
- const (
- SOURCE PluginType = iota
- SINK
- FUNCTION
- )
- var (
- PluginTypes = []string{"sources", "sinks", "functions"}
- once sync.Once
- singleton *Manager
- )
- //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
- type Registry struct {
- sync.RWMutex
- internal [][]string
- }
- func (rr *Registry) Store(t PluginType, value string) {
- rr.Lock()
- rr.internal[t] = append(rr.internal[t], value)
- rr.Unlock()
- }
- func (rr *Registry) List(t PluginType) (values []string) {
- rr.RLock()
- result := rr.internal[t]
- rr.RUnlock()
- return result
- }
- //func (rr *Registry) Delete(t PluginType, value string) {
- // rr.Lock()
- // s := rr.internal[t]
- // for i, f := range s{
- // if f == value{
- // s[len(s)-1], s[i] = s[i], s[len(s)-1]
- // rr.internal[t] = s
- // break
- // }
- // }
- // rr.Unlock()
- //}
- type Manager struct {
- pluginDir string
- etcDir string
- registry *Registry
- }
- func NewPluginManager() (*Manager, error) {
- var err error
- once.Do(func() {
- dir, err := common.GetLoc("/plugins")
- if err != nil {
- err = fmt.Errorf("cannot find plugins folder: %s", err)
- return
- }
- etcDir, err := common.GetLoc("/etc")
- if err != nil {
- err = fmt.Errorf("cannot find etc folder: %s", err)
- return
- }
- plugins := make([][]string, 3)
- for i := 0; i < 3; i++ {
- names, err := findAll(PluginType(i), dir)
- if err != nil {
- err = fmt.Errorf("fail to find existing plugins: %s", err)
- return
- }
- plugins[i] = names
- }
- registry := &Registry{internal: plugins}
- singleton = &Manager{
- pluginDir: dir,
- etcDir: etcDir,
- registry: registry,
- }
- })
- return singleton, err
- }
- func findAll(t PluginType, pluginDir string) (result []string, err error) {
- dir := path.Join(pluginDir, PluginTypes[t])
- files, err := ioutil.ReadDir(dir)
- if err != nil {
- return
- }
- for _, file := range files {
- baseName := filepath.Base(file.Name())
- if strings.HasSuffix(baseName, ".so") {
- result = append(result, lcFirst(baseName[0:len(baseName)-3]))
- }
- }
- return
- }
- func (m *Manager) List(t PluginType) (result []string, err error) {
- return m.registry.List(t), nil
- }
- func (m *Manager) Register(t PluginType, j *Plugin) error {
- name, uri, cb := j.Name, j.File, j.Callback
- //Validation
- name = strings.Trim(name, " ")
- if name == "" {
- return fmt.Errorf("invalid name %s: should not be empty", name)
- }
- if !isValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
- return fmt.Errorf("invalid uri %s", uri)
- }
- for _, n := range m.registry.List(t) {
- if n == name {
- return fmt.Errorf("invalid name %s: duplicate", name)
- }
- }
- zipPath := path.Join(m.pluginDir, name+".zip")
- var unzipFiles []string
- //clean up: delete zip file and unzip files in error
- defer os.Remove(zipPath)
- //download
- err := downloadFile(zipPath, uri)
- if err != nil {
- return fmt.Errorf("fail to download file %s: %s", uri, err)
- }
- //unzip and copy to destination
- unzipFiles, err = m.unzipAndCopy(t, name, zipPath)
- if err != nil {
- if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
- os.Remove(unzipFiles[0])
- }
- return fmt.Errorf("fail to unzip file %s: %s", uri, err)
- }
- m.registry.Store(t, name)
- return callback(cb)
- }
- func (m *Manager) Delete(t PluginType, name string) (result error) {
- name = strings.Trim(name, " ")
- if name == "" {
- return fmt.Errorf("invalid name %s: should not be empty", name)
- }
- found := false
- for _, n := range m.registry.List(t) {
- if n == name {
- found = true
- }
- }
- if !found {
- return fmt.Errorf("invalid name %s: not exist", name)
- }
- var results []string
- paths := []string{
- path.Join(m.pluginDir, PluginTypes[t], ucFirst(name)+".so"),
- }
- if t == SOURCE {
- paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
- }
- for _, p := range paths {
- _, err := os.Stat(p)
- if err == nil {
- err = os.Remove(p)
- if err != nil {
- results = append(results, err.Error())
- }
- } else {
- results = append(results, fmt.Sprintf("can't find %s", p))
- }
- }
- if len(results) > 0 {
- return errors.New(strings.Join(results, "\n"))
- } else {
- return nil
- }
- }
- func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, error) {
- var filenames []string
- r, err := zip.OpenReader(src)
- if err != nil {
- return filenames, err
- }
- defer r.Close()
- files := []string{
- ucFirst(name) + ".so",
- }
- paths := []string{
- path.Join(m.pluginDir, PluginTypes[t], files[0]),
- }
- if t == SOURCE {
- files = append(files, name+".yaml")
- paths = append(paths, path.Join(m.etcDir, PluginTypes[t], files[1]))
- }
- for i, d := range files {
- var z *zip.File
- for _, file := range r.File {
- fileName := file.Name
- if fileName == d {
- z = file
- }
- }
- if z == nil {
- return filenames, fmt.Errorf("invalid zip file: so file or conf file is missing")
- }
- err = unzipTo(z, paths[i])
- if err != nil {
- return filenames, err
- }
- filenames = append(filenames, paths[i])
- }
- return filenames, nil
- }
- func unzipTo(f *zip.File, fpath string) error {
- _, err := os.Stat(fpath)
- if err == nil || !os.IsNotExist(err) {
- return fmt.Errorf("%s already exist", fpath)
- }
- if f.FileInfo().IsDir() {
- return fmt.Errorf("%s: not a file, but a directory", fpath)
- }
- if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
- return err
- }
- outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
- if err != nil {
- return err
- }
- rc, err := f.Open()
- if err != nil {
- return err
- }
- _, err = io.Copy(outFile, rc)
- outFile.Close()
- rc.Close()
- return err
- }
- func isValidUrl(uri string) bool {
- _, err := url.ParseRequestURI(uri)
- if err != nil {
- return false
- }
- u, err := url.Parse(uri)
- if err != nil || u.Scheme == "" || u.Host == "" {
- return false
- }
- return true
- }
- func downloadFile(filepath string, url string) error {
- // Get the data
- resp, err := http.Get(url)
- if err != nil {
- return err
- }
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("cannot download the file with status: %d %s", resp.StatusCode, resp.Status)
- }
- defer resp.Body.Close()
- // Create the file
- out, err := os.Create(filepath)
- if err != nil {
- return err
- }
- defer out.Close()
- // Write the body to file
- _, err = io.Copy(out, resp.Body)
- return err
- }
- func ucFirst(str string) string {
- for i, v := range str {
- return string(unicode.ToUpper(v)) + str[i+1:]
- }
- return ""
- }
- func lcFirst(str string) string {
- for i, v := range str {
- return string(unicode.ToLower(v)) + str[i+1:]
- }
- return ""
- }
- func callback(u string) error {
- return nil
- }
|