manager.go 8.2 KB


  1. package plugins
  2. import (
  3. "archive/zip"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "path"
  13. "path/filepath"
  14. "plugin"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. "unicode"
  20. )
  21. type Plugin struct {
  22. Name string `json:"name"`
  23. File string `json:"file"`
  24. }
  25. type PluginType int
  26. const (
  27. SOURCE PluginType = iota
  28. SINK
  29. FUNCTION
  30. )
  31. var (
  32. PluginTypes = []string{"sources", "sinks", "functions"}
  33. once sync.Once
  34. singleton *Manager
  35. )
  36. //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  37. type Registry struct {
  38. sync.RWMutex
  39. internal []map[string]string
  40. }
  41. func (rr *Registry) Store(t PluginType, name string, version string) {
  42. rr.Lock()
  43. rr.internal[t][name] = version
  44. rr.Unlock()
  45. }
  46. func (rr *Registry) List(t PluginType) []string {
  47. rr.RLock()
  48. result := rr.internal[t]
  49. rr.RUnlock()
  50. keys := make([]string, 0, len(result))
  51. for k := range result {
  52. keys = append(keys, k)
  53. }
  54. return keys
  55. }
  56. func (rr *Registry) Get(t PluginType, name string) (string, bool) {
  57. rr.RLock()
  58. result := rr.internal[t]
  59. rr.RUnlock()
  60. r, ok := result[name]
  61. return r, ok
  62. }
  63. //func (rr *Registry) Delete(t PluginType, value string) {
  64. // rr.Lock()
  65. // s := rr.internal[t]
  66. // for i, f := range s{
  67. // if f == value{
  68. // s[len(s)-1], s[i] = s[i], s[len(s)-1]
  69. // rr.internal[t] = s
  70. // break
  71. // }
  72. // }
  73. // rr.Unlock()
  74. //}
  75. var symbolRegistry = make(map[string]plugin.Symbol)
  76. func GetPlugin(t string, ptype string) (plugin.Symbol, error) {
  77. t = ucFirst(t)
  78. key := ptype + "/" + t
  79. var nf plugin.Symbol
  80. nf, ok := symbolRegistry[key]
  81. if !ok {
  82. loc, err := common.GetLoc("/plugins/")
  83. if err != nil {
  84. return nil, fmt.Errorf("cannot find the plugins folder")
  85. }
  86. mod := path.Join(loc, ptype, t+".so")
  87. plug, err := plugin.Open(mod)
  88. if err != nil {
  89. return nil, fmt.Errorf("cannot open %s: %v", mod, err)
  90. }
  91. nf, err = plug.Lookup(t)
  92. if err != nil {
  93. return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
  94. }
  95. symbolRegistry[key] = nf
  96. }
  97. return nf, nil
  98. }
  99. type Manager struct {
  100. pluginDir string
  101. etcDir string
  102. registry *Registry
  103. }
  104. func NewPluginManager() (*Manager, error) {
  105. var err error
  106. once.Do(func() {
  107. dir, err := common.GetLoc("/plugins")
  108. if err != nil {
  109. err = fmt.Errorf("cannot find plugins folder: %s", err)
  110. return
  111. }
  112. etcDir, err := common.GetLoc("/etc")
  113. if err != nil {
  114. err = fmt.Errorf("cannot find etc folder: %s", err)
  115. return
  116. }
  117. plugins := make([]map[string]string, 3)
  118. for i := 0; i < 3; i++ {
  119. names, err := findAll(PluginType(i), dir)
  120. if err != nil {
  121. err = fmt.Errorf("fail to find existing plugins: %s", err)
  122. return
  123. }
  124. plugins[i] = names
  125. }
  126. registry := &Registry{internal: plugins}
  127. singleton = &Manager{
  128. pluginDir: dir,
  129. etcDir: etcDir,
  130. registry: registry,
  131. }
  132. })
  133. return singleton, err
  134. }
  135. func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
  136. result = make(map[string]string)
  137. dir := path.Join(pluginDir, PluginTypes[t])
  138. files, err := ioutil.ReadDir(dir)
  139. if err != nil {
  140. return
  141. }
  142. for _, file := range files {
  143. baseName := filepath.Base(file.Name())
  144. if strings.HasSuffix(baseName, ".so") {
  145. n, v := parseName(baseName)
  146. result[n] = v
  147. }
  148. }
  149. return
  150. }
  151. func (m *Manager) List(t PluginType) (result []string, err error) {
  152. return m.registry.List(t), nil
  153. }
  154. func (m *Manager) Register(t PluginType, j *Plugin) error {
  155. name, uri := j.Name, j.File
  156. //Validation
  157. name = strings.Trim(name, " ")
  158. if name == "" {
  159. return fmt.Errorf("invalid name %s: should not be empty", name)
  160. }
  161. if !isValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  162. return fmt.Errorf("invalid uri %s", uri)
  163. }
  164. for _, n := range m.registry.List(t) {
  165. if n == name {
  166. return fmt.Errorf("invalid name %s: duplicate", name)
  167. }
  168. }
  169. zipPath := path.Join(m.pluginDir, name+".zip")
  170. var unzipFiles []string
  171. //clean up: delete zip file and unzip files in error
  172. defer os.Remove(zipPath)
  173. //download
  174. err := downloadFile(zipPath, uri)
  175. if err != nil {
  176. return fmt.Errorf("fail to download file %s: %s", uri, err)
  177. }
  178. //unzip and copy to destination
  179. unzipFiles, version, err := m.unzipAndCopy(t, name, zipPath)
  180. if err != nil {
  181. if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
  182. os.Remove(unzipFiles[0])
  183. }
  184. return fmt.Errorf("fail to unzip file %s: %s", uri, err)
  185. }
  186. m.registry.Store(t, name, version)
  187. return nil
  188. }
  189. func (m *Manager) Delete(t PluginType, name string, restart bool) error {
  190. name = strings.Trim(name, " ")
  191. if name == "" {
  192. return fmt.Errorf("invalid name %s: should not be empty", name)
  193. }
  194. v, ok := m.registry.Get(t, name)
  195. if !ok {
  196. return fmt.Errorf("invalid name %s: not exist", name)
  197. }
  198. var results []string
  199. soFile := ucFirst(name) + ".so"
  200. if v != "" {
  201. soFile = fmt.Sprintf("%s@v%s.so", ucFirst(name), v)
  202. }
  203. paths := []string{
  204. path.Join(m.pluginDir, PluginTypes[t], soFile),
  205. }
  206. if t == SOURCE {
  207. paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
  208. }
  209. for _, p := range paths {
  210. _, err := os.Stat(p)
  211. if err == nil {
  212. err = os.Remove(p)
  213. if err != nil {
  214. results = append(results, err.Error())
  215. }
  216. } else {
  217. results = append(results, fmt.Sprintf("can't find %s", p))
  218. }
  219. }
  220. if len(results) > 0 {
  221. return errors.New(strings.Join(results, "\n"))
  222. } else {
  223. if restart {
  224. go func() {
  225. time.Sleep(1 * time.Second)
  226. os.Exit(100)
  227. }()
  228. }
  229. return nil
  230. }
  231. }
  232. func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, string, error) {
  233. var filenames []string
  234. r, err := zip.OpenReader(src)
  235. if err != nil {
  236. return filenames, "", err
  237. }
  238. defer r.Close()
  239. soPrefix := regexp.MustCompile(fmt.Sprintf(`^%s(@v.*)?\.so$`, ucFirst(name)))
  240. var yamlFile, yamlPath, version string
  241. expFiles := 1
  242. if t == SOURCE {
  243. yamlFile = name + ".yaml"
  244. yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
  245. expFiles = 2
  246. }
  247. for _, file := range r.File {
  248. fileName := file.Name
  249. if yamlFile == fileName {
  250. err = unzipTo(file, yamlPath)
  251. if err != nil {
  252. return filenames, "", err
  253. }
  254. filenames = append(filenames, yamlPath)
  255. }
  256. if soPrefix.Match([]byte(fileName)) {
  257. soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
  258. err = unzipTo(file, soPath)
  259. if err != nil {
  260. return filenames, "", err
  261. }
  262. filenames = append(filenames, soPath)
  263. _, version = parseName(fileName)
  264. }
  265. }
  266. if len(filenames) != expFiles {
  267. return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
  268. }
  269. return filenames, version, nil
  270. }
  271. func parseName(n string) (string, string) {
  272. result := strings.Split(n, ".so")
  273. result = strings.Split(result[0], "@v")
  274. name := lcFirst(result[0])
  275. if len(result) > 1 {
  276. return name, result[1]
  277. }
  278. return name, ""
  279. }
  280. func unzipTo(f *zip.File, fpath string) error {
  281. _, err := os.Stat(fpath)
  282. if err == nil || !os.IsNotExist(err) {
  283. return fmt.Errorf("%s already exist", fpath)
  284. }
  285. if f.FileInfo().IsDir() {
  286. return fmt.Errorf("%s: not a file, but a directory", fpath)
  287. }
  288. if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
  289. return err
  290. }
  291. outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
  292. if err != nil {
  293. return err
  294. }
  295. rc, err := f.Open()
  296. if err != nil {
  297. return err
  298. }
  299. _, err = io.Copy(outFile, rc)
  300. outFile.Close()
  301. rc.Close()
  302. return err
  303. }
  304. func isValidUrl(uri string) bool {
  305. _, err := url.ParseRequestURI(uri)
  306. if err != nil {
  307. return false
  308. }
  309. u, err := url.Parse(uri)
  310. if err != nil || u.Scheme == "" || u.Host == "" {
  311. return false
  312. }
  313. return true
  314. }
  315. func downloadFile(filepath string, url string) error {
  316. // Get the data
  317. resp, err := http.Get(url)
  318. if err != nil {
  319. return err
  320. }
  321. if resp.StatusCode != http.StatusOK {
  322. return fmt.Errorf("cannot download the file with status: %s", resp.Status)
  323. }
  324. defer resp.Body.Close()
  325. // Create the file
  326. out, err := os.Create(filepath)
  327. if err != nil {
  328. return err
  329. }
  330. defer out.Close()
  331. // Write the body to file
  332. _, err = io.Copy(out, resp.Body)
  333. return err
  334. }
  335. func ucFirst(str string) string {
  336. for i, v := range str {
  337. return string(unicode.ToUpper(v)) + str[i+1:]
  338. }
  339. return ""
  340. }
  341. func lcFirst(str string) string {
  342. for i, v := range str {
  343. return string(unicode.ToLower(v)) + str[i+1:]
  344. }
  345. return ""
  346. }