manager.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. package plugins
  2. import (
  3. "archive/zip"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "path"
  13. "path/filepath"
  14. "plugin"
  15. "strings"
  16. "sync"
  17. "unicode"
  18. )
  19. type Plugin struct {
  20. Name string `json:"name"`
  21. File string `json:"file"`
  22. Callback string `json:"callback"`
  23. }
  24. type PluginType int
  25. const (
  26. SOURCE PluginType = iota
  27. SINK
  28. FUNCTION
  29. )
  30. var (
  31. PluginTypes = []string{"sources", "sinks", "functions"}
  32. once sync.Once
  33. singleton *Manager
  34. )
  35. //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  36. type Registry struct {
  37. sync.RWMutex
  38. internal [][]string
  39. }
  40. func (rr *Registry) Store(t PluginType, value string) {
  41. rr.Lock()
  42. rr.internal[t] = append(rr.internal[t], value)
  43. rr.Unlock()
  44. }
  45. func (rr *Registry) List(t PluginType) (values []string) {
  46. rr.RLock()
  47. result := rr.internal[t]
  48. rr.RUnlock()
  49. return result
  50. }
  51. //func (rr *Registry) Delete(t PluginType, value string) {
  52. // rr.Lock()
  53. // s := rr.internal[t]
  54. // for i, f := range s{
  55. // if f == value{
  56. // s[len(s)-1], s[i] = s[i], s[len(s)-1]
  57. // rr.internal[t] = s
  58. // break
  59. // }
  60. // }
  61. // rr.Unlock()
  62. //}
  63. var symbolRegistry = make(map[string]plugin.Symbol)
  64. func GetPlugin(t string, ptype string) (plugin.Symbol, error) {
  65. t = ucFirst(t)
  66. key := ptype + "/" + t
  67. var nf plugin.Symbol
  68. nf, ok := symbolRegistry[key]
  69. if !ok {
  70. loc, err := common.GetLoc("/plugins/")
  71. if err != nil {
  72. return nil, fmt.Errorf("cannot find the plugins folder")
  73. }
  74. mod := path.Join(loc, ptype, t+".so")
  75. plug, err := plugin.Open(mod)
  76. if err != nil {
  77. return nil, fmt.Errorf("cannot open %s: %v", mod, err)
  78. }
  79. nf, err = plug.Lookup(t)
  80. if err != nil {
  81. return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
  82. }
  83. symbolRegistry[key] = nf
  84. }
  85. return nf, nil
  86. }
  87. type Manager struct {
  88. pluginDir string
  89. etcDir string
  90. registry *Registry
  91. }
  92. func NewPluginManager() (*Manager, error) {
  93. var err error
  94. once.Do(func() {
  95. dir, err := common.GetLoc("/plugins")
  96. if err != nil {
  97. err = fmt.Errorf("cannot find plugins folder: %s", err)
  98. return
  99. }
  100. etcDir, err := common.GetLoc("/etc")
  101. if err != nil {
  102. err = fmt.Errorf("cannot find etc folder: %s", err)
  103. return
  104. }
  105. plugins := make([][]string, 3)
  106. for i := 0; i < 3; i++ {
  107. names, err := findAll(PluginType(i), dir)
  108. if err != nil {
  109. err = fmt.Errorf("fail to find existing plugins: %s", err)
  110. return
  111. }
  112. plugins[i] = names
  113. }
  114. registry := &Registry{internal: plugins}
  115. singleton = &Manager{
  116. pluginDir: dir,
  117. etcDir: etcDir,
  118. registry: registry,
  119. }
  120. })
  121. return singleton, err
  122. }
  123. func findAll(t PluginType, pluginDir string) (result []string, err error) {
  124. dir := path.Join(pluginDir, PluginTypes[t])
  125. files, err := ioutil.ReadDir(dir)
  126. if err != nil {
  127. return
  128. }
  129. for _, file := range files {
  130. baseName := filepath.Base(file.Name())
  131. if strings.HasSuffix(baseName, ".so") {
  132. result = append(result, lcFirst(baseName[0:len(baseName)-3]))
  133. }
  134. }
  135. return
  136. }
  137. func (m *Manager) List(t PluginType) (result []string, err error) {
  138. return m.registry.List(t), nil
  139. }
  140. func (m *Manager) Register(t PluginType, j *Plugin) error {
  141. name, uri, cb := j.Name, j.File, j.Callback
  142. //Validation
  143. name = strings.Trim(name, " ")
  144. if name == "" {
  145. return fmt.Errorf("invalid name %s: should not be empty", name)
  146. }
  147. if !isValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  148. return fmt.Errorf("invalid uri %s", uri)
  149. }
  150. for _, n := range m.registry.List(t) {
  151. if n == name {
  152. return fmt.Errorf("invalid name %s: duplicate", name)
  153. }
  154. }
  155. zipPath := path.Join(m.pluginDir, name+".zip")
  156. var unzipFiles []string
  157. //clean up: delete zip file and unzip files in error
  158. defer os.Remove(zipPath)
  159. //download
  160. err := downloadFile(zipPath, uri)
  161. if err != nil {
  162. return fmt.Errorf("fail to download file %s: %s", uri, err)
  163. }
  164. //unzip and copy to destination
  165. unzipFiles, err = m.unzipAndCopy(t, name, zipPath)
  166. if err != nil {
  167. if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
  168. os.Remove(unzipFiles[0])
  169. }
  170. return fmt.Errorf("fail to unzip file %s: %s", uri, err)
  171. }
  172. m.registry.Store(t, name)
  173. return callback(cb)
  174. }
  175. func (m *Manager) Delete(t PluginType, name string, cb string) error {
  176. name = strings.Trim(name, " ")
  177. if name == "" {
  178. return fmt.Errorf("invalid name %s: should not be empty", name)
  179. }
  180. found := false
  181. for _, n := range m.registry.List(t) {
  182. if n == name {
  183. found = true
  184. }
  185. }
  186. if !found {
  187. return fmt.Errorf("invalid name %s: not exist", name)
  188. }
  189. var results []string
  190. paths := []string{
  191. path.Join(m.pluginDir, PluginTypes[t], ucFirst(name)+".so"),
  192. }
  193. if t == SOURCE {
  194. paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
  195. }
  196. for _, p := range paths {
  197. _, err := os.Stat(p)
  198. if err == nil {
  199. err = os.Remove(p)
  200. if err != nil {
  201. results = append(results, err.Error())
  202. }
  203. } else {
  204. results = append(results, fmt.Sprintf("can't find %s", p))
  205. }
  206. }
  207. if len(results) > 0 {
  208. return errors.New(strings.Join(results, "\n"))
  209. } else {
  210. return callback(cb)
  211. }
  212. }
  213. func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, error) {
  214. var filenames []string
  215. r, err := zip.OpenReader(src)
  216. if err != nil {
  217. return filenames, err
  218. }
  219. defer r.Close()
  220. files := []string{
  221. ucFirst(name) + ".so",
  222. }
  223. paths := []string{
  224. path.Join(m.pluginDir, PluginTypes[t], files[0]),
  225. }
  226. if t == SOURCE {
  227. files = append(files, name+".yaml")
  228. paths = append(paths, path.Join(m.etcDir, PluginTypes[t], files[1]))
  229. }
  230. for i, d := range files {
  231. var z *zip.File
  232. for _, file := range r.File {
  233. fileName := file.Name
  234. if fileName == d {
  235. z = file
  236. }
  237. }
  238. if z == nil {
  239. return filenames, fmt.Errorf("invalid zip file: so file or conf file is missing")
  240. }
  241. err = unzipTo(z, paths[i])
  242. if err != nil {
  243. return filenames, err
  244. }
  245. filenames = append(filenames, paths[i])
  246. }
  247. return filenames, nil
  248. }
  249. func unzipTo(f *zip.File, fpath string) error {
  250. _, err := os.Stat(fpath)
  251. if err == nil || !os.IsNotExist(err) {
  252. return fmt.Errorf("%s already exist", fpath)
  253. }
  254. if f.FileInfo().IsDir() {
  255. return fmt.Errorf("%s: not a file, but a directory", fpath)
  256. }
  257. if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
  258. return err
  259. }
  260. outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
  261. if err != nil {
  262. return err
  263. }
  264. rc, err := f.Open()
  265. if err != nil {
  266. return err
  267. }
  268. _, err = io.Copy(outFile, rc)
  269. outFile.Close()
  270. rc.Close()
  271. return err
  272. }
  273. func isValidUrl(uri string) bool {
  274. _, err := url.ParseRequestURI(uri)
  275. if err != nil {
  276. return false
  277. }
  278. u, err := url.Parse(uri)
  279. if err != nil || u.Scheme == "" || u.Host == "" {
  280. return false
  281. }
  282. return true
  283. }
  284. func downloadFile(filepath string, url string) error {
  285. // Get the data
  286. resp, err := http.Get(url)
  287. if err != nil {
  288. return err
  289. }
  290. if resp.StatusCode != http.StatusOK {
  291. return fmt.Errorf("cannot download the file with status: %s", resp.Status)
  292. }
  293. defer resp.Body.Close()
  294. // Create the file
  295. out, err := os.Create(filepath)
  296. if err != nil {
  297. return err
  298. }
  299. defer out.Close()
  300. // Write the body to file
  301. _, err = io.Copy(out, resp.Body)
  302. return err
  303. }
  304. func ucFirst(str string) string {
  305. for i, v := range str {
  306. return string(unicode.ToUpper(v)) + str[i+1:]
  307. }
  308. return ""
  309. }
  310. func lcFirst(str string) string {
  311. for i, v := range str {
  312. return string(unicode.ToLower(v)) + str[i+1:]
  313. }
  314. return ""
  315. }
  316. func callback(u string) error {
  317. if strings.Trim(u, " ") == "" {
  318. return nil
  319. } else {
  320. resp, err := http.Get(u)
  321. if err != nil {
  322. return fmt.Errorf("action succeded but callback failed: %v", err)
  323. } else {
  324. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  325. return fmt.Errorf("action succeeded but callback failed: status %s", resp.Status)
  326. }
  327. }
  328. }
  329. return nil
  330. }