manager.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "archive/zip"
  17. "fmt"
  18. "os"
  19. "path"
  20. "path/filepath"
  21. "strings"
  22. "sync"
  23. kconf "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/pkg/filex"
  25. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  26. "github.com/lf-edge/ekuiper/internal/pkg/store"
  27. "github.com/lf-edge/ekuiper/pkg/api"
  28. "github.com/lf-edge/ekuiper/pkg/kv"
  29. )
  30. var (
  31. once sync.Once
  32. mutex sync.Mutex
  33. singleton *Manager //Do not call this directly, use GetServiceManager
  34. )
  35. type Manager struct {
  36. executorPool *sync.Map // The pool of executors
  37. loaded bool
  38. serviceBuf *sync.Map
  39. functionBuf *sync.Map
  40. etcDir string
  41. serviceKV kv.KeyValue
  42. functionKV kv.KeyValue
  43. }
  44. func InitManager() (*Manager, error) {
  45. mutex.Lock()
  46. defer mutex.Unlock()
  47. if singleton == nil {
  48. dir := "data/services"
  49. if kconf.IsTesting {
  50. dir = "service/test"
  51. }
  52. etcDir, err := kconf.GetLoc(dir)
  53. if err != nil {
  54. return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
  55. }
  56. err, sdb := store.GetKV("services")
  57. if err != nil {
  58. return nil, fmt.Errorf("cannot open service db: %s", err)
  59. }
  60. err, fdb := store.GetKV("serviceFuncs")
  61. if err != nil {
  62. return nil, fmt.Errorf("cannot open function db: %s", err)
  63. }
  64. singleton = &Manager{
  65. executorPool: &sync.Map{},
  66. serviceBuf: &sync.Map{},
  67. functionBuf: &sync.Map{},
  68. etcDir: etcDir,
  69. serviceKV: sdb,
  70. functionKV: fdb,
  71. }
  72. }
  73. if !singleton.loaded && !kconf.IsTesting { // To boost the testing perf
  74. err := singleton.InitByFiles()
  75. return singleton, err
  76. }
  77. return singleton, nil
  78. }
  79. func GetManager() *Manager {
  80. return singleton
  81. }
  82. /**
  83. * This function will parse the service definition json files in etc/services.
  84. * It will validate all json files and their schemaFiles. If invalid, it just prints
  85. * an error log and ignore. So it is possible that only valid service definition are
  86. * parsed and available.
  87. *
  88. * NOT threadsafe, must run in lock
  89. */
  90. func (m *Manager) InitByFiles() error {
  91. kconf.Log.Debugf("init service manager")
  92. files, err := os.ReadDir(m.etcDir)
  93. if nil != err {
  94. return err
  95. }
  96. // Parse schemas in batch. So we have 2 loops. First loop to collect files and the second to save the result.
  97. for _, file := range files {
  98. baseName := filepath.Base(file.Name())
  99. if filepath.Ext(baseName) == ".json" {
  100. err := m.initFile(baseName)
  101. if err != nil {
  102. kconf.Log.Errorf("%v", err)
  103. continue
  104. }
  105. }
  106. }
  107. m.loaded = true
  108. return nil
  109. }
  110. func (m *Manager) initFile(baseName string) error {
  111. serviceConf := &conf{}
  112. err := filex.ReadJsonUnmarshal(filepath.Join(m.etcDir, baseName), serviceConf)
  113. if err != nil {
  114. return fmt.Errorf("parse services file %s failed: %v", baseName, err)
  115. }
  116. //TODO validate serviceConf
  117. serviceName := baseName[0 : len(baseName)-5]
  118. info := &serviceInfo{
  119. About: serviceConf.About,
  120. Interfaces: make(map[string]*interfaceInfo),
  121. }
  122. for name, binding := range serviceConf.Interfaces {
  123. desc, err := parse(binding.SchemaType, binding.SchemaFile)
  124. if err != nil {
  125. return fmt.Errorf("Fail to parse schema file %s: %v", binding.SchemaFile, err)
  126. }
  127. // setting function alias
  128. aliasMap := make(map[string]string)
  129. for _, finfo := range binding.Functions {
  130. aliasMap[finfo.ServiceName] = finfo.Name
  131. }
  132. methods := desc.GetFunctions()
  133. functions := make([]string, len(methods))
  134. for i, f := range methods {
  135. fname := f
  136. if a, ok := aliasMap[f]; ok {
  137. fname = a
  138. }
  139. functions[i] = fname
  140. }
  141. info.Interfaces[name] = &interfaceInfo{
  142. Desc: binding.Description,
  143. Addr: binding.Address,
  144. Protocol: binding.Protocol,
  145. Schema: &schemaInfo{
  146. SchemaType: binding.SchemaType,
  147. SchemaFile: binding.SchemaFile,
  148. },
  149. Functions: functions,
  150. Options: binding.Options,
  151. }
  152. for i, f := range functions {
  153. err := m.functionKV.Set(f, &functionContainer{
  154. ServiceName: serviceName,
  155. InterfaceName: name,
  156. MethodName: methods[i],
  157. })
  158. if err != nil {
  159. kconf.Log.Errorf("fail to save the function mapping for %s, the function is not available: %v", f, err)
  160. }
  161. }
  162. }
  163. err = m.serviceKV.Set(serviceName, info)
  164. if err != nil {
  165. return fmt.Errorf("fail to save the parsing result: %v", err)
  166. }
  167. return nil
  168. }
  169. // Start Implement FunctionFactory
  170. func (m *Manager) HasFunctionSet(_ string) bool {
  171. return false
  172. }
  173. func (m *Manager) Function(name string) (api.Function, error) {
  174. f, ok := m.getFunction(name)
  175. if !ok {
  176. return nil, fmt.Errorf("service function %s not found", name)
  177. }
  178. s, ok := m.getService(f.ServiceName)
  179. if !ok {
  180. return nil, fmt.Errorf("service function %s's service %s not found", name, f.ServiceName)
  181. }
  182. i, ok := s.Interfaces[f.InterfaceName]
  183. if !ok {
  184. return nil, fmt.Errorf("service function %s's interface %s not found", name, f.InterfaceName)
  185. }
  186. // executor is gotten from pool, so all externalFuncs with the same interface share the same executor instance
  187. e, err := m.getExecutor(f.InterfaceName, i)
  188. if err != nil {
  189. return nil, fmt.Errorf("fail to initiate the executor for %s: %v", f.InterfaceName, err)
  190. }
  191. return &ExternalFunc{exe: e, methodName: f.MethodName}, nil
  192. }
  193. func (m *Manager) ConvName(funcName string) (string, bool) {
  194. _, ok := m.getFunction(funcName)
  195. return funcName, ok
  196. }
  197. // End Implement FunctionFactory
  198. func (m *Manager) HasService(name string) bool {
  199. _, ok := m.getService(name)
  200. kconf.Log.Debugf("found external service %s? %v ", name, ok)
  201. return ok
  202. }
  203. func (m *Manager) getFunction(name string) (*functionContainer, bool) {
  204. var r *functionContainer
  205. if t, ok := m.functionBuf.Load(name); ok {
  206. r = t.(*functionContainer)
  207. return r, ok
  208. } else {
  209. r = &functionContainer{}
  210. ok, err := m.functionKV.Get(name, r)
  211. if err != nil {
  212. kconf.Log.Errorf("failed to get service function %s from kv: %v", name, err)
  213. return nil, false
  214. }
  215. if ok {
  216. m.functionBuf.Store(name, r)
  217. }
  218. return r, ok
  219. }
  220. }
  221. func (m *Manager) getService(name string) (*serviceInfo, bool) {
  222. var r *serviceInfo
  223. if t, ok := m.serviceBuf.Load(name); ok {
  224. r = t.(*serviceInfo)
  225. return r, ok
  226. } else {
  227. r = &serviceInfo{}
  228. ok, err := m.serviceKV.Get(name, r)
  229. if err != nil {
  230. kconf.Log.Errorf("failed to get service %s from kv: %v", name, err)
  231. return nil, false
  232. }
  233. if ok {
  234. m.serviceBuf.Store(name, r)
  235. }
  236. return r, ok
  237. }
  238. }
  239. // Each interface maps to an executor
  240. func (m *Manager) getExecutor(name string, info *interfaceInfo) (executor, error) {
  241. e, ok := m.executorPool.Load(name)
  242. if !ok {
  243. ne, err := NewExecutor(info)
  244. if err != nil {
  245. return nil, err
  246. }
  247. e, _ = m.executorPool.LoadOrStore(name, ne)
  248. }
  249. return e.(executor), nil
  250. }
  251. func (m *Manager) deleteServiceFuncs(service string) error {
  252. if s, ok := m.getService(service); ok {
  253. for _, i := range s.Interfaces {
  254. for _, f := range i.Functions {
  255. _ = m.deleteFunc(service, f)
  256. }
  257. }
  258. }
  259. return nil
  260. }
  261. func (m *Manager) deleteFunc(service, name string) error {
  262. f, err := m.GetFunction(name)
  263. if err != nil {
  264. return err
  265. }
  266. if f.ServiceName == service {
  267. m.functionBuf.Delete(name)
  268. m.functionKV.Delete(name)
  269. }
  270. return nil
  271. }
  272. // ** CRUD of the service files **
  273. type ServiceCreationRequest struct {
  274. Name string `json:"name"`
  275. File string `json:"file"`
  276. }
  277. func (m *Manager) List() ([]string, error) {
  278. return m.serviceKV.Keys()
  279. }
  280. func (m *Manager) Create(r *ServiceCreationRequest) error {
  281. name, uri := r.Name, r.File
  282. if ok, _ := m.serviceKV.Get(name, &serviceInfo{}); ok {
  283. return fmt.Errorf("service %s exist", name)
  284. }
  285. if !httpx.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  286. return fmt.Errorf("invalid file path %s", uri)
  287. }
  288. zipPath := path.Join(m.etcDir, name+".zip")
  289. //clean up: delete zip file and unzip files in error
  290. defer os.Remove(zipPath)
  291. //download
  292. err := httpx.DownloadFile(zipPath, uri)
  293. if err != nil {
  294. return fmt.Errorf("fail to download file %s: %s", uri, err)
  295. }
  296. //unzip and copy to destination
  297. err = m.unzip(name, zipPath)
  298. if err != nil {
  299. return err
  300. }
  301. // init file to serviceKV
  302. return m.initFile(name + ".json")
  303. }
  304. func (m *Manager) Delete(name string) error {
  305. name = strings.Trim(name, " ")
  306. if name == "" {
  307. return fmt.Errorf("invalid name %s: should not be empty", name)
  308. }
  309. m.deleteServiceFuncs(name)
  310. m.serviceBuf.Delete(name)
  311. err := m.serviceKV.Delete(name)
  312. if err != nil {
  313. return err
  314. }
  315. path := path.Join(m.etcDir, name+".json")
  316. err = os.Remove(path)
  317. if err != nil {
  318. kconf.Log.Errorf("remove service json fails: %v", err)
  319. }
  320. return nil
  321. }
  322. func (m *Manager) Get(name string) (*serviceInfo, error) {
  323. name = strings.Trim(name, " ")
  324. if name == "" {
  325. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  326. }
  327. r, ok := m.getService(name)
  328. if !ok {
  329. return nil, fmt.Errorf("can't get the service %s", name)
  330. }
  331. return r, nil
  332. }
  333. func (m *Manager) Update(req *ServiceCreationRequest) error {
  334. err := m.Delete(req.Name)
  335. if err != nil {
  336. return err
  337. }
  338. return m.Create(req)
  339. }
  340. func (m *Manager) unzip(name, src string) error {
  341. r, err := zip.OpenReader(src)
  342. if err != nil {
  343. return err
  344. }
  345. defer r.Close()
  346. baseName := name + ".json"
  347. // Try unzip
  348. found := false
  349. for _, file := range r.File {
  350. if strings.EqualFold(file.Name, baseName) {
  351. found = true
  352. break
  353. }
  354. }
  355. if !found {
  356. return fmt.Errorf("cannot find the json descriptor file %s for service", baseName)
  357. }
  358. // unzip
  359. for _, file := range r.File {
  360. err := filex.UnzipTo(file, path.Join(m.etcDir, file.Name))
  361. if err != nil {
  362. return err
  363. }
  364. }
  365. return nil
  366. }
  367. func (m *Manager) ListFunctions() ([]string, error) {
  368. return m.functionKV.Keys()
  369. }
  370. func (m *Manager) GetFunction(name string) (*functionContainer, error) {
  371. name = strings.Trim(name, " ")
  372. if name == "" {
  373. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  374. }
  375. r, ok := m.getFunction(name)
  376. if !ok {
  377. return nil, fmt.Errorf("can't get the service function %s", name)
  378. }
  379. return r, nil
  380. }