123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- package plugins
- import (
- "archive/zip"
- "fmt"
- "github.com/emqx/kuiper/common"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "path"
- "path/filepath"
- "strings"
- "sync"
- "unicode"
- )
- type PluginType int
- const (
- SOURCE PluginType = iota
- SINK
- FUNCTION
- )
- var (
- pluginFolders = []string{"sources", "sinks", "functions"}
- once sync.Once
- singleton *Manager
- )
- type OnRegistered func()
- //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
- type Registry struct {
- sync.RWMutex
- internal [][]string
- }
- func (rr *Registry) Store(t PluginType, value string) {
- rr.Lock()
- rr.internal[t] = append(rr.internal[t], value)
- rr.Unlock()
- }
- func (rr *Registry) List(t PluginType) (values []string) {
- rr.RLock()
- result := rr.internal[t]
- rr.RUnlock()
- return result
- }
- //func (rr *Registry) Delete(t PluginType, value string) {
- // rr.Lock()
- // s := rr.internal[t]
- // for i, f := range s{
- // if f == value{
- // s[len(s)-1], s[i] = s[i], s[len(s)-1]
- // rr.internal[t] = s
- // break
- // }
- // }
- // rr.Unlock()
- //}
- type Manager struct {
- pluginDir string
- etcDir string
- registry *Registry
- }
- func NewPluginManager() (*Manager, error) {
- var err error
- once.Do(func() {
- dir, err := common.GetLoc("/plugins")
- if err != nil {
- err = fmt.Errorf("cannot find plugins folder: %s", err)
- return
- }
- etcDir, err := common.GetLoc("/etc")
- if err != nil {
- err = fmt.Errorf("cannot find etc folder: %s", err)
- return
- }
- plugins := make([][]string, 3)
- for i := 0; i < 3; i++ {
- names, err := findAll(PluginType(i), dir)
- if err != nil {
- err = fmt.Errorf("fail to find existing plugins: %s", err)
- return
- }
- plugins[i] = names
- }
- registry := &Registry{internal: plugins}
- singleton = &Manager{
- pluginDir: dir,
- etcDir: etcDir,
- registry: registry,
- }
- })
- return singleton, err
- }
- func findAll(t PluginType, pluginDir string) (result []string, err error) {
- dir := path.Join(pluginDir, pluginFolders[t])
- files, err := ioutil.ReadDir(dir)
- if err != nil {
- return
- }
- for _, file := range files {
- result = append(result, file.Name())
- }
- return
- }
- func (m *Manager) Register(t PluginType, name string, uri string, callback OnRegistered) error {
- //Validation
- name = strings.Trim(name, " ")
- if name == "" {
- return fmt.Errorf("invalid name %s: should not be empty", name)
- }
- if !isValidUrl(uri) && strings.HasSuffix(uri, ".zip") {
- return fmt.Errorf("invalid uri %s", uri)
- }
- for _, n := range m.registry.List(t) {
- if n == name {
- return fmt.Errorf("invalid name %s: duplicate", name)
- }
- }
- zipPath := path.Join(m.pluginDir, name+".zip")
- var unzipFiles []string
- //clean up: delete zip file and unzip files in error
- defer func() {
- os.Remove(zipPath)
- if len(unzipFiles) == 1 {
- os.Remove(unzipFiles[0])
- } else if len(unzipFiles) == 2 {
- m.registry.Store(t, name)
- callback()
- }
- }()
- //download
- err := downloadFile(zipPath, uri)
- if err != nil {
- return fmt.Errorf("fail to download file %s: %s", uri, err)
- }
- //unzip and copy to destination
- unzipFiles, err = m.unzipAndCopy(t, name, zipPath)
- if err != nil {
- return fmt.Errorf("fail to unzip file %s: %s", uri, err)
- }
- return nil
- }
- func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, error) {
- var filenames []string
- r, err := zip.OpenReader(src)
- if err != nil {
- return filenames, err
- }
- defer r.Close()
- soFileName := ucFirst(name) + ".so"
- confFileName := name + ".yaml"
- var soFile, confFile *zip.File
- found := 0
- for _, file := range r.File {
- fileName := file.Name
- if fileName == soFileName {
- soFile = file
- found++
- } else if fileName == confFileName {
- confFile = file
- found++
- }
- if found == 2 {
- break
- }
- }
- if found < 2 {
- return filenames, fmt.Errorf("invalid zip file: so file or conf file is missing")
- }
- soPath := path.Join(m.pluginDir, pluginFolders[t], soFileName)
- err = unzipTo(soFile, soPath)
- if err != nil {
- return filenames, err
- }
- filenames = append(filenames, soPath)
- confPath := path.Join(m.etcDir, pluginFolders[t], confFileName)
- err = unzipTo(confFile, confPath)
- if err != nil {
- return filenames, err
- }
- filenames = append(filenames, confPath)
- return filenames, nil
- }
- func unzipTo(f *zip.File, fpath string) error {
- if f.FileInfo().IsDir() {
- // Make Folder
- os.MkdirAll(fpath, os.ModePerm)
- return fmt.Errorf("%s: not a file, but a directory", fpath)
- }
- // Make File
- if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
- return err
- }
- outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_TRUNC, f.Mode())
- if err != nil {
- return err
- }
- rc, err := f.Open()
- if err != nil {
- return err
- }
- _, err = io.Copy(outFile, rc)
- // Close the file without defer to close before next iteration of loop
- outFile.Close()
- rc.Close()
- return err
- }
- func isValidUrl(uri string) bool {
- _, err := url.ParseRequestURI(uri)
- if err != nil {
- return false
- }
- u, err := url.Parse(uri)
- if err != nil || u.Scheme == "" || u.Host == "" {
- return false
- }
- return true
- }
- func downloadFile(filepath string, url string) error {
- // Get the data
- resp, err := http.Get(url)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- // Create the file
- out, err := os.Create(filepath)
- if err != nil {
- return err
- }
- defer out.Close()
- // Write the body to file
- _, err = io.Copy(out, resp.Body)
- return err
- }
- func ucFirst(str string) string {
- for i, v := range str {
- return string(unicode.ToUpper(v)) + str[i+1:]
- }
- return ""
- }
|