manager.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. package plugins
  2. import (
  3. "archive/zip"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "path"
  13. "path/filepath"
  14. "plugin"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. "unicode"
  20. )
  21. type Plugin struct {
  22. Name string `json:"name"`
  23. File string `json:"file"`
  24. }
  25. type PluginType int
  26. const (
  27. SOURCE PluginType = iota
  28. SINK
  29. FUNCTION
  30. )
  31. var (
  32. PluginTypes = []string{"sources", "sinks", "functions"}
  33. once sync.Once
  34. singleton *Manager
  35. )
  36. //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  37. type Registry struct {
  38. sync.RWMutex
  39. internal []map[string]string
  40. }
  41. func (rr *Registry) Store(t PluginType, name string, version string) {
  42. rr.Lock()
  43. rr.internal[t][name] = version
  44. rr.Unlock()
  45. }
  46. func (rr *Registry) List(t PluginType) []string {
  47. rr.RLock()
  48. result := rr.internal[t]
  49. rr.RUnlock()
  50. keys := make([]string, 0, len(result))
  51. for k := range result {
  52. keys = append(keys, k)
  53. }
  54. return keys
  55. }
  56. func (rr *Registry) Get(t PluginType, name string) (string, bool) {
  57. rr.RLock()
  58. result := rr.internal[t]
  59. rr.RUnlock()
  60. r, ok := result[name]
  61. return r, ok
  62. }
  63. //func (rr *Registry) Delete(t PluginType, value string) {
  64. // rr.Lock()
  65. // s := rr.internal[t]
  66. // for i, f := range s{
  67. // if f == value{
  68. // s[len(s)-1], s[i] = s[i], s[len(s)-1]
  69. // rr.internal[t] = s
  70. // break
  71. // }
  72. // }
  73. // rr.Unlock()
  74. //}
  75. var symbolRegistry = make(map[string]plugin.Symbol)
  76. func GetPlugin(t string, pt PluginType) (plugin.Symbol, error) {
  77. ut := ucFirst(t)
  78. ptype := PluginTypes[pt]
  79. key := ptype + "/" + t
  80. var nf plugin.Symbol
  81. nf, ok := symbolRegistry[key]
  82. if !ok {
  83. loc, err := common.GetLoc("/plugins/")
  84. if err != nil {
  85. return nil, fmt.Errorf("cannot find the plugins folder")
  86. }
  87. m, err := NewPluginManager()
  88. if err != nil {
  89. return nil, fmt.Errorf("fail to initialize the plugin manager")
  90. }
  91. soFile, err := getSoFileName(m, pt, t)
  92. if err != nil {
  93. return nil, fmt.Errorf("cannot get the plugin file name: %v", err)
  94. }
  95. mod := path.Join(loc, ptype, soFile)
  96. plug, err := plugin.Open(mod)
  97. if err != nil {
  98. return nil, fmt.Errorf("cannot open %s: %v", mod, err)
  99. }
  100. nf, err = plug.Lookup(ut)
  101. if err != nil {
  102. return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
  103. }
  104. symbolRegistry[key] = nf
  105. }
  106. return nf, nil
  107. }
  108. type Manager struct {
  109. pluginDir string
  110. etcDir string
  111. registry *Registry
  112. }
  113. func NewPluginManager() (*Manager, error) {
  114. var err error
  115. once.Do(func() {
  116. dir, err := common.GetLoc("/plugins")
  117. if err != nil {
  118. err = fmt.Errorf("cannot find plugins folder: %s", err)
  119. return
  120. }
  121. etcDir, err := common.GetLoc("/etc")
  122. if err != nil {
  123. err = fmt.Errorf("cannot find etc folder: %s", err)
  124. return
  125. }
  126. plugins := make([]map[string]string, 3)
  127. for i := 0; i < 3; i++ {
  128. names, err := findAll(PluginType(i), dir)
  129. if err != nil {
  130. err = fmt.Errorf("fail to find existing plugins: %s", err)
  131. return
  132. }
  133. plugins[i] = names
  134. }
  135. registry := &Registry{internal: plugins}
  136. singleton = &Manager{
  137. pluginDir: dir,
  138. etcDir: etcDir,
  139. registry: registry,
  140. }
  141. })
  142. return singleton, err
  143. }
  144. func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
  145. result = make(map[string]string)
  146. dir := path.Join(pluginDir, PluginTypes[t])
  147. files, err := ioutil.ReadDir(dir)
  148. if err != nil {
  149. return
  150. }
  151. for _, file := range files {
  152. baseName := filepath.Base(file.Name())
  153. if strings.HasSuffix(baseName, ".so") {
  154. n, v := parseName(baseName)
  155. result[n] = v
  156. }
  157. }
  158. return
  159. }
  160. func (m *Manager) List(t PluginType) (result []string, err error) {
  161. return m.registry.List(t), nil
  162. }
  163. func (m *Manager) Register(t PluginType, j *Plugin) error {
  164. name, uri := j.Name, j.File
  165. //Validation
  166. name = strings.Trim(name, " ")
  167. if name == "" {
  168. return fmt.Errorf("invalid name %s: should not be empty", name)
  169. }
  170. if !isValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  171. return fmt.Errorf("invalid uri %s", uri)
  172. }
  173. for _, n := range m.registry.List(t) {
  174. if n == name {
  175. return fmt.Errorf("invalid name %s: duplicate", name)
  176. }
  177. }
  178. zipPath := path.Join(m.pluginDir, name+".zip")
  179. var unzipFiles []string
  180. //clean up: delete zip file and unzip files in error
  181. defer os.Remove(zipPath)
  182. //download
  183. err := downloadFile(zipPath, uri)
  184. if err != nil {
  185. return fmt.Errorf("fail to download file %s: %s", uri, err)
  186. }
  187. //unzip and copy to destination
  188. unzipFiles, version, err := m.unzipAndCopy(t, name, zipPath)
  189. if err != nil {
  190. if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
  191. os.Remove(unzipFiles[0])
  192. }
  193. return fmt.Errorf("fail to unzip file %s: %s", uri, err)
  194. }
  195. m.registry.Store(t, name, version)
  196. return nil
  197. }
  198. func (m *Manager) Delete(t PluginType, name string, stop bool) error {
  199. name = strings.Trim(name, " ")
  200. if name == "" {
  201. return fmt.Errorf("invalid name %s: should not be empty", name)
  202. }
  203. soFile, err := getSoFileName(m, t, name)
  204. if err != nil {
  205. return err
  206. }
  207. var results []string
  208. paths := []string{
  209. path.Join(m.pluginDir, PluginTypes[t], soFile),
  210. }
  211. if t == SOURCE {
  212. paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
  213. }
  214. for _, p := range paths {
  215. _, err := os.Stat(p)
  216. if err == nil {
  217. err = os.Remove(p)
  218. if err != nil {
  219. results = append(results, err.Error())
  220. }
  221. } else {
  222. results = append(results, fmt.Sprintf("can't find %s", p))
  223. }
  224. }
  225. if len(results) > 0 {
  226. return errors.New(strings.Join(results, "\n"))
  227. } else {
  228. if stop {
  229. go func() {
  230. time.Sleep(1 * time.Second)
  231. os.Exit(100)
  232. }()
  233. }
  234. return nil
  235. }
  236. }
  237. func (m *Manager) Get(t PluginType, name string) (map[string]string, bool) {
  238. v, ok := m.registry.Get(t, name)
  239. if ok {
  240. m := map[string]string{
  241. "name": name,
  242. "version": v,
  243. }
  244. return m, ok
  245. }
  246. return nil, false
  247. }
  248. func getSoFileName(m *Manager, t PluginType, name string) (string, error) {
  249. v, ok := m.registry.Get(t, name)
  250. if !ok {
  251. return "", fmt.Errorf("invalid name %s: not exist", name)
  252. }
  253. soFile := ucFirst(name) + ".so"
  254. if v != "" {
  255. soFile = fmt.Sprintf("%s@v%s.so", ucFirst(name), v)
  256. }
  257. return soFile, nil
  258. }
  259. func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, string, error) {
  260. var filenames []string
  261. r, err := zip.OpenReader(src)
  262. if err != nil {
  263. return filenames, "", err
  264. }
  265. defer r.Close()
  266. soPrefix := regexp.MustCompile(fmt.Sprintf(`^%s(@v.*)?\.so$`, ucFirst(name)))
  267. var yamlFile, yamlPath, version string
  268. expFiles := 1
  269. if t == SOURCE {
  270. yamlFile = name + ".yaml"
  271. yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
  272. expFiles = 2
  273. }
  274. for _, file := range r.File {
  275. fileName := file.Name
  276. if yamlFile == fileName {
  277. err = unzipTo(file, yamlPath)
  278. if err != nil {
  279. return filenames, "", err
  280. }
  281. filenames = append(filenames, yamlPath)
  282. }
  283. if soPrefix.Match([]byte(fileName)) {
  284. soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
  285. err = unzipTo(file, soPath)
  286. if err != nil {
  287. return filenames, "", err
  288. }
  289. filenames = append(filenames, soPath)
  290. _, version = parseName(fileName)
  291. }
  292. }
  293. if len(filenames) != expFiles {
  294. return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
  295. }
  296. return filenames, version, nil
  297. }
  298. func parseName(n string) (string, string) {
  299. result := strings.Split(n, ".so")
  300. result = strings.Split(result[0], "@v")
  301. name := lcFirst(result[0])
  302. if len(result) > 1 {
  303. return name, result[1]
  304. }
  305. return name, ""
  306. }
  307. func unzipTo(f *zip.File, fpath string) error {
  308. _, err := os.Stat(fpath)
  309. if err == nil || !os.IsNotExist(err) {
  310. return fmt.Errorf("%s already exist", fpath)
  311. }
  312. if f.FileInfo().IsDir() {
  313. return fmt.Errorf("%s: not a file, but a directory", fpath)
  314. }
  315. if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
  316. return err
  317. }
  318. outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
  319. if err != nil {
  320. return err
  321. }
  322. rc, err := f.Open()
  323. if err != nil {
  324. return err
  325. }
  326. _, err = io.Copy(outFile, rc)
  327. outFile.Close()
  328. rc.Close()
  329. return err
  330. }
  331. func isValidUrl(uri string) bool {
  332. _, err := url.ParseRequestURI(uri)
  333. if err != nil {
  334. return false
  335. }
  336. u, err := url.Parse(uri)
  337. if err != nil || u.Scheme == "" || u.Host == "" {
  338. return false
  339. }
  340. return true
  341. }
  342. func downloadFile(filepath string, url string) error {
  343. // Get the data
  344. resp, err := http.Get(url)
  345. if err != nil {
  346. return err
  347. }
  348. if resp.StatusCode != http.StatusOK {
  349. return fmt.Errorf("cannot download the file with status: %s", resp.Status)
  350. }
  351. defer resp.Body.Close()
  352. // Create the file
  353. out, err := os.Create(filepath)
  354. if err != nil {
  355. return err
  356. }
  357. defer out.Close()
  358. // Write the body to file
  359. _, err = io.Copy(out, resp.Body)
  360. return err
  361. }
  362. func ucFirst(str string) string {
  363. for i, v := range str {
  364. return string(unicode.ToUpper(v)) + str[i+1:]
  365. }
  366. return ""
  367. }
  368. func lcFirst(str string) string {
  369. for i, v := range str {
  370. return string(unicode.ToLower(v)) + str[i+1:]
  371. }
  372. return ""
  373. }