manager.go 18 KB


  1. package plugins
  2. import (
  3. "archive/zip"
  4. "bytes"
  5. "errors"
  6. "fmt"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/common/kv"
  9. "github.com/emqx/kuiper/xstream/api"
  10. "io/ioutil"
  11. "os"
  12. "os/exec"
  13. "path"
  14. "path/filepath"
  15. "plugin"
  16. "regexp"
  17. "strings"
  18. "sync"
  19. "time"
  20. "unicode"
  21. )
  22. type Plugin interface {
  23. GetName() string
  24. GetFile() string
  25. GetShellParas() []string
  26. GetSymbols() []string
  27. SetName(n string)
  28. }
  29. type IOPlugin struct {
  30. Name string `json:"name"`
  31. File string `json:"file"`
  32. ShellParas []string `json:"shellParas"`
  33. }
  34. func (p *IOPlugin) GetName() string {
  35. return p.Name
  36. }
  37. func (p *IOPlugin) GetFile() string {
  38. return p.File
  39. }
  40. func (p *IOPlugin) GetShellParas() []string {
  41. return p.ShellParas
  42. }
  43. func (p *IOPlugin) GetSymbols() []string {
  44. return nil
  45. }
  46. func (p *IOPlugin) SetName(n string) {
  47. p.Name = n
  48. }
  49. type FuncPlugin struct {
  50. IOPlugin
  51. // Optional, if not specified, a default element with the same name of the file will be registered
  52. Functions []string `json:"functions"`
  53. }
  54. func (fp *FuncPlugin) GetSymbols() []string {
  55. return fp.Functions
  56. }
  57. type PluginType int
  58. func NewPluginByType(t PluginType) Plugin {
  59. switch t {
  60. case FUNCTION:
  61. return &FuncPlugin{}
  62. default:
  63. return &IOPlugin{}
  64. }
  65. }
  66. const (
  67. SOURCE PluginType = iota
  68. SINK
  69. FUNCTION
  70. )
  71. const DELETED = "$deleted"
  72. var (
  73. PluginTypes = []string{"sources", "sinks", "functions"}
  74. once sync.Once
  75. singleton *Manager
  76. )
  77. //Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  78. type Registry struct {
  79. sync.RWMutex
  80. // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
  81. plugins []map[string]string
  82. // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
  83. symbols map[string]string
  84. }
  85. func (rr *Registry) Store(t PluginType, name string, version string) {
  86. rr.Lock()
  87. rr.plugins[t][name] = version
  88. rr.Unlock()
  89. }
  90. func (rr *Registry) StoreSymbols(name string, symbols []string) error {
  91. rr.Lock()
  92. defer rr.Unlock()
  93. for _, s := range symbols {
  94. if _, ok := rr.symbols[s]; ok {
  95. return fmt.Errorf("function name %s already exists", s)
  96. } else {
  97. rr.symbols[s] = name
  98. }
  99. }
  100. return nil
  101. }
  102. func (rr *Registry) RemoveSymbols(symbols []string) {
  103. rr.Lock()
  104. for _, s := range symbols {
  105. delete(rr.symbols, s)
  106. }
  107. rr.Unlock()
  108. }
  109. func (rr *Registry) List(t PluginType) []string {
  110. rr.RLock()
  111. result := rr.plugins[t]
  112. rr.RUnlock()
  113. keys := make([]string, 0, len(result))
  114. for k := range result {
  115. keys = append(keys, k)
  116. }
  117. return keys
  118. }
  119. func (rr *Registry) ListSymbols() []string {
  120. rr.RLock()
  121. result := rr.symbols
  122. rr.RUnlock()
  123. keys := make([]string, 0, len(result))
  124. for k := range result {
  125. keys = append(keys, k)
  126. }
  127. return keys
  128. }
  129. func (rr *Registry) Get(t PluginType, name string) (string, bool) {
  130. rr.RLock()
  131. result := rr.plugins[t]
  132. rr.RUnlock()
  133. r, ok := result[name]
  134. return r, ok
  135. }
  136. func (rr *Registry) GetPluginVersionBySymbol(t PluginType, symbolName string) (string, bool) {
  137. switch t {
  138. case FUNCTION:
  139. rr.RLock()
  140. result := rr.plugins[t]
  141. name, ok := rr.symbols[symbolName]
  142. rr.RUnlock()
  143. if ok {
  144. r, nok := result[name]
  145. return r, nok
  146. } else {
  147. return "", false
  148. }
  149. default:
  150. return rr.Get(t, symbolName)
  151. }
  152. }
  153. func (rr *Registry) GetPluginBySymbol(t PluginType, symbolName string) (string, bool) {
  154. switch t {
  155. case FUNCTION:
  156. rr.RLock()
  157. defer rr.RUnlock()
  158. name, ok := rr.symbols[symbolName]
  159. return name, ok
  160. default:
  161. return symbolName, true
  162. }
  163. }
  164. var symbolRegistry = make(map[string]plugin.Symbol)
  165. var mu sync.RWMutex
  166. func getPlugin(t string, pt PluginType) (plugin.Symbol, error) {
  167. ut := ucFirst(t)
  168. ptype := PluginTypes[pt]
  169. key := ptype + "/" + t
  170. mu.Lock()
  171. defer mu.Unlock()
  172. var nf plugin.Symbol
  173. nf, ok := symbolRegistry[key]
  174. if !ok {
  175. m, err := NewPluginManager()
  176. if err != nil {
  177. return nil, fmt.Errorf("fail to initialize the plugin manager")
  178. }
  179. mod, err := getSoFilePath(m, pt, t, false)
  180. if err != nil {
  181. return nil, fmt.Errorf("cannot get the plugin file path: %v", err)
  182. }
  183. common.Log.Debugf("Opening plugin %s", mod)
  184. plug, err := plugin.Open(mod)
  185. if err != nil {
  186. return nil, fmt.Errorf("cannot open %s: %v", mod, err)
  187. }
  188. common.Log.Debugf("Successfully open plugin %s", mod)
  189. nf, err = plug.Lookup(ut)
  190. if err != nil {
  191. return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
  192. }
  193. common.Log.Debugf("Successfully look-up plugin %s", mod)
  194. symbolRegistry[key] = nf
  195. }
  196. return nf, nil
  197. }
  198. func GetSource(t string) (api.Source, error) {
  199. nf, err := getPlugin(t, SOURCE)
  200. if err != nil {
  201. return nil, err
  202. }
  203. var s api.Source
  204. switch t := nf.(type) {
  205. case api.Source:
  206. s = t
  207. case func() api.Source:
  208. s = t()
  209. default:
  210. return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
  211. }
  212. return s, nil
  213. }
  214. func GetSink(t string) (api.Sink, error) {
  215. nf, err := getPlugin(t, SINK)
  216. if err != nil {
  217. return nil, err
  218. }
  219. var s api.Sink
  220. switch t := nf.(type) {
  221. case api.Sink:
  222. s = t
  223. case func() api.Sink:
  224. s = t()
  225. default:
  226. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
  227. }
  228. return s, nil
  229. }
  230. type Manager struct {
  231. pluginDir string
  232. etcDir string
  233. registry *Registry
  234. db kv.KeyValue
  235. }
  236. func NewPluginManager() (*Manager, error) {
  237. var outerErr error
  238. once.Do(func() {
  239. dir, err := common.GetPluginsLoc()
  240. if err != nil {
  241. outerErr = fmt.Errorf("cannot find plugins folder: %s", err)
  242. return
  243. }
  244. etcDir, err := common.GetConfLoc()
  245. if err != nil {
  246. outerErr = fmt.Errorf("cannot find etc folder: %s", err)
  247. return
  248. }
  249. dbDir, err := common.GetDataLoc()
  250. if err != nil {
  251. outerErr = fmt.Errorf("cannot find db folder: %s", err)
  252. return
  253. }
  254. db := kv.GetDefaultKVStore(path.Join(dbDir, "pluginFuncs"))
  255. err = db.Open()
  256. if err != nil {
  257. outerErr = fmt.Errorf("error when opening db: %v.", err)
  258. }
  259. defer db.Close()
  260. plugins := make([]map[string]string, 3)
  261. for i := 0; i < 3; i++ {
  262. names, err := findAll(PluginType(i), dir)
  263. if err != nil {
  264. outerErr = fmt.Errorf("fail to find existing plugins: %s", err)
  265. return
  266. }
  267. plugins[i] = names
  268. }
  269. registry := &Registry{plugins: plugins, symbols: make(map[string]string)}
  270. for pf, _ := range plugins[FUNCTION] {
  271. l := make([]string, 0)
  272. if ok, err := db.Get(pf, &l); ok {
  273. registry.StoreSymbols(pf, l)
  274. } else if err != nil {
  275. outerErr = fmt.Errorf("error when querying kv: %s", err)
  276. return
  277. } else {
  278. registry.StoreSymbols(pf, []string{pf})
  279. }
  280. }
  281. singleton = &Manager{
  282. pluginDir: dir,
  283. etcDir: etcDir,
  284. registry: registry,
  285. db: db,
  286. }
  287. if err := singleton.readSourceMetaDir(); nil != err {
  288. common.Log.Errorf("readSourceMetaDir:%v", err)
  289. }
  290. if err := singleton.readSinkMetaDir(); nil != err {
  291. common.Log.Errorf("readSinkMetaDir:%v", err)
  292. }
  293. if err := singleton.readFuncMetaDir(); nil != err {
  294. common.Log.Errorf("readFuncMetaDir:%v", err)
  295. }
  296. if err := singleton.readUiMsgDir(); nil != err {
  297. common.Log.Errorf("readUiMsgDir:%v", err)
  298. }
  299. })
  300. return singleton, outerErr
  301. }
  302. func findAll(t PluginType, pluginDir string) (result map[string]string, err error) {
  303. result = make(map[string]string)
  304. dir := path.Join(pluginDir, PluginTypes[t])
  305. files, err := ioutil.ReadDir(dir)
  306. if err != nil {
  307. return
  308. }
  309. for _, file := range files {
  310. baseName := filepath.Base(file.Name())
  311. if strings.HasSuffix(baseName, ".so") {
  312. n, v := parseName(baseName)
  313. result[n] = v
  314. }
  315. }
  316. return
  317. }
  318. func (m *Manager) List(t PluginType) (result []string, err error) {
  319. return m.registry.List(t), nil
  320. }
  321. func (m *Manager) ListSymbols() (result []string, err error) {
  322. return m.registry.ListSymbols(), nil
  323. }
  324. func (m *Manager) GetSymbol(s string) (result string, ok bool) {
  325. return m.registry.GetPluginBySymbol(FUNCTION, s)
  326. }
  327. func (m *Manager) Register(t PluginType, j Plugin) error {
  328. name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
  329. //Validation
  330. name = strings.Trim(name, " ")
  331. if name == "" {
  332. return fmt.Errorf("invalid name %s: should not be empty", name)
  333. }
  334. if !common.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  335. return fmt.Errorf("invalid uri %s", uri)
  336. }
  337. if v, ok := m.registry.Get(t, name); ok {
  338. if v == DELETED {
  339. return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
  340. } else {
  341. return fmt.Errorf("invalid name %s: duplicate", name)
  342. }
  343. }
  344. var err error
  345. if t == FUNCTION {
  346. if len(j.GetSymbols()) > 0 {
  347. err = m.db.Open()
  348. if err != nil {
  349. return err
  350. }
  351. err = m.db.Set(name, j.GetSymbols())
  352. if err != nil {
  353. return err
  354. }
  355. m.db.Close()
  356. err = m.registry.StoreSymbols(name, j.GetSymbols())
  357. } else {
  358. err = m.registry.StoreSymbols(name, []string{name})
  359. }
  360. }
  361. if err != nil {
  362. return err
  363. }
  364. zipPath := path.Join(m.pluginDir, name+".zip")
  365. var unzipFiles []string
  366. //clean up: delete zip file and unzip files in error
  367. defer os.Remove(zipPath)
  368. //download
  369. err = common.DownloadFile(zipPath, uri)
  370. if err != nil {
  371. return fmt.Errorf("fail to download file %s: %s", uri, err)
  372. }
  373. //unzip and copy to destination
  374. unzipFiles, version, err := m.install(t, name, zipPath, shellParas)
  375. if err == nil && len(j.GetSymbols()) > 0 {
  376. if err = m.db.Open(); err == nil {
  377. err = m.db.Set(name, j.GetSymbols())
  378. }
  379. }
  380. if err != nil { //Revert for any errors
  381. if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
  382. os.RemoveAll(unzipFiles[0])
  383. }
  384. if len(j.GetSymbols()) > 0 {
  385. m.db.Close()
  386. m.registry.RemoveSymbols(j.GetSymbols())
  387. } else {
  388. m.registry.RemoveSymbols([]string{name})
  389. }
  390. return fmt.Errorf("fail to install plugin: %s", err)
  391. }
  392. m.registry.Store(t, name, version)
  393. switch t {
  394. case SINK:
  395. if err := m.readSinkMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
  396. common.Log.Errorf("readSinkFile:%v", err)
  397. }
  398. case SOURCE:
  399. if err := m.readSourceMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
  400. common.Log.Errorf("readSourceFile:%v", err)
  401. }
  402. case FUNCTION:
  403. if err := m.readFuncMetaFile(path.Join(m.etcDir, PluginTypes[t], name+`.json`)); nil != err {
  404. common.Log.Errorf("readFuncFile:%v", err)
  405. }
  406. }
  407. return nil
  408. }
  409. // prerequisite:function plugin of name exists
  410. func (m *Manager) RegisterFuncs(name string, functions []string) error {
  411. if len(functions) == 0 {
  412. return fmt.Errorf("property 'functions' must not be empty")
  413. }
  414. err := m.db.Open()
  415. if err != nil {
  416. return err
  417. }
  418. defer m.db.Close()
  419. old := make([]string, 0)
  420. if ok, err := m.db.Get(name, &old); err != nil {
  421. return err
  422. } else if ok {
  423. m.registry.RemoveSymbols(old)
  424. } else if !ok {
  425. m.registry.RemoveSymbols([]string{name})
  426. }
  427. err = m.db.Set(name, functions)
  428. if err != nil {
  429. return err
  430. }
  431. return m.registry.StoreSymbols(name, functions)
  432. }
  433. func (m *Manager) Delete(t PluginType, name string, stop bool) error {
  434. name = strings.Trim(name, " ")
  435. if name == "" {
  436. return fmt.Errorf("invalid name %s: should not be empty", name)
  437. }
  438. soPath, err := getSoFilePath(m, t, name, true)
  439. if err != nil {
  440. return err
  441. }
  442. var results []string
  443. paths := []string{
  444. soPath,
  445. }
  446. // Find etc folder
  447. etcPath := path.Join(m.etcDir, PluginTypes[t], name)
  448. if fi, err := os.Stat(etcPath); err == nil {
  449. if fi.Mode().IsDir() {
  450. paths = append(paths, etcPath)
  451. }
  452. }
  453. switch t {
  454. case SOURCE:
  455. paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
  456. m.uninstalSource(name)
  457. case SINK:
  458. m.uninstalSink(name)
  459. case FUNCTION:
  460. old := make([]string, 0)
  461. err = m.db.Open()
  462. if err != nil {
  463. return err
  464. }
  465. if ok, err := m.db.Get(name, &old); err != nil {
  466. return err
  467. } else if ok {
  468. m.registry.RemoveSymbols(old)
  469. err := m.db.Delete(name)
  470. if err != nil {
  471. return err
  472. }
  473. } else if !ok {
  474. m.registry.RemoveSymbols([]string{name})
  475. }
  476. m.db.Close()
  477. m.uninstalFunc(name)
  478. }
  479. for _, p := range paths {
  480. _, err := os.Stat(p)
  481. if err == nil {
  482. err = os.RemoveAll(p)
  483. if err != nil {
  484. results = append(results, err.Error())
  485. }
  486. } else {
  487. results = append(results, fmt.Sprintf("can't find %s", p))
  488. }
  489. }
  490. if len(results) > 0 {
  491. return errors.New(strings.Join(results, "\n"))
  492. } else {
  493. m.registry.Store(t, name, DELETED)
  494. if stop {
  495. go func() {
  496. time.Sleep(1 * time.Second)
  497. os.Exit(100)
  498. }()
  499. }
  500. return nil
  501. }
  502. }
  503. func (m *Manager) Get(t PluginType, name string) (map[string]interface{}, bool) {
  504. v, ok := m.registry.Get(t, name)
  505. if strings.HasPrefix(v, "v") {
  506. v = v[1:]
  507. }
  508. if ok {
  509. r := map[string]interface{}{
  510. "name": name,
  511. "version": v,
  512. }
  513. if t == FUNCTION {
  514. if err := m.db.Open(); err == nil {
  515. l := make([]string, 0)
  516. if ok, _ := m.db.Get(name, &l); ok {
  517. r["functions"] = l
  518. }
  519. m.db.Close()
  520. }
  521. // ignore the error
  522. }
  523. return r, ok
  524. }
  525. return nil, false
  526. }
  527. // Start implement xsql.FunctionRegister
  528. func (m *Manager) HasFunction(name string) bool {
  529. _, ok := m.GetSymbol(name)
  530. return ok
  531. }
  532. func (m *Manager) Function(name string) (api.Function, error) {
  533. nf, err := getPlugin(name, FUNCTION)
  534. if err != nil {
  535. return nil, err
  536. }
  537. var s api.Function
  538. switch t := nf.(type) {
  539. case api.Function:
  540. s = t
  541. case func() api.Function:
  542. s = t()
  543. default:
  544. return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
  545. }
  546. return s, nil
  547. }
  548. // End Implement FunctionRegister
  549. // Return the lowercase version of so name. It may be upper case in path.
  550. func getSoFilePath(m *Manager, t PluginType, name string, isSoName bool) (string, error) {
  551. var (
  552. v string
  553. soname string
  554. ok bool
  555. )
  556. // We must identify plugin or symbol when deleting function plugin
  557. if isSoName {
  558. soname = name
  559. } else {
  560. soname, ok = m.registry.GetPluginBySymbol(t, name)
  561. if !ok {
  562. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
  563. }
  564. }
  565. v, ok = m.registry.Get(t, soname)
  566. if !ok {
  567. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
  568. }
  569. soFile := soname + ".so"
  570. if v != "" {
  571. soFile = fmt.Sprintf("%s@%s.so", soname, v)
  572. }
  573. p := path.Join(m.pluginDir, PluginTypes[t], soFile)
  574. if _, err := os.Stat(p); err != nil {
  575. p = path.Join(m.pluginDir, PluginTypes[t], ucFirst(soFile))
  576. }
  577. if _, err := os.Stat(p); err != nil {
  578. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
  579. }
  580. return p, nil
  581. }
  582. func (m *Manager) install(t PluginType, name, src string, shellParas []string) ([]string, string, error) {
  583. var filenames []string
  584. var tempPath = path.Join(m.pluginDir, "temp", PluginTypes[t], name)
  585. defer os.RemoveAll(tempPath)
  586. r, err := zip.OpenReader(src)
  587. if err != nil {
  588. return filenames, "", err
  589. }
  590. defer r.Close()
  591. soPrefix := regexp.MustCompile(fmt.Sprintf(`^((%s)|(%s))(@.*)?\.so$`, name, ucFirst(name)))
  592. var yamlFile, yamlPath, version string
  593. expFiles := 1
  594. if t == SOURCE {
  595. yamlFile = name + ".yaml"
  596. yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
  597. expFiles = 2
  598. }
  599. var revokeFiles []string
  600. needInstall := false
  601. for _, file := range r.File {
  602. fileName := file.Name
  603. if yamlFile == fileName {
  604. err = common.UnzipTo(file, yamlPath)
  605. if err != nil {
  606. return filenames, "", err
  607. }
  608. revokeFiles = append(revokeFiles, yamlPath)
  609. filenames = append(filenames, yamlPath)
  610. } else if fileName == name+".json" {
  611. jsonPath := path.Join(m.etcDir, PluginTypes[t], fileName)
  612. if err := common.UnzipTo(file, jsonPath); nil != err {
  613. common.Log.Errorf("Failed to decompress the metadata %s file", fileName)
  614. } else {
  615. revokeFiles = append(revokeFiles, jsonPath)
  616. }
  617. } else if soPrefix.Match([]byte(fileName)) {
  618. soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
  619. err = common.UnzipTo(file, soPath)
  620. if err != nil {
  621. return filenames, "", err
  622. }
  623. filenames = append(filenames, soPath)
  624. revokeFiles = append(revokeFiles, soPath)
  625. _, version = parseName(fileName)
  626. } else if strings.HasPrefix(fileName, "etc/") {
  627. err = common.UnzipTo(file, path.Join(m.etcDir, PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
  628. if err != nil {
  629. return filenames, "", err
  630. }
  631. } else { //unzip other files
  632. err = common.UnzipTo(file, path.Join(tempPath, fileName))
  633. if err != nil {
  634. return filenames, "", err
  635. }
  636. if fileName == "install.sh" {
  637. needInstall = true
  638. }
  639. }
  640. }
  641. if len(filenames) != expFiles {
  642. return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
  643. } else if needInstall {
  644. //run install script if there is
  645. spath := path.Join(tempPath, "install.sh")
  646. shellParas = append(shellParas, spath)
  647. if 1 != len(shellParas) {
  648. copy(shellParas[1:], shellParas[0:])
  649. shellParas[0] = spath
  650. }
  651. cmd := exec.Command("/bin/sh", shellParas...)
  652. var outb, errb bytes.Buffer
  653. cmd.Stdout = &outb
  654. cmd.Stderr = &errb
  655. err := cmd.Run()
  656. if err != nil {
  657. for _, f := range revokeFiles {
  658. os.RemoveAll(f)
  659. }
  660. common.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
  661. return filenames, "", err
  662. } else {
  663. common.Log.Infof(`run install script:%s`, outb.String())
  664. common.Log.Infof("install %s plugin %s", PluginTypes[t], name)
  665. }
  666. }
  667. return filenames, version, nil
  668. }
  669. func parseName(n string) (string, string) {
  670. result := strings.Split(n, ".so")
  671. result = strings.Split(result[0], "@")
  672. name := lcFirst(result[0])
  673. if len(result) > 1 {
  674. return name, result[1]
  675. }
  676. return name, ""
  677. }
  678. func ucFirst(str string) string {
  679. for i, v := range str {
  680. return string(unicode.ToUpper(v)) + str[i+1:]
  681. }
  682. return ""
  683. }
  684. func lcFirst(str string) string {
  685. for i, v := range str {
  686. return string(unicode.ToLower(v)) + str[i+1:]
  687. }
  688. return ""
  689. }