manager.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "archive/zip"
  17. "fmt"
  18. kconf "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  20. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/kv"
  23. "io/ioutil"
  24. "os"
  25. "path"
  26. "path/filepath"
  27. "strings"
  28. "sync"
  29. )
  30. var (
  31. once sync.Once
  32. mutex sync.Mutex
  33. singleton *Manager //Do not call this directly, use GetServiceManager
  34. )
  35. type Manager struct {
  36. executorPool *sync.Map // The pool of executors
  37. loaded bool
  38. serviceBuf *sync.Map
  39. functionBuf *sync.Map
  40. etcDir string
  41. serviceKV kv.KeyValue
  42. functionKV kv.KeyValue
  43. }
  44. func GetServiceManager() (*Manager, error) {
  45. mutex.Lock()
  46. defer mutex.Unlock()
  47. if singleton == nil {
  48. dir := "etc/services"
  49. if kconf.IsTesting {
  50. dir = "service/test"
  51. }
  52. etcDir, err := kconf.GetLoc(dir)
  53. if err != nil {
  54. return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
  55. }
  56. dbDir, err := kconf.GetDataLoc()
  57. if err != nil {
  58. return nil, fmt.Errorf("cannot find db folder: %s", err)
  59. }
  60. sdb := kv.GetDefaultKVStore(path.Join(dbDir, "services"))
  61. fdb := kv.GetDefaultKVStore(path.Join(dbDir, "serviceFuncs"))
  62. err = sdb.Open()
  63. if err != nil {
  64. return nil, fmt.Errorf("cannot open service db: %s", err)
  65. }
  66. err = fdb.Open()
  67. if err != nil {
  68. return nil, fmt.Errorf("cannot open function db: %s", err)
  69. }
  70. singleton = &Manager{
  71. executorPool: &sync.Map{},
  72. serviceBuf: &sync.Map{},
  73. functionBuf: &sync.Map{},
  74. etcDir: etcDir,
  75. serviceKV: sdb,
  76. functionKV: fdb,
  77. }
  78. }
  79. if !singleton.loaded && !kconf.IsTesting { // To boost the testing perf
  80. err := singleton.InitByFiles()
  81. return singleton, err
  82. }
  83. return singleton, nil
  84. }
  85. /**
  86. * This function will parse the service definition json files in etc/services.
  87. * It will validate all json files and their schemaFiles. If invalid, it just prints
  88. * an error log and ignore. So it is possible that only valid service definition are
  89. * parsed and available.
  90. *
  91. * NOT threadsafe, must run in lock
  92. */
  93. func (m *Manager) InitByFiles() error {
  94. kconf.Log.Debugf("init service manager")
  95. files, err := ioutil.ReadDir(m.etcDir)
  96. if nil != err {
  97. return err
  98. }
  99. // Parse schemas in batch. So we have 2 loops. First loop to collect files and the second to save the result.
  100. for _, file := range files {
  101. baseName := filepath.Base(file.Name())
  102. if filepath.Ext(baseName) == ".json" {
  103. err := m.initFile(baseName)
  104. if err != nil {
  105. kconf.Log.Errorf("%v", err)
  106. continue
  107. }
  108. }
  109. }
  110. m.loaded = true
  111. return nil
  112. }
  113. func (m *Manager) initFile(baseName string) error {
  114. serviceConf := &conf{}
  115. err := filex.ReadJsonUnmarshal(filepath.Join(m.etcDir, baseName), serviceConf)
  116. if err != nil {
  117. return fmt.Errorf("parse services file %s failed: %v", baseName, err)
  118. }
  119. //TODO validate serviceConf
  120. serviceName := baseName[0 : len(baseName)-5]
  121. info := &serviceInfo{
  122. About: serviceConf.About,
  123. Interfaces: make(map[string]*interfaceInfo),
  124. }
  125. for name, binding := range serviceConf.Interfaces {
  126. desc, err := parse(binding.SchemaType, binding.SchemaFile)
  127. if err != nil {
  128. return fmt.Errorf("Fail to parse schema file %s: %v", binding.SchemaFile, err)
  129. }
  130. // setting function alias
  131. aliasMap := make(map[string]string)
  132. for _, finfo := range binding.Functions {
  133. aliasMap[finfo.ServiceName] = finfo.Name
  134. }
  135. methods := desc.GetFunctions()
  136. functions := make([]string, len(methods))
  137. for i, f := range methods {
  138. fname := f
  139. if a, ok := aliasMap[f]; ok {
  140. fname = a
  141. }
  142. functions[i] = fname
  143. }
  144. info.Interfaces[name] = &interfaceInfo{
  145. Desc: binding.Description,
  146. Addr: binding.Address,
  147. Protocol: binding.Protocol,
  148. Schema: &schemaInfo{
  149. SchemaType: binding.SchemaType,
  150. SchemaFile: binding.SchemaFile,
  151. },
  152. Functions: functions,
  153. Options: binding.Options,
  154. }
  155. for i, f := range functions {
  156. err := m.functionKV.Set(f, &functionContainer{
  157. ServiceName: serviceName,
  158. InterfaceName: name,
  159. MethodName: methods[i],
  160. })
  161. if err != nil {
  162. kconf.Log.Errorf("fail to save the function mapping for %s, the function is not available: %v", f, err)
  163. }
  164. }
  165. }
  166. err = m.serviceKV.Set(serviceName, info)
  167. if err != nil {
  168. return fmt.Errorf("fail to save the parsing result: %v", err)
  169. }
  170. return nil
  171. }
  172. // Start Implement FunctionRegister
  173. func (m *Manager) HasFunction(name string) bool {
  174. _, ok := m.getFunction(name)
  175. kconf.Log.Debugf("found external function %s? %v ", name, ok)
  176. return ok
  177. }
  178. func (m *Manager) Function(name string) (api.Function, error) {
  179. f, ok := m.getFunction(name)
  180. if !ok {
  181. return nil, fmt.Errorf("service function %s not found", name)
  182. }
  183. s, ok := m.getService(f.ServiceName)
  184. if !ok {
  185. return nil, fmt.Errorf("service function %s's service %s not found", name, f.ServiceName)
  186. }
  187. i, ok := s.Interfaces[f.InterfaceName]
  188. if !ok {
  189. return nil, fmt.Errorf("service function %s's interface %s not found", name, f.InterfaceName)
  190. }
  191. // executor is gotten from pool, so all externalFuncs with the same interface share the same executor instance
  192. e, err := m.getExecutor(f.InterfaceName, i)
  193. if err != nil {
  194. return nil, fmt.Errorf("fail to initiate the executor for %s: %v", f.InterfaceName, err)
  195. }
  196. return &ExternalFunc{exe: e, methodName: f.MethodName}, nil
  197. }
  198. // End Implement FunctionRegister
  199. func (m *Manager) HasService(name string) bool {
  200. _, ok := m.getService(name)
  201. kconf.Log.Debugf("found external service %s? %v ", name, ok)
  202. return ok
  203. }
  204. func (m *Manager) getFunction(name string) (*functionContainer, bool) {
  205. var r *functionContainer
  206. if t, ok := m.functionBuf.Load(name); ok {
  207. r = t.(*functionContainer)
  208. return r, ok
  209. } else {
  210. r = &functionContainer{}
  211. ok, err := m.functionKV.Get(name, r)
  212. if err != nil {
  213. kconf.Log.Errorf("failed to get service function %s from kv: %v", name, err)
  214. return nil, false
  215. }
  216. if ok {
  217. m.functionBuf.Store(name, r)
  218. }
  219. return r, ok
  220. }
  221. }
  222. func (m *Manager) getService(name string) (*serviceInfo, bool) {
  223. var r *serviceInfo
  224. if t, ok := m.serviceBuf.Load(name); ok {
  225. r = t.(*serviceInfo)
  226. return r, ok
  227. } else {
  228. r = &serviceInfo{}
  229. ok, err := m.serviceKV.Get(name, r)
  230. if err != nil {
  231. kconf.Log.Errorf("failed to get service %s from kv: %v", name, err)
  232. return nil, false
  233. }
  234. if ok {
  235. m.serviceBuf.Store(name, r)
  236. }
  237. return r, ok
  238. }
  239. }
  240. // Each interface maps to an executor
  241. func (m *Manager) getExecutor(name string, info *interfaceInfo) (executor, error) {
  242. e, ok := m.executorPool.Load(name)
  243. if !ok {
  244. ne, err := NewExecutor(info)
  245. if err != nil {
  246. return nil, err
  247. }
  248. e, _ = m.executorPool.LoadOrStore(name, ne)
  249. }
  250. return e.(executor), nil
  251. }
  252. func (m *Manager) deleteServiceFuncs(service string) error {
  253. if s, ok := m.getService(service); ok {
  254. for _, i := range s.Interfaces {
  255. for _, f := range i.Functions {
  256. _ = m.deleteFunc(service, f)
  257. }
  258. }
  259. }
  260. return nil
  261. }
  262. func (m *Manager) deleteFunc(service, name string) error {
  263. f, err := m.GetFunction(name)
  264. if err != nil {
  265. return err
  266. }
  267. if f.ServiceName == service {
  268. m.functionBuf.Delete(name)
  269. m.functionKV.Delete(name)
  270. }
  271. return nil
  272. }
  273. // ** CRUD of the service files **
  274. type ServiceCreationRequest struct {
  275. Name string `json:"name"`
  276. File string `json:"file"`
  277. }
  278. func (m *Manager) List() ([]string, error) {
  279. return m.serviceKV.Keys()
  280. }
  281. func (m *Manager) Create(r *ServiceCreationRequest) error {
  282. name, uri := r.Name, r.File
  283. if ok, _ := m.serviceKV.Get(name, &serviceInfo{}); ok {
  284. return fmt.Errorf("service %s exist", name)
  285. }
  286. if !httpx.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  287. return fmt.Errorf("invalid file path %s", uri)
  288. }
  289. zipPath := path.Join(m.etcDir, name+".zip")
  290. //clean up: delete zip file and unzip files in error
  291. defer os.Remove(zipPath)
  292. //download
  293. err := httpx.DownloadFile(zipPath, uri)
  294. if err != nil {
  295. return fmt.Errorf("fail to download file %s: %s", uri, err)
  296. }
  297. //unzip and copy to destination
  298. err = m.unzip(name, zipPath)
  299. if err != nil {
  300. return err
  301. }
  302. // init file to serviceKV
  303. return m.initFile(name + ".json")
  304. }
  305. func (m *Manager) Delete(name string) error {
  306. name = strings.Trim(name, " ")
  307. if name == "" {
  308. return fmt.Errorf("invalid name %s: should not be empty", name)
  309. }
  310. m.deleteServiceFuncs(name)
  311. m.serviceBuf.Delete(name)
  312. err := m.serviceKV.Delete(name)
  313. if err != nil {
  314. return err
  315. }
  316. path := path.Join(m.etcDir, name+".json")
  317. err = os.Remove(path)
  318. if err != nil {
  319. kconf.Log.Errorf("remove service json fails: %v", err)
  320. }
  321. return nil
  322. }
  323. func (m *Manager) Get(name string) (*serviceInfo, error) {
  324. name = strings.Trim(name, " ")
  325. if name == "" {
  326. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  327. }
  328. r, ok := m.getService(name)
  329. if !ok {
  330. return nil, fmt.Errorf("can't get the service %s", name)
  331. }
  332. return r, nil
  333. }
  334. func (m *Manager) Update(req *ServiceCreationRequest) error {
  335. err := m.Delete(req.Name)
  336. if err != nil {
  337. return err
  338. }
  339. return m.Create(req)
  340. }
  341. func (m *Manager) unzip(name, src string) error {
  342. r, err := zip.OpenReader(src)
  343. if err != nil {
  344. return err
  345. }
  346. defer r.Close()
  347. baseName := strings.ToLower(name + ".json")
  348. // Try unzip
  349. found := false
  350. for _, file := range r.File {
  351. if strings.ToLower(file.Name) == baseName {
  352. found = true
  353. break
  354. }
  355. }
  356. if !found {
  357. return fmt.Errorf("cannot find the json descriptor file %s for service", baseName)
  358. }
  359. // unzip
  360. for _, file := range r.File {
  361. err := filex.UnzipTo(file, path.Join(m.etcDir, file.Name))
  362. if err != nil {
  363. return err
  364. }
  365. }
  366. return nil
  367. }
  368. func (m *Manager) ListFunctions() ([]string, error) {
  369. return m.functionKV.Keys()
  370. }
  371. func (m *Manager) GetFunction(name string) (*functionContainer, error) {
  372. name = strings.Trim(name, " ")
  373. if name == "" {
  374. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  375. }
  376. r, ok := m.getFunction(name)
  377. if !ok {
  378. return nil, fmt.Errorf("can't get the service function %s", name)
  379. }
  380. return r, nil
  381. }