manager.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Manage the loading of both native and portable plugins
  15. package native
  16. import (
  17. "archive/zip"
  18. "bytes"
  19. "encoding/json"
  20. "fmt"
  21. "os"
  22. "os/exec"
  23. "path"
  24. "path/filepath"
  25. "plugin"
  26. "regexp"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unicode"
  31. "github.com/lf-edge/ekuiper/internal/conf"
  32. "github.com/lf-edge/ekuiper/internal/meta"
  33. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  34. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  35. "github.com/lf-edge/ekuiper/internal/pkg/store"
  36. plugin2 "github.com/lf-edge/ekuiper/internal/plugin"
  37. "github.com/lf-edge/ekuiper/pkg/api"
  38. "github.com/lf-edge/ekuiper/pkg/errorx"
  39. "github.com/lf-edge/ekuiper/pkg/kv"
  40. )
  41. // Manager Initialized in the binder
  42. var manager *Manager
  43. const DELETED = "$deleted"
  44. // Manager is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  45. type Manager struct {
  46. sync.RWMutex
  47. // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
  48. plugins []map[string]string
  49. // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
  50. symbols map[string]string
  51. // loaded symbols in current runtime
  52. runtime map[string]*plugin.Plugin
  53. // dirs
  54. pluginDir string
  55. pluginConfDir string
  56. // the access to func symbols db
  57. funcSymbolsDb kv.KeyValue
  58. // the access to plugin install script db
  59. plgInstallDb kv.KeyValue
  60. // the access to plugin install status db
  61. plgStatusDb kv.KeyValue
  62. }
  63. // InitManager must only be called once
  64. func InitManager() (*Manager, error) {
  65. pluginDir, err := conf.GetPluginsLoc()
  66. if err != nil {
  67. return nil, fmt.Errorf("cannot find plugins folder: %s", err)
  68. }
  69. dataDir, err := conf.GetDataLoc()
  70. if err != nil {
  71. return nil, fmt.Errorf("cannot find data folder: %s", err)
  72. }
  73. func_db, err := store.GetKV("pluginFuncs")
  74. if err != nil {
  75. return nil, fmt.Errorf("error when opening funcSymbolsdb: %v", err)
  76. }
  77. plg_db, err := store.GetKV("nativePlugin")
  78. if err != nil {
  79. return nil, fmt.Errorf("error when opening nativePlugin: %v", err)
  80. }
  81. plg_status_db, err := store.GetKV("nativePluginStatus")
  82. if err != nil {
  83. return nil, fmt.Errorf("error when opening nativePluginStatus: %v", err)
  84. }
  85. registry := &Manager{symbols: make(map[string]string), funcSymbolsDb: func_db, plgInstallDb: plg_db, plgStatusDb: plg_status_db, pluginDir: pluginDir, pluginConfDir: dataDir, runtime: make(map[string]*plugin.Plugin)}
  86. manager = registry
  87. plugins := make([]map[string]string, 3)
  88. for i := range plugins {
  89. names, err := findAll(plugin2.PluginType(i), pluginDir)
  90. if err != nil {
  91. return nil, fmt.Errorf("fail to find existing plugins: %s", err)
  92. }
  93. plugins[i] = names
  94. }
  95. registry.plugins = plugins
  96. for pf := range plugins[plugin2.FUNCTION] {
  97. l := make([]string, 0)
  98. if ok, err := func_db.Get(pf, &l); ok {
  99. registry.storeSymbols(pf, l)
  100. } else if err != nil {
  101. return nil, fmt.Errorf("error when querying kv: %s", err)
  102. } else {
  103. registry.storeSymbols(pf, []string{pf})
  104. }
  105. }
  106. if manager.hasInstallFlag() {
  107. manager.pluginInstallWhenReboot()
  108. manager.clearInstallFlag()
  109. }
  110. return registry, nil
  111. }
  112. func findAll(t plugin2.PluginType, pluginDir string) (result map[string]string, err error) {
  113. result = make(map[string]string)
  114. dir := path.Join(pluginDir, plugin2.PluginTypes[t])
  115. files, err := os.ReadDir(dir)
  116. if err != nil {
  117. return
  118. }
  119. for _, file := range files {
  120. baseName := filepath.Base(file.Name())
  121. if strings.HasSuffix(baseName, ".so") {
  122. n, v := parseName(baseName)
  123. // load the plugins when ekuiper set up
  124. if !conf.IsTesting {
  125. if _, err := manager.loadRuntime(t, n, path.Join(dir, baseName), ""); err != nil {
  126. continue
  127. }
  128. }
  129. result[n] = v
  130. }
  131. }
  132. return
  133. }
  134. func GetManager() *Manager {
  135. return manager
  136. }
  137. func (rr *Manager) get(t plugin2.PluginType, name string) (string, bool) {
  138. rr.RLock()
  139. result := rr.plugins[t]
  140. rr.RUnlock()
  141. r, ok := result[name]
  142. return r, ok
  143. }
  144. func (rr *Manager) store(t plugin2.PluginType, name string, version string) {
  145. rr.Lock()
  146. rr.plugins[t][name] = version
  147. rr.Unlock()
  148. }
  149. func (rr *Manager) storeSymbols(name string, symbols []string) error {
  150. rr.Lock()
  151. defer rr.Unlock()
  152. for _, s := range symbols {
  153. if _, ok := rr.symbols[s]; ok {
  154. return fmt.Errorf("function name %s already exists", s)
  155. } else {
  156. rr.symbols[s] = name
  157. }
  158. }
  159. return nil
  160. }
  161. func (rr *Manager) removeSymbols(symbols []string) {
  162. rr.Lock()
  163. for _, s := range symbols {
  164. delete(rr.symbols, s)
  165. }
  166. rr.Unlock()
  167. }
  168. // API for management
  169. func (rr *Manager) List(t plugin2.PluginType) []string {
  170. rr.RLock()
  171. result := rr.plugins[t]
  172. rr.RUnlock()
  173. keys := make([]string, 0, len(result))
  174. for k := range result {
  175. keys = append(keys, k)
  176. }
  177. return keys
  178. }
  179. func (rr *Manager) ListSymbols() []string {
  180. rr.RLock()
  181. result := rr.symbols
  182. rr.RUnlock()
  183. keys := make([]string, 0, len(result))
  184. for k := range result {
  185. keys = append(keys, k)
  186. }
  187. return keys
  188. }
  189. func (rr *Manager) GetPluginVersionBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
  190. switch t {
  191. case plugin2.FUNCTION:
  192. rr.RLock()
  193. result := rr.plugins[t]
  194. name, ok := rr.symbols[symbolName]
  195. rr.RUnlock()
  196. if ok {
  197. r, nok := result[name]
  198. return r, nok
  199. } else {
  200. return "", false
  201. }
  202. default:
  203. return rr.get(t, symbolName)
  204. }
  205. }
  206. func (rr *Manager) GetPluginBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
  207. switch t {
  208. case plugin2.FUNCTION:
  209. rr.RLock()
  210. defer rr.RUnlock()
  211. name, ok := rr.symbols[symbolName]
  212. return name, ok
  213. default:
  214. return symbolName, true
  215. }
  216. }
  217. func (rr *Manager) storePluginInstallScript(name string, t plugin2.PluginType, j plugin2.Plugin) {
  218. key := plugin2.PluginTypes[t] + "_" + name
  219. val := string(j.GetInstallScripts())
  220. _ = rr.plgInstallDb.Set(key, val)
  221. }
  222. func (rr *Manager) removePluginInstallScript(name string, t plugin2.PluginType) {
  223. key := plugin2.PluginTypes[t] + "_" + name
  224. _ = rr.plgInstallDb.Delete(key)
  225. }
  226. func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
  227. name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
  228. // Validation
  229. name = strings.Trim(name, " ")
  230. if name == "" {
  231. return fmt.Errorf("invalid name %s: should not be empty", name)
  232. }
  233. if !httpx.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  234. return fmt.Errorf("invalid uri %s", uri)
  235. }
  236. if v, ok := rr.get(t, name); ok {
  237. if v == DELETED {
  238. return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
  239. } else {
  240. return fmt.Errorf("invalid name %s: duplicate", name)
  241. }
  242. }
  243. var err error
  244. zipPath := path.Join(rr.pluginDir, name+".zip")
  245. // clean up: delete zip file and unzip files in error
  246. defer os.Remove(zipPath)
  247. // download
  248. err = httpx.DownloadFile(zipPath, uri)
  249. if err != nil {
  250. return fmt.Errorf("fail to download file %s: %s", uri, err)
  251. }
  252. if t == plugin2.FUNCTION {
  253. if len(j.GetSymbols()) > 0 {
  254. err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
  255. if err != nil {
  256. return err
  257. }
  258. err = rr.storeSymbols(name, j.GetSymbols())
  259. } else {
  260. err = rr.storeSymbols(name, []string{name})
  261. }
  262. }
  263. if err != nil {
  264. return err
  265. }
  266. // unzip and copy to destination
  267. version, err := rr.install(t, name, zipPath, shellParas)
  268. if err == nil && len(j.GetSymbols()) > 0 {
  269. err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
  270. }
  271. if err != nil { // Revert for any errors
  272. if len(j.GetSymbols()) > 0 {
  273. rr.removeSymbols(j.GetSymbols())
  274. } else {
  275. rr.removeSymbols([]string{name})
  276. }
  277. return fmt.Errorf("fail to install plugin: %s", err)
  278. }
  279. rr.store(t, name, version)
  280. rr.storePluginInstallScript(name, t, j)
  281. switch t {
  282. case plugin2.SINK:
  283. if err := meta.ReadSinkMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), true); nil != err {
  284. conf.Log.Errorf("readSinkFile:%v", err)
  285. }
  286. case plugin2.SOURCE:
  287. isScan := true
  288. isLookup := true
  289. _, err := rr.Source(name)
  290. if err != nil {
  291. isScan = false
  292. }
  293. _, err = rr.LookupSource(name)
  294. if err != nil {
  295. isLookup = false
  296. }
  297. if err := meta.ReadSourceMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), isScan, isLookup); nil != err {
  298. conf.Log.Errorf("readSourceFile:%v", err)
  299. }
  300. }
  301. return nil
  302. }
  303. // RegisterFuncs prerequisite:function plugin of name exists
  304. func (rr *Manager) RegisterFuncs(name string, functions []string) error {
  305. if len(functions) == 0 {
  306. return fmt.Errorf("property 'functions' must not be empty")
  307. }
  308. old := make([]string, 0)
  309. if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
  310. return err
  311. } else if ok {
  312. rr.removeSymbols(old)
  313. } else if !ok {
  314. rr.removeSymbols([]string{name})
  315. }
  316. err := rr.funcSymbolsDb.Set(name, functions)
  317. if err != nil {
  318. return err
  319. }
  320. return rr.storeSymbols(name, functions)
  321. }
  322. func (rr *Manager) Delete(t plugin2.PluginType, name string, stop bool) error {
  323. name = strings.Trim(name, " ")
  324. if name == "" {
  325. return fmt.Errorf("invalid name %s: should not be empty", name)
  326. }
  327. soPath, err := rr.getSoFilePath(t, name, true)
  328. if err != nil {
  329. return err
  330. }
  331. var results []string
  332. paths := []string{
  333. soPath,
  334. }
  335. // Find etc folder
  336. etcPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name)
  337. if fi, err := os.Stat(etcPath); err == nil {
  338. if fi.Mode().IsDir() {
  339. paths = append(paths, etcPath)
  340. }
  341. }
  342. switch t {
  343. case plugin2.SOURCE:
  344. yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".yaml")
  345. _ = os.Remove(yamlPaths)
  346. srcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".json")
  347. _ = os.Remove(srcJsonPath)
  348. meta.UninstallSource(name)
  349. case plugin2.SINK:
  350. yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".yaml")
  351. _ = os.Remove(yamlPaths)
  352. sinkJsonPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".json")
  353. _ = os.Remove(sinkJsonPaths)
  354. meta.UninstallSink(name)
  355. case plugin2.FUNCTION:
  356. funcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.FUNCTION], name+".json")
  357. _ = os.Remove(funcJsonPath)
  358. old := make([]string, 0)
  359. if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
  360. return err
  361. } else if ok {
  362. rr.removeSymbols(old)
  363. err := rr.funcSymbolsDb.Delete(name)
  364. if err != nil {
  365. return err
  366. }
  367. } else if !ok {
  368. rr.removeSymbols([]string{name})
  369. }
  370. }
  371. for _, p := range paths {
  372. _, err := os.Stat(p)
  373. if err == nil {
  374. err = os.RemoveAll(p)
  375. if err != nil {
  376. results = append(results, err.Error())
  377. }
  378. } else {
  379. results = append(results, fmt.Sprintf("can't find %s", p))
  380. }
  381. }
  382. rr.removePluginInstallScript(name, t)
  383. if len(results) > 0 {
  384. return fmt.Errorf(strings.Join(results, "\n"))
  385. } else {
  386. rr.store(t, name, DELETED)
  387. if stop {
  388. go func() {
  389. time.Sleep(1 * time.Second)
  390. os.Exit(100)
  391. }()
  392. }
  393. return nil
  394. }
  395. }
  396. func (rr *Manager) GetPluginInfo(t plugin2.PluginType, name string) (map[string]interface{}, bool) {
  397. v, ok := rr.get(t, name)
  398. if strings.HasPrefix(v, "v") {
  399. v = v[1:]
  400. }
  401. if ok {
  402. r := map[string]interface{}{
  403. "name": name,
  404. "version": v,
  405. }
  406. if t == plugin2.FUNCTION {
  407. l := make([]string, 0)
  408. if ok, _ := rr.funcSymbolsDb.Get(name, &l); ok {
  409. r["functions"] = l
  410. }
  411. // ignore the error
  412. }
  413. return r, ok
  414. }
  415. return nil, false
  416. }
  417. func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []string) (string, error) {
  418. var filenames []string
  419. tempPath := path.Join(rr.pluginDir, "temp", plugin2.PluginTypes[t], name)
  420. defer os.RemoveAll(tempPath)
  421. r, err := zip.OpenReader(src)
  422. if err != nil {
  423. return "", err
  424. }
  425. defer r.Close()
  426. haveInstallFile := false
  427. for _, file := range r.File {
  428. fileName := file.Name
  429. if fileName == "install.sh" {
  430. haveInstallFile = true
  431. }
  432. }
  433. if len(shellParas) != 0 && !haveInstallFile {
  434. return "", fmt.Errorf("have shell parameters : %s but no install.sh file", shellParas)
  435. }
  436. soPrefix := regexp.MustCompile(fmt.Sprintf(`^((%s)|(%s))(@.*)?\.so$`, name, ucFirst(name)))
  437. var soPath string
  438. var yamlFile, yamlPath, version, soName string
  439. expFiles := 1
  440. if t == plugin2.SOURCE {
  441. yamlFile = name + ".yaml"
  442. yamlPath = path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], yamlFile)
  443. expFiles = 2
  444. }
  445. var revokeFiles []string
  446. defer func() {
  447. if err != nil {
  448. for _, f := range revokeFiles {
  449. os.RemoveAll(f)
  450. }
  451. }
  452. }()
  453. for _, file := range r.File {
  454. fileName := file.Name
  455. if yamlFile == fileName {
  456. err = filex.UnzipTo(file, yamlPath)
  457. if err != nil {
  458. return version, err
  459. }
  460. revokeFiles = append(revokeFiles, yamlPath)
  461. filenames = append(filenames, yamlPath)
  462. } else if fileName == name+".json" {
  463. jsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], fileName)
  464. if err := filex.UnzipTo(file, jsonPath); nil != err {
  465. conf.Log.Errorf("Failed to decompress the metadata %s file", fileName)
  466. } else {
  467. revokeFiles = append(revokeFiles, jsonPath)
  468. }
  469. } else if soPrefix.Match([]byte(fileName)) {
  470. soPath = path.Join(rr.pluginDir, plugin2.PluginTypes[t], fileName)
  471. err = filex.UnzipTo(file, soPath)
  472. if err != nil {
  473. return version, err
  474. }
  475. filenames = append(filenames, soPath)
  476. revokeFiles = append(revokeFiles, soPath)
  477. soName, version = parseName(fileName)
  478. } else if strings.HasPrefix(fileName, "etc/") {
  479. err = filex.UnzipTo(file, path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
  480. if err != nil {
  481. return version, err
  482. }
  483. } else { // unzip other files
  484. err = filex.UnzipTo(file, path.Join(tempPath, fileName))
  485. if err != nil {
  486. return version, err
  487. }
  488. }
  489. }
  490. if len(filenames) != expFiles {
  491. err = fmt.Errorf("invalid zip file: so file or conf file is missing")
  492. return version, err
  493. } else if haveInstallFile {
  494. // run install script if there is
  495. shell := make([]string, len(shellParas))
  496. copy(shell, shellParas)
  497. spath := path.Join(tempPath, "install.sh")
  498. shell = append(shell, spath)
  499. if 1 != len(shell) {
  500. copy(shell[1:], shell[0:])
  501. shell[0] = spath
  502. }
  503. conf.Log.Infof("run install script %s", strings.Join(shell, " "))
  504. cmd := exec.Command("/bin/sh", shell...)
  505. var outb, errb bytes.Buffer
  506. cmd.Stdout = &outb
  507. cmd.Stderr = &errb
  508. err := cmd.Run()
  509. if err != nil {
  510. conf.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
  511. return version, err
  512. }
  513. conf.Log.Infof(`run install script:%s`, outb.String())
  514. }
  515. if !conf.IsTesting {
  516. // load the runtime first
  517. _, err = manager.loadRuntime(t, soName, soPath, "")
  518. if err != nil {
  519. return version, err
  520. }
  521. }
  522. conf.Log.Infof("install %s plugin %s", plugin2.PluginTypes[t], name)
  523. return version, nil
  524. }
  525. // binder factory implementations
  526. func (rr *Manager) Source(name string) (api.Source, error) {
  527. nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", "")
  528. if err != nil {
  529. return nil, err
  530. }
  531. if nf == nil {
  532. return nil, nil
  533. }
  534. switch t := nf.(type) {
  535. case api.Source:
  536. return t, nil
  537. case func() api.Source:
  538. return t(), nil
  539. default:
  540. return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
  541. }
  542. }
  543. func (rr *Manager) SourcePluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
  544. _, ok := rr.GetPluginVersionBySymbol(plugin2.SOURCE, name)
  545. if ok {
  546. pluginName, _ := rr.GetPluginBySymbol(plugin2.SOURCE, name)
  547. installScript := ""
  548. pluginKey := plugin2.PluginTypes[plugin2.SOURCE] + "_" + pluginName
  549. rr.plgInstallDb.Get(pluginKey, &installScript)
  550. return plugin2.NATIVE_EXTENSION, pluginKey, installScript
  551. } else {
  552. return plugin2.NONE_EXTENSION, "", ""
  553. }
  554. }
  555. func (rr *Manager) LookupSource(name string) (api.LookupSource, error) {
  556. nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", ucFirst(name)+"Lookup")
  557. if err != nil {
  558. return nil, err
  559. }
  560. if nf == nil {
  561. return nil, nil
  562. }
  563. switch t := nf.(type) {
  564. case api.LookupSource:
  565. return t, nil
  566. case func() api.LookupSource:
  567. return t(), nil
  568. default:
  569. return nil, fmt.Errorf("exported symbol %s is not type of api.LookupSource or function that return api.LookupSource", t)
  570. }
  571. }
  572. func (rr *Manager) Sink(name string) (api.Sink, error) {
  573. nf, err := rr.loadRuntime(plugin2.SINK, name, "", "")
  574. if err != nil {
  575. return nil, err
  576. }
  577. if nf == nil {
  578. return nil, nil
  579. }
  580. var s api.Sink
  581. switch t := nf.(type) {
  582. case api.Sink:
  583. s = t
  584. case func() api.Sink:
  585. s = t()
  586. default:
  587. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
  588. }
  589. return s, nil
  590. }
  591. func (rr *Manager) SinkPluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
  592. _, ok := rr.GetPluginVersionBySymbol(plugin2.SINK, name)
  593. if ok {
  594. pluginName, _ := rr.GetPluginBySymbol(plugin2.SINK, name)
  595. installScript := ""
  596. pluginKey := plugin2.PluginTypes[plugin2.SINK] + "_" + pluginName
  597. rr.plgInstallDb.Get(pluginKey, &installScript)
  598. return plugin2.NATIVE_EXTENSION, pluginKey, installScript
  599. } else {
  600. return plugin2.NONE_EXTENSION, "", ""
  601. }
  602. }
  603. func (rr *Manager) Function(name string) (api.Function, error) {
  604. nf, err := rr.loadRuntime(plugin2.FUNCTION, name, "", "")
  605. if err != nil {
  606. return nil, err
  607. }
  608. if nf == nil {
  609. return nil, nil
  610. }
  611. var s api.Function
  612. switch t := nf.(type) {
  613. case api.Function:
  614. s = t
  615. case func() api.Function:
  616. s = t()
  617. default:
  618. return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
  619. }
  620. return s, nil
  621. }
  622. func (rr *Manager) HasFunctionSet(name string) bool {
  623. _, ok := rr.get(plugin2.FUNCTION, name)
  624. return ok
  625. }
  626. func (rr *Manager) FunctionPluginInfo(funcName string) (plugin2.EXTENSION_TYPE, string, string) {
  627. pluginName, ok := rr.GetPluginBySymbol(plugin2.FUNCTION, funcName)
  628. if ok {
  629. installScript := ""
  630. pluginKey := plugin2.PluginTypes[plugin2.FUNCTION] + "_" + pluginName
  631. rr.plgInstallDb.Get(pluginKey, &installScript)
  632. return plugin2.NATIVE_EXTENSION, pluginKey, installScript
  633. } else {
  634. return plugin2.NONE_EXTENSION, "", ""
  635. }
  636. }
  637. func (rr *Manager) ConvName(name string) (string, bool) {
  638. _, ok := rr.GetPluginBySymbol(plugin2.FUNCTION, name)
  639. if ok {
  640. return name, true
  641. }
  642. return name, false
  643. }
  644. // If not found, return nil,nil; Other errors return nil, err
  645. func (rr *Manager) loadRuntime(t plugin2.PluginType, soName, soFilepath, symbolName string) (plugin.Symbol, error) {
  646. ptype := plugin2.PluginTypes[t]
  647. key := ptype + "/" + soName
  648. var (
  649. plug *plugin.Plugin
  650. ok bool
  651. err error
  652. )
  653. rr.RLock()
  654. plug, ok = rr.runtime[key]
  655. rr.RUnlock()
  656. if !ok {
  657. var soPath string
  658. if soFilepath != "" {
  659. soPath = soFilepath
  660. } else {
  661. mod, err := rr.getSoFilePath(t, soName, false)
  662. if err != nil {
  663. conf.Log.Debugf(fmt.Sprintf("cannot find the native plugin %s in path: %v", soName, err))
  664. return nil, nil
  665. }
  666. soPath = mod
  667. }
  668. conf.Log.Debugf("Opening plugin %s", soPath)
  669. plug, err = plugin.Open(soPath)
  670. if err != nil {
  671. conf.Log.Errorf(fmt.Sprintf("plugin %s open error: %v", soName, err))
  672. return nil, fmt.Errorf("cannot open %s: %v", soPath, err)
  673. }
  674. rr.Lock()
  675. rr.runtime[key] = plug
  676. rr.Unlock()
  677. conf.Log.Debugf("Successfully open plugin %s", soPath)
  678. }
  679. if symbolName == "" {
  680. symbolName = ucFirst(soName)
  681. }
  682. conf.Log.Debugf("Loading symbol %s", symbolName)
  683. nf, err := plug.Lookup(symbolName)
  684. if err != nil {
  685. conf.Log.Warnf(fmt.Sprintf("cannot find symbol %s, please check if it is exported: %v", symbolName, err))
  686. return nil, nil
  687. }
  688. conf.Log.Debugf("Successfully look-up plugin %s", symbolName)
  689. return nf, nil
  690. }
  691. // Return the lowercase version of so name. It may be upper case in path.
  692. func (rr *Manager) getSoFilePath(t plugin2.PluginType, name string, isSoName bool) (string, error) {
  693. var (
  694. v string
  695. soname string
  696. ok bool
  697. )
  698. // We must identify plugin or symbol when deleting function plugin
  699. if isSoName {
  700. soname = name
  701. } else {
  702. soname, ok = rr.GetPluginBySymbol(t, name)
  703. if !ok {
  704. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
  705. }
  706. }
  707. v, ok = rr.get(t, soname)
  708. if !ok {
  709. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
  710. }
  711. soFile := soname + ".so"
  712. if v != "" {
  713. soFile = fmt.Sprintf("%s@%s.so", soname, v)
  714. }
  715. p := path.Join(rr.pluginDir, plugin2.PluginTypes[t], soFile)
  716. if _, err := os.Stat(p); err != nil {
  717. p = path.Join(rr.pluginDir, plugin2.PluginTypes[t], ucFirst(soFile))
  718. }
  719. if _, err := os.Stat(p); err != nil {
  720. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
  721. }
  722. return p, nil
  723. }
  724. func parseName(n string) (string, string) {
  725. result := strings.Split(n, ".so")
  726. result = strings.Split(result[0], "@")
  727. name := lcFirst(result[0])
  728. if len(result) > 1 {
  729. return name, result[1]
  730. }
  731. return name, ""
  732. }
  733. func ucFirst(str string) string {
  734. for i, v := range str {
  735. return string(unicode.ToUpper(v)) + str[i+1:]
  736. }
  737. return ""
  738. }
  739. func lcFirst(str string) string {
  740. for i, v := range str {
  741. return string(unicode.ToLower(v)) + str[i+1:]
  742. }
  743. return ""
  744. }
  745. func (rr *Manager) UninstallAllPlugins() {
  746. keys, err := rr.plgInstallDb.Keys()
  747. if err != nil {
  748. return
  749. }
  750. for _, v := range keys {
  751. plgType := plugin2.PluginTypeMap[strings.Split(v, "_")[0]]
  752. plgName := strings.Split(v, "_")[1]
  753. _ = rr.Delete(plgType, plgName, false)
  754. }
  755. }
  756. func (rr *Manager) GetAllPlugins() map[string]string {
  757. allPlgs, err := rr.plgInstallDb.All()
  758. if err != nil {
  759. return nil
  760. }
  761. delete(allPlgs, BOOT_INSTALL)
  762. return allPlgs
  763. }
  764. func (rr *Manager) GetAllPluginsStatus() map[string]string {
  765. allPlgs, err := rr.plgStatusDb.All()
  766. if err != nil {
  767. return nil
  768. }
  769. return allPlgs
  770. }
  771. const BOOT_INSTALL = "$boot_install"
  772. // PluginImport save the plugin install information and wait for restart
  773. func (rr *Manager) PluginImport(plugins map[string]string) error {
  774. if len(plugins) == 0 {
  775. return nil
  776. }
  777. for k, v := range plugins {
  778. err := rr.plgInstallDb.Set(k, v)
  779. if err != nil {
  780. return err
  781. }
  782. }
  783. // set the flag to install the plugins when eKuiper reboot
  784. err := rr.plgInstallDb.Set(BOOT_INSTALL, BOOT_INSTALL)
  785. if err != nil {
  786. return err
  787. }
  788. return nil
  789. }
  790. // PluginPartialImport compare the plugin to be installed and the one in database
  791. // if not exist in database, install;
  792. // if exist, ignore
  793. func (rr *Manager) PluginPartialImport(plugins map[string]string) map[string]string {
  794. errMap := map[string]string{}
  795. for k, v := range plugins {
  796. plugInScript := ""
  797. found, _ := rr.plgInstallDb.Get(k, &plugInScript)
  798. if !found {
  799. err := rr.pluginRegisterForImport(k, v)
  800. if err != nil {
  801. errMap[k] = err.Error()
  802. }
  803. }
  804. }
  805. return errMap
  806. }
  807. func (rr *Manager) hasInstallFlag() bool {
  808. val := ""
  809. found, _ := rr.plgInstallDb.Get(BOOT_INSTALL, &val)
  810. return found
  811. }
  812. func (rr *Manager) clearInstallFlag() {
  813. _ = rr.plgInstallDb.Delete(BOOT_INSTALL)
  814. }
  815. func (rr *Manager) pluginRegisterForImport(key, script string) error {
  816. plgType := plugin2.PluginTypeMap[strings.Split(key, "_")[0]]
  817. sd := plugin2.NewPluginByType(plgType)
  818. err := json.Unmarshal([]byte(script), &sd)
  819. if err != nil {
  820. return err
  821. }
  822. err = rr.Register(plgType, sd)
  823. if err != nil {
  824. conf.Log.Errorf(`install native plugin %s error: %v`, key, err)
  825. return err
  826. }
  827. return nil
  828. }
  829. func (rr *Manager) pluginInstallWhenReboot() {
  830. allPlgs, err := rr.plgInstallDb.All()
  831. if err != nil {
  832. return
  833. }
  834. delete(allPlgs, BOOT_INSTALL)
  835. _ = rr.plgStatusDb.Clean()
  836. for k, v := range allPlgs {
  837. err := rr.pluginRegisterForImport(k, v)
  838. _ = rr.plgStatusDb.Set(k, err.Error())
  839. }
  840. }