manager.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861
  1. package plugins
  2. import (
  3. "archive/zip"
  4. "bytes"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "github.com/emqx/kuiper/common"
  9. "github.com/emqx/kuiper/common/kv"
  10. "github.com/emqx/kuiper/xstream/api"
  11. "io"
  12. "io/ioutil"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "os/exec"
  17. "path"
  18. "path/filepath"
  19. "plugin"
  20. "regexp"
  21. "strings"
  22. "sync"
  23. "time"
  24. "unicode"
  25. )
  26. type Plugin interface {
  27. GetName() string
  28. GetFile() string
  29. GetShellParas() []string
  30. GetSymbols() []string
  31. SetName(n string)
  32. }
  33. type IOPlugin struct {
  34. Name string `json:"name"`
  35. File string `json:"file"`
  36. ShellParas []string `json:"shellParas"`
  37. }
  38. func (p *IOPlugin) GetName() string {
  39. return p.Name
  40. }
  41. func (p *IOPlugin) GetFile() string {
  42. return p.File
  43. }
  44. func (p *IOPlugin) GetShellParas() []string {
  45. return p.ShellParas
  46. }
  47. func (p *IOPlugin) GetSymbols() []string {
  48. return nil
  49. }
  50. func (p *IOPlugin) SetName(n string) {
  51. p.Name = n
  52. }
  53. type FuncPlugin struct {
  54. IOPlugin
  55. // Optional, if not specified, a default element with the same name of the file will be registered
  56. Functions []string `json:"functions"`
  57. }
  58. func (fp *FuncPlugin) GetSymbols() []string {
  59. return fp.Functions
  60. }
  61. type PluginType int
  62. func NewPluginByType(t PluginType) Plugin {
  63. switch t {
  64. case FUNCTION:
  65. return &FuncPlugin{}
  66. default:
  67. return &IOPlugin{}
  68. }
  69. }
  70. const (
  71. SOURCE PluginType = iota
  72. SINK
  73. FUNCTION
  74. )
  75. const DELETED = "$deleted"
  76. var (
  77. PluginTypes = []string{"sources", "sinks", "functions"}
  78. once sync.Once
  79. singleton *Manager
  80. )
  81. //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  82. type Registry struct {
  83. sync.RWMutex
  84. // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
  85. plugins []map[string]string
  86. // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
  87. symbols map[string]string
  88. }
  89. func (rr *Registry) Store(t PluginType, name string, version string) {
  90. rr.Lock()
  91. rr.plugins[t][name] = version
  92. rr.Unlock()
  93. }
  94. func (rr *Registry) StoreSymbols(name string, symbols []string) error {
  95. rr.Lock()
  96. defer rr.Unlock()
  97. for _, s := range symbols {
  98. if _, ok := rr.symbols[s]; ok {
  99. return fmt.Errorf("function name %s already exists", s)
  100. } else {
  101. rr.symbols[s] = name
  102. }
  103. }
  104. return nil
  105. }
  106. func (rr *Registry) RemoveSymbols(symbols []string) {
  107. rr.Lock()
  108. for _, s := range symbols {
  109. delete(rr.symbols, s)
  110. }
  111. rr.Unlock()
  112. }
  113. func (rr *Registry) List(t PluginType) []string {
  114. rr.RLock()
  115. result := rr.plugins[t]
  116. rr.RUnlock()
  117. keys := make([]string, 0, len(result))
  118. for k := range result {
  119. keys = append(keys, k)
  120. }
  121. return keys
  122. }
  123. func (rr *Registry) ListSymbols() []string {
  124. rr.RLock()
  125. result := rr.symbols
  126. rr.RUnlock()
  127. keys := make([]string, 0, len(result))
  128. for k := range result {
  129. keys = append(keys, k)
  130. }
  131. return keys
  132. }
  133. func (rr *Registry) Get(t PluginType, name string) (string, bool) {
  134. rr.RLock()
  135. result := rr.plugins[t]
  136. rr.RUnlock()
  137. r, ok := result[name]
  138. return r, ok
  139. }
  140. func (rr *Registry) GetPluginVersionBySymbol(t PluginType, symbolName string) (string, bool) {
  141. switch t {
  142. case FUNCTION:
  143. rr.RLock()
  144. result := rr.plugins[t]
  145. name, ok := rr.symbols[symbolName]
  146. rr.RUnlock()
  147. if ok {
  148. r, nok := result[name]
  149. return r, nok
  150. } else {
  151. return "", false
  152. }
  153. default:
  154. return rr.Get(t, symbolName)
  155. }
  156. }
  157. func (rr *Registry) GetPluginBySymbol(t PluginType, symbolName string) (string, bool) {
  158. switch t {
  159. case FUNCTION:
  160. rr.RLock()
  161. defer rr.RUnlock()
  162. name, ok := rr.symbols[symbolName]
  163. return name, ok
  164. default:
  165. return symbolName, true
  166. }
  167. }
  168. var symbolRegistry = make(map[string]plugin.Symbol)
  169. var mu sync.RWMutex
  170. func getPlugin(t string, pt PluginType) (plugin.Symbol, error) {
  171. ut := ucFirst(t)
  172. ptype := PluginTypes[pt]
  173. key := ptype + "/" + t
  174. mu.Lock()
  175. defer mu.Unlock()
  176. var nf plugin.Symbol
  177. nf, ok := symbolRegistry[key]
  178. if !ok {
  179. m, err := NewPluginManager()
  180. if err != nil {
  181. return nil, fmt.Errorf("fail to initialize the plugin manager")
  182. }
  183. mod, err := getSoFilePath(m, pt, t, false)
  184. if err != nil {
  185. return nil, fmt.Errorf("cannot get the plugin file path: %v", err)
  186. }
  187. common.Log.Debugf("Opening plugin %s", mod)
  188. plug, err := plugin.Open(mod)
  189. if err != nil {
  190. return nil, fmt.Errorf("cannot open %s: %v", mod, err)
  191. }
  192. common.Log.Debugf("Successfully open plugin %s", mod)
  193. nf, err = plug.Lookup(ut)
  194. if err != nil {
  195. return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
  196. }
  197. common.Log.Debugf("Successfully look-up plugin %s", mod)
  198. symbolRegistry[key] = nf
  199. }
  200. return nf, nil
  201. }
  202. func GetSource(t string) (api.Source, error) {
  203. nf, err := getPlugin(t, SOURCE)
  204. if err != nil {
  205. return nil, err
  206. }
  207. var s api.Source
  208. switch t := nf.(type) {
  209. case api.Source:
  210. s = t
  211. case func() api.Source:
  212. s = t()
  213. default:
  214. return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
  215. }
  216. return s, nil
  217. }
  218. func GetSink(t string) (api.Sink, error) {
  219. nf, err := getPlugin(t, SINK)
  220. if err != nil {
  221. return nil, err
  222. }
  223. var s api.Sink
  224. switch t := nf.(type) {
  225. case api.Sink:
  226. s = t
  227. case func() api.Sink:
  228. s = t()
  229. default:
  230. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
  231. }
  232. return s, nil
  233. }
  234. func GetFunction(t string) (api.Function, error) {
  235. nf, err := getPlugin(t, FUNCTION)
  236. if err != nil {
  237. return nil, err
  238. }
  239. var s api.Function
  240. switch t := nf.(type) {
  241. case api.Function:
  242. s = t
  243. case func() api.Function:
  244. s = t()
  245. default:
  246. return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
  247. }
  248. return s, nil
  249. }
  250. type Manager struct {
  251. pluginDir string
  252. etcDir string
  253. registry *Registry
  254. db kv.KeyValue
  255. }
  256. func NewPluginManager() (*Manager, error) {
  257. var outerErr error
  258. once.Do(func() {
  259. dir, err := common.GetLoc("/plugins")
  260. if err != nil {
  261. outerErr = fmt.Errorf("cannot find plugins folder: %s", err)
  262. return
  263. }
  264. etcDir, err := common.GetLoc("/etc")
  265. if err != nil {
  266. outerErr = fmt.Errorf("cannot find etc folder: %s", err)
  267. return
  268. }
  269. dbDir, err := common.GetDataLoc()
  270. if err != nil {
  271. outerErr = fmt.Errorf("cannot find db folder: %s", err)
  272. return
  273. }
  274. db := kv.GetDefaultKVStore(path.Join(dbDir, "pluginFuncs"))
  275. err = db.Open()
  276. if err != nil {
  277. outerErr = fmt.Errorf("error when opening db: %v.", err)
  278. }
  279. defer db.Close()
  280. plugins := make([]map[string]string, 3)
  281. for i := 0; i < 3; i++ {
  282. names, err := findAll(PluginType(i), dir)
  283. if err != nil {
  284. outerErr = fmt.Errorf("fail to find existing plugins: %s", err)
  285. return
  286. }
  287. plugins[i] = names
  288. }
  289. registry := &Registry{plugins: plugins, symbols: make(map[string]string)}
  290. for pf, _ := range plugins[FUNCTION] {
  291. l := make([]string, 0)
  292. if ok, err := db.Get(pf, &l); ok {
  293. registry.StoreSymbols(pf, l)
  294. } else if err != nil {
  295. outerErr = fmt.Errorf("error when querying kv: %s", err)
  296. return
  297. } else {
  298. registry.StoreSymbols(pf, []string{pf})
  299. }
  300. }
  301. singleton = &Manager{
  302. pluginDir: dir,
  303. etcDir: etcDir,
  304. registry: registry,
  305. db: db,
  306. }
  307. if err := singleton.readSourceMetaDir(); nil != err {
  308. common.Log.Errorf("readSourceMetaDir:%v", err)
  309. }
  310. if err := singleton.readSinkMetaDir(); nil != err {
  311. common.Log.Errorf("readSinkMetaDir:%v", err)
  312. }
  313. if err := singleton.readFuncMetaDir(); nil != err {
  314. common.Log.Errorf("readFuncMetaDir:%v", err)
  315. }
  316. if err := singleton.readUiMsgDir(); nil != err {
  317. common.Log.Errorf("readUiMsgDir:%v", err)
  318. }
  319. })
  320. return singleton, outerErr
  321. }
  322. func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
  323. result = make(map[string]string)
  324. dir := path.Join(pluginDir, PluginTypes[t])
  325. files, err := ioutil.ReadDir(dir)
  326. if err != nil {
  327. return
  328. }
  329. for _, file := range files {
  330. baseName := filepath.Base(file.Name())
  331. if strings.HasSuffix(baseName, ".so") {
  332. n, v := parseName(baseName)
  333. result[n] = v
  334. }
  335. }
  336. return
  337. }
  338. func (m *Manager) List(t PluginType) (result []string, err error) {
  339. return m.registry.List(t), nil
  340. }
  341. func (m *Manager) ListSymbols() (result []string, err error) {
  342. return m.registry.ListSymbols(), nil
  343. }
  344. func (m *Manager) GetSymbol(s string) (result string, ok bool) {
  345. return m.registry.GetPluginBySymbol(FUNCTION, s)
  346. }
  347. func (m *Manager) Register(t PluginType, j Plugin) error {
  348. name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
  349. //Validation
  350. name = strings.Trim(name, " ")
  351. if name == "" {
  352. return fmt.Errorf("invalid name %s: should not be empty", name)
  353. }
  354. if !isValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  355. return fmt.Errorf("invalid uri %s", uri)
  356. }
  357. if v, ok := m.registry.Get(t, name); ok {
  358. if v == DELETED {
  359. return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
  360. } else {
  361. return fmt.Errorf("invalid name %s: duplicate", name)
  362. }
  363. }
  364. var err error
  365. if t == FUNCTION {
  366. if len(j.GetSymbols()) > 0 {
  367. err = m.db.Open()
  368. if err != nil {
  369. return err
  370. }
  371. err = m.db.Set(name, j.GetSymbols())
  372. if err != nil {
  373. return err
  374. }
  375. m.db.Close()
  376. err = m.registry.StoreSymbols(name, j.GetSymbols())
  377. } else {
  378. err = m.registry.StoreSymbols(name, []string{name})
  379. }
  380. }
  381. if err != nil {
  382. return err
  383. }
  384. zipPath := path.Join(m.pluginDir, name+".zip")
  385. var unzipFiles []string
  386. //clean up: delete zip file and unzip files in error
  387. defer os.Remove(zipPath)
  388. //download
  389. err = downloadFile(zipPath, uri)
  390. if err != nil {
  391. return fmt.Errorf("fail to download file %s: %s", uri, err)
  392. }
  393. //unzip and copy to destination
  394. unzipFiles, version, err := m.install(t, name, zipPath, shellParas)
  395. if err == nil && len(j.GetSymbols()) > 0 {
  396. if err = m.db.Open(); err == nil {
  397. err = m.db.Set(name, j.GetSymbols())
  398. }
  399. }
  400. if err != nil { //Revert for any errors
  401. if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
  402. os.RemoveAll(unzipFiles[0])
  403. }
  404. if len(j.GetSymbols()) > 0 {
  405. m.db.Close()
  406. m.registry.RemoveSymbols(j.GetSymbols())
  407. } else {
  408. m.registry.RemoveSymbols([]string{name})
  409. }
  410. return fmt.Errorf("fail to install plugin: %s", err)
  411. }
  412. m.registry.Store(t, name, version)
  413. switch t {
  414. case SINK:
  415. if err := m.readSinkMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
  416. common.Log.Errorf("readSinkFile:%v", err)
  417. }
  418. case SOURCE:
  419. if err := m.readSourceMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
  420. common.Log.Errorf("readSourceFile:%v", err)
  421. }
  422. case FUNCTION:
  423. if err := m.readFuncMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
  424. common.Log.Errorf("readFuncFile:%v", err)
  425. }
  426. }
  427. return nil
  428. }
  429. // prerequisite:function plugin of name exists
  430. func (m *Manager) RegisterFuncs(name string, functions []string) error {
  431. if len(functions) == 0 {
  432. return fmt.Errorf("property 'functions' must not be empty")
  433. }
  434. err := m.db.Open()
  435. if err != nil {
  436. return err
  437. }
  438. defer m.db.Close()
  439. old := make([]string, 0)
  440. if ok, err := m.db.Get(name, &old); err != nil {
  441. return err
  442. } else if ok {
  443. m.registry.RemoveSymbols(old)
  444. } else if !ok {
  445. m.registry.RemoveSymbols([]string{name})
  446. }
  447. err = m.db.Set(name, functions)
  448. if err != nil {
  449. return err
  450. }
  451. return m.registry.StoreSymbols(name, functions)
  452. }
  453. func (m *Manager) Delete(t PluginType, name string, stop bool) error {
  454. name = strings.Trim(name, " ")
  455. if name == "" {
  456. return fmt.Errorf("invalid name %s: should not be empty", name)
  457. }
  458. soPath, err := getSoFilePath(m, t, name, true)
  459. if err != nil {
  460. return err
  461. }
  462. var results []string
  463. paths := []string{
  464. soPath,
  465. }
  466. // Find etc folder
  467. etcPath := path.Join(m.etcDir, PluginTypes[t], name)
  468. if fi, err := os.Stat(etcPath); err == nil {
  469. if fi.Mode().IsDir() {
  470. paths = append(paths, etcPath)
  471. }
  472. }
  473. switch t {
  474. case SOURCE:
  475. paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
  476. m.uninstalSource(name)
  477. case SINK:
  478. m.uninstalSink(name)
  479. case FUNCTION:
  480. old := make([]string, 0)
  481. err = m.db.Open()
  482. if err != nil {
  483. return err
  484. }
  485. if ok, err := m.db.Get(name, &old); err != nil {
  486. return err
  487. } else if ok {
  488. m.registry.RemoveSymbols(old)
  489. err := m.db.Delete(name)
  490. if err != nil {
  491. return err
  492. }
  493. } else if !ok {
  494. m.registry.RemoveSymbols([]string{name})
  495. }
  496. m.db.Close()
  497. m.uninstalFunc(name)
  498. }
  499. for _, p := range paths {
  500. _, err := os.Stat(p)
  501. if err == nil {
  502. err = os.RemoveAll(p)
  503. if err != nil {
  504. results = append(results, err.Error())
  505. }
  506. } else {
  507. results = append(results, fmt.Sprintf("can't find %s", p))
  508. }
  509. }
  510. if len(results) > 0 {
  511. return errors.New(strings.Join(results, "\n"))
  512. } else {
  513. m.registry.Store(t, name, DELETED)
  514. if stop {
  515. go func() {
  516. time.Sleep(1 * time.Second)
  517. os.Exit(100)
  518. }()
  519. }
  520. return nil
  521. }
  522. }
  523. func (m *Manager) Get(t PluginType, name string) (map[string]interface{}, bool) {
  524. v, ok := m.registry.Get(t, name)
  525. if strings.HasPrefix(v, "v") {
  526. v = v[1:]
  527. }
  528. if ok {
  529. r := map[string]interface{}{
  530. "name": name,
  531. "version": v,
  532. }
  533. if t == FUNCTION {
  534. if err := m.db.Open(); err == nil {
  535. l := make([]string, 0)
  536. if ok, _ := m.db.Get(name, &l); ok {
  537. r["functions"] = l
  538. }
  539. m.db.Close()
  540. }
  541. // ignore the error
  542. }
  543. return r, ok
  544. }
  545. return nil, false
  546. }
  547. // Return the lowercase version of so name. It may be upper case in path.
  548. func getSoFilePath(m *Manager, t PluginType, name string, isSoName bool) (string, error) {
  549. var (
  550. v string
  551. soname string
  552. ok bool
  553. )
  554. // We must identify plugin or symbol when deleting function plugin
  555. if isSoName {
  556. soname = name
  557. } else {
  558. soname, ok = m.registry.GetPluginBySymbol(t, name)
  559. if !ok {
  560. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
  561. }
  562. }
  563. v, ok = m.registry.Get(t, soname)
  564. if !ok {
  565. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
  566. }
  567. soFile := soname + ".so"
  568. if v != "" {
  569. soFile = fmt.Sprintf("%s@%s.so", soname, v)
  570. }
  571. p := path.Join(m.pluginDir, PluginTypes[t], soFile)
  572. if _, err := os.Stat(p); err != nil {
  573. p = path.Join(m.pluginDir, PluginTypes[t], ucFirst(soFile))
  574. }
  575. if _, err := os.Stat(p); err != nil {
  576. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
  577. }
  578. return p, nil
  579. }
  580. func (m *Manager) install(t PluginType, name, src string, shellParas []string) ([]string, string, error) {
  581. var filenames []string
  582. var tempPath = path.Join(m.pluginDir, "temp", PluginTypes[t], name)
  583. defer os.RemoveAll(tempPath)
  584. r, err := zip.OpenReader(src)
  585. if err != nil {
  586. return filenames, "", err
  587. }
  588. defer r.Close()
  589. soPrefix := regexp.MustCompile(fmt.Sprintf(`^((%s)|(%s))(@.*)?\.so$`, name, ucFirst(name)))
  590. var yamlFile, yamlPath, version string
  591. expFiles := 1
  592. if t == SOURCE {
  593. yamlFile = name + ".yaml"
  594. yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
  595. expFiles = 2
  596. }
  597. var revokeFiles []string
  598. needInstall := false
  599. for _, file := range r.File {
  600. fileName := file.Name
  601. if yamlFile == fileName {
  602. err = unzipTo(file, yamlPath)
  603. if err != nil {
  604. return filenames, "", err
  605. }
  606. revokeFiles = append(revokeFiles, yamlPath)
  607. filenames = append(filenames, yamlPath)
  608. } else if fileName == name+".json" {
  609. jsonPath := path.Join(m.etcDir, PluginTypes[t], fileName)
  610. if err := unzipTo(file, jsonPath); nil != err {
  611. common.Log.Errorf("Failed to decompress the metadata %s file", fileName)
  612. } else {
  613. revokeFiles = append(revokeFiles, jsonPath)
  614. }
  615. } else if soPrefix.Match([]byte(fileName)) {
  616. soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
  617. err = unzipTo(file, soPath)
  618. if err != nil {
  619. return filenames, "", err
  620. }
  621. filenames = append(filenames, soPath)
  622. revokeFiles = append(revokeFiles, soPath)
  623. _, version = parseName(fileName)
  624. } else if strings.HasPrefix(fileName, "etc/") {
  625. err = unzipTo(file, path.Join(m.etcDir, PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
  626. if err != nil {
  627. return filenames, "", err
  628. }
  629. } else { //unzip other files
  630. err = unzipTo(file, path.Join(tempPath, fileName))
  631. if err != nil {
  632. return filenames, "", err
  633. }
  634. if fileName == "install.sh" {
  635. needInstall = true
  636. }
  637. }
  638. }
  639. if len(filenames) != expFiles {
  640. return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
  641. } else if needInstall {
  642. //run install script if there is
  643. spath := path.Join(tempPath, "install.sh")
  644. shellParas = append(shellParas, spath)
  645. if 1 != len(shellParas) {
  646. copy(shellParas[1:], shellParas[0:])
  647. shellParas[0] = spath
  648. }
  649. cmd := exec.Command("/bin/sh", shellParas...)
  650. var outb, errb bytes.Buffer
  651. cmd.Stdout = &outb
  652. cmd.Stderr = &errb
  653. err := cmd.Run()
  654. if err != nil {
  655. for _, f := range revokeFiles {
  656. os.RemoveAll(f)
  657. }
  658. common.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
  659. return filenames, "", err
  660. } else {
  661. common.Log.Infof(`run install script:%s`, outb.String())
  662. common.Log.Infof("install %s plugin %s", PluginTypes[t], name)
  663. }
  664. }
  665. return filenames, version, nil
  666. }
  667. func parseName(n string) (string, string) {
  668. result := strings.Split(n, ".so")
  669. result = strings.Split(result[0], "@")
  670. name := lcFirst(result[0])
  671. if len(result) > 1 {
  672. return name, result[1]
  673. }
  674. return name, ""
  675. }
  676. func unzipTo(f *zip.File, fpath string) error {
  677. _, err := os.Stat(fpath)
  678. if err == nil || !os.IsNotExist(err) {
  679. if err = os.RemoveAll(fpath); err != nil {
  680. return fmt.Errorf("failed to delete file %s", fpath)
  681. }
  682. }
  683. if f.FileInfo().IsDir() {
  684. // Make Folder
  685. os.MkdirAll(fpath, os.ModePerm)
  686. return nil
  687. }
  688. if _, err := os.Stat(filepath.Dir(fpath)); os.IsNotExist(err) {
  689. if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
  690. return err
  691. }
  692. }
  693. outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
  694. if err != nil {
  695. return err
  696. }
  697. rc, err := f.Open()
  698. if err != nil {
  699. return err
  700. }
  701. _, err = io.Copy(outFile, rc)
  702. outFile.Close()
  703. rc.Close()
  704. return err
  705. }
  706. func isValidUrl(uri string) bool {
  707. pu, err := url.ParseRequestURI(uri)
  708. if err != nil {
  709. return false
  710. }
  711. switch pu.Scheme {
  712. case "http", "https":
  713. u, err := url.Parse(uri)
  714. if err != nil || u.Scheme == "" || u.Host == "" {
  715. return false
  716. }
  717. case "file":
  718. if pu.Host != "" || pu.Path == "" {
  719. return false
  720. }
  721. default:
  722. return false
  723. }
  724. return true
  725. }
  726. func downloadFile(filepath string, uri string) error {
  727. common.Log.Infof("Start to download file %s\n", uri)
  728. u, err := url.ParseRequestURI(uri)
  729. if err != nil {
  730. return err
  731. }
  732. var src io.Reader
  733. switch u.Scheme {
  734. case "file":
  735. // deal with windows path
  736. if strings.Index(u.Path, ":") == 2 {
  737. u.Path = u.Path[1:]
  738. }
  739. common.Log.Debugf(u.Path)
  740. sourceFileStat, err := os.Stat(u.Path)
  741. if err != nil {
  742. return err
  743. }
  744. if !sourceFileStat.Mode().IsRegular() {
  745. return fmt.Errorf("%s is not a regular file", u.Path)
  746. }
  747. srcFile, err := os.Open(u.Path)
  748. if err != nil {
  749. return err
  750. }
  751. defer srcFile.Close()
  752. src = srcFile
  753. case "http", "https":
  754. // Get the data
  755. timeout := time.Duration(5 * time.Minute)
  756. client := &http.Client{
  757. Timeout: timeout,
  758. Transport: &http.Transport{
  759. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  760. },
  761. }
  762. resp, err := client.Get(uri)
  763. if err != nil {
  764. return err
  765. }
  766. if resp.StatusCode != http.StatusOK {
  767. return fmt.Errorf("cannot download the file with status: %s", resp.Status)
  768. }
  769. defer resp.Body.Close()
  770. src = resp.Body
  771. default:
  772. return fmt.Errorf("unsupported url scheme %s", u.Scheme)
  773. }
  774. // Create the file
  775. out, err := os.Create(filepath)
  776. if err != nil {
  777. return err
  778. }
  779. defer out.Close()
  780. // Write the body to file
  781. _, err = io.Copy(out, src)
  782. return err
  783. }
  784. func ucFirst(str string) string {
  785. for i, v := range str {
  786. return string(unicode.ToUpper(v)) + str[i+1:]
  787. }
  788. return ""
  789. }
  790. func lcFirst(str string) string {
  791. for i, v := range str {
  792. return string(unicode.ToLower(v)) + str[i+1:]
  793. }
  794. return ""
  795. }