manager.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "archive/zip"
  17. "encoding/json"
  18. "fmt"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "strings"
  23. "sync"
  24. kconf "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  26. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  27. "github.com/lf-edge/ekuiper/internal/pkg/store"
  28. "github.com/lf-edge/ekuiper/internal/plugin"
  29. "github.com/lf-edge/ekuiper/pkg/api"
  30. "github.com/lf-edge/ekuiper/pkg/cast"
  31. "github.com/lf-edge/ekuiper/pkg/kv"
  32. )
  33. var (
  34. once sync.Once
  35. mutex sync.Mutex
  36. singleton *Manager // Do not call this directly, use GetServiceManager
  37. )
  38. type Manager struct {
  39. executorPool *sync.Map // The pool of executors
  40. loaded bool
  41. serviceBuf *sync.Map
  42. functionBuf *sync.Map
  43. etcDir string
  44. serviceInstallKV kv.KeyValue
  45. serviceStatusInstallKV kv.KeyValue
  46. serviceKV kv.KeyValue
  47. functionKV kv.KeyValue
  48. }
  49. func InitManager() (*Manager, error) {
  50. mutex.Lock()
  51. defer mutex.Unlock()
  52. if singleton == nil {
  53. dir := "data/services"
  54. if kconf.IsTesting {
  55. dir = "service/test"
  56. }
  57. etcDir, err := kconf.GetLoc(dir)
  58. if err != nil {
  59. return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
  60. }
  61. sdb, err := store.GetKV("services")
  62. if err != nil {
  63. return nil, fmt.Errorf("cannot open service db: %s", err)
  64. }
  65. fdb, err := store.GetKV("serviceFuncs")
  66. if err != nil {
  67. return nil, fmt.Errorf("cannot open function db: %s", err)
  68. }
  69. sInstallDb, err := store.GetKV("serviceInstall")
  70. if err != nil {
  71. return nil, fmt.Errorf("cannot open service db: %s", err)
  72. }
  73. statusDb, err := store.GetKV("serviceInstallStatus")
  74. if err != nil {
  75. return nil, fmt.Errorf("cannot open service db: %s", err)
  76. }
  77. singleton = &Manager{
  78. executorPool: &sync.Map{},
  79. serviceBuf: &sync.Map{},
  80. functionBuf: &sync.Map{},
  81. etcDir: etcDir,
  82. serviceStatusInstallKV: statusDb,
  83. serviceInstallKV: sInstallDb,
  84. serviceKV: sdb,
  85. functionKV: fdb,
  86. }
  87. }
  88. if !singleton.loaded && !kconf.IsTesting { // To boost the testing perf
  89. err := singleton.InitByFiles()
  90. return singleton, err
  91. }
  92. return singleton, nil
  93. }
  94. func GetManager() *Manager {
  95. return singleton
  96. }
  97. // InitByFiles
  98. /**
  99. * This function will parse the service definition json files in etc/services.
  100. * It will validate all json files and their schemaFiles. If invalid, it just prints
  101. * an error log and ignore. So it is possible that only valid service definition are
  102. * parsed and available.
  103. *
  104. * NOT threadsafe, must run in lock
  105. */
  106. func (m *Manager) InitByFiles() error {
  107. kconf.Log.Debugf("init service manager")
  108. files, err := os.ReadDir(m.etcDir)
  109. if nil != err {
  110. return err
  111. }
  112. // Parse schemas in batch. So we have 2 loops. First loop to collect files and the second to save the result.
  113. for _, file := range files {
  114. baseName := filepath.Base(file.Name())
  115. if filepath.Ext(baseName) == ".json" {
  116. err := m.initFile(baseName)
  117. if err != nil {
  118. kconf.Log.Errorf("%v", err)
  119. continue
  120. }
  121. }
  122. }
  123. m.loaded = true
  124. return nil
  125. }
  126. func (m *Manager) initFile(baseName string) error {
  127. serviceConf := &conf{}
  128. err := filex.ReadJsonUnmarshal(filepath.Join(m.etcDir, baseName), serviceConf)
  129. if err != nil {
  130. return fmt.Errorf("parse services file %s failed: %v", baseName, err)
  131. }
  132. // TODO validate serviceConf
  133. serviceName := baseName[0 : len(baseName)-5]
  134. info := &serviceInfo{
  135. About: serviceConf.About,
  136. Interfaces: make(map[string]*interfaceInfo),
  137. }
  138. for name, binding := range serviceConf.Interfaces {
  139. desc, err := parse(binding.SchemaType, binding.SchemaFile)
  140. if err != nil {
  141. return fmt.Errorf("Fail to parse schema file %s: %v", binding.SchemaFile, err)
  142. }
  143. // setting function alias
  144. aliasMap := make(map[string]string)
  145. for _, finfo := range binding.Functions {
  146. aliasMap[finfo.ServiceName] = finfo.Name
  147. }
  148. methods := desc.GetFunctions()
  149. functions := make([]string, len(methods))
  150. for i, f := range methods {
  151. fname := f
  152. if a, ok := aliasMap[f]; ok {
  153. fname = a
  154. }
  155. functions[i] = fname
  156. }
  157. info.Interfaces[name] = &interfaceInfo{
  158. Desc: binding.Description,
  159. Addr: binding.Address,
  160. Protocol: binding.Protocol,
  161. Schema: &schemaInfo{
  162. SchemaType: binding.SchemaType,
  163. SchemaFile: binding.SchemaFile,
  164. },
  165. Functions: functions,
  166. Options: binding.Options,
  167. }
  168. for i, f := range functions {
  169. err := m.functionKV.Set(f, &functionContainer{
  170. ServiceName: serviceName,
  171. InterfaceName: name,
  172. MethodName: methods[i],
  173. })
  174. if err != nil {
  175. kconf.Log.Errorf("fail to save the function mapping for %s, the function is not available: %v", f, err)
  176. }
  177. }
  178. }
  179. err = m.serviceKV.Set(serviceName, info)
  180. if err != nil {
  181. return fmt.Errorf("fail to save the parsing result: %v", err)
  182. }
  183. return nil
  184. }
  185. // Start Implement FunctionFactory
  186. func (m *Manager) HasFunctionSet(_ string) bool {
  187. return false
  188. }
  189. func (m *Manager) FunctionPluginInfo(funcName string) (plugin.EXTENSION_TYPE, string, string) {
  190. funcContainer, ok := m.getFunction(funcName)
  191. if ok {
  192. installScript := ""
  193. m.serviceInstallKV.Get(funcContainer.ServiceName, &installScript)
  194. return plugin.SERVICE_EXTENSION, funcContainer.ServiceName, installScript
  195. } else {
  196. return plugin.NONE_EXTENSION, "", ""
  197. }
  198. }
  199. func (m *Manager) Function(name string) (api.Function, error) {
  200. f, ok := m.getFunction(name)
  201. if !ok {
  202. return nil, fmt.Errorf("service function %s not found", name)
  203. }
  204. s, ok := m.getService(f.ServiceName)
  205. if !ok {
  206. return nil, fmt.Errorf("service function %s's service %s not found", name, f.ServiceName)
  207. }
  208. i, ok := s.Interfaces[f.InterfaceName]
  209. if !ok {
  210. return nil, fmt.Errorf("service function %s's interface %s not found", name, f.InterfaceName)
  211. }
  212. // executor is gotten from pool, so all externalFuncs with the same interface share the same executor instance
  213. e, err := m.getExecutor(f.InterfaceName, i)
  214. if err != nil {
  215. return nil, fmt.Errorf("fail to initiate the executor for %s: %v", f.InterfaceName, err)
  216. }
  217. return &ExternalFunc{exe: e, methodName: f.MethodName}, nil
  218. }
  219. func (m *Manager) ConvName(funcName string) (string, bool) {
  220. _, ok := m.getFunction(funcName)
  221. return funcName, ok
  222. }
  223. // End Implement FunctionFactory
  224. func (m *Manager) HasService(name string) bool {
  225. _, ok := m.getService(name)
  226. kconf.Log.Debugf("found external service %s? %v ", name, ok)
  227. return ok
  228. }
  229. func (m *Manager) getFunction(name string) (*functionContainer, bool) {
  230. var r *functionContainer
  231. if t, ok := m.functionBuf.Load(name); ok {
  232. r = t.(*functionContainer)
  233. return r, ok
  234. } else {
  235. r = &functionContainer{}
  236. ok, err := m.functionKV.Get(name, r)
  237. if err != nil {
  238. kconf.Log.Errorf("failed to get service function %s from kv: %v", name, err)
  239. return nil, false
  240. }
  241. if ok {
  242. m.functionBuf.Store(name, r)
  243. }
  244. return r, ok
  245. }
  246. }
  247. func (m *Manager) getService(name string) (*serviceInfo, bool) {
  248. var r *serviceInfo
  249. if t, ok := m.serviceBuf.Load(name); ok {
  250. r = t.(*serviceInfo)
  251. return r, ok
  252. } else {
  253. r = &serviceInfo{}
  254. ok, err := m.serviceKV.Get(name, r)
  255. if err != nil {
  256. kconf.Log.Errorf("failed to get service %s from kv: %v", name, err)
  257. return nil, false
  258. }
  259. if ok {
  260. m.serviceBuf.Store(name, r)
  261. }
  262. return r, ok
  263. }
  264. }
  265. // Each interface maps to an executor
  266. func (m *Manager) getExecutor(name string, info *interfaceInfo) (executor, error) {
  267. e, ok := m.executorPool.Load(name)
  268. if !ok {
  269. ne, err := NewExecutor(info)
  270. if err != nil {
  271. return nil, err
  272. }
  273. e, _ = m.executorPool.LoadOrStore(name, ne)
  274. }
  275. return e.(executor), nil
  276. }
  277. func (m *Manager) deleteServiceFuncs(service string) error {
  278. if s, ok := m.getService(service); ok {
  279. for _, i := range s.Interfaces {
  280. for _, f := range i.Functions {
  281. _ = m.deleteFunc(service, f)
  282. }
  283. }
  284. }
  285. return nil
  286. }
  287. func (m *Manager) deleteFunc(service, name string) error {
  288. f, err := m.GetFunction(name)
  289. if err != nil {
  290. return err
  291. }
  292. if f.ServiceName == service {
  293. m.functionBuf.Delete(name)
  294. m.functionKV.Delete(name)
  295. }
  296. return nil
  297. }
  298. // ** CRUD of the service files **
  299. type ServiceCreationRequest struct {
  300. Name string `json:"name"`
  301. File string `json:"file"`
  302. }
  303. func (s *ServiceCreationRequest) InstallScript() string {
  304. marshal, err := json.Marshal(s)
  305. if err != nil {
  306. return ""
  307. }
  308. return string(marshal)
  309. }
  310. func (m *Manager) List() ([]string, error) {
  311. return m.serviceKV.Keys()
  312. }
  313. func (m *Manager) Create(r *ServiceCreationRequest) error {
  314. name, uri := r.Name, r.File
  315. if ok, _ := m.serviceKV.Get(name, &serviceInfo{}); ok {
  316. return fmt.Errorf("service %s exist", name)
  317. }
  318. if !httpx.IsValidUrl(uri) {
  319. return fmt.Errorf("invalid file path %s", uri)
  320. }
  321. zipPath := path.Join(m.etcDir, name+".zip")
  322. // clean up: delete zip file and unzip files in error
  323. defer os.Remove(zipPath)
  324. // download
  325. err := httpx.DownloadFile(zipPath, uri)
  326. if err != nil {
  327. return fmt.Errorf("fail to download file %s: %s", uri, err)
  328. }
  329. // unzip and copy to destination
  330. err = m.unzip(name, zipPath)
  331. if err != nil {
  332. return err
  333. }
  334. // save the install script
  335. m.serviceInstallKV.Set(name, r.InstallScript())
  336. // init file to serviceKV
  337. return m.initFile(name + ".json")
  338. }
  339. func (m *Manager) Delete(name string) error {
  340. name = strings.Trim(name, " ")
  341. if name == "" {
  342. return fmt.Errorf("invalid name %s: should not be empty", name)
  343. }
  344. m.deleteServiceFuncs(name)
  345. m.serviceBuf.Delete(name)
  346. err := m.serviceKV.Delete(name)
  347. if err != nil {
  348. return err
  349. }
  350. _ = m.serviceInstallKV.Delete(name)
  351. path := path.Join(m.etcDir, name+".json")
  352. err = os.Remove(path)
  353. if err != nil {
  354. kconf.Log.Errorf("remove service json fails: %v", err)
  355. }
  356. return nil
  357. }
  358. func (m *Manager) Get(name string) (*serviceInfo, error) {
  359. name = strings.Trim(name, " ")
  360. if name == "" {
  361. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  362. }
  363. r, ok := m.getService(name)
  364. if !ok {
  365. return nil, fmt.Errorf("can't get the service %s", name)
  366. }
  367. return r, nil
  368. }
  369. func (m *Manager) Update(req *ServiceCreationRequest) error {
  370. err := m.Delete(req.Name)
  371. if err != nil {
  372. return err
  373. }
  374. return m.Create(req)
  375. }
  376. func (m *Manager) unzip(name, src string) error {
  377. r, err := zip.OpenReader(src)
  378. if err != nil {
  379. return err
  380. }
  381. defer r.Close()
  382. baseName := name + ".json"
  383. // Try unzip
  384. found := false
  385. for _, file := range r.File {
  386. if strings.EqualFold(file.Name, baseName) {
  387. found = true
  388. break
  389. }
  390. }
  391. if !found {
  392. return fmt.Errorf("cannot find the json descriptor file %s for service", baseName)
  393. }
  394. // unzip
  395. for _, file := range r.File {
  396. err := filex.UnzipTo(file, path.Join(m.etcDir, file.Name))
  397. if err != nil {
  398. return err
  399. }
  400. }
  401. return nil
  402. }
  403. func (m *Manager) ListFunctions() ([]string, error) {
  404. return m.functionKV.Keys()
  405. }
  406. func (m *Manager) GetFunction(name string) (*functionContainer, error) {
  407. name = strings.Trim(name, " ")
  408. if name == "" {
  409. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  410. }
  411. r, ok := m.getFunction(name)
  412. if !ok {
  413. return nil, fmt.Errorf("can't get the service function %s", name)
  414. }
  415. return r, nil
  416. }
  417. func (m *Manager) GetAllServices() map[string]string {
  418. all, err := m.serviceInstallKV.All()
  419. if err != nil {
  420. return nil
  421. }
  422. return all
  423. }
  424. func (m *Manager) GetAllServicesStatus() map[string]string {
  425. all, err := m.serviceStatusInstallKV.All()
  426. if err != nil {
  427. return nil
  428. }
  429. return all
  430. }
  431. func (m *Manager) UninstallAllServices() {
  432. keys, err := m.serviceInstallKV.Keys()
  433. if err != nil {
  434. return
  435. }
  436. for _, v := range keys {
  437. _ = m.Delete(v)
  438. }
  439. }
  440. func (m *Manager) servicesRegisterForImport(_, v string) error {
  441. req := &ServiceCreationRequest{}
  442. err := json.Unmarshal(cast.StringToBytes(v), req)
  443. if err != nil {
  444. return err
  445. }
  446. err = m.Create(req)
  447. if err != nil {
  448. return err
  449. }
  450. return nil
  451. }
  452. func (m *Manager) ImportServices(services map[string]string) map[string]string {
  453. errMap := map[string]string{}
  454. _ = m.serviceStatusInstallKV.Clean()
  455. for k, v := range services {
  456. err := m.servicesRegisterForImport(k, v)
  457. if err != nil {
  458. _ = m.serviceStatusInstallKV.Set(k, err.Error())
  459. errMap[k] = err.Error()
  460. }
  461. }
  462. return errMap
  463. }
  464. func (m *Manager) ImportPartialServices(services map[string]string) map[string]string {
  465. errMap := map[string]string{}
  466. for k, v := range services {
  467. err := m.servicesRegisterForImport(k, v)
  468. if err != nil {
  469. errMap[k] = err.Error()
  470. }
  471. }
  472. return errMap
  473. }