manager.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Manage the loading of both native and portable plugins
  15. package native
  16. import (
  17. "archive/zip"
  18. "bytes"
  19. "encoding/json"
  20. "fmt"
  21. "os"
  22. "os/exec"
  23. "path"
  24. "path/filepath"
  25. "plugin"
  26. "regexp"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unicode"
  31. "github.com/lf-edge/ekuiper/internal/conf"
  32. "github.com/lf-edge/ekuiper/internal/meta"
  33. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  34. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  35. "github.com/lf-edge/ekuiper/internal/pkg/store"
  36. plugin2 "github.com/lf-edge/ekuiper/internal/plugin"
  37. "github.com/lf-edge/ekuiper/pkg/api"
  38. "github.com/lf-edge/ekuiper/pkg/errorx"
  39. "github.com/lf-edge/ekuiper/pkg/kv"
  40. )
  41. // Manager Initialized in the binder
  42. var manager *Manager
  43. const DELETED = "$deleted"
  44. // Manager is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  45. type Manager struct {
  46. sync.RWMutex
  47. // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
  48. plugins []map[string]string
  49. // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
  50. symbols map[string]string
  51. // loaded symbols in current runtime
  52. runtime map[string]*plugin.Plugin
  53. // dirs
  54. pluginDir string
  55. pluginConfDir string
  56. // the access to func symbols db
  57. funcSymbolsDb kv.KeyValue
  58. // the access to plugin install script db
  59. plgInstallDb kv.KeyValue
  60. // the access to plugin install status db
  61. plgStatusDb kv.KeyValue
  62. }
  63. // InitManager must only be called once
  64. func InitManager() (*Manager, error) {
  65. pluginDir, err := conf.GetPluginsLoc()
  66. if err != nil {
  67. return nil, fmt.Errorf("cannot find plugins folder: %s", err)
  68. }
  69. dataDir, err := conf.GetDataLoc()
  70. if err != nil {
  71. return nil, fmt.Errorf("cannot find data folder: %s", err)
  72. }
  73. err, func_db := store.GetKV("pluginFuncs")
  74. if err != nil {
  75. return nil, fmt.Errorf("error when opening funcSymbolsdb: %v", err)
  76. }
  77. err, plg_db := store.GetKV("nativePlugin")
  78. if err != nil {
  79. return nil, fmt.Errorf("error when opening nativePlugin: %v", err)
  80. }
  81. err, plg_status_db := store.GetKV("nativePluginStatus")
  82. if err != nil {
  83. return nil, fmt.Errorf("error when opening nativePluginStatus: %v", err)
  84. }
  85. registry := &Manager{symbols: make(map[string]string), funcSymbolsDb: func_db, plgInstallDb: plg_db, plgStatusDb: plg_status_db, pluginDir: pluginDir, pluginConfDir: dataDir, runtime: make(map[string]*plugin.Plugin)}
  86. manager = registry
  87. if manager.hasInstallFlag() {
  88. manager.plugins = make([]map[string]string, 3)
  89. for i := range manager.plugins {
  90. manager.plugins[i] = make(map[string]string)
  91. }
  92. manager.pluginInstallWhenReboot()
  93. manager.clearInstallFlag()
  94. } else {
  95. plugins := make([]map[string]string, 3)
  96. for i := range plugins {
  97. names, err := findAll(plugin2.PluginType(i), pluginDir)
  98. if err != nil {
  99. return nil, fmt.Errorf("fail to find existing plugins: %s", err)
  100. }
  101. plugins[i] = names
  102. }
  103. registry.plugins = plugins
  104. for pf := range plugins[plugin2.FUNCTION] {
  105. l := make([]string, 0)
  106. if ok, err := func_db.Get(pf, &l); ok {
  107. registry.storeSymbols(pf, l)
  108. } else if err != nil {
  109. return nil, fmt.Errorf("error when querying kv: %s", err)
  110. } else {
  111. registry.storeSymbols(pf, []string{pf})
  112. }
  113. }
  114. }
  115. return registry, nil
  116. }
  117. func findAll(t plugin2.PluginType, pluginDir string) (result map[string]string, err error) {
  118. result = make(map[string]string)
  119. dir := path.Join(pluginDir, plugin2.PluginTypes[t])
  120. files, err := os.ReadDir(dir)
  121. if err != nil {
  122. return
  123. }
  124. for _, file := range files {
  125. baseName := filepath.Base(file.Name())
  126. if strings.HasSuffix(baseName, ".so") {
  127. n, v := parseName(baseName)
  128. //load the plugins when ekuiper set up
  129. if !conf.IsTesting {
  130. if _, err := manager.loadRuntime(t, n, path.Join(dir, baseName), ""); err != nil {
  131. continue
  132. }
  133. }
  134. result[n] = v
  135. }
  136. }
  137. return
  138. }
  139. func GetManager() *Manager {
  140. return manager
  141. }
  142. func (rr *Manager) get(t plugin2.PluginType, name string) (string, bool) {
  143. rr.RLock()
  144. result := rr.plugins[t]
  145. rr.RUnlock()
  146. r, ok := result[name]
  147. return r, ok
  148. }
  149. func (rr *Manager) store(t plugin2.PluginType, name string, version string) {
  150. rr.Lock()
  151. rr.plugins[t][name] = version
  152. rr.Unlock()
  153. }
  154. func (rr *Manager) storeSymbols(name string, symbols []string) error {
  155. rr.Lock()
  156. defer rr.Unlock()
  157. for _, s := range symbols {
  158. if _, ok := rr.symbols[s]; ok {
  159. return fmt.Errorf("function name %s already exists", s)
  160. } else {
  161. rr.symbols[s] = name
  162. }
  163. }
  164. return nil
  165. }
  166. func (rr *Manager) removeSymbols(symbols []string) {
  167. rr.Lock()
  168. for _, s := range symbols {
  169. delete(rr.symbols, s)
  170. }
  171. rr.Unlock()
  172. }
  173. // API for management
  174. func (rr *Manager) List(t plugin2.PluginType) []string {
  175. rr.RLock()
  176. result := rr.plugins[t]
  177. rr.RUnlock()
  178. keys := make([]string, 0, len(result))
  179. for k := range result {
  180. keys = append(keys, k)
  181. }
  182. return keys
  183. }
  184. func (rr *Manager) ListSymbols() []string {
  185. rr.RLock()
  186. result := rr.symbols
  187. rr.RUnlock()
  188. keys := make([]string, 0, len(result))
  189. for k := range result {
  190. keys = append(keys, k)
  191. }
  192. return keys
  193. }
  194. func (rr *Manager) GetPluginVersionBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
  195. switch t {
  196. case plugin2.FUNCTION:
  197. rr.RLock()
  198. result := rr.plugins[t]
  199. name, ok := rr.symbols[symbolName]
  200. rr.RUnlock()
  201. if ok {
  202. r, nok := result[name]
  203. return r, nok
  204. } else {
  205. return "", false
  206. }
  207. default:
  208. return rr.get(t, symbolName)
  209. }
  210. }
  211. func (rr *Manager) GetPluginBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
  212. switch t {
  213. case plugin2.FUNCTION:
  214. rr.RLock()
  215. defer rr.RUnlock()
  216. name, ok := rr.symbols[symbolName]
  217. return name, ok
  218. default:
  219. return symbolName, true
  220. }
  221. }
  222. func (rr *Manager) storePluginInstallScript(name string, t plugin2.PluginType, j plugin2.Plugin) {
  223. key := plugin2.PluginTypes[t] + "_" + name
  224. val := string(j.GetInstallScripts())
  225. _ = rr.plgInstallDb.Set(key, val)
  226. }
  227. func (rr *Manager) removePluginInstallScript(name string, t plugin2.PluginType) {
  228. key := plugin2.PluginTypes[t] + "_" + name
  229. _ = rr.plgInstallDb.Delete(key)
  230. }
  231. func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
  232. name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
  233. //Validation
  234. name = strings.Trim(name, " ")
  235. if name == "" {
  236. return fmt.Errorf("invalid name %s: should not be empty", name)
  237. }
  238. if !httpx.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  239. return fmt.Errorf("invalid uri %s", uri)
  240. }
  241. if v, ok := rr.get(t, name); ok {
  242. if v == DELETED {
  243. return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
  244. } else {
  245. return fmt.Errorf("invalid name %s: duplicate", name)
  246. }
  247. }
  248. var err error
  249. zipPath := path.Join(rr.pluginDir, name+".zip")
  250. //clean up: delete zip file and unzip files in error
  251. defer os.Remove(zipPath)
  252. //download
  253. err = httpx.DownloadFile(zipPath, uri)
  254. if err != nil {
  255. return fmt.Errorf("fail to download file %s: %s", uri, err)
  256. }
  257. if t == plugin2.FUNCTION {
  258. if len(j.GetSymbols()) > 0 {
  259. err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
  260. if err != nil {
  261. return err
  262. }
  263. err = rr.storeSymbols(name, j.GetSymbols())
  264. } else {
  265. err = rr.storeSymbols(name, []string{name})
  266. }
  267. }
  268. if err != nil {
  269. return err
  270. }
  271. //unzip and copy to destination
  272. version, err := rr.install(t, name, zipPath, shellParas)
  273. if err == nil && len(j.GetSymbols()) > 0 {
  274. err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
  275. }
  276. if err != nil { //Revert for any errors
  277. if len(j.GetSymbols()) > 0 {
  278. rr.removeSymbols(j.GetSymbols())
  279. } else {
  280. rr.removeSymbols([]string{name})
  281. }
  282. return fmt.Errorf("fail to install plugin: %s", err)
  283. }
  284. rr.store(t, name, version)
  285. rr.storePluginInstallScript(name, t, j)
  286. switch t {
  287. case plugin2.SINK:
  288. if err := meta.ReadSinkMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), true); nil != err {
  289. conf.Log.Errorf("readSinkFile:%v", err)
  290. }
  291. case plugin2.SOURCE:
  292. isScan := true
  293. isLookup := true
  294. _, err := rr.Source(name)
  295. if err != nil {
  296. isScan = false
  297. }
  298. _, err = rr.LookupSource(name)
  299. if err != nil {
  300. isLookup = false
  301. }
  302. if err := meta.ReadSourceMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), isScan, isLookup); nil != err {
  303. conf.Log.Errorf("readSourceFile:%v", err)
  304. }
  305. }
  306. return nil
  307. }
  308. // RegisterFuncs prerequisite:function plugin of name exists
  309. func (rr *Manager) RegisterFuncs(name string, functions []string) error {
  310. if len(functions) == 0 {
  311. return fmt.Errorf("property 'functions' must not be empty")
  312. }
  313. old := make([]string, 0)
  314. if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
  315. return err
  316. } else if ok {
  317. rr.removeSymbols(old)
  318. } else if !ok {
  319. rr.removeSymbols([]string{name})
  320. }
  321. err := rr.funcSymbolsDb.Set(name, functions)
  322. if err != nil {
  323. return err
  324. }
  325. return rr.storeSymbols(name, functions)
  326. }
  327. func (rr *Manager) Delete(t plugin2.PluginType, name string, stop bool) error {
  328. name = strings.Trim(name, " ")
  329. if name == "" {
  330. return fmt.Errorf("invalid name %s: should not be empty", name)
  331. }
  332. soPath, err := rr.getSoFilePath(t, name, true)
  333. if err != nil {
  334. return err
  335. }
  336. var results []string
  337. paths := []string{
  338. soPath,
  339. }
  340. // Find etc folder
  341. etcPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name)
  342. if fi, err := os.Stat(etcPath); err == nil {
  343. if fi.Mode().IsDir() {
  344. paths = append(paths, etcPath)
  345. }
  346. }
  347. switch t {
  348. case plugin2.SOURCE:
  349. yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".yaml")
  350. _ = os.Remove(yamlPaths)
  351. srcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".json")
  352. _ = os.Remove(srcJsonPath)
  353. meta.UninstallSource(name)
  354. case plugin2.SINK:
  355. yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".yaml")
  356. _ = os.Remove(yamlPaths)
  357. sinkJsonPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".json")
  358. _ = os.Remove(sinkJsonPaths)
  359. meta.UninstallSink(name)
  360. case plugin2.FUNCTION:
  361. funcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.FUNCTION], name+".json")
  362. _ = os.Remove(funcJsonPath)
  363. old := make([]string, 0)
  364. if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
  365. return err
  366. } else if ok {
  367. rr.removeSymbols(old)
  368. err := rr.funcSymbolsDb.Delete(name)
  369. if err != nil {
  370. return err
  371. }
  372. } else if !ok {
  373. rr.removeSymbols([]string{name})
  374. }
  375. }
  376. for _, p := range paths {
  377. _, err := os.Stat(p)
  378. if err == nil {
  379. err = os.RemoveAll(p)
  380. if err != nil {
  381. results = append(results, err.Error())
  382. }
  383. } else {
  384. results = append(results, fmt.Sprintf("can't find %s", p))
  385. }
  386. }
  387. rr.removePluginInstallScript(name, t)
  388. if len(results) > 0 {
  389. return fmt.Errorf(strings.Join(results, "\n"))
  390. } else {
  391. rr.store(t, name, DELETED)
  392. if stop {
  393. go func() {
  394. time.Sleep(1 * time.Second)
  395. os.Exit(100)
  396. }()
  397. }
  398. return nil
  399. }
  400. }
  401. func (rr *Manager) GetPluginInfo(t plugin2.PluginType, name string) (map[string]interface{}, bool) {
  402. v, ok := rr.get(t, name)
  403. if strings.HasPrefix(v, "v") {
  404. v = v[1:]
  405. }
  406. if ok {
  407. r := map[string]interface{}{
  408. "name": name,
  409. "version": v,
  410. }
  411. if t == plugin2.FUNCTION {
  412. l := make([]string, 0)
  413. if ok, _ := rr.funcSymbolsDb.Get(name, &l); ok {
  414. r["functions"] = l
  415. }
  416. // ignore the error
  417. }
  418. return r, ok
  419. }
  420. return nil, false
  421. }
  422. func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []string) (string, error) {
  423. var filenames []string
  424. var tempPath = path.Join(rr.pluginDir, "temp", plugin2.PluginTypes[t], name)
  425. defer os.RemoveAll(tempPath)
  426. r, err := zip.OpenReader(src)
  427. if err != nil {
  428. return "", err
  429. }
  430. defer r.Close()
  431. haveInstallFile := false
  432. for _, file := range r.File {
  433. fileName := file.Name
  434. if fileName == "install.sh" {
  435. haveInstallFile = true
  436. }
  437. }
  438. if len(shellParas) != 0 && !haveInstallFile {
  439. return "", fmt.Errorf("have shell parameters : %s but no install.sh file", shellParas)
  440. }
  441. soPrefix := regexp.MustCompile(fmt.Sprintf(`^((%s)|(%s))(@.*)?\.so$`, name, ucFirst(name)))
  442. var soPath string
  443. var yamlFile, yamlPath, version, soName string
  444. expFiles := 1
  445. if t == plugin2.SOURCE {
  446. yamlFile = name + ".yaml"
  447. yamlPath = path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], yamlFile)
  448. expFiles = 2
  449. }
  450. var revokeFiles []string
  451. defer func() {
  452. if err != nil {
  453. for _, f := range revokeFiles {
  454. os.RemoveAll(f)
  455. }
  456. }
  457. }()
  458. for _, file := range r.File {
  459. fileName := file.Name
  460. if yamlFile == fileName {
  461. err = filex.UnzipTo(file, yamlPath)
  462. if err != nil {
  463. return version, err
  464. }
  465. revokeFiles = append(revokeFiles, yamlPath)
  466. filenames = append(filenames, yamlPath)
  467. } else if fileName == name+".json" {
  468. jsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], fileName)
  469. if err := filex.UnzipTo(file, jsonPath); nil != err {
  470. conf.Log.Errorf("Failed to decompress the metadata %s file", fileName)
  471. } else {
  472. revokeFiles = append(revokeFiles, jsonPath)
  473. }
  474. } else if soPrefix.Match([]byte(fileName)) {
  475. soPath = path.Join(rr.pluginDir, plugin2.PluginTypes[t], fileName)
  476. err = filex.UnzipTo(file, soPath)
  477. if err != nil {
  478. return version, err
  479. }
  480. filenames = append(filenames, soPath)
  481. revokeFiles = append(revokeFiles, soPath)
  482. soName, version = parseName(fileName)
  483. } else if strings.HasPrefix(fileName, "etc/") {
  484. err = filex.UnzipTo(file, path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
  485. if err != nil {
  486. return version, err
  487. }
  488. } else { //unzip other files
  489. err = filex.UnzipTo(file, path.Join(tempPath, fileName))
  490. if err != nil {
  491. return version, err
  492. }
  493. }
  494. }
  495. if len(filenames) != expFiles {
  496. err = fmt.Errorf("invalid zip file: so file or conf file is missing")
  497. return version, err
  498. } else if haveInstallFile {
  499. //run install script if there is
  500. var shell = make([]string, len(shellParas))
  501. copy(shell, shellParas)
  502. spath := path.Join(tempPath, "install.sh")
  503. shell = append(shell, spath)
  504. if 1 != len(shell) {
  505. copy(shell[1:], shell[0:])
  506. shell[0] = spath
  507. }
  508. conf.Log.Infof("run install script %s", strings.Join(shell, " "))
  509. cmd := exec.Command("/bin/sh", shell...)
  510. var outb, errb bytes.Buffer
  511. cmd.Stdout = &outb
  512. cmd.Stderr = &errb
  513. err := cmd.Run()
  514. if err != nil {
  515. conf.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
  516. return version, err
  517. }
  518. conf.Log.Infof(`run install script:%s`, outb.String())
  519. }
  520. if !conf.IsTesting {
  521. // load the runtime first
  522. _, err = manager.loadRuntime(t, soName, soPath, "")
  523. if err != nil {
  524. return version, err
  525. }
  526. }
  527. conf.Log.Infof("install %s plugin %s", plugin2.PluginTypes[t], name)
  528. return version, nil
  529. }
  530. // binder factory implementations
  531. func (rr *Manager) Source(name string) (api.Source, error) {
  532. nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", "")
  533. if err != nil {
  534. return nil, err
  535. }
  536. if nf == nil {
  537. return nil, nil
  538. }
  539. switch t := nf.(type) {
  540. case api.Source:
  541. return t, nil
  542. case func() api.Source:
  543. return t(), nil
  544. default:
  545. return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
  546. }
  547. }
  548. func (rr *Manager) LookupSource(name string) (api.LookupSource, error) {
  549. nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", ucFirst(name)+"Lookup")
  550. if err != nil {
  551. return nil, err
  552. }
  553. if nf == nil {
  554. return nil, nil
  555. }
  556. switch t := nf.(type) {
  557. case api.LookupSource:
  558. return t, nil
  559. case func() api.LookupSource:
  560. return t(), nil
  561. default:
  562. return nil, fmt.Errorf("exported symbol %s is not type of api.LookupSource or function that return api.LookupSource", t)
  563. }
  564. }
  565. func (rr *Manager) Sink(name string) (api.Sink, error) {
  566. nf, err := rr.loadRuntime(plugin2.SINK, name, "", "")
  567. if err != nil {
  568. return nil, err
  569. }
  570. if nf == nil {
  571. return nil, nil
  572. }
  573. var s api.Sink
  574. switch t := nf.(type) {
  575. case api.Sink:
  576. s = t
  577. case func() api.Sink:
  578. s = t()
  579. default:
  580. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
  581. }
  582. return s, nil
  583. }
  584. func (rr *Manager) Function(name string) (api.Function, error) {
  585. nf, err := rr.loadRuntime(plugin2.FUNCTION, name, "", "")
  586. if err != nil {
  587. return nil, err
  588. }
  589. if nf == nil {
  590. return nil, nil
  591. }
  592. var s api.Function
  593. switch t := nf.(type) {
  594. case api.Function:
  595. s = t
  596. case func() api.Function:
  597. s = t()
  598. default:
  599. return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
  600. }
  601. return s, nil
  602. }
  603. func (rr *Manager) HasFunctionSet(name string) bool {
  604. _, ok := rr.get(plugin2.FUNCTION, name)
  605. return ok
  606. }
  607. func (rr *Manager) ConvName(name string) (string, bool) {
  608. _, ok := rr.GetPluginBySymbol(plugin2.FUNCTION, name)
  609. if ok {
  610. return name, true
  611. }
  612. return name, false
  613. }
  614. // If not found, return nil,nil; Other errors return nil, err
  615. func (rr *Manager) loadRuntime(t plugin2.PluginType, soName, soFilepath, symbolName string) (plugin.Symbol, error) {
  616. ptype := plugin2.PluginTypes[t]
  617. key := ptype + "/" + soName
  618. var (
  619. plug *plugin.Plugin
  620. ok bool
  621. err error
  622. )
  623. rr.RLock()
  624. plug, ok = rr.runtime[key]
  625. rr.RUnlock()
  626. if !ok {
  627. var soPath string
  628. if soFilepath != "" {
  629. soPath = soFilepath
  630. } else {
  631. mod, err := rr.getSoFilePath(t, soName, false)
  632. if err != nil {
  633. conf.Log.Debugf(fmt.Sprintf("cannot find the native plugin %s in path: %v", soName, err))
  634. return nil, nil
  635. }
  636. soPath = mod
  637. }
  638. conf.Log.Debugf("Opening plugin %s", soPath)
  639. plug, err = plugin.Open(soPath)
  640. if err != nil {
  641. conf.Log.Errorf(fmt.Sprintf("plugin %s open error: %v", soName, err))
  642. return nil, fmt.Errorf("cannot open %s: %v", soPath, err)
  643. }
  644. rr.Lock()
  645. rr.runtime[key] = plug
  646. rr.Unlock()
  647. conf.Log.Debugf("Successfully open plugin %s", soPath)
  648. }
  649. if symbolName == "" {
  650. symbolName = ucFirst(soName)
  651. }
  652. conf.Log.Debugf("Loading symbol %s", symbolName)
  653. nf, err := plug.Lookup(symbolName)
  654. if err != nil {
  655. conf.Log.Warnf(fmt.Sprintf("cannot find symbol %s, please check if it is exported: %v", symbolName, err))
  656. return nil, nil
  657. }
  658. conf.Log.Debugf("Successfully look-up plugin %s", symbolName)
  659. return nf, nil
  660. }
  661. // Return the lowercase version of so name. It may be upper case in path.
  662. func (rr *Manager) getSoFilePath(t plugin2.PluginType, name string, isSoName bool) (string, error) {
  663. var (
  664. v string
  665. soname string
  666. ok bool
  667. )
  668. // We must identify plugin or symbol when deleting function plugin
  669. if isSoName {
  670. soname = name
  671. } else {
  672. soname, ok = rr.GetPluginBySymbol(t, name)
  673. if !ok {
  674. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
  675. }
  676. }
  677. v, ok = rr.get(t, soname)
  678. if !ok {
  679. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
  680. }
  681. soFile := soname + ".so"
  682. if v != "" {
  683. soFile = fmt.Sprintf("%s@%s.so", soname, v)
  684. }
  685. p := path.Join(rr.pluginDir, plugin2.PluginTypes[t], soFile)
  686. if _, err := os.Stat(p); err != nil {
  687. p = path.Join(rr.pluginDir, plugin2.PluginTypes[t], ucFirst(soFile))
  688. }
  689. if _, err := os.Stat(p); err != nil {
  690. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
  691. }
  692. return p, nil
  693. }
  694. func parseName(n string) (string, string) {
  695. result := strings.Split(n, ".so")
  696. result = strings.Split(result[0], "@")
  697. name := lcFirst(result[0])
  698. if len(result) > 1 {
  699. return name, result[1]
  700. }
  701. return name, ""
  702. }
  703. func ucFirst(str string) string {
  704. for i, v := range str {
  705. return string(unicode.ToUpper(v)) + str[i+1:]
  706. }
  707. return ""
  708. }
  709. func lcFirst(str string) string {
  710. for i, v := range str {
  711. return string(unicode.ToLower(v)) + str[i+1:]
  712. }
  713. return ""
  714. }
  715. func (rr *Manager) UninstallAllPlugins() {
  716. keys, err := rr.plgInstallDb.Keys()
  717. if err != nil {
  718. return
  719. }
  720. for _, v := range keys {
  721. plgType := plugin2.PluginTypeMap[strings.Split(v, "_")[0]]
  722. plgName := strings.Split(v, "_")[1]
  723. _ = rr.Delete(plgType, plgName, false)
  724. }
  725. }
  726. func (rr *Manager) GetAllPlugins() map[string]string {
  727. allPlgs, err := rr.plgInstallDb.All()
  728. if err != nil {
  729. return nil
  730. }
  731. delete(allPlgs, BOOT_INSTALL)
  732. return allPlgs
  733. }
  734. func (rr *Manager) GetAllPluginsStatus() map[string]string {
  735. allPlgs, err := rr.plgStatusDb.All()
  736. if err != nil {
  737. return nil
  738. }
  739. return allPlgs
  740. }
  741. const BOOT_INSTALL = "$boot_install"
  742. func (rr *Manager) PluginImport(plugins map[string]string) error {
  743. if len(plugins) == 0 {
  744. return nil
  745. }
  746. for k, v := range plugins {
  747. err := rr.plgInstallDb.Set(k, v)
  748. if err != nil {
  749. return err
  750. }
  751. }
  752. //set the flag to install the plugins when eKuiper reboot
  753. err := rr.plgInstallDb.Set(BOOT_INSTALL, BOOT_INSTALL)
  754. if err != nil {
  755. return err
  756. }
  757. return nil
  758. }
  759. func (rr *Manager) hasInstallFlag() bool {
  760. var val = ""
  761. found, _ := rr.plgInstallDb.Get(BOOT_INSTALL, &val)
  762. return found
  763. }
  764. func (rr *Manager) clearInstallFlag() {
  765. _ = rr.plgInstallDb.Delete(BOOT_INSTALL)
  766. }
  767. func (rr *Manager) pluginInstallWhenReboot() {
  768. allPlgs, err := rr.plgInstallDb.All()
  769. if err != nil {
  770. return
  771. }
  772. delete(allPlgs, BOOT_INSTALL)
  773. _ = rr.plgStatusDb.Clean()
  774. for k, v := range allPlgs {
  775. plgType := plugin2.PluginTypeMap[strings.Split(k, "_")[0]]
  776. sd := plugin2.NewPluginByType(plgType)
  777. err := json.Unmarshal([]byte(v), &sd)
  778. if err != nil {
  779. _ = rr.plgStatusDb.Set(k, err.Error())
  780. continue
  781. }
  782. err = rr.Register(plgType, sd)
  783. if err != nil {
  784. _ = rr.plgStatusDb.Set(k, err.Error())
  785. continue
  786. }
  787. }
  788. }