manager.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package plugins
  2. import (
  3. "archive/zip"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "io"
  7. "io/ioutil"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "path"
  12. "path/filepath"
  13. "strings"
  14. "sync"
  15. "unicode"
  16. )
  17. type PluginType int
  18. const (
  19. SOURCE PluginType = iota
  20. SINK
  21. FUNCTION
  22. )
  23. var (
  24. pluginFolders = []string{"sources", "sinks", "functions"}
  25. once sync.Once
  26. singleton *Manager
  27. )
  28. type OnRegistered func()
  29. //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  30. type Registry struct {
  31. sync.RWMutex
  32. internal [][]string
  33. }
  34. func (rr *Registry) Store(t PluginType, value string) {
  35. rr.Lock()
  36. rr.internal[t] = append(rr.internal[t], value)
  37. rr.Unlock()
  38. }
  39. func (rr *Registry) List(t PluginType) (values []string) {
  40. rr.RLock()
  41. result := rr.internal[t]
  42. rr.RUnlock()
  43. return result
  44. }
  45. //func (rr *Registry) Delete(t PluginType, value string) {
  46. // rr.Lock()
  47. // s := rr.internal[t]
  48. // for i, f := range s{
  49. // if f == value{
  50. // s[len(s)-1], s[i] = s[i], s[len(s)-1]
  51. // rr.internal[t] = s
  52. // break
  53. // }
  54. // }
  55. // rr.Unlock()
  56. //}
  57. type Manager struct {
  58. pluginDir string
  59. etcDir string
  60. registry *Registry
  61. }
  62. func NewPluginManager() (*Manager, error) {
  63. var err error
  64. once.Do(func() {
  65. dir, err := common.GetLoc("/plugins")
  66. if err != nil {
  67. err = fmt.Errorf("cannot find plugins folder: %s", err)
  68. return
  69. }
  70. etcDir, err := common.GetLoc("/etc")
  71. if err != nil {
  72. err = fmt.Errorf("cannot find etc folder: %s", err)
  73. return
  74. }
  75. plugins := make([][]string, 3)
  76. for i := 0; i < 3; i++ {
  77. names, err := findAll(PluginType(i), dir)
  78. if err != nil {
  79. err = fmt.Errorf("fail to find existing plugins: %s", err)
  80. return
  81. }
  82. plugins[i] = names
  83. }
  84. registry := &Registry{internal: plugins}
  85. singleton = &Manager{
  86. pluginDir: dir,
  87. etcDir: etcDir,
  88. registry: registry,
  89. }
  90. })
  91. return singleton, err
  92. }
  93. func findAll(t PluginType, pluginDir string) (result []string, err error) {
  94. dir := path.Join(pluginDir, pluginFolders[t])
  95. files, err := ioutil.ReadDir(dir)
  96. if err != nil {
  97. return
  98. }
  99. for _, file := range files {
  100. result = append(result, file.Name())
  101. }
  102. return
  103. }
  104. func (m *Manager) Register(t PluginType, name string, uri string, callback OnRegistered) error {
  105. //Validation
  106. name = strings.Trim(name, " ")
  107. if name == "" {
  108. return fmt.Errorf("invalid name %s: should not be empty", name)
  109. }
  110. if !isValidUrl(uri) && strings.HasSuffix(uri, ".zip") {
  111. return fmt.Errorf("invalid uri %s", uri)
  112. }
  113. for _, n := range m.registry.List(t) {
  114. if n == name {
  115. return fmt.Errorf("invalid name %s: duplicate", name)
  116. }
  117. }
  118. zipPath := path.Join(m.pluginDir, name+".zip")
  119. var unzipFiles []string
  120. //clean up: delete zip file and unzip files in error
  121. defer func() {
  122. os.Remove(zipPath)
  123. if len(unzipFiles) == 1 {
  124. os.Remove(unzipFiles[0])
  125. } else if len(unzipFiles) == 2 {
  126. m.registry.Store(t, name)
  127. callback()
  128. }
  129. }()
  130. //download
  131. err := downloadFile(zipPath, uri)
  132. if err != nil {
  133. return fmt.Errorf("fail to download file %s: %s", uri, err)
  134. }
  135. //unzip and copy to destination
  136. unzipFiles, err = m.unzipAndCopy(t, name, zipPath)
  137. if err != nil {
  138. return fmt.Errorf("fail to unzip file %s: %s", uri, err)
  139. }
  140. return nil
  141. }
  142. func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, error) {
  143. var filenames []string
  144. r, err := zip.OpenReader(src)
  145. if err != nil {
  146. return filenames, err
  147. }
  148. defer r.Close()
  149. soFileName := ucFirst(name) + ".so"
  150. confFileName := name + ".yaml"
  151. var soFile, confFile *zip.File
  152. found := 0
  153. for _, file := range r.File {
  154. fileName := file.Name
  155. if fileName == soFileName {
  156. soFile = file
  157. found++
  158. } else if fileName == confFileName {
  159. confFile = file
  160. found++
  161. }
  162. if found == 2 {
  163. break
  164. }
  165. }
  166. if found < 2 {
  167. return filenames, fmt.Errorf("invalid zip file: so file or conf file is missing")
  168. }
  169. soPath := path.Join(m.pluginDir, pluginFolders[t], soFileName)
  170. err = unzipTo(soFile, soPath)
  171. if err != nil {
  172. return filenames, err
  173. }
  174. filenames = append(filenames, soPath)
  175. confPath := path.Join(m.etcDir, pluginFolders[t], confFileName)
  176. err = unzipTo(confFile, confPath)
  177. if err != nil {
  178. return filenames, err
  179. }
  180. filenames = append(filenames, confPath)
  181. return filenames, nil
  182. }
  183. func unzipTo(f *zip.File, fpath string) error {
  184. if f.FileInfo().IsDir() {
  185. // Make Folder
  186. os.MkdirAll(fpath, os.ModePerm)
  187. return fmt.Errorf("%s: not a file, but a directory", fpath)
  188. }
  189. // Make File
  190. if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
  191. return err
  192. }
  193. outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_TRUNC, f.Mode())
  194. if err != nil {
  195. return err
  196. }
  197. rc, err := f.Open()
  198. if err != nil {
  199. return err
  200. }
  201. _, err = io.Copy(outFile, rc)
  202. // Close the file without defer to close before next iteration of loop
  203. outFile.Close()
  204. rc.Close()
  205. return err
  206. }
  207. func isValidUrl(uri string) bool {
  208. _, err := url.ParseRequestURI(uri)
  209. if err != nil {
  210. return false
  211. }
  212. u, err := url.Parse(uri)
  213. if err != nil || u.Scheme == "" || u.Host == "" {
  214. return false
  215. }
  216. return true
  217. }
  218. func downloadFile(filepath string, url string) error {
  219. // Get the data
  220. resp, err := http.Get(url)
  221. if err != nil {
  222. return err
  223. }
  224. defer resp.Body.Close()
  225. // Create the file
  226. out, err := os.Create(filepath)
  227. if err != nil {
  228. return err
  229. }
  230. defer out.Close()
  231. // Write the body to file
  232. _, err = io.Copy(out, resp.Body)
  233. return err
  234. }
  235. func ucFirst(str string) string {
  236. for i, v := range str {
  237. return string(unicode.ToUpper(v)) + str[i+1:]
  238. }
  239. return ""
  240. }