manager.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Manage the loading of both native and portable plugins
  15. package native
  16. import (
  17. "archive/zip"
  18. "bytes"
  19. "encoding/json"
  20. "fmt"
  21. "os"
  22. "os/exec"
  23. "path"
  24. "path/filepath"
  25. "plugin"
  26. "regexp"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unicode"
  31. "github.com/lf-edge/ekuiper/internal/conf"
  32. "github.com/lf-edge/ekuiper/internal/meta"
  33. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  34. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  35. "github.com/lf-edge/ekuiper/internal/pkg/store"
  36. plugin2 "github.com/lf-edge/ekuiper/internal/plugin"
  37. "github.com/lf-edge/ekuiper/pkg/api"
  38. "github.com/lf-edge/ekuiper/pkg/cast"
  39. "github.com/lf-edge/ekuiper/pkg/errorx"
  40. "github.com/lf-edge/ekuiper/pkg/kv"
  41. )
  42. // Manager Initialized in the binder
  43. var manager *Manager
  44. const DELETED = "$deleted"
  45. // Manager is appended only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
  46. type Manager struct {
  47. sync.RWMutex
  48. // 3 maps for source/sink/function. In each map, key is the plugin name, value is the version
  49. plugins []map[string]string
  50. // A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
  51. symbols map[string]string
  52. // loaded symbols in current runtime
  53. runtime map[string]*plugin.Plugin
  54. // dirs
  55. pluginDir string
  56. pluginConfDir string
  57. // the access to func symbols db
  58. funcSymbolsDb kv.KeyValue
  59. // the access to plugin install script db
  60. plgInstallDb kv.KeyValue
  61. // the access to plugin install status db
  62. plgStatusDb kv.KeyValue
  63. }
  64. // InitManager must only be called once
  65. func InitManager() (*Manager, error) {
  66. pluginDir, err := conf.GetPluginsLoc()
  67. if err != nil {
  68. return nil, fmt.Errorf("cannot find plugins folder: %s", err)
  69. }
  70. dataDir, err := conf.GetDataLoc()
  71. if err != nil {
  72. return nil, fmt.Errorf("cannot find data folder: %s", err)
  73. }
  74. func_db, err := store.GetKV("pluginFuncs")
  75. if err != nil {
  76. return nil, fmt.Errorf("error when opening funcSymbolsdb: %v", err)
  77. }
  78. plg_db, err := store.GetKV("nativePlugin")
  79. if err != nil {
  80. return nil, fmt.Errorf("error when opening nativePlugin: %v", err)
  81. }
  82. plg_status_db, err := store.GetKV("nativePluginStatus")
  83. if err != nil {
  84. return nil, fmt.Errorf("error when opening nativePluginStatus: %v", err)
  85. }
  86. registry := &Manager{symbols: make(map[string]string), funcSymbolsDb: func_db, plgInstallDb: plg_db, plgStatusDb: plg_status_db, pluginDir: pluginDir, pluginConfDir: dataDir, runtime: make(map[string]*plugin.Plugin)}
  87. manager = registry
  88. plugins := make([]map[string]string, 3)
  89. for i := range plugins {
  90. names, err := findAll(plugin2.PluginType(i), pluginDir)
  91. if err != nil {
  92. return nil, fmt.Errorf("fail to find existing plugins: %s", err)
  93. }
  94. plugins[i] = names
  95. }
  96. registry.plugins = plugins
  97. for pf := range plugins[plugin2.FUNCTION] {
  98. l := make([]string, 0)
  99. if ok, err := func_db.Get(pf, &l); ok {
  100. _ = registry.storeSymbols(pf, l)
  101. } else if err != nil {
  102. return nil, fmt.Errorf("error when querying kv: %s", err)
  103. } else {
  104. _ = registry.storeSymbols(pf, []string{pf})
  105. }
  106. }
  107. if manager.hasInstallFlag() {
  108. manager.pluginInstallWhenReboot()
  109. manager.clearInstallFlag()
  110. }
  111. return registry, nil
  112. }
  113. func findAll(t plugin2.PluginType, pluginDir string) (result map[string]string, err error) {
  114. result = make(map[string]string)
  115. dir := path.Join(pluginDir, plugin2.PluginTypes[t])
  116. files, err := os.ReadDir(dir)
  117. if err != nil {
  118. return
  119. }
  120. for _, file := range files {
  121. baseName := filepath.Base(file.Name())
  122. if strings.HasSuffix(baseName, ".so") {
  123. n, v := parseName(baseName)
  124. // load the plugins when ekuiper set up
  125. if !conf.IsTesting {
  126. if _, err := manager.loadRuntime(t, n, path.Join(dir, baseName), ""); err != nil {
  127. continue
  128. }
  129. }
  130. result[n] = v
  131. }
  132. }
  133. return
  134. }
  135. func (rr *Manager) get(t plugin2.PluginType, name string) (string, bool) {
  136. rr.RLock()
  137. result := rr.plugins[t]
  138. rr.RUnlock()
  139. r, ok := result[name]
  140. return r, ok
  141. }
  142. func (rr *Manager) store(t plugin2.PluginType, name string, version string) {
  143. rr.Lock()
  144. rr.plugins[t][name] = version
  145. rr.Unlock()
  146. }
  147. func (rr *Manager) storeSymbols(name string, symbols []string) error {
  148. rr.Lock()
  149. defer rr.Unlock()
  150. for _, s := range symbols {
  151. if _, ok := rr.symbols[s]; ok {
  152. return fmt.Errorf("function name %s already exists", s)
  153. } else {
  154. rr.symbols[s] = name
  155. }
  156. }
  157. return nil
  158. }
  159. func (rr *Manager) removeSymbols(symbols []string) {
  160. rr.Lock()
  161. for _, s := range symbols {
  162. delete(rr.symbols, s)
  163. }
  164. rr.Unlock()
  165. }
  166. // API for management
  167. func (rr *Manager) List(t plugin2.PluginType) []string {
  168. rr.RLock()
  169. result := rr.plugins[t]
  170. rr.RUnlock()
  171. keys := make([]string, 0, len(result))
  172. for k := range result {
  173. keys = append(keys, k)
  174. }
  175. return keys
  176. }
  177. func (rr *Manager) ListSymbols() []string {
  178. rr.RLock()
  179. result := rr.symbols
  180. rr.RUnlock()
  181. keys := make([]string, 0, len(result))
  182. for k := range result {
  183. keys = append(keys, k)
  184. }
  185. return keys
  186. }
  187. func (rr *Manager) GetPluginVersionBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
  188. switch t {
  189. case plugin2.FUNCTION:
  190. rr.RLock()
  191. result := rr.plugins[t]
  192. name, ok := rr.symbols[symbolName]
  193. rr.RUnlock()
  194. if ok {
  195. r, nok := result[name]
  196. return r, nok
  197. } else {
  198. return "", false
  199. }
  200. default:
  201. return rr.get(t, symbolName)
  202. }
  203. }
  204. func (rr *Manager) GetPluginBySymbol(t plugin2.PluginType, symbolName string) (string, bool) {
  205. switch t {
  206. case plugin2.FUNCTION:
  207. rr.RLock()
  208. defer rr.RUnlock()
  209. name, ok := rr.symbols[symbolName]
  210. return name, ok
  211. default:
  212. return symbolName, true
  213. }
  214. }
  215. func (rr *Manager) storePluginInstallScript(name string, t plugin2.PluginType, j plugin2.Plugin) {
  216. key := plugin2.PluginTypes[t] + "_" + name
  217. val := string(j.GetInstallScripts())
  218. _ = rr.plgInstallDb.Set(key, val)
  219. }
  220. func (rr *Manager) removePluginInstallScript(name string, t plugin2.PluginType) {
  221. key := plugin2.PluginTypes[t] + "_" + name
  222. _ = rr.plgInstallDb.Delete(key)
  223. }
  224. func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
  225. name, uri, shellParas := j.GetName(), j.GetFile(), j.GetShellParas()
  226. // Validation
  227. name = strings.Trim(name, " ")
  228. if name == "" {
  229. return fmt.Errorf("invalid name %s: should not be empty", name)
  230. }
  231. if !httpx.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  232. return fmt.Errorf("invalid uri %s", uri)
  233. }
  234. if v, ok := rr.get(t, name); ok {
  235. if v == DELETED {
  236. conf.Log.Debugf("update the plugin %s is marked as deleted but eKuiper is not restarted for the change to take effect yet", name)
  237. } else {
  238. return fmt.Errorf("invalid name %s: duplicate", name)
  239. }
  240. }
  241. var err error
  242. zipPath := path.Join(rr.pluginDir, name+".zip")
  243. // clean up: delete zip file and unzip files in error
  244. defer func(name string) { _ = os.Remove(name) }(zipPath)
  245. // download
  246. err = httpx.DownloadFile(zipPath, uri)
  247. if err != nil {
  248. return fmt.Errorf("fail to download file %s: %s", uri, err)
  249. }
  250. if t == plugin2.FUNCTION {
  251. if len(j.GetSymbols()) > 0 {
  252. err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
  253. if err != nil {
  254. return err
  255. }
  256. err = rr.storeSymbols(name, j.GetSymbols())
  257. } else {
  258. err = rr.storeSymbols(name, []string{name})
  259. }
  260. }
  261. if err != nil {
  262. return err
  263. }
  264. // unzip and copy to destination
  265. version, err := rr.install(t, name, zipPath, shellParas)
  266. if err == nil && len(j.GetSymbols()) > 0 {
  267. err = rr.funcSymbolsDb.Set(name, j.GetSymbols())
  268. }
  269. if err != nil { // Revert for any errors
  270. if len(j.GetSymbols()) > 0 {
  271. rr.removeSymbols(j.GetSymbols())
  272. } else {
  273. rr.removeSymbols([]string{name})
  274. }
  275. return fmt.Errorf("fail to install plugin: %s", err)
  276. }
  277. rr.store(t, name, version)
  278. rr.storePluginInstallScript(name, t, j)
  279. switch t {
  280. case plugin2.SINK:
  281. if err := meta.ReadSinkMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), true); nil != err {
  282. conf.Log.Errorf("readSinkFile:%v", err)
  283. }
  284. case plugin2.SOURCE:
  285. isScan := true
  286. isLookup := true
  287. _, err := rr.Source(name)
  288. if err != nil {
  289. isScan = false
  290. }
  291. _, err = rr.LookupSource(name)
  292. if err != nil {
  293. isLookup = false
  294. }
  295. if err := meta.ReadSourceMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), isScan, isLookup); nil != err {
  296. conf.Log.Errorf("readSourceFile:%v", err)
  297. }
  298. }
  299. return nil
  300. }
  301. // RegisterFuncs prerequisite:function plugin of name exists
  302. func (rr *Manager) RegisterFuncs(name string, functions []string) error {
  303. if len(functions) == 0 {
  304. return fmt.Errorf("property 'functions' must not be empty")
  305. }
  306. old := make([]string, 0)
  307. if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
  308. return err
  309. } else if ok {
  310. rr.removeSymbols(old)
  311. } else if !ok {
  312. rr.removeSymbols([]string{name})
  313. }
  314. err := rr.funcSymbolsDb.Set(name, functions)
  315. if err != nil {
  316. return err
  317. }
  318. return rr.storeSymbols(name, functions)
  319. }
  320. func (rr *Manager) Delete(t plugin2.PluginType, name string, stop bool) error {
  321. name = strings.Trim(name, " ")
  322. if name == "" {
  323. return fmt.Errorf("invalid name %s: should not be empty", name)
  324. }
  325. if v, ok := rr.get(t, name); ok && v == DELETED {
  326. conf.Log.Debugf("plugin %s is already deleted", name)
  327. return nil
  328. }
  329. soPath, err := rr.getSoFilePath(t, name, true)
  330. if err != nil {
  331. return err
  332. }
  333. var results []string
  334. paths := []string{
  335. soPath,
  336. }
  337. // Find etc folder
  338. etcPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name)
  339. if fi, err := os.Stat(etcPath); err == nil {
  340. if fi.Mode().IsDir() {
  341. paths = append(paths, etcPath)
  342. }
  343. }
  344. switch t {
  345. case plugin2.SOURCE:
  346. yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".yaml")
  347. _ = os.Remove(yamlPaths)
  348. srcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SOURCE], name+".json")
  349. _ = os.Remove(srcJsonPath)
  350. meta.UninstallSource(name)
  351. case plugin2.SINK:
  352. yamlPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".yaml")
  353. _ = os.Remove(yamlPaths)
  354. sinkJsonPaths := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.SINK], name+".json")
  355. _ = os.Remove(sinkJsonPaths)
  356. meta.UninstallSink(name)
  357. case plugin2.FUNCTION:
  358. funcJsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[plugin2.FUNCTION], name+".json")
  359. _ = os.Remove(funcJsonPath)
  360. old := make([]string, 0)
  361. if ok, err := rr.funcSymbolsDb.Get(name, &old); err != nil {
  362. return err
  363. } else if ok {
  364. rr.removeSymbols(old)
  365. err := rr.funcSymbolsDb.Delete(name)
  366. if err != nil {
  367. return err
  368. }
  369. } else if !ok {
  370. rr.removeSymbols([]string{name})
  371. }
  372. }
  373. for _, p := range paths {
  374. _, err := os.Stat(p)
  375. if err == nil {
  376. err = os.RemoveAll(p)
  377. if err != nil {
  378. results = append(results, err.Error())
  379. }
  380. } else {
  381. results = append(results, fmt.Sprintf("can't find %s", p))
  382. }
  383. }
  384. rr.removePluginInstallScript(name, t)
  385. if len(results) > 0 {
  386. return fmt.Errorf(strings.Join(results, "\n"))
  387. } else {
  388. rr.store(t, name, DELETED)
  389. if stop {
  390. go func() {
  391. time.Sleep(1 * time.Second)
  392. os.Exit(100)
  393. }()
  394. }
  395. return nil
  396. }
  397. }
  398. func (rr *Manager) GetPluginInfo(t plugin2.PluginType, name string) (map[string]interface{}, bool) {
  399. v, ok := rr.get(t, name)
  400. if strings.HasPrefix(v, "v") {
  401. v = v[1:]
  402. }
  403. if ok {
  404. r := map[string]interface{}{
  405. "name": name,
  406. "version": v,
  407. }
  408. if t == plugin2.FUNCTION {
  409. l := make([]string, 0)
  410. if ok, _ := rr.funcSymbolsDb.Get(name, &l); ok {
  411. r["functions"] = l
  412. }
  413. // ignore the error
  414. }
  415. return r, ok
  416. }
  417. return nil, false
  418. }
  419. func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []string) (string, error) {
  420. var filenames []string
  421. tempPath := path.Join(rr.pluginDir, "temp", plugin2.PluginTypes[t], name)
  422. defer func(path string) {
  423. _ = os.RemoveAll(path)
  424. }(tempPath)
  425. r, err := zip.OpenReader(src)
  426. if err != nil {
  427. return "", err
  428. }
  429. defer func(r *zip.ReadCloser) {
  430. _ = r.Close()
  431. }(r)
  432. haveInstallFile := false
  433. for _, file := range r.File {
  434. fileName := file.Name
  435. if fileName == "install.sh" {
  436. haveInstallFile = true
  437. }
  438. }
  439. if len(shellParas) != 0 && !haveInstallFile {
  440. return "", fmt.Errorf("have shell parameters : %s but no install.sh file", shellParas)
  441. }
  442. soPrefix := regexp.MustCompile(fmt.Sprintf(`^((%s)|(%s))(@.*)?\.so$`, name, ucFirst(name)))
  443. var soPath string
  444. var yamlFile, yamlPath, version, soName string
  445. expFiles := 1
  446. if t == plugin2.SOURCE {
  447. yamlFile = name + ".yaml"
  448. yamlPath = path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], yamlFile)
  449. expFiles = 2
  450. }
  451. var revokeFiles []string
  452. defer func() {
  453. if err != nil {
  454. for _, f := range revokeFiles {
  455. _ = os.RemoveAll(f)
  456. }
  457. }
  458. }()
  459. for _, file := range r.File {
  460. fileName := file.Name
  461. if yamlFile == fileName {
  462. err = filex.UnzipTo(file, yamlPath)
  463. if err != nil {
  464. return version, err
  465. }
  466. revokeFiles = append(revokeFiles, yamlPath)
  467. filenames = append(filenames, yamlPath)
  468. } else if fileName == name+".json" {
  469. jsonPath := path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], fileName)
  470. if err := filex.UnzipTo(file, jsonPath); nil != err {
  471. conf.Log.Errorf("Failed to decompress the metadata %s file", fileName)
  472. } else {
  473. revokeFiles = append(revokeFiles, jsonPath)
  474. }
  475. } else if soPrefix.Match([]byte(fileName)) {
  476. soPath = path.Join(rr.pluginDir, plugin2.PluginTypes[t], fileName)
  477. err = filex.UnzipTo(file, soPath)
  478. if err != nil {
  479. return version, err
  480. }
  481. filenames = append(filenames, soPath)
  482. revokeFiles = append(revokeFiles, soPath)
  483. soName, version = parseName(fileName)
  484. } else if strings.HasPrefix(fileName, "etc/") {
  485. err = filex.UnzipTo(file, path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
  486. if err != nil {
  487. return version, err
  488. }
  489. } else { // unzip other files
  490. err = filex.UnzipTo(file, path.Join(tempPath, fileName))
  491. if err != nil {
  492. return version, err
  493. }
  494. }
  495. }
  496. if len(filenames) != expFiles {
  497. err = fmt.Errorf("invalid zip file: so file or conf file is missing")
  498. return version, err
  499. } else if haveInstallFile {
  500. // run install script if there is
  501. shell := make([]string, len(shellParas))
  502. copy(shell, shellParas)
  503. spath := path.Join(tempPath, "install.sh")
  504. shell = append(shell, spath)
  505. if 1 != len(shell) {
  506. copy(shell[1:], shell[0:])
  507. shell[0] = spath
  508. }
  509. conf.Log.Infof("run install script %s", strings.Join(shell, " "))
  510. cmd := exec.Command("/bin/sh", shell...)
  511. var outb, errb bytes.Buffer
  512. cmd.Stdout = &outb
  513. cmd.Stderr = &errb
  514. cmd.Dir = tempPath
  515. err := cmd.Run()
  516. if err != nil {
  517. conf.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
  518. return version, err
  519. }
  520. conf.Log.Infof(`run install script:%s`, outb.String())
  521. }
  522. if !conf.IsTesting {
  523. // load the runtime first
  524. _, err = manager.loadRuntime(t, soName, soPath, "")
  525. if err != nil {
  526. return version, err
  527. }
  528. }
  529. conf.Log.Infof("install %s plugin %s", plugin2.PluginTypes[t], name)
  530. return version, nil
  531. }
  532. // binder factory implementations
  533. func (rr *Manager) Source(name string) (api.Source, error) {
  534. nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", "")
  535. if err != nil {
  536. return nil, err
  537. }
  538. if nf == nil {
  539. return nil, nil
  540. }
  541. switch t := nf.(type) {
  542. case api.Source:
  543. return t, nil
  544. case func() api.Source:
  545. return t(), nil
  546. default:
  547. return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
  548. }
  549. }
  550. func (rr *Manager) SourcePluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
  551. _, ok := rr.GetPluginVersionBySymbol(plugin2.SOURCE, name)
  552. if ok {
  553. pluginName, _ := rr.GetPluginBySymbol(plugin2.SOURCE, name)
  554. installScript := ""
  555. pluginKey := plugin2.PluginTypes[plugin2.SOURCE] + "_" + pluginName
  556. _, _ = rr.plgInstallDb.Get(pluginKey, &installScript)
  557. return plugin2.NATIVE_EXTENSION, pluginKey, installScript
  558. } else {
  559. return plugin2.NONE_EXTENSION, "", ""
  560. }
  561. }
  562. func (rr *Manager) LookupSource(name string) (api.LookupSource, error) {
  563. nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", ucFirst(name)+"Lookup")
  564. if err != nil {
  565. return nil, err
  566. }
  567. if nf == nil {
  568. return nil, nil
  569. }
  570. switch t := nf.(type) {
  571. case api.LookupSource:
  572. return t, nil
  573. case func() api.LookupSource:
  574. return t(), nil
  575. default:
  576. return nil, fmt.Errorf("exported symbol %s is not type of api.LookupSource or function that return api.LookupSource", t)
  577. }
  578. }
  579. func (rr *Manager) Sink(name string) (api.Sink, error) {
  580. nf, err := rr.loadRuntime(plugin2.SINK, name, "", "")
  581. if err != nil {
  582. return nil, err
  583. }
  584. if nf == nil {
  585. return nil, nil
  586. }
  587. var s api.Sink
  588. switch t := nf.(type) {
  589. case api.Sink:
  590. s = t
  591. case func() api.Sink:
  592. s = t()
  593. default:
  594. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
  595. }
  596. return s, nil
  597. }
  598. func (rr *Manager) SinkPluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
  599. _, ok := rr.GetPluginVersionBySymbol(plugin2.SINK, name)
  600. if ok {
  601. pluginName, _ := rr.GetPluginBySymbol(plugin2.SINK, name)
  602. installScript := ""
  603. pluginKey := plugin2.PluginTypes[plugin2.SINK] + "_" + pluginName
  604. _, _ = rr.plgInstallDb.Get(pluginKey, &installScript)
  605. return plugin2.NATIVE_EXTENSION, pluginKey, installScript
  606. } else {
  607. return plugin2.NONE_EXTENSION, "", ""
  608. }
  609. }
  610. func (rr *Manager) Function(name string) (api.Function, error) {
  611. nf, err := rr.loadRuntime(plugin2.FUNCTION, name, "", "")
  612. if err != nil {
  613. return nil, err
  614. }
  615. if nf == nil {
  616. return nil, nil
  617. }
  618. var s api.Function
  619. switch t := nf.(type) {
  620. case api.Function:
  621. s = t
  622. case func() api.Function:
  623. s = t()
  624. default:
  625. return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
  626. }
  627. return s, nil
  628. }
  629. func (rr *Manager) HasFunctionSet(name string) bool {
  630. _, ok := rr.get(plugin2.FUNCTION, name)
  631. return ok
  632. }
  633. func (rr *Manager) FunctionPluginInfo(funcName string) (plugin2.EXTENSION_TYPE, string, string) {
  634. pluginName, ok := rr.GetPluginBySymbol(plugin2.FUNCTION, funcName)
  635. if ok {
  636. installScript := ""
  637. pluginKey := plugin2.PluginTypes[plugin2.FUNCTION] + "_" + pluginName
  638. _, _ = rr.plgInstallDb.Get(pluginKey, &installScript)
  639. return plugin2.NATIVE_EXTENSION, pluginKey, installScript
  640. } else {
  641. return plugin2.NONE_EXTENSION, "", ""
  642. }
  643. }
  644. func (rr *Manager) ConvName(name string) (string, bool) {
  645. _, ok := rr.GetPluginBySymbol(plugin2.FUNCTION, name)
  646. if ok {
  647. return name, true
  648. }
  649. return name, false
  650. }
  651. // If not found, return nil,nil; Other errors return nil, err
  652. func (rr *Manager) loadRuntime(t plugin2.PluginType, soName, soFilepath, symbolName string) (plugin.Symbol, error) {
  653. ptype := plugin2.PluginTypes[t]
  654. key := ptype + "/" + soName
  655. var (
  656. plug *plugin.Plugin
  657. ok bool
  658. err error
  659. )
  660. rr.RLock()
  661. plug, ok = rr.runtime[key]
  662. rr.RUnlock()
  663. if !ok {
  664. var soPath string
  665. if soFilepath != "" {
  666. soPath = soFilepath
  667. } else {
  668. mod, err := rr.getSoFilePath(t, soName, false)
  669. if err != nil {
  670. conf.Log.Debugf(fmt.Sprintf("cannot find the native plugin %s in path: %v", soName, err))
  671. return nil, nil
  672. }
  673. soPath = mod
  674. }
  675. conf.Log.Debugf("Opening plugin %s", soPath)
  676. plug, err = plugin.Open(soPath)
  677. if err != nil {
  678. conf.Log.Errorf(fmt.Sprintf("plugin %s open error: %v", soName, err))
  679. return nil, fmt.Errorf("cannot open %s: %v", soPath, err)
  680. }
  681. rr.Lock()
  682. rr.runtime[key] = plug
  683. rr.Unlock()
  684. conf.Log.Debugf("Successfully open plugin %s", soPath)
  685. }
  686. if symbolName == "" {
  687. symbolName = ucFirst(soName)
  688. }
  689. conf.Log.Debugf("Loading symbol %s", symbolName)
  690. nf, err := plug.Lookup(symbolName)
  691. if err != nil {
  692. conf.Log.Warnf(fmt.Sprintf("cannot find symbol %s, please check if it is exported: %v", symbolName, err))
  693. return nil, nil
  694. }
  695. conf.Log.Debugf("Successfully look-up plugin %s", symbolName)
  696. return nf, nil
  697. }
  698. // Return the lowercase version of so name. It may be upper case in path.
  699. func (rr *Manager) getSoFilePath(t plugin2.PluginType, name string, isSoName bool) (string, error) {
  700. var (
  701. v string
  702. soname string
  703. ok bool
  704. )
  705. // We must identify plugin or symbol when deleting function plugin
  706. if isSoName {
  707. soname = name
  708. } else {
  709. soname, ok = rr.GetPluginBySymbol(t, name)
  710. if !ok {
  711. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid symbol name %s: not exist", name))
  712. }
  713. }
  714. v, ok = rr.get(t, soname)
  715. if !ok {
  716. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("invalid name %s: not exist", soname))
  717. }
  718. soFile := soname + ".so"
  719. if v != "" {
  720. soFile = fmt.Sprintf("%s@%s.so", soname, v)
  721. }
  722. p := path.Join(rr.pluginDir, plugin2.PluginTypes[t], soFile)
  723. if _, err := os.Stat(p); err != nil {
  724. p = path.Join(rr.pluginDir, plugin2.PluginTypes[t], ucFirst(soFile))
  725. }
  726. if _, err := os.Stat(p); err != nil {
  727. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("cannot find .so file for plugin %s", soname))
  728. }
  729. return p, nil
  730. }
  731. func parseName(n string) (string, string) {
  732. result := strings.Split(n, ".so")
  733. result = strings.Split(result[0], "@")
  734. name := lcFirst(result[0])
  735. if len(result) > 1 {
  736. return name, result[1]
  737. }
  738. return name, ""
  739. }
  740. func ucFirst(str string) string {
  741. for i, v := range str {
  742. return string(unicode.ToUpper(v)) + str[i+1:]
  743. }
  744. return ""
  745. }
  746. func lcFirst(str string) string {
  747. for i, v := range str {
  748. return string(unicode.ToLower(v)) + str[i+1:]
  749. }
  750. return ""
  751. }
  752. func (rr *Manager) UninstallAllPlugins() {
  753. keys, err := rr.plgInstallDb.Keys()
  754. if err != nil {
  755. return
  756. }
  757. for _, v := range keys {
  758. plgType := plugin2.PluginTypeMap[strings.Split(v, "_")[0]]
  759. plgName := strings.Split(v, "_")[1]
  760. _ = rr.Delete(plgType, plgName, false)
  761. }
  762. }
  763. func (rr *Manager) GetAllPlugins() map[string]string {
  764. allPlgs, err := rr.plgInstallDb.All()
  765. if err != nil {
  766. return nil
  767. }
  768. delete(allPlgs, BOOT_INSTALL)
  769. return allPlgs
  770. }
  771. func (rr *Manager) GetAllPluginsStatus() map[string]string {
  772. allPlgs, err := rr.plgStatusDb.All()
  773. if err != nil {
  774. return nil
  775. }
  776. return allPlgs
  777. }
  778. const BOOT_INSTALL = "$boot_install"
  779. // PluginImport save the plugin install information and wait for restart
  780. func (rr *Manager) PluginImport(plugins map[string]string) error {
  781. if len(plugins) == 0 {
  782. return nil
  783. }
  784. for k, v := range plugins {
  785. err := rr.plgInstallDb.Set(k, v)
  786. if err != nil {
  787. return err
  788. }
  789. }
  790. // set the flag to install the plugins when eKuiper reboot
  791. err := rr.plgInstallDb.Set(BOOT_INSTALL, BOOT_INSTALL)
  792. if err != nil {
  793. return err
  794. }
  795. return nil
  796. }
  797. // PluginPartialImport compare the plugin to be installed and the one in database
  798. // if not exist in database, install;
  799. // if exist, ignore
  800. func (rr *Manager) PluginPartialImport(plugins map[string]string) map[string]string {
  801. errMap := map[string]string{}
  802. for k, v := range plugins {
  803. plugInScript := ""
  804. found, _ := rr.plgInstallDb.Get(k, &plugInScript)
  805. if !found {
  806. err := rr.pluginRegisterForImport(k, v)
  807. if err != nil {
  808. errMap[k] = err.Error()
  809. }
  810. }
  811. }
  812. return errMap
  813. }
  814. func (rr *Manager) hasInstallFlag() bool {
  815. val := ""
  816. found, _ := rr.plgInstallDb.Get(BOOT_INSTALL, &val)
  817. return found
  818. }
  819. func (rr *Manager) clearInstallFlag() {
  820. _ = rr.plgInstallDb.Delete(BOOT_INSTALL)
  821. }
  822. func (rr *Manager) pluginRegisterForImport(key, script string) error {
  823. plgType := plugin2.PluginTypeMap[strings.Split(key, "_")[0]]
  824. sd := plugin2.NewPluginByType(plgType)
  825. err := json.Unmarshal(cast.StringToBytes(script), &sd)
  826. if err != nil {
  827. return err
  828. }
  829. err = rr.Register(plgType, sd)
  830. if err != nil {
  831. conf.Log.Errorf(`install native plugin %s error: %v`, key, err)
  832. return err
  833. }
  834. return nil
  835. }
  836. func (rr *Manager) pluginInstallWhenReboot() {
  837. allPlgs, err := rr.plgInstallDb.All()
  838. if err != nil {
  839. return
  840. }
  841. delete(allPlgs, BOOT_INSTALL)
  842. _ = rr.plgStatusDb.Clean()
  843. for k, v := range allPlgs {
  844. err := rr.pluginRegisterForImport(k, v)
  845. _ = rr.plgStatusDb.Set(k, err.Error())
  846. }
  847. }