manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "archive/zip"
  17. "encoding/json"
  18. "fmt"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "strings"
  23. "sync"
  24. kconf "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  26. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  27. "github.com/lf-edge/ekuiper/internal/pkg/store"
  28. "github.com/lf-edge/ekuiper/pkg/api"
  29. "github.com/lf-edge/ekuiper/pkg/kv"
  30. )
  31. var (
  32. once sync.Once
  33. mutex sync.Mutex
  34. singleton *Manager //Do not call this directly, use GetServiceManager
  35. )
  36. type Manager struct {
  37. executorPool *sync.Map // The pool of executors
  38. loaded bool
  39. serviceBuf *sync.Map
  40. functionBuf *sync.Map
  41. etcDir string
  42. serviceInstallKV kv.KeyValue
  43. serviceStatusInstallKV kv.KeyValue
  44. serviceKV kv.KeyValue
  45. functionKV kv.KeyValue
  46. }
  47. func InitManager() (*Manager, error) {
  48. mutex.Lock()
  49. defer mutex.Unlock()
  50. if singleton == nil {
  51. dir := "data/services"
  52. if kconf.IsTesting {
  53. dir = "service/test"
  54. }
  55. etcDir, err := kconf.GetLoc(dir)
  56. if err != nil {
  57. return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
  58. }
  59. err, sdb := store.GetKV("services")
  60. if err != nil {
  61. return nil, fmt.Errorf("cannot open service db: %s", err)
  62. }
  63. err, fdb := store.GetKV("serviceFuncs")
  64. if err != nil {
  65. return nil, fmt.Errorf("cannot open function db: %s", err)
  66. }
  67. err, sInstallDb := store.GetKV("serviceInstall")
  68. if err != nil {
  69. return nil, fmt.Errorf("cannot open service db: %s", err)
  70. }
  71. err, statusDb := store.GetKV("serviceInstallStatus")
  72. if err != nil {
  73. return nil, fmt.Errorf("cannot open service db: %s", err)
  74. }
  75. singleton = &Manager{
  76. executorPool: &sync.Map{},
  77. serviceBuf: &sync.Map{},
  78. functionBuf: &sync.Map{},
  79. etcDir: etcDir,
  80. serviceStatusInstallKV: statusDb,
  81. serviceInstallKV: sInstallDb,
  82. serviceKV: sdb,
  83. functionKV: fdb,
  84. }
  85. }
  86. if !singleton.loaded && !kconf.IsTesting { // To boost the testing perf
  87. err := singleton.InitByFiles()
  88. return singleton, err
  89. }
  90. return singleton, nil
  91. }
  92. func GetManager() *Manager {
  93. return singleton
  94. }
  95. // InitByFiles
  96. /**
  97. * This function will parse the service definition json files in etc/services.
  98. * It will validate all json files and their schemaFiles. If invalid, it just prints
  99. * an error log and ignore. So it is possible that only valid service definition are
  100. * parsed and available.
  101. *
  102. * NOT threadsafe, must run in lock
  103. */
  104. func (m *Manager) InitByFiles() error {
  105. kconf.Log.Debugf("init service manager")
  106. files, err := os.ReadDir(m.etcDir)
  107. if nil != err {
  108. return err
  109. }
  110. // Parse schemas in batch. So we have 2 loops. First loop to collect files and the second to save the result.
  111. for _, file := range files {
  112. baseName := filepath.Base(file.Name())
  113. if filepath.Ext(baseName) == ".json" {
  114. err := m.initFile(baseName)
  115. if err != nil {
  116. kconf.Log.Errorf("%v", err)
  117. continue
  118. }
  119. }
  120. }
  121. m.loaded = true
  122. return nil
  123. }
  124. func (m *Manager) initFile(baseName string) error {
  125. serviceConf := &conf{}
  126. err := filex.ReadJsonUnmarshal(filepath.Join(m.etcDir, baseName), serviceConf)
  127. if err != nil {
  128. return fmt.Errorf("parse services file %s failed: %v", baseName, err)
  129. }
  130. //TODO validate serviceConf
  131. serviceName := baseName[0 : len(baseName)-5]
  132. info := &serviceInfo{
  133. About: serviceConf.About,
  134. Interfaces: make(map[string]*interfaceInfo),
  135. }
  136. for name, binding := range serviceConf.Interfaces {
  137. desc, err := parse(binding.SchemaType, binding.SchemaFile)
  138. if err != nil {
  139. return fmt.Errorf("Fail to parse schema file %s: %v", binding.SchemaFile, err)
  140. }
  141. // setting function alias
  142. aliasMap := make(map[string]string)
  143. for _, finfo := range binding.Functions {
  144. aliasMap[finfo.ServiceName] = finfo.Name
  145. }
  146. methods := desc.GetFunctions()
  147. functions := make([]string, len(methods))
  148. for i, f := range methods {
  149. fname := f
  150. if a, ok := aliasMap[f]; ok {
  151. fname = a
  152. }
  153. functions[i] = fname
  154. }
  155. info.Interfaces[name] = &interfaceInfo{
  156. Desc: binding.Description,
  157. Addr: binding.Address,
  158. Protocol: binding.Protocol,
  159. Schema: &schemaInfo{
  160. SchemaType: binding.SchemaType,
  161. SchemaFile: binding.SchemaFile,
  162. },
  163. Functions: functions,
  164. Options: binding.Options,
  165. }
  166. for i, f := range functions {
  167. err := m.functionKV.Set(f, &functionContainer{
  168. ServiceName: serviceName,
  169. InterfaceName: name,
  170. MethodName: methods[i],
  171. })
  172. if err != nil {
  173. kconf.Log.Errorf("fail to save the function mapping for %s, the function is not available: %v", f, err)
  174. }
  175. }
  176. }
  177. err = m.serviceKV.Set(serviceName, info)
  178. if err != nil {
  179. return fmt.Errorf("fail to save the parsing result: %v", err)
  180. }
  181. return nil
  182. }
  183. // Start Implement FunctionFactory
  184. func (m *Manager) HasFunctionSet(_ string) bool {
  185. return false
  186. }
  187. func (m *Manager) Function(name string) (api.Function, error) {
  188. f, ok := m.getFunction(name)
  189. if !ok {
  190. return nil, fmt.Errorf("service function %s not found", name)
  191. }
  192. s, ok := m.getService(f.ServiceName)
  193. if !ok {
  194. return nil, fmt.Errorf("service function %s's service %s not found", name, f.ServiceName)
  195. }
  196. i, ok := s.Interfaces[f.InterfaceName]
  197. if !ok {
  198. return nil, fmt.Errorf("service function %s's interface %s not found", name, f.InterfaceName)
  199. }
  200. // executor is gotten from pool, so all externalFuncs with the same interface share the same executor instance
  201. e, err := m.getExecutor(f.InterfaceName, i)
  202. if err != nil {
  203. return nil, fmt.Errorf("fail to initiate the executor for %s: %v", f.InterfaceName, err)
  204. }
  205. return &ExternalFunc{exe: e, methodName: f.MethodName}, nil
  206. }
  207. func (m *Manager) ConvName(funcName string) (string, bool) {
  208. _, ok := m.getFunction(funcName)
  209. return funcName, ok
  210. }
  211. // End Implement FunctionFactory
  212. func (m *Manager) HasService(name string) bool {
  213. _, ok := m.getService(name)
  214. kconf.Log.Debugf("found external service %s? %v ", name, ok)
  215. return ok
  216. }
  217. func (m *Manager) getFunction(name string) (*functionContainer, bool) {
  218. var r *functionContainer
  219. if t, ok := m.functionBuf.Load(name); ok {
  220. r = t.(*functionContainer)
  221. return r, ok
  222. } else {
  223. r = &functionContainer{}
  224. ok, err := m.functionKV.Get(name, r)
  225. if err != nil {
  226. kconf.Log.Errorf("failed to get service function %s from kv: %v", name, err)
  227. return nil, false
  228. }
  229. if ok {
  230. m.functionBuf.Store(name, r)
  231. }
  232. return r, ok
  233. }
  234. }
  235. func (m *Manager) getService(name string) (*serviceInfo, bool) {
  236. var r *serviceInfo
  237. if t, ok := m.serviceBuf.Load(name); ok {
  238. r = t.(*serviceInfo)
  239. return r, ok
  240. } else {
  241. r = &serviceInfo{}
  242. ok, err := m.serviceKV.Get(name, r)
  243. if err != nil {
  244. kconf.Log.Errorf("failed to get service %s from kv: %v", name, err)
  245. return nil, false
  246. }
  247. if ok {
  248. m.serviceBuf.Store(name, r)
  249. }
  250. return r, ok
  251. }
  252. }
  253. // Each interface maps to an executor
  254. func (m *Manager) getExecutor(name string, info *interfaceInfo) (executor, error) {
  255. e, ok := m.executorPool.Load(name)
  256. if !ok {
  257. ne, err := NewExecutor(info)
  258. if err != nil {
  259. return nil, err
  260. }
  261. e, _ = m.executorPool.LoadOrStore(name, ne)
  262. }
  263. return e.(executor), nil
  264. }
  265. func (m *Manager) deleteServiceFuncs(service string) error {
  266. if s, ok := m.getService(service); ok {
  267. for _, i := range s.Interfaces {
  268. for _, f := range i.Functions {
  269. _ = m.deleteFunc(service, f)
  270. }
  271. }
  272. }
  273. return nil
  274. }
  275. func (m *Manager) deleteFunc(service, name string) error {
  276. f, err := m.GetFunction(name)
  277. if err != nil {
  278. return err
  279. }
  280. if f.ServiceName == service {
  281. m.functionBuf.Delete(name)
  282. m.functionKV.Delete(name)
  283. }
  284. return nil
  285. }
  286. // ** CRUD of the service files **
  287. type ServiceCreationRequest struct {
  288. Name string `json:"name"`
  289. File string `json:"file"`
  290. }
  291. func (s *ServiceCreationRequest) InstallScript() string {
  292. marshal, err := json.Marshal(s)
  293. if err != nil {
  294. return ""
  295. }
  296. return string(marshal)
  297. }
  298. func (m *Manager) List() ([]string, error) {
  299. return m.serviceKV.Keys()
  300. }
  301. func (m *Manager) Create(r *ServiceCreationRequest) error {
  302. name, uri := r.Name, r.File
  303. if ok, _ := m.serviceKV.Get(name, &serviceInfo{}); ok {
  304. return fmt.Errorf("service %s exist", name)
  305. }
  306. if !httpx.IsValidUrl(uri) {
  307. return fmt.Errorf("invalid file path %s", uri)
  308. }
  309. zipPath := path.Join(m.etcDir, name+".zip")
  310. //clean up: delete zip file and unzip files in error
  311. defer os.Remove(zipPath)
  312. //download
  313. err := httpx.DownloadFile(zipPath, uri)
  314. if err != nil {
  315. return fmt.Errorf("fail to download file %s: %s", uri, err)
  316. }
  317. //unzip and copy to destination
  318. err = m.unzip(name, zipPath)
  319. if err != nil {
  320. return err
  321. }
  322. //save the install script
  323. m.serviceInstallKV.Set(name, r.InstallScript())
  324. // init file to serviceKV
  325. return m.initFile(name + ".json")
  326. }
  327. func (m *Manager) Delete(name string) error {
  328. name = strings.Trim(name, " ")
  329. if name == "" {
  330. return fmt.Errorf("invalid name %s: should not be empty", name)
  331. }
  332. m.deleteServiceFuncs(name)
  333. m.serviceBuf.Delete(name)
  334. err := m.serviceKV.Delete(name)
  335. if err != nil {
  336. return err
  337. }
  338. _ = m.serviceInstallKV.Delete(name)
  339. path := path.Join(m.etcDir, name+".json")
  340. err = os.Remove(path)
  341. if err != nil {
  342. kconf.Log.Errorf("remove service json fails: %v", err)
  343. }
  344. return nil
  345. }
  346. func (m *Manager) Get(name string) (*serviceInfo, error) {
  347. name = strings.Trim(name, " ")
  348. if name == "" {
  349. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  350. }
  351. r, ok := m.getService(name)
  352. if !ok {
  353. return nil, fmt.Errorf("can't get the service %s", name)
  354. }
  355. return r, nil
  356. }
  357. func (m *Manager) Update(req *ServiceCreationRequest) error {
  358. err := m.Delete(req.Name)
  359. if err != nil {
  360. return err
  361. }
  362. return m.Create(req)
  363. }
  364. func (m *Manager) unzip(name, src string) error {
  365. r, err := zip.OpenReader(src)
  366. if err != nil {
  367. return err
  368. }
  369. defer r.Close()
  370. baseName := name + ".json"
  371. // Try unzip
  372. found := false
  373. for _, file := range r.File {
  374. if strings.EqualFold(file.Name, baseName) {
  375. found = true
  376. break
  377. }
  378. }
  379. if !found {
  380. return fmt.Errorf("cannot find the json descriptor file %s for service", baseName)
  381. }
  382. // unzip
  383. for _, file := range r.File {
  384. err := filex.UnzipTo(file, path.Join(m.etcDir, file.Name))
  385. if err != nil {
  386. return err
  387. }
  388. }
  389. return nil
  390. }
  391. func (m *Manager) ListFunctions() ([]string, error) {
  392. return m.functionKV.Keys()
  393. }
  394. func (m *Manager) GetFunction(name string) (*functionContainer, error) {
  395. name = strings.Trim(name, " ")
  396. if name == "" {
  397. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  398. }
  399. r, ok := m.getFunction(name)
  400. if !ok {
  401. return nil, fmt.Errorf("can't get the service function %s", name)
  402. }
  403. return r, nil
  404. }
  405. func (m *Manager) GetAllServices() map[string]string {
  406. all, err := m.serviceInstallKV.All()
  407. if err != nil {
  408. return nil
  409. }
  410. return all
  411. }
  412. func (m *Manager) GetAllServicesStatus() map[string]string {
  413. all, err := m.serviceStatusInstallKV.All()
  414. if err != nil {
  415. return nil
  416. }
  417. return all
  418. }
  419. func (m *Manager) UninstallAllServices() {
  420. keys, err := m.serviceInstallKV.Keys()
  421. if err != nil {
  422. return
  423. }
  424. for _, v := range keys {
  425. _ = m.Delete(v)
  426. }
  427. }
  428. func (m *Manager) ImportServices(services map[string]string) {
  429. _ = m.serviceStatusInstallKV.Clean()
  430. for k, v := range services {
  431. req := &ServiceCreationRequest{}
  432. err := json.Unmarshal([]byte(v), req)
  433. if err != nil {
  434. m.serviceStatusInstallKV.Set(k, err.Error())
  435. continue
  436. }
  437. err = m.Create(req)
  438. if err != nil {
  439. m.serviceStatusInstallKV.Set(k, err.Error())
  440. continue
  441. }
  442. }
  443. }