manager.go 19 KB


  1. package plugins
  2. import (
  3. "archive/zip"
  4. "bytes"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "github.com/emqx/kuiper/common"
  9. "github.com/emqx/kuiper/common/kv"
  10. "github.com/emqx/kuiper/xstream/api"
  11. "io"
  12. "io/ioutil"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "os/exec"
  17. "path"
  18. "path/filepath"
  19. "plugin"
  20. "regexp"
  21. "strings"
  22. "sync"
  23. "time"
  24. "unicode"
  25. )
  26. type Plugin interface {
  27. GetName() string
  28. GetFile() string
  29. GetShellParas() []string
  30. GetSymbols() []string
  31. SetName(n string)
  32. }
  33. type IOPlugin struct {
  34. Name string `json:"name"`
  35. File string `json:"file"`
  36. ShellParas []string `json:"shellParas"`
  37. }
  38. func (p *IOPlugin) GetName() string {
  39. return p.Name
  40. }
  41. func (p *IOPlugin) GetFile() string {
  42. return p.File
  43. }
  44. func (p *IOPlugin) GetShellParas() []string {
  45. return p.ShellParas
  46. }
  47. func (p *IOPlugin) GetSymbols() []string {
  48. return nil
  49. }
  50. func (p *IOPlugin) SetName(n string) {
  51. p.Name = n
  52. }
  53. type FuncPlugin struct {
  54. IOPlugin
  55. // Optional, if not specified, a default element with the same name of the file will be registered
  56. Functions []string `json:"functions"`
  57. }
  58. func (fp *FuncPlugin) GetSymbols() []string {
  59. return fp.Functions
  60. }
  61. type PluginType int
  62. func NewPluginByType(t PluginType) Plugin {
  63. switch t {
  64. case FUNCTION:
  65. return &FuncPlugin{}
  66. default:
  67. return &IOPlugin{}
  68. }
  69. }
  70. const (
  71. SOURCE PluginType = iota
  72. SINK
  73. FUNCTION
  74. )
  75. const DELETED = "$deleted"
  76. var (
  77. PluginTypes = []string{"sources", "sinks", "functions"}
  78. once sync.Once
  79. singleton *Manager
  80. )
  81. //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  82. type Registry struct {
  83. sync.RWMutex
  84. // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
  85. plugins []map[string]string
  86. // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
  87. symbols map[string]string
  88. }
  89. func (rr *Registry) Store(t PluginType, name string, version string) {
  90. rr.Lock()
  91. rr.plugins[t][name] = version
  92. rr.Unlock()
  93. }
  94. func (rr *Registry) StoreSymbols(name string, symbols []string) error {
  95. rr.Lock()
  96. defer rr.Unlock()
  97. for _, s := range symbols {
  98. if _, ok := rr.symbols[s]; ok {
  99. return fmt.Errorf("function name %s already exists", s)
  100. } else {
  101. rr.symbols[s] = name
  102. }
  103. }
  104. return nil
  105. }
  106. func (rr *Registry) RemoveSymbols(symbols []string) {
  107. rr.Lock()
  108. for _, s := range symbols {
  109. delete(rr.symbols, s)
  110. }
  111. rr.Unlock()
  112. }
  113. func (rr *Registry) List(t PluginType) []string {
  114. rr.RLock()
  115. result := rr.plugins[t]
  116. rr.RUnlock()
  117. keys := make([]string, 0, len(result))
  118. for k := range result {
  119. keys = append(keys, k)
  120. }
  121. return keys
  122. }
  123. func (rr *Registry) ListSymbols() []string {
  124. rr.RLock()
  125. result := rr.symbols
  126. rr.RUnlock()
  127. keys := make([]string, 0, len(result))
  128. for k := range result {
  129. keys = append(keys, k)
  130. }
  131. return keys
  132. }
  133. func (rr *Registry) Get(t PluginType, name string) (string, bool) {
  134. rr.RLock()
  135. result := rr.plugins[t]
  136. rr.RUnlock()
  137. r, ok := result[name]
  138. return r, ok
  139. }
  140. func (rr *Registry) GetPluginVersionBySymbol(t PluginType, symbolName string) (string, bool) {
  141. switch t {
  142. case FUNCTION:
  143. rr.RLock()
  144. result := rr.plugins[t]
  145. name, ok := rr.symbols[symbolName]
  146. rr.RUnlock()
  147. if ok {
  148. r, nok := result[name]
  149. return r, nok
  150. } else {
  151. return "", false
  152. }
  153. default:
  154. return rr.Get(t, symbolName)
  155. }
  156. }
  157. func (rr *Registry) GetPluginBySymbol(t PluginType, symbolName string) (string, bool) {
  158. switch t {
  159. case FUNCTION:
  160. rr.RLock()
  161. defer rr.RUnlock()
  162. name, ok := rr.symbols[symbolName]
  163. return name, ok
  164. default:
  165. return symbolName, true
  166. }
  167. }
  168. var symbolRegistry = make(map[string]plugin.Symbol)
  169. var mu sync.RWMutex
  170. func getPlugin(t string, pt PluginType) (plugin.Symbol, error) {
  171. ut := ucFirst(t)
  172. ptype := PluginTypes[pt]
  173. key := ptype + "/" + t
  174. mu.Lock()
  175. defer mu.Unlock()
  176. var nf plugin.Symbol
  177. nf, ok := symbolRegistry[key]
  178. if !ok {
  179. m, err := NewPluginManager()
  180. if err != nil {
  181. return nil, fmt.Errorf("fail to initialize the plugin manager")
  182. }
  183. mod, err := getSoFilePath(m, pt, t, false)
  184. if err != nil {
  185. return nil, fmt.Errorf("cannot get the plugin file path: %v", err)
  186. }
  187. common.Log.Debugf("Opening plugin %s", mod)
  188. plug, err := plugin.Open(mod)
  189. if err != nil {
  190. return nil, fmt.Errorf("cannot open %s: %v", mod, err)
  191. }
  192. common.Log.Debugf("Successfully open plugin %s", mod)
  193. nf, err = plug.Lookup(ut)
  194. if err != nil {
  195. return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
  196. }
  197. common.Log.Debugf("Successfully look-up plugin %s", mod)
  198. symbolRegistry[key] = nf
  199. }
  200. return nf, nil
  201. }
  202. func GetSource(t string) (api.Source, error) {
  203. nf, err := getPlugin(t, SOURCE)
  204. if err != nil {
  205. return nil, err
  206. }
  207. var s api.Source
  208. switch t := nf.(type) {
  209. case api.Source:
  210. s = t
  211. case func() api.Source:
  212. s = t()
  213. default:
  214. return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
  215. }
  216. return s, nil
  217. }
  218. func GetSink(t string) (api.Sink, error) {
  219. nf, err := getPlugin(t, SINK)
  220. if err != nil {
  221. return nil, err
  222. }
  223. var s api.Sink
  224. switch t := nf.(type) {
  225. case api.Sink:
  226. s = t
  227. case func() api.Sink:
  228. s = t()
  229. default:
  230. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
  231. }
  232. return s, nil
  233. }
  234. func GetFunction(t string) (api.Function, error) {
  235. nf, err := getPlugin(t, FUNCTION)
  236. if err != nil {
  237. return nil, err
  238. }
  239. var s api.Function
  240. switch t := nf.(type) {
  241. case api.Function:
  242. s = t
  243. case func() api.Function:
  244. s = t()
  245. default:
  246. return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
  247. }
  248. return s, nil
  249. }
  250. type Manager struct {
  251. pluginDir string
  252. etcDir string
  253. registry *Registry
  254. db kv.KeyValue
  255. }
  256. func NewPluginManager() (*Manager, error) {
  257. var outerErr error
  258. once.Do(func() {
  259. dir, err := common.GetLoc("/plugins")
  260. if err != nil {
  261. outerErr = fmt.Errorf("cannot find plugins folder: %s", err)
  262. return
  263. }
  264. etcDir, err := common.GetLoc("/etc")
  265. if err != nil {
  266. outerErr = fmt.Errorf("cannot find etc folder: %s", err)
  267. return
  268. }
  269. dbDir, err := common.GetDataLoc()
  270. if err != nil {
  271. outerErr = fmt.Errorf("cannot find db folder: %s", err)
  272. return
  273. }
  274. db := kv.GetDefaultKVStore(path.Join(dbDir, "pluginFuncs"))
  275. err = db.Open()
  276. if err != nil {
  277. outerErr = fmt.Errorf("error when opening db: %v.", err)
  278. }
  279. defer db.Close()
  280. plugins := make([]map[string]string, 3)
  281. for i := 0; i < 3; i++ {
  282. names, err := findAll(PluginType(i), dir)
  283. if err != nil {
  284. outerErr = fmt.Errorf("fail to find existing plugins: %s", err)
  285. return
  286. }
  287. plugins[i] = names
  288. }
  289. registry := &Registry{plugins: plugins, symbols: make(map[string]string)}
  290. for pf, _ := range plugins[FUNCTION] {
  291. l := make([]string, 0)
  292. if ok, err := db.Get(pf, &l); ok {
  293. registry.StoreSymbols(pf, l)
  294. } else if err != nil {
  295. outerErr = fmt.Errorf("error when querying kv: %s", err)
  296. return
  297. } else {
  298. registry.StoreSymbols(pf, []string{pf})
  299. }
  300. }
  301. singleton = &Manager{
  302. pluginDir: dir,
  303. etcDir: etcDir,
  304. registry: registry,
  305. db: db,
  306. }
  307. if err := singleton.readSourceMetaDir(); nil != err {
  308. common.Log.Errorf("readSourceMetaDir:%v", err)
  309. }
  310. if err := singleton.readSinkMetaDir(); nil != err {
  311. common.Log.Errorf("readSinkMetaDir:%v", err)
  312. }
  313. if err := singleton.readFuncMetaDir(); nil != err {
  314. common.Log.Errorf("readFuncMetaDir:%v", err)
  315. }
  316. if err := singleton.readUiMsgDir(); nil != err {
  317. common.Log.Errorf("readUiMsgDir:%v", err)
  318. }
  319. })
  320. return singleton, outerErr
  321. }
  322. func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
  323. result = make(map[string]string)
  324. dir := path.Join(pluginDir, PluginTypes[t])
  325. files, err := ioutil.ReadDir(dir)
  326. if err != nil {
  327. return
  328. }
  329. for _, file := range files {
  330. baseName := filepath.Base(file.Name())
  331. if strings.HasSuffix(baseName, ".so") {
  332. n, v := parseName(baseName)
  333. result[n] = v
  334. }
  335. }
  336. return
  337. }
  338. func (m *Manager) List(t PluginType) (result []string, err error) {
  339. return m.registry.List(t), nil
  340. }
  341. func (m *Manager) ListSymbols() (result []string, err error) {
  342. return m.registry.ListSymbols(), nil
  343. }
  344. func (m *Manager) GetSymbol(s string) (result string, ok bool) {
  345. return m.registry.GetPluginBySymbol(FUNCTION, s)
  346. }
  347. func (m *Manager) Register(t PluginType, j Plugin) error {
  348. name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
  349. //Validation
  350. name = strings.Trim(name, " ")
  351. if name == "" {
  352. return fmt.Errorf("invalid name %s: should not be empty", name)
  353. }
  354. if !isValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  355. return fmt.Errorf("invalid uri %s", uri)
  356. }
  357. if v, ok := m.registry.Get(t, name); ok {
  358. if v == DELETED {
  359. return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
  360. } else {
  361. return fmt.Errorf("invalid name %s: duplicate", name)
  362. }
  363. }
  364. var err error
  365. if t == FUNCTION {
  366. if len(j.GetSymbols()) > 0 {
  367. err = m.db.Open()
  368. if err != nil {
  369. return err
  370. }
  371. err = m.db.Set(name, j.GetSymbols())
  372. if err != nil {
  373. return err
  374. }
  375. m.db.Close()
  376. err = m.registry.StoreSymbols(name, j.GetSymbols())
  377. } else {
  378. err = m.registry.StoreSymbols(name, []string{name})
  379. }
  380. }
  381. if err != nil {
  382. return err
  383. }
  384. zipPath := path.Join(m.pluginDir, name+".zip")
  385. var unzipFiles []string
  386. //clean up: delete zip file and unzip files in error
  387. defer os.Remove(zipPath)
  388. //download
  389. err = downloadFile(zipPath, uri)
  390. if err != nil {
  391. return fmt.Errorf("fail to download file %s: %s", uri, err)
  392. }
  393. //unzip and copy to destination
  394. unzipFiles, version, err := m.install(t, name, zipPath, shellParas)
  395. if err == nil && len(j.GetSymbols()) > 0 {
  396. if err = m.db.Open(); err == nil {
  397. err = m.db.Set(name, j.GetSymbols())
  398. }
  399. }
  400. if err != nil { //Revert for any errors
  401. if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
  402. os.Remove(unzipFiles[0])
  403. }
  404. if len(j.GetSymbols()) > 0 {
  405. m.db.Close()
  406. m.registry.RemoveSymbols(j.GetSymbols())
  407. } else {
  408. m.registry.RemoveSymbols([]string{name})
  409. }
  410. return fmt.Errorf("fail to install plugin: %s", err)
  411. }
  412. m.registry.Store(t, name, version)
  413. switch t {
  414. case SINK:
  415. if err := m.readSinkMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
  416. common.Log.Errorf("readSinkFile:%v", err)
  417. }
  418. case SOURCE:
  419. if err := m.readSourceMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
  420. common.Log.Errorf("readSourceFile:%v", err)
  421. }
  422. case FUNCTION:
  423. if err := m.readFuncMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
  424. common.Log.Errorf("readFuncFile:%v", err)
  425. }
  426. }
  427. return nil
  428. }
  429. // prerequisite:function plugin of name exists
  430. func (m *Manager) RegisterFuncs(name string, functions []string) error {
  431. if len(functions) == 0 {
  432. return fmt.Errorf("property 'functions' must not be empty")
  433. }
  434. err := m.db.Open()
  435. if err != nil {
  436. return err
  437. }
  438. defer m.db.Close()
  439. old := make([]string, 0)
  440. if ok, err := m.db.Get(name, &old); err != nil {
  441. return err
  442. } else if ok {
  443. m.registry.RemoveSymbols(old)
  444. } else if !ok {
  445. m.registry.RemoveSymbols([]string{name})
  446. }
  447. err = m.db.Set(name, functions)
  448. if err != nil {
  449. return err
  450. }
  451. return m.registry.StoreSymbols(name, functions)
  452. }
  453. func (m *Manager) Delete(t PluginType, name string, stop bool) error {
  454. name = strings.Trim(name, " ")
  455. if name == "" {
  456. return fmt.Errorf("invalid name %s: should not be empty", name)
  457. }
  458. soPath, err := getSoFilePath(m, t, name, true)
  459. if err != nil {
  460. return err
  461. }
  462. var results []string
  463. paths := []string{
  464. soPath,
  465. }
  466. switch t {
  467. case SOURCE:
  468. paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
  469. m.uninstalSource(name)
  470. case SINK:
  471. m.uninstalSink(name)
  472. case FUNCTION:
  473. old := make([]string, 0)
  474. err = m.db.Open()
  475. if err != nil {
  476. return err
  477. }
  478. if ok, err := m.db.Get(name, &old); err != nil {
  479. return err
  480. } else if ok {
  481. m.registry.RemoveSymbols(old)
  482. err := m.db.Delete(name)
  483. if err != nil {
  484. return err
  485. }
  486. } else if !ok {
  487. m.registry.RemoveSymbols([]string{name})
  488. }
  489. m.db.Close()
  490. m.uninstalFunc(name)
  491. }
  492. for _, p := range paths {
  493. _, err := os.Stat(p)
  494. if err == nil {
  495. err = os.Remove(p)
  496. if err != nil {
  497. results = append(results, err.Error())
  498. }
  499. } else {
  500. results = append(results, fmt.Sprintf("can't find %s", p))
  501. }
  502. }
  503. if len(results) > 0 {
  504. return errors.New(strings.Join(results, "\n"))
  505. } else {
  506. m.registry.Store(t, name, DELETED)
  507. if stop {
  508. go func() {
  509. time.Sleep(1 * time.Second)
  510. os.Exit(100)
  511. }()
  512. }
  513. return nil
  514. }
  515. }
  516. func (m *Manager) Get(t PluginType, name string) (map[string]interface{}, bool) {
  517. v, ok := m.registry.Get(t, name)
  518. if strings.HasPrefix(v, "v") {
  519. v = v[1:]
  520. }
  521. if ok {
  522. r := map[string]interface{}{
  523. "name": name,
  524. "version": v,
  525. }
  526. if t == FUNCTION {
  527. if err := m.db.Open(); err == nil {
  528. l := make([]string, 0)
  529. if ok, _ := m.db.Get(name, &l); ok {
  530. r["functions"] = l
  531. }
  532. m.db.Close()
  533. }
  534. // ignore the error
  535. }
  536. return r, ok
  537. }
  538. return nil, false
  539. }
  540. // Return the lowercase version of so name. It may be upper case in path.
  541. func getSoFilePath(m *Manager, t PluginType, name string, isSoName bool) (string, error) {
  542. var (
  543. v string
  544. soname string
  545. ok bool
  546. )
  547. // We must identify plugin or symbol when deleting function plugin
  548. if isSoName {
  549. soname = name
  550. } else {
  551. soname, ok = m.registry.GetPluginBySymbol(t, name)
  552. if !ok {
  553. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
  554. }
  555. }
  556. v, ok = m.registry.Get(t, soname)
  557. if !ok {
  558. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
  559. }
  560. soFile := soname + ".so"
  561. if v != "" {
  562. soFile = fmt.Sprintf("%s@%s.so", soname, v)
  563. }
  564. p := path.Join(m.pluginDir, PluginTypes[t], soFile)
  565. if _, err := os.Stat(p); err != nil {
  566. p = path.Join(m.pluginDir, PluginTypes[t], ucFirst(soFile))
  567. }
  568. if _, err := os.Stat(p); err != nil {
  569. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
  570. }
  571. return p, nil
  572. }
  573. func (m *Manager) install(t PluginType, name, src string, shellParas []string) ([]string, string, error) {
  574. var filenames []string
  575. var tempPath = path.Join(m.pluginDir, "temp", PluginTypes[t], name)
  576. defer os.RemoveAll(tempPath)
  577. r, err := zip.OpenReader(src)
  578. if err != nil {
  579. return filenames, "", err
  580. }
  581. defer r.Close()
  582. soPrefix := regexp.MustCompile(fmt.Sprintf(`^((%s)|(%s))(@.*)?\.so$`, name, ucFirst(name)))
  583. var yamlFile, yamlPath, version string
  584. expFiles := 1
  585. if t == SOURCE {
  586. yamlFile = name + ".yaml"
  587. yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
  588. expFiles = 2
  589. }
  590. var revokeFiles []string
  591. needInstall := false
  592. for _, file := range r.File {
  593. fileName := file.Name
  594. if yamlFile == fileName {
  595. err = unzipTo(file, yamlPath)
  596. if err != nil {
  597. return filenames, "", err
  598. }
  599. revokeFiles = append(revokeFiles, yamlPath)
  600. filenames = append(filenames, yamlPath)
  601. } else if fileName == name+".json" {
  602. jsonPath := path.Join(m.etcDir, PluginTypes[t], fileName)
  603. if err := unzipTo(file, jsonPath); nil != err {
  604. common.Log.Errorf("Failed to decompress the metadata %s file", fileName)
  605. } else {
  606. revokeFiles = append(revokeFiles, jsonPath)
  607. }
  608. } else if soPrefix.Match([]byte(fileName)) {
  609. soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
  610. err = unzipTo(file, soPath)
  611. if err != nil {
  612. return filenames, "", err
  613. }
  614. filenames = append(filenames, soPath)
  615. revokeFiles = append(revokeFiles, soPath)
  616. _, version = parseName(fileName)
  617. } else { //unzip other files
  618. err = unzipTo(file, path.Join(tempPath, fileName))
  619. if err != nil {
  620. return filenames, "", err
  621. }
  622. if fileName == "install.sh" {
  623. needInstall = true
  624. }
  625. }
  626. }
  627. if len(filenames) != expFiles {
  628. return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
  629. } else if needInstall {
  630. //run install script if there is
  631. spath := path.Join(tempPath, "install.sh")
  632. shellParas = append(shellParas, spath)
  633. if 1 != len(shellParas) {
  634. copy(shellParas[1:], shellParas[0:])
  635. shellParas[0] = spath
  636. }
  637. cmd := exec.Command("/bin/sh", shellParas...)
  638. var outb, errb bytes.Buffer
  639. cmd.Stdout = &outb
  640. cmd.Stderr = &errb
  641. err := cmd.Run()
  642. if err != nil {
  643. for _, f := range revokeFiles {
  644. os.Remove(f)
  645. }
  646. common.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
  647. return filenames, "", err
  648. } else {
  649. common.Log.Infof("install %s plugin %s", PluginTypes[t], name)
  650. }
  651. }
  652. return filenames, version, nil
  653. }
  654. func parseName(n string) (string, string) {
  655. result := strings.Split(n, ".so")
  656. result = strings.Split(result[0], "@")
  657. name := lcFirst(result[0])
  658. if len(result) > 1 {
  659. return name, result[1]
  660. }
  661. return name, ""
  662. }
  663. func unzipTo(f *zip.File, fpath string) error {
  664. _, err := os.Stat(fpath)
  665. if err == nil || !os.IsNotExist(err) {
  666. if err = os.Remove(fpath); err != nil {
  667. return fmt.Errorf("failed to delete file %s", fpath)
  668. }
  669. }
  670. if f.FileInfo().IsDir() {
  671. return fmt.Errorf("%s: not a file, but a directory", fpath)
  672. }
  673. if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
  674. return err
  675. }
  676. outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
  677. if err != nil {
  678. return err
  679. }
  680. rc, err := f.Open()
  681. if err != nil {
  682. return err
  683. }
  684. _, err = io.Copy(outFile, rc)
  685. outFile.Close()
  686. rc.Close()
  687. return err
  688. }
  689. func isValidUrl(uri string) bool {
  690. pu, err := url.ParseRequestURI(uri)
  691. if err != nil {
  692. return false
  693. }
  694. switch pu.Scheme {
  695. case "http", "https":
  696. u, err := url.Parse(uri)
  697. if err != nil || u.Scheme == "" || u.Host == "" {
  698. return false
  699. }
  700. case "file":
  701. if pu.Host != "" || pu.Path == "" {
  702. return false
  703. }
  704. default:
  705. return false
  706. }
  707. return true
  708. }
  709. func downloadFile(filepath string, uri string) error {
  710. common.Log.Infof("Start to download file %s\n", uri)
  711. u, err := url.ParseRequestURI(uri)
  712. if err != nil {
  713. return err
  714. }
  715. var src io.Reader
  716. switch u.Scheme {
  717. case "file":
  718. // deal with windows path
  719. if strings.Index(u.Path, ":") == 2 {
  720. u.Path = u.Path[1:]
  721. }
  722. common.Log.Debugf(u.Path)
  723. sourceFileStat, err := os.Stat(u.Path)
  724. if err != nil {
  725. return err
  726. }
  727. if !sourceFileStat.Mode().IsRegular() {
  728. return fmt.Errorf("%s is not a regular file", u.Path)
  729. }
  730. srcFile, err := os.Open(u.Path)
  731. if err != nil {
  732. return err
  733. }
  734. defer srcFile.Close()
  735. src = srcFile
  736. case "http", "https":
  737. // Get the data
  738. timeout := time.Duration(5 * time.Minute)
  739. client := &http.Client{
  740. Timeout: timeout,
  741. Transport: &http.Transport{
  742. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  743. },
  744. }
  745. resp, err := client.Get(uri)
  746. if err != nil {
  747. return err
  748. }
  749. if resp.StatusCode != http.StatusOK {
  750. return fmt.Errorf("cannot download the file with status: %s", resp.Status)
  751. }
  752. defer resp.Body.Close()
  753. src = resp.Body
  754. default:
  755. return fmt.Errorf("unsupported url scheme %s", u.Scheme)
  756. }
  757. // Create the file
  758. out, err := os.Create(filepath)
  759. if err != nil {
  760. return err
  761. }
  762. defer out.Close()
  763. // Write the body to file
  764. _, err = io.Copy(out, src)
  765. return err
  766. }
  767. func ucFirst(str string) string {
  768. for i, v := range str {
  769. return string(unicode.ToUpper(v)) + str[i+1:]
  770. }
  771. return ""
  772. }
  773. func lcFirst(str string) string {
  774. for i, v := range str {
  775. return string(unicode.ToLower(v)) + str[i+1:]
  776. }
  777. return ""
  778. }