manager.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "archive/zip"
  17. "encoding/json"
  18. "fmt"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "strings"
  23. "sync"
  24. kconf "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  26. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  27. "github.com/lf-edge/ekuiper/internal/pkg/store"
  28. "github.com/lf-edge/ekuiper/internal/plugin"
  29. "github.com/lf-edge/ekuiper/pkg/api"
  30. "github.com/lf-edge/ekuiper/pkg/kv"
  31. )
  32. var (
  33. once sync.Once
  34. mutex sync.Mutex
  35. singleton *Manager // Do not call this directly, use GetServiceManager
  36. )
  37. type Manager struct {
  38. executorPool *sync.Map // The pool of executors
  39. loaded bool
  40. serviceBuf *sync.Map
  41. functionBuf *sync.Map
  42. etcDir string
  43. serviceInstallKV kv.KeyValue
  44. serviceStatusInstallKV kv.KeyValue
  45. serviceKV kv.KeyValue
  46. functionKV kv.KeyValue
  47. }
  48. func InitManager() (*Manager, error) {
  49. mutex.Lock()
  50. defer mutex.Unlock()
  51. if singleton == nil {
  52. dir := "data/services"
  53. if kconf.IsTesting {
  54. dir = "service/test"
  55. }
  56. etcDir, err := kconf.GetLoc(dir)
  57. if err != nil {
  58. return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
  59. }
  60. sdb, err := store.GetKV("services")
  61. if err != nil {
  62. return nil, fmt.Errorf("cannot open service db: %s", err)
  63. }
  64. fdb, err := store.GetKV("serviceFuncs")
  65. if err != nil {
  66. return nil, fmt.Errorf("cannot open function db: %s", err)
  67. }
  68. sInstallDb, err := store.GetKV("serviceInstall")
  69. if err != nil {
  70. return nil, fmt.Errorf("cannot open service db: %s", err)
  71. }
  72. statusDb, err := store.GetKV("serviceInstallStatus")
  73. if err != nil {
  74. return nil, fmt.Errorf("cannot open service db: %s", err)
  75. }
  76. singleton = &Manager{
  77. executorPool: &sync.Map{},
  78. serviceBuf: &sync.Map{},
  79. functionBuf: &sync.Map{},
  80. etcDir: etcDir,
  81. serviceStatusInstallKV: statusDb,
  82. serviceInstallKV: sInstallDb,
  83. serviceKV: sdb,
  84. functionKV: fdb,
  85. }
  86. }
  87. if !singleton.loaded && !kconf.IsTesting { // To boost the testing perf
  88. err := singleton.InitByFiles()
  89. return singleton, err
  90. }
  91. return singleton, nil
  92. }
  93. func GetManager() *Manager {
  94. return singleton
  95. }
  96. // InitByFiles
  97. /**
  98. * This function will parse the service definition json files in etc/services.
  99. * It will validate all json files and their schemaFiles. If invalid, it just prints
  100. * an error log and ignore. So it is possible that only valid service definition are
  101. * parsed and available.
  102. *
  103. * NOT threadsafe, must run in lock
  104. */
  105. func (m *Manager) InitByFiles() error {
  106. kconf.Log.Debugf("init service manager")
  107. files, err := os.ReadDir(m.etcDir)
  108. if nil != err {
  109. return err
  110. }
  111. // Parse schemas in batch. So we have 2 loops. First loop to collect files and the second to save the result.
  112. for _, file := range files {
  113. baseName := filepath.Base(file.Name())
  114. if filepath.Ext(baseName) == ".json" {
  115. err := m.initFile(baseName)
  116. if err != nil {
  117. kconf.Log.Errorf("%v", err)
  118. continue
  119. }
  120. }
  121. }
  122. m.loaded = true
  123. return nil
  124. }
  125. func (m *Manager) initFile(baseName string) error {
  126. serviceConf := &conf{}
  127. err := filex.ReadJsonUnmarshal(filepath.Join(m.etcDir, baseName), serviceConf)
  128. if err != nil {
  129. return fmt.Errorf("parse services file %s failed: %v", baseName, err)
  130. }
  131. // TODO validate serviceConf
  132. serviceName := baseName[0 : len(baseName)-5]
  133. info := &serviceInfo{
  134. About: serviceConf.About,
  135. Interfaces: make(map[string]*interfaceInfo),
  136. }
  137. for name, binding := range serviceConf.Interfaces {
  138. desc, err := parse(binding.SchemaType, binding.SchemaFile)
  139. if err != nil {
  140. return fmt.Errorf("Fail to parse schema file %s: %v", binding.SchemaFile, err)
  141. }
  142. // setting function alias
  143. aliasMap := make(map[string]string)
  144. for _, finfo := range binding.Functions {
  145. aliasMap[finfo.ServiceName] = finfo.Name
  146. }
  147. methods := desc.GetFunctions()
  148. functions := make([]string, len(methods))
  149. for i, f := range methods {
  150. fname := f
  151. if a, ok := aliasMap[f]; ok {
  152. fname = a
  153. }
  154. functions[i] = fname
  155. }
  156. info.Interfaces[name] = &interfaceInfo{
  157. Desc: binding.Description,
  158. Addr: binding.Address,
  159. Protocol: binding.Protocol,
  160. Schema: &schemaInfo{
  161. SchemaType: binding.SchemaType,
  162. SchemaFile: binding.SchemaFile,
  163. },
  164. Functions: functions,
  165. Options: binding.Options,
  166. }
  167. for i, f := range functions {
  168. err := m.functionKV.Set(f, &functionContainer{
  169. ServiceName: serviceName,
  170. InterfaceName: name,
  171. MethodName: methods[i],
  172. })
  173. if err != nil {
  174. kconf.Log.Errorf("fail to save the function mapping for %s, the function is not available: %v", f, err)
  175. }
  176. }
  177. }
  178. err = m.serviceKV.Set(serviceName, info)
  179. if err != nil {
  180. return fmt.Errorf("fail to save the parsing result: %v", err)
  181. }
  182. return nil
  183. }
  184. // Start Implement FunctionFactory
  185. func (m *Manager) HasFunctionSet(_ string) bool {
  186. return false
  187. }
  188. func (m *Manager) FunctionPluginInfo(funcName string) (plugin.EXTENSION_TYPE, string, string) {
  189. funcContainer, ok := m.getFunction(funcName)
  190. if ok {
  191. installScript := ""
  192. m.serviceInstallKV.Get(funcContainer.ServiceName, &installScript)
  193. return plugin.SERVICE_EXTENSION, funcContainer.ServiceName, installScript
  194. } else {
  195. return plugin.NONE_EXTENSION, "", ""
  196. }
  197. }
  198. func (m *Manager) Function(name string) (api.Function, error) {
  199. f, ok := m.getFunction(name)
  200. if !ok {
  201. return nil, fmt.Errorf("service function %s not found", name)
  202. }
  203. s, ok := m.getService(f.ServiceName)
  204. if !ok {
  205. return nil, fmt.Errorf("service function %s's service %s not found", name, f.ServiceName)
  206. }
  207. i, ok := s.Interfaces[f.InterfaceName]
  208. if !ok {
  209. return nil, fmt.Errorf("service function %s's interface %s not found", name, f.InterfaceName)
  210. }
  211. // executor is gotten from pool, so all externalFuncs with the same interface share the same executor instance
  212. e, err := m.getExecutor(f.InterfaceName, i)
  213. if err != nil {
  214. return nil, fmt.Errorf("fail to initiate the executor for %s: %v", f.InterfaceName, err)
  215. }
  216. return &ExternalFunc{exe: e, methodName: f.MethodName}, nil
  217. }
  218. func (m *Manager) ConvName(funcName string) (string, bool) {
  219. _, ok := m.getFunction(funcName)
  220. return funcName, ok
  221. }
  222. // End Implement FunctionFactory
  223. func (m *Manager) HasService(name string) bool {
  224. _, ok := m.getService(name)
  225. kconf.Log.Debugf("found external service %s? %v ", name, ok)
  226. return ok
  227. }
  228. func (m *Manager) getFunction(name string) (*functionContainer, bool) {
  229. var r *functionContainer
  230. if t, ok := m.functionBuf.Load(name); ok {
  231. r = t.(*functionContainer)
  232. return r, ok
  233. } else {
  234. r = &functionContainer{}
  235. ok, err := m.functionKV.Get(name, r)
  236. if err != nil {
  237. kconf.Log.Errorf("failed to get service function %s from kv: %v", name, err)
  238. return nil, false
  239. }
  240. if ok {
  241. m.functionBuf.Store(name, r)
  242. }
  243. return r, ok
  244. }
  245. }
  246. func (m *Manager) getService(name string) (*serviceInfo, bool) {
  247. var r *serviceInfo
  248. if t, ok := m.serviceBuf.Load(name); ok {
  249. r = t.(*serviceInfo)
  250. return r, ok
  251. } else {
  252. r = &serviceInfo{}
  253. ok, err := m.serviceKV.Get(name, r)
  254. if err != nil {
  255. kconf.Log.Errorf("failed to get service %s from kv: %v", name, err)
  256. return nil, false
  257. }
  258. if ok {
  259. m.serviceBuf.Store(name, r)
  260. }
  261. return r, ok
  262. }
  263. }
  264. // Each interface maps to an executor
  265. func (m *Manager) getExecutor(name string, info *interfaceInfo) (executor, error) {
  266. e, ok := m.executorPool.Load(name)
  267. if !ok {
  268. ne, err := NewExecutor(info)
  269. if err != nil {
  270. return nil, err
  271. }
  272. e, _ = m.executorPool.LoadOrStore(name, ne)
  273. }
  274. return e.(executor), nil
  275. }
  276. func (m *Manager) deleteServiceFuncs(service string) error {
  277. if s, ok := m.getService(service); ok {
  278. for _, i := range s.Interfaces {
  279. for _, f := range i.Functions {
  280. _ = m.deleteFunc(service, f)
  281. }
  282. }
  283. }
  284. return nil
  285. }
  286. func (m *Manager) deleteFunc(service, name string) error {
  287. f, err := m.GetFunction(name)
  288. if err != nil {
  289. return err
  290. }
  291. if f.ServiceName == service {
  292. m.functionBuf.Delete(name)
  293. m.functionKV.Delete(name)
  294. }
  295. return nil
  296. }
  297. // ** CRUD of the service files **
  298. type ServiceCreationRequest struct {
  299. Name string `json:"name"`
  300. File string `json:"file"`
  301. }
  302. func (s *ServiceCreationRequest) InstallScript() string {
  303. marshal, err := json.Marshal(s)
  304. if err != nil {
  305. return ""
  306. }
  307. return string(marshal)
  308. }
  309. func (m *Manager) List() ([]string, error) {
  310. return m.serviceKV.Keys()
  311. }
  312. func (m *Manager) Create(r *ServiceCreationRequest) error {
  313. name, uri := r.Name, r.File
  314. if ok, _ := m.serviceKV.Get(name, &serviceInfo{}); ok {
  315. return fmt.Errorf("service %s exist", name)
  316. }
  317. if !httpx.IsValidUrl(uri) {
  318. return fmt.Errorf("invalid file path %s", uri)
  319. }
  320. zipPath := path.Join(m.etcDir, name+".zip")
  321. // clean up: delete zip file and unzip files in error
  322. defer os.Remove(zipPath)
  323. // download
  324. err := httpx.DownloadFile(zipPath, uri)
  325. if err != nil {
  326. return fmt.Errorf("fail to download file %s: %s", uri, err)
  327. }
  328. // unzip and copy to destination
  329. err = m.unzip(name, zipPath)
  330. if err != nil {
  331. return err
  332. }
  333. // save the install script
  334. m.serviceInstallKV.Set(name, r.InstallScript())
  335. // init file to serviceKV
  336. return m.initFile(name + ".json")
  337. }
  338. func (m *Manager) Delete(name string) error {
  339. name = strings.Trim(name, " ")
  340. if name == "" {
  341. return fmt.Errorf("invalid name %s: should not be empty", name)
  342. }
  343. m.deleteServiceFuncs(name)
  344. m.serviceBuf.Delete(name)
  345. err := m.serviceKV.Delete(name)
  346. if err != nil {
  347. return err
  348. }
  349. _ = m.serviceInstallKV.Delete(name)
  350. path := path.Join(m.etcDir, name+".json")
  351. err = os.Remove(path)
  352. if err != nil {
  353. kconf.Log.Errorf("remove service json fails: %v", err)
  354. }
  355. return nil
  356. }
  357. func (m *Manager) Get(name string) (*serviceInfo, error) {
  358. name = strings.Trim(name, " ")
  359. if name == "" {
  360. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  361. }
  362. r, ok := m.getService(name)
  363. if !ok {
  364. return nil, fmt.Errorf("can't get the service %s", name)
  365. }
  366. return r, nil
  367. }
  368. func (m *Manager) Update(req *ServiceCreationRequest) error {
  369. err := m.Delete(req.Name)
  370. if err != nil {
  371. return err
  372. }
  373. return m.Create(req)
  374. }
  375. func (m *Manager) unzip(name, src string) error {
  376. r, err := zip.OpenReader(src)
  377. if err != nil {
  378. return err
  379. }
  380. defer r.Close()
  381. baseName := name + ".json"
  382. // Try unzip
  383. found := false
  384. for _, file := range r.File {
  385. if strings.EqualFold(file.Name, baseName) {
  386. found = true
  387. break
  388. }
  389. }
  390. if !found {
  391. return fmt.Errorf("cannot find the json descriptor file %s for service", baseName)
  392. }
  393. // unzip
  394. for _, file := range r.File {
  395. err := filex.UnzipTo(file, path.Join(m.etcDir, file.Name))
  396. if err != nil {
  397. return err
  398. }
  399. }
  400. return nil
  401. }
  402. func (m *Manager) ListFunctions() ([]string, error) {
  403. return m.functionKV.Keys()
  404. }
  405. func (m *Manager) GetFunction(name string) (*functionContainer, error) {
  406. name = strings.Trim(name, " ")
  407. if name == "" {
  408. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  409. }
  410. r, ok := m.getFunction(name)
  411. if !ok {
  412. return nil, fmt.Errorf("can't get the service function %s", name)
  413. }
  414. return r, nil
  415. }
  416. func (m *Manager) GetAllServices() map[string]string {
  417. all, err := m.serviceInstallKV.All()
  418. if err != nil {
  419. return nil
  420. }
  421. return all
  422. }
  423. func (m *Manager) GetAllServicesStatus() map[string]string {
  424. all, err := m.serviceStatusInstallKV.All()
  425. if err != nil {
  426. return nil
  427. }
  428. return all
  429. }
  430. func (m *Manager) UninstallAllServices() {
  431. keys, err := m.serviceInstallKV.Keys()
  432. if err != nil {
  433. return
  434. }
  435. for _, v := range keys {
  436. _ = m.Delete(v)
  437. }
  438. }
  439. func (m *Manager) servicesRegisterForImport(_, v string) error {
  440. req := &ServiceCreationRequest{}
  441. err := json.Unmarshal([]byte(v), req)
  442. if err != nil {
  443. return err
  444. }
  445. err = m.Create(req)
  446. if err != nil {
  447. return err
  448. }
  449. return nil
  450. }
  451. func (m *Manager) ImportServices(services map[string]string) map[string]string {
  452. errMap := map[string]string{}
  453. _ = m.serviceStatusInstallKV.Clean()
  454. for k, v := range services {
  455. err := m.servicesRegisterForImport(k, v)
  456. if err != nil {
  457. _ = m.serviceStatusInstallKV.Set(k, err.Error())
  458. errMap[k] = err.Error()
  459. }
  460. }
  461. return errMap
  462. }
  463. func (m *Manager) ImportPartialServices(services map[string]string) map[string]string {
  464. errMap := map[string]string{}
  465. for k, v := range services {
  466. err := m.servicesRegisterForImport(k, v)
  467. if err != nil {
  468. errMap[k] = err.Error()
  469. }
  470. }
  471. return errMap
  472. }