manager.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. package plugins
  2. import (
  3. "archive/zip"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "path"
  13. "path/filepath"
  14. "strings"
  15. "sync"
  16. "unicode"
  17. )
  18. type PluginType int
  19. const (
  20. SOURCE PluginType = iota
  21. SINK
  22. FUNCTION
  23. )
  24. var (
  25. pluginFolders = []string{"sources", "sinks", "functions"}
  26. once sync.Once
  27. singleton *Manager
  28. )
  29. type OnRegistered func()
  30. //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  31. type Registry struct {
  32. sync.RWMutex
  33. internal [][]string
  34. }
  35. func (rr *Registry) Store(t PluginType, value string) {
  36. rr.Lock()
  37. rr.internal[t] = append(rr.internal[t], value)
  38. rr.Unlock()
  39. }
  40. func (rr *Registry) List(t PluginType) (values []string) {
  41. rr.RLock()
  42. result := rr.internal[t]
  43. rr.RUnlock()
  44. return result
  45. }
  46. //func (rr *Registry) Delete(t PluginType, value string) {
  47. // rr.Lock()
  48. // s := rr.internal[t]
  49. // for i, f := range s{
  50. // if f == value{
  51. // s[len(s)-1], s[i] = s[i], s[len(s)-1]
  52. // rr.internal[t] = s
  53. // break
  54. // }
  55. // }
  56. // rr.Unlock()
  57. //}
  58. type Manager struct {
  59. pluginDir string
  60. etcDir string
  61. registry *Registry
  62. }
  63. func NewPluginManager() (*Manager, error) {
  64. var err error
  65. once.Do(func() {
  66. dir, err := common.GetLoc("/plugins")
  67. if err != nil {
  68. err = fmt.Errorf("cannot find plugins folder: %s", err)
  69. return
  70. }
  71. etcDir, err := common.GetLoc("/etc")
  72. if err != nil {
  73. err = fmt.Errorf("cannot find etc folder: %s", err)
  74. return
  75. }
  76. plugins := make([][]string, 3)
  77. for i := 0; i < 3; i++ {
  78. names, err := findAll(PluginType(i), dir)
  79. if err != nil {
  80. err = fmt.Errorf("fail to find existing plugins: %s", err)
  81. return
  82. }
  83. plugins[i] = names
  84. }
  85. registry := &Registry{internal: plugins}
  86. singleton = &Manager{
  87. pluginDir: dir,
  88. etcDir: etcDir,
  89. registry: registry,
  90. }
  91. })
  92. return singleton, err
  93. }
  94. func findAll(t PluginType, pluginDir string) (result []string, err error) {
  95. dir := path.Join(pluginDir, pluginFolders[t])
  96. files, err := ioutil.ReadDir(dir)
  97. if err != nil {
  98. return
  99. }
  100. for _, file := range files {
  101. baseName := filepath.Base(file.Name())
  102. if strings.HasSuffix(baseName, ".so") {
  103. result = append(result, lcFirst(baseName[0:len(baseName)-3]))
  104. }
  105. }
  106. return
  107. }
  108. func (m *Manager) Register(t PluginType, name string, uri string, callback OnRegistered) error {
  109. //Validation
  110. name = strings.Trim(name, " ")
  111. if name == "" {
  112. return fmt.Errorf("invalid name %s: should not be empty", name)
  113. }
  114. if !isValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  115. return fmt.Errorf("invalid uri %s", uri)
  116. }
  117. for _, n := range m.registry.List(t) {
  118. if n == name {
  119. return fmt.Errorf("invalid name %s: duplicate", name)
  120. }
  121. }
  122. zipPath := path.Join(m.pluginDir, name+".zip")
  123. var unzipFiles []string
  124. //clean up: delete zip file and unzip files in error
  125. defer func() {
  126. os.Remove(zipPath)
  127. if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
  128. os.Remove(unzipFiles[0])
  129. } else if (t == SOURCE && len(unzipFiles) == 2) || (t != SOURCE && len(unzipFiles) == 1) { //no error
  130. m.registry.Store(t, name)
  131. callback()
  132. }
  133. }()
  134. //download
  135. err := downloadFile(zipPath, uri)
  136. if err != nil {
  137. return fmt.Errorf("fail to download file %s: %s", uri, err)
  138. }
  139. //unzip and copy to destination
  140. unzipFiles, err = m.unzipAndCopy(t, name, zipPath)
  141. if err != nil {
  142. return fmt.Errorf("fail to unzip file %s: %s", uri, err)
  143. }
  144. return nil
  145. }
  146. func (m *Manager) Delete(t PluginType, name string, callback OnRegistered) (result error) {
  147. name = strings.Trim(name, " ")
  148. if name == "" {
  149. return fmt.Errorf("invalid name %s: should not be empty", name)
  150. }
  151. found := false
  152. for _, n := range m.registry.List(t) {
  153. if n == name {
  154. found = true
  155. }
  156. }
  157. if !found {
  158. return fmt.Errorf("invalid name %s: not exist", name)
  159. }
  160. var results []string
  161. paths := []string{
  162. path.Join(m.pluginDir, pluginFolders[t], ucFirst(name)+".so"),
  163. }
  164. if t == SOURCE {
  165. paths = append(paths, path.Join(m.etcDir, pluginFolders[t], name+".yaml"))
  166. }
  167. for _, p := range paths {
  168. _, err := os.Stat(p)
  169. if err == nil {
  170. err = os.Remove(p)
  171. if err != nil {
  172. results = append(results, err.Error())
  173. }
  174. } else {
  175. results = append(results, fmt.Sprintf("can't find %s", p))
  176. }
  177. }
  178. if len(results) > 0 {
  179. return errors.New(strings.Join(results, "\n"))
  180. } else {
  181. return nil
  182. }
  183. }
  184. func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, error) {
  185. var filenames []string
  186. r, err := zip.OpenReader(src)
  187. if err != nil {
  188. return filenames, err
  189. }
  190. defer r.Close()
  191. files := []string{
  192. ucFirst(name) + ".so",
  193. }
  194. paths := []string{
  195. path.Join(m.pluginDir, pluginFolders[t], files[0]),
  196. }
  197. if t == SOURCE {
  198. files = append(files, name+".yaml")
  199. paths = append(paths, path.Join(m.etcDir, pluginFolders[t], files[1]))
  200. }
  201. for i, d := range files {
  202. var z *zip.File
  203. for _, file := range r.File {
  204. fileName := file.Name
  205. if fileName == d {
  206. z = file
  207. }
  208. }
  209. if z == nil {
  210. return filenames, fmt.Errorf("invalid zip file: so file or conf file is missing")
  211. }
  212. err = unzipTo(z, paths[i])
  213. if err != nil {
  214. return filenames, err
  215. }
  216. filenames = append(filenames, paths[i])
  217. }
  218. return filenames, nil
  219. }
  220. func unzipTo(f *zip.File, fpath string) error {
  221. _, err := os.Stat(fpath)
  222. if err == nil || !os.IsNotExist(err) {
  223. return fmt.Errorf("%s already exist", fpath)
  224. }
  225. if f.FileInfo().IsDir() {
  226. return fmt.Errorf("%s: not a file, but a directory", fpath)
  227. }
  228. if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
  229. return err
  230. }
  231. outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
  232. if err != nil {
  233. return err
  234. }
  235. rc, err := f.Open()
  236. if err != nil {
  237. return err
  238. }
  239. _, err = io.Copy(outFile, rc)
  240. outFile.Close()
  241. rc.Close()
  242. return err
  243. }
  244. func isValidUrl(uri string) bool {
  245. _, err := url.ParseRequestURI(uri)
  246. if err != nil {
  247. return false
  248. }
  249. u, err := url.Parse(uri)
  250. if err != nil || u.Scheme == "" || u.Host == "" {
  251. return false
  252. }
  253. return true
  254. }
  255. func downloadFile(filepath string, url string) error {
  256. // Get the data
  257. resp, err := http.Get(url)
  258. if err != nil {
  259. return err
  260. }
  261. defer resp.Body.Close()
  262. // Create the file
  263. out, err := os.Create(filepath)
  264. if err != nil {
  265. return err
  266. }
  267. defer out.Close()
  268. // Write the body to file
  269. _, err = io.Copy(out, resp.Body)
  270. return err
  271. }
  272. func ucFirst(str string) string {
  273. for i, v := range str {
  274. return string(unicode.ToUpper(v)) + str[i+1:]
  275. }
  276. return ""
  277. }
  278. func lcFirst(str string) string {
  279. for i, v := range str {
  280. return string(unicode.ToLower(v)) + str[i+1:]
  281. }
  282. return ""
  283. }