manager.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Manage the loading of both native and portable plugins
  15. package native
  16. import (
  17. "archive/zip"
  18. "bytes"
  19. "fmt"
  20. "os"
  21. "os/exec"
  22. "path"
  23. "path/filepath"
  24. "plugin"
  25. "regexp"
  26. "strings"
  27. "sync"
  28. "time"
  29. "unicode"
  30. "github.com/lf-edge/ekuiper/internal/conf"
  31. "github.com/lf-edge/ekuiper/internal/meta"
  32. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  33. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  34. "github.com/lf-edge/ekuiper/internal/pkg/store"
  35. plugin2 "github.com/lf-edge/ekuiper/internal/plugin"
  36. "github.com/lf-edge/ekuiper/pkg/api"
  37. "github.com/lf-edge/ekuiper/pkg/errorx"
  38. "github.com/lf-edge/ekuiper/pkg/kv"
  39. )
  40. // Manager Initialized in the binder
  41. var manager *Manager
  42. const DELETED = "$deleted"
  43. // Manager is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  44. type Manager struct {
  45. sync.RWMutex
  46. // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
  47. plugins []map[string]string
  48. // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
  49. symbols map[string]string
  50. // loaded symbols in current runtime
  51. runtime map[string]*plugin.Plugin
  52. // dirs
  53. pluginDir string
  54. pluginConfDir string
  55. // the access to db
  56. db kv.KeyValue
  57. }
  58. // InitManager must only be called once
  59. func InitManager() (*Manager, error) {
  60. pluginDir, err := conf.GetPluginsLoc()
  61. if err != nil {
  62. return nil, fmt.Errorf("cannot find plugins folder: %s", err)
  63. }
  64. dataDir, err := conf.GetDataLoc()
  65. if err != nil {
  66. return nil, fmt.Errorf("cannot find data folder: %s", err)
  67. }
  68. err, db := store.GetKV("pluginFuncs")
  69. if err != nil {
  70. return nil, fmt.Errorf("error when opening db: %v", err)
  71. }
  72. registry := &Manager{symbols: make(map[string]string), db: db, pluginDir: pluginDir, pluginConfDir: dataDir, runtime: make(map[string]*plugin.Plugin)}
  73. manager = registry
  74. plugins := make([]map[string]string, 3)
  75. for i := range plugin2.PluginTypes {
  76. names, err := findAll(plugin2.PluginType(i), pluginDir)
  77. if err != nil {
  78. return nil, fmt.Errorf("fail to find existing plugins: %s", err)
  79. }
  80. plugins[i] = names
  81. }
  82. registry.plugins = plugins
  83. for pf := range plugins[plugin2.FUNCTION] {
  84. l := make([]string, 0)
  85. if ok, err := db.Get(pf, &l); ok {
  86. registry.storeSymbols(pf, l)
  87. } else if err != nil {
  88. return nil, fmt.Errorf("error when querying kv: %s", err)
  89. } else {
  90. registry.storeSymbols(pf, []string{pf})
  91. }
  92. }
  93. return registry, nil
  94. }
  95. func findAll(t plugin2.PluginType, pluginDir string) (result map[string]string, err error) {
  96. result = make(map[string]string)
  97. dir := path.Join(pluginDir, plugin2.PluginTypes[t])
  98. files, err := os.ReadDir(dir)
  99. if err != nil {
  100. return
  101. }
  102. for _, file := range files {
  103. baseName := filepath.Base(file.Name())
  104. if strings.HasSuffix(baseName, ".so") {
  105. n, v := parseName(baseName)
  106. //load the plugins when ekuiper set up
  107. if !conf.IsTesting {
  108. if _, err := manager.loadRuntime(t, n, path.Join(dir, baseName), ""); err != nil {
  109. continue
  110. }
  111. }
  112. result[n] = v
  113. }
  114. }
  115. return
  116. }
  117. func GetManager() *Manager {
  118. return manager
  119. }
  120. func (rr *Manager) get(t plugin2.PluginType, name string) (string, bool) {
  121. rr.RLock()
  122. result := rr.plugins[t]
  123. rr.RUnlock()
  124. r, ok := result[name]
  125. return r, ok
  126. }
  127. func (rr *Manager) store(t plugin2.PluginType, name string, version string) {
  128. rr.Lock()
  129. rr.plugins[t][name] = version
  130. rr.Unlock()
  131. }
  132. func (rr *Manager) storeSymbols(name string, symbols []string) error {
  133. rr.Lock()
  134. defer rr.Unlock()
  135. for _, s := range symbols {
  136. if _, ok := rr.symbols[s]; ok {
  137. return fmt.Errorf("function name %s already exists", s)
  138. } else {
  139. rr.symbols[s] = name
  140. }
  141. }
  142. return nil
  143. }
  144. func (rr *Manager) removeSymbols(symbols []string) {
  145. rr.Lock()
  146. for _, s := range symbols {
  147. delete(rr.symbols, s)
  148. }
  149. rr.Unlock()
  150. }
  151. // API for management
  152. func (rr *Manager) List(t plugin2.PluginType) []string {
  153. rr.RLock()
  154. result := rr.plugins[t]
  155. rr.RUnlock()
  156. keys := make([]string, 0, len(result))
  157. for k := range result {
  158. keys = append(keys, k)
  159. }
  160. return keys
  161. }
  162. func (rr *Manager) ListSymbols() []string {
  163. rr.RLock()
  164. result := rr.symbols
  165. rr.RUnlock()
  166. keys := make([]string, 0, len(result))
  167. for k := range result {
  168. keys = append(keys, k)
  169. }
  170. return keys
  171. }
  172. func (rr *Manager) GetPluginVersionBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
  173. switch t {
  174. case plugin2.FUNCTION:
  175. rr.RLock()
  176. result := rr.plugins[t]
  177. name, ok := rr.symbols[symbolName]
  178. rr.RUnlock()
  179. if ok {
  180. r, nok := result[name]
  181. return r, nok
  182. } else {
  183. return "", false
  184. }
  185. default:
  186. return rr.get(t, symbolName)
  187. }
  188. }
  189. func (rr *Manager) GetPluginBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
  190. switch t {
  191. case plugin2.FUNCTION:
  192. rr.RLock()
  193. defer rr.RUnlock()
  194. name, ok := rr.symbols[symbolName]
  195. return name, ok
  196. default:
  197. return symbolName, true
  198. }
  199. }
  200. func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
  201. name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
  202. //Validation
  203. name = strings.Trim(name, " ")
  204. if name == "" {
  205. return fmt.Errorf("invalid name %s: should not be empty", name)
  206. }
  207. if !httpx.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  208. return fmt.Errorf("invalid uri %s", uri)
  209. }
  210. if v, ok := rr.get(t, name); ok {
  211. if v == DELETED {
  212. return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
  213. } else {
  214. return fmt.Errorf("invalid name %s: duplicate", name)
  215. }
  216. }
  217. var err error
  218. zipPath := path.Join(rr.pluginDir, name+".zip")
  219. //clean up: delete zip file and unzip files in error
  220. defer os.Remove(zipPath)
  221. //download
  222. err = httpx.DownloadFile(zipPath, uri)
  223. if err != nil {
  224. return fmt.Errorf("fail to download file %s: %s", uri, err)
  225. }
  226. if t == plugin2.FUNCTION {
  227. if len(j.GetSymbols()) > 0 {
  228. err = rr.db.Set(name, j.GetSymbols())
  229. if err != nil {
  230. return err
  231. }
  232. err = rr.storeSymbols(name, j.GetSymbols())
  233. } else {
  234. err = rr.storeSymbols(name, []string{name})
  235. }
  236. }
  237. if err != nil {
  238. return err
  239. }
  240. //unzip and copy to destination
  241. version, err := rr.install(t, name, zipPath, shellParas)
  242. if err == nil && len(j.GetSymbols()) > 0 {
  243. err = rr.db.Set(name, j.GetSymbols())
  244. }
  245. if err != nil { //Revert for any errors
  246. if len(j.GetSymbols()) > 0 {
  247. rr.removeSymbols(j.GetSymbols())
  248. } else {
  249. rr.removeSymbols([]string{name})
  250. }
  251. return fmt.Errorf("fail to install plugin: %s", err)
  252. }
  253. rr.store(t, name, version)
  254. switch t {
  255. case plugin2.SINK:
  256. if err := meta.ReadSinkMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), true); nil != err {
  257. conf.Log.Errorf("readSinkFile:%v", err)
  258. }
  259. case plugin2.SOURCE:
  260. isScan := true
  261. isLookup := true
  262. _, err := rr.Source(name)
  263. if err != nil {
  264. isScan = false
  265. }
  266. _, err = rr.LookupSource(name)
  267. if err != nil {
  268. isLookup = false
  269. }
  270. if err := meta.ReadSourceMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), isScan, isLookup); nil != err {
  271. conf.Log.Errorf("readSourceFile:%v", err)
  272. }
  273. }
  274. return nil
  275. }
  276. // RegisterFuncs prerequisite:function plugin of name exists
  277. func (rr *Manager) RegisterFuncs(name string, functions []string) error {
  278. if len(functions) == 0 {
  279. return fmt.Errorf("property 'functions' must not be empty")
  280. }
  281. old := make([]string, 0)
  282. if ok, err := rr.db.Get(name, &old); err != nil {
  283. return err
  284. } else if ok {
  285. rr.removeSymbols(old)
  286. } else if !ok {
  287. rr.removeSymbols([]string{name})
  288. }
  289. err := rr.db.Set(name, functions)
  290. if err != nil {
  291. return err
  292. }
  293. return rr.storeSymbols(name, functions)
  294. }
  295. func (rr *Manager) Delete(t plugin2.PluginType, name string, stop bool) error {
  296. name = strings.Trim(name, " ")
  297. if name == "" {
  298. return fmt.Errorf("invalid name %s: should not be empty", name)
  299. }
  300. soPath, err := rr.getSoFilePath(t, name, true)
  301. if err != nil {
  302. return err
  303. }
  304. var results []string
  305. paths := []string{
  306. soPath,
  307. }
  308. // Find etc folder
  309. etcPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name)
  310. if fi, err := os.Stat(etcPath); err == nil {
  311. if fi.Mode().IsDir() {
  312. paths = append(paths, etcPath)
  313. }
  314. }
  315. switch t {
  316. case plugin2.SOURCE:
  317. yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".yaml")
  318. _ = os.Remove(yamlPaths)
  319. srcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".json")
  320. _ = os.Remove(srcJsonPath)
  321. meta.UninstallSource(name)
  322. case plugin2.SINK:
  323. yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".yaml")
  324. _ = os.Remove(yamlPaths)
  325. sinkJsonPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".json")
  326. _ = os.Remove(sinkJsonPaths)
  327. meta.UninstallSink(name)
  328. case plugin2.FUNCTION:
  329. funcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.FUNCTION], name+".json")
  330. _ = os.Remove(funcJsonPath)
  331. old := make([]string, 0)
  332. if ok, err := rr.db.Get(name, &old); err != nil {
  333. return err
  334. } else if ok {
  335. rr.removeSymbols(old)
  336. err := rr.db.Delete(name)
  337. if err != nil {
  338. return err
  339. }
  340. } else if !ok {
  341. rr.removeSymbols([]string{name})
  342. }
  343. }
  344. for _, p := range paths {
  345. _, err := os.Stat(p)
  346. if err == nil {
  347. err = os.RemoveAll(p)
  348. if err != nil {
  349. results = append(results, err.Error())
  350. }
  351. } else {
  352. results = append(results, fmt.Sprintf("can't find %s", p))
  353. }
  354. }
  355. if len(results) > 0 {
  356. return fmt.Errorf(strings.Join(results, "\n"))
  357. } else {
  358. rr.store(t, name, DELETED)
  359. if stop {
  360. go func() {
  361. time.Sleep(1 * time.Second)
  362. os.Exit(100)
  363. }()
  364. }
  365. return nil
  366. }
  367. }
  368. func (rr *Manager) GetPluginInfo(t plugin2.PluginType, name string) (map[string]interface{}, bool) {
  369. v, ok := rr.get(t, name)
  370. if strings.HasPrefix(v, "v") {
  371. v = v[1:]
  372. }
  373. if ok {
  374. r := map[string]interface{}{
  375. "name": name,
  376. "version": v,
  377. }
  378. if t == plugin2.FUNCTION {
  379. l := make([]string, 0)
  380. if ok, _ := rr.db.Get(name, &l); ok {
  381. r["functions"] = l
  382. }
  383. // ignore the error
  384. }
  385. return r, ok
  386. }
  387. return nil, false
  388. }
  389. func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []string) (string, error) {
  390. var filenames []string
  391. var tempPath = path.Join(rr.pluginDir, "temp", plugin2.PluginTypes[t], name)
  392. defer os.RemoveAll(tempPath)
  393. r, err := zip.OpenReader(src)
  394. if err != nil {
  395. return "", err
  396. }
  397. defer r.Close()
  398. haveInstallFile := false
  399. for _, file := range r.File {
  400. fileName := file.Name
  401. if fileName == "install.sh" {
  402. haveInstallFile = true
  403. }
  404. }
  405. if len(shellParas) != 0 && !haveInstallFile {
  406. return "", fmt.Errorf("have shell parameters : %s but no install.sh file", shellParas)
  407. }
  408. soPrefix := regexp.MustCompile(fmt.Sprintf(`^((%s)|(%s))(@.*)?\.so$`, name, ucFirst(name)))
  409. var soPath string
  410. var yamlFile, yamlPath, version, soName string
  411. expFiles := 1
  412. if t == plugin2.SOURCE {
  413. yamlFile = name + ".yaml"
  414. yamlPath = path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], yamlFile)
  415. expFiles = 2
  416. }
  417. var revokeFiles []string
  418. defer func() {
  419. if err != nil {
  420. for _, f := range revokeFiles {
  421. os.RemoveAll(f)
  422. }
  423. }
  424. }()
  425. for _, file := range r.File {
  426. fileName := file.Name
  427. if yamlFile == fileName {
  428. err = filex.UnzipTo(file, yamlPath)
  429. if err != nil {
  430. return version, err
  431. }
  432. revokeFiles = append(revokeFiles, yamlPath)
  433. filenames = append(filenames, yamlPath)
  434. } else if fileName == name+".json" {
  435. jsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], fileName)
  436. if err := filex.UnzipTo(file, jsonPath); nil != err {
  437. conf.Log.Errorf("Failed to decompress the metadata %s file", fileName)
  438. } else {
  439. revokeFiles = append(revokeFiles, jsonPath)
  440. }
  441. } else if soPrefix.Match([]byte(fileName)) {
  442. soPath = path.Join(rr.pluginDir, plugin2.PluginTypes[t], fileName)
  443. err = filex.UnzipTo(file, soPath)
  444. if err != nil {
  445. return version, err
  446. }
  447. filenames = append(filenames, soPath)
  448. revokeFiles = append(revokeFiles, soPath)
  449. soName, version = parseName(fileName)
  450. } else if strings.HasPrefix(fileName, "etc/") {
  451. err = filex.UnzipTo(file, path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
  452. if err != nil {
  453. return version, err
  454. }
  455. } else { //unzip other files
  456. err = filex.UnzipTo(file, path.Join(tempPath, fileName))
  457. if err != nil {
  458. return version, err
  459. }
  460. }
  461. }
  462. if len(filenames) != expFiles {
  463. err = fmt.Errorf("invalid zip file: so file or conf file is missing")
  464. return version, err
  465. } else if haveInstallFile {
  466. //run install script if there is
  467. spath := path.Join(tempPath, "install.sh")
  468. shellParas = append(shellParas, spath)
  469. if 1 != len(shellParas) {
  470. copy(shellParas[1:], shellParas[0:])
  471. shellParas[0] = spath
  472. }
  473. cmd := exec.Command("/bin/sh", shellParas...)
  474. var outb, errb bytes.Buffer
  475. cmd.Stdout = &outb
  476. cmd.Stderr = &errb
  477. err := cmd.Run()
  478. if err != nil {
  479. conf.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
  480. return version, err
  481. }
  482. conf.Log.Infof(`run install script:%s`, outb.String())
  483. }
  484. if !conf.IsTesting {
  485. // load the runtime first
  486. _, err = manager.loadRuntime(t, soName, soPath, "")
  487. if err != nil {
  488. return version, err
  489. }
  490. }
  491. conf.Log.Infof("install %s plugin %s", plugin2.PluginTypes[t], name)
  492. return version, nil
  493. }
  494. // binder factory implementations
  495. func (rr *Manager) Source(name string) (api.Source, error) {
  496. nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", "")
  497. if err != nil {
  498. return nil, err
  499. }
  500. if nf == nil {
  501. return nil, nil
  502. }
  503. switch t := nf.(type) {
  504. case api.Source:
  505. return t, nil
  506. case func() api.Source:
  507. return t(), nil
  508. default:
  509. return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
  510. }
  511. }
  512. func (rr *Manager) LookupSource(name string) (api.LookupSource, error) {
  513. nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", ucFirst(name)+"Lookup")
  514. if err != nil {
  515. return nil, err
  516. }
  517. if nf == nil {
  518. return nil, nil
  519. }
  520. switch t := nf.(type) {
  521. case api.LookupSource:
  522. return t, nil
  523. case func() api.LookupSource:
  524. return t(), nil
  525. default:
  526. return nil, fmt.Errorf("exported symbol %s is not type of api.LookupSource or function that return api.LookupSource", t)
  527. }
  528. }
  529. func (rr *Manager) Sink(name string) (api.Sink, error) {
  530. nf, err := rr.loadRuntime(plugin2.SINK, name, "", "")
  531. if err != nil {
  532. return nil, err
  533. }
  534. if nf == nil {
  535. return nil, nil
  536. }
  537. var s api.Sink
  538. switch t := nf.(type) {
  539. case api.Sink:
  540. s = t
  541. case func() api.Sink:
  542. s = t()
  543. default:
  544. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
  545. }
  546. return s, nil
  547. }
  548. func (rr *Manager) Function(name string) (api.Function, error) {
  549. nf, err := rr.loadRuntime(plugin2.FUNCTION, name, "", "")
  550. if err != nil {
  551. return nil, err
  552. }
  553. if nf == nil {
  554. return nil, nil
  555. }
  556. var s api.Function
  557. switch t := nf.(type) {
  558. case api.Function:
  559. s = t
  560. case func() api.Function:
  561. s = t()
  562. default:
  563. return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
  564. }
  565. return s, nil
  566. }
  567. func (rr *Manager) HasFunctionSet(name string) bool {
  568. _, ok := rr.get(plugin2.FUNCTION, name)
  569. return ok
  570. }
  571. func (rr *Manager) ConvName(name string) (string, bool) {
  572. _, ok := rr.GetPluginBySymbol(plugin2.FUNCTION, name)
  573. if ok {
  574. return name, true
  575. }
  576. return name, false
  577. }
  578. // If not found, return nil,nil; Other errors return nil, err
  579. func (rr *Manager) loadRuntime(t plugin2.PluginType, soName, soFilepath, symbolName string) (plugin.Symbol, error) {
  580. ptype := plugin2.PluginTypes[t]
  581. key := ptype + "/" + soName
  582. var (
  583. plug *plugin.Plugin
  584. ok bool
  585. err error
  586. )
  587. rr.RLock()
  588. plug, ok = rr.runtime[key]
  589. rr.RUnlock()
  590. if !ok {
  591. var soPath string
  592. if soFilepath != "" {
  593. soPath = soFilepath
  594. } else {
  595. mod, err := rr.getSoFilePath(t, soName, false)
  596. if err != nil {
  597. conf.Log.Debugf(fmt.Sprintf("cannot find the native plugin %s in path: %v", soName, err))
  598. return nil, nil
  599. }
  600. soPath = mod
  601. }
  602. conf.Log.Debugf("Opening plugin %s", soPath)
  603. plug, err = plugin.Open(soPath)
  604. if err != nil {
  605. conf.Log.Errorf(fmt.Sprintf("plugin %s open error: %v", soName, err))
  606. return nil, fmt.Errorf("cannot open %s: %v", soPath, err)
  607. }
  608. rr.Lock()
  609. rr.runtime[key] = plug
  610. rr.Unlock()
  611. conf.Log.Debugf("Successfully open plugin %s", soPath)
  612. }
  613. if symbolName == "" {
  614. symbolName = ucFirst(soName)
  615. }
  616. conf.Log.Debugf("Loading symbol %s", symbolName)
  617. nf, err := plug.Lookup(symbolName)
  618. if err != nil {
  619. conf.Log.Warnf(fmt.Sprintf("cannot find symbol %s, please check if it is exported: %v", symbolName, err))
  620. return nil, nil
  621. }
  622. conf.Log.Debugf("Successfully look-up plugin %s", symbolName)
  623. return nf, nil
  624. }
  625. // Return the lowercase version of so name. It may be upper case in path.
  626. func (rr *Manager) getSoFilePath(t plugin2.PluginType, name string, isSoName bool) (string, error) {
  627. var (
  628. v string
  629. soname string
  630. ok bool
  631. )
  632. // We must identify plugin or symbol when deleting function plugin
  633. if isSoName {
  634. soname = name
  635. } else {
  636. soname, ok = rr.GetPluginBySymbol(t, name)
  637. if !ok {
  638. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
  639. }
  640. }
  641. v, ok = rr.get(t, soname)
  642. if !ok {
  643. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
  644. }
  645. soFile := soname + ".so"
  646. if v != "" {
  647. soFile = fmt.Sprintf("%s@%s.so", soname, v)
  648. }
  649. p := path.Join(rr.pluginDir, plugin2.PluginTypes[t], soFile)
  650. if _, err := os.Stat(p); err != nil {
  651. p = path.Join(rr.pluginDir, plugin2.PluginTypes[t], ucFirst(soFile))
  652. }
  653. if _, err := os.Stat(p); err != nil {
  654. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
  655. }
  656. return p, nil
  657. }
  658. func parseName(n string) (string, string) {
  659. result := strings.Split(n, ".so")
  660. result = strings.Split(result[0], "@")
  661. name := lcFirst(result[0])
  662. if len(result) > 1 {
  663. return name, result[1]
  664. }
  665. return name, ""
  666. }
  667. func ucFirst(str string) string {
  668. for i, v := range str {
  669. return string(unicode.ToUpper(v)) + str[i+1:]
  670. }
  671. return ""
  672. }
  673. func lcFirst(str string) string {
  674. for i, v := range str {
  675. return string(unicode.ToLower(v)) + str[i+1:]
  676. }
  677. return ""
  678. }