manager.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package service
  2. import (
  3. "archive/zip"
  4. "fmt"
  5. kconf "github.com/emqx/kuiper/internal/conf"
  6. "github.com/emqx/kuiper/internal/pkg/filex"
  7. "github.com/emqx/kuiper/internal/pkg/httpx"
  8. "github.com/emqx/kuiper/pkg/api"
  9. "github.com/emqx/kuiper/pkg/kv"
  10. "io/ioutil"
  11. "os"
  12. "path"
  13. "path/filepath"
  14. "strings"
  15. "sync"
  16. )
  17. var (
  18. once sync.Once
  19. mutex sync.Mutex
  20. singleton *Manager //Do not call this directly, use GetServiceManager
  21. )
  22. type Manager struct {
  23. executorPool *sync.Map // The pool of executors
  24. loaded bool
  25. serviceBuf *sync.Map
  26. functionBuf *sync.Map
  27. etcDir string
  28. serviceKV kv.KeyValue
  29. functionKV kv.KeyValue
  30. }
  31. func GetServiceManager() (*Manager, error) {
  32. mutex.Lock()
  33. defer mutex.Unlock()
  34. if singleton == nil {
  35. dir := "etc/services"
  36. if kconf.IsTesting {
  37. dir = "service/test"
  38. }
  39. etcDir, err := kconf.GetLoc(dir)
  40. if err != nil {
  41. return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
  42. }
  43. dbDir, err := kconf.GetDataLoc()
  44. if err != nil {
  45. return nil, fmt.Errorf("cannot find db folder: %s", err)
  46. }
  47. sdb := kv.GetDefaultKVStore(path.Join(dbDir, "services"))
  48. fdb := kv.GetDefaultKVStore(path.Join(dbDir, "serviceFuncs"))
  49. err = sdb.Open()
  50. if err != nil {
  51. return nil, fmt.Errorf("cannot open service db: %s", err)
  52. }
  53. err = fdb.Open()
  54. if err != nil {
  55. return nil, fmt.Errorf("cannot open function db: %s", err)
  56. }
  57. singleton = &Manager{
  58. executorPool: &sync.Map{},
  59. serviceBuf: &sync.Map{},
  60. functionBuf: &sync.Map{},
  61. etcDir: etcDir,
  62. serviceKV: sdb,
  63. functionKV: fdb,
  64. }
  65. }
  66. if !singleton.loaded && !kconf.IsTesting { // To boost the testing perf
  67. err := singleton.InitByFiles()
  68. return singleton, err
  69. }
  70. return singleton, nil
  71. }
  72. /**
  73. * This function will parse the service definition json files in etc/services.
  74. * It will validate all json files and their schemaFiles. If invalid, it just prints
  75. * an error log and ignore. So it is possible that only valid service definition are
  76. * parsed and available.
  77. *
  78. * NOT threadsafe, must run in lock
  79. */
  80. func (m *Manager) InitByFiles() error {
  81. kconf.Log.Debugf("init service manager")
  82. files, err := ioutil.ReadDir(m.etcDir)
  83. if nil != err {
  84. return err
  85. }
  86. // Parse schemas in batch. So we have 2 loops. First loop to collect files and the second to save the result.
  87. for _, file := range files {
  88. baseName := filepath.Base(file.Name())
  89. if filepath.Ext(baseName) == ".json" {
  90. err := m.initFile(baseName)
  91. if err != nil {
  92. kconf.Log.Errorf("%v", err)
  93. continue
  94. }
  95. }
  96. }
  97. m.loaded = true
  98. return nil
  99. }
  100. func (m *Manager) initFile(baseName string) error {
  101. serviceConf := &conf{}
  102. err := filex.ReadJsonUnmarshal(filepath.Join(m.etcDir, baseName), serviceConf)
  103. if err != nil {
  104. return fmt.Errorf("parse services file %s failed: %v", baseName, err)
  105. }
  106. //TODO validate serviceConf
  107. serviceName := baseName[0 : len(baseName)-5]
  108. info := &serviceInfo{
  109. About: serviceConf.About,
  110. Interfaces: make(map[string]*interfaceInfo),
  111. }
  112. for name, binding := range serviceConf.Interfaces {
  113. desc, err := parse(binding.SchemaType, binding.SchemaFile)
  114. if err != nil {
  115. return fmt.Errorf("Fail to parse schema file %s: %v", binding.SchemaFile, err)
  116. }
  117. // setting function alias
  118. aliasMap := make(map[string]string)
  119. for _, finfo := range binding.Functions {
  120. aliasMap[finfo.ServiceName] = finfo.Name
  121. }
  122. methods := desc.GetFunctions()
  123. functions := make([]string, len(methods))
  124. for i, f := range methods {
  125. fname := f
  126. if a, ok := aliasMap[f]; ok {
  127. fname = a
  128. }
  129. functions[i] = fname
  130. }
  131. info.Interfaces[name] = &interfaceInfo{
  132. Desc: binding.Description,
  133. Addr: binding.Address,
  134. Protocol: binding.Protocol,
  135. Schema: &schemaInfo{
  136. SchemaType: binding.SchemaType,
  137. SchemaFile: binding.SchemaFile,
  138. },
  139. Functions: functions,
  140. Options: binding.Options,
  141. }
  142. for i, f := range functions {
  143. err := m.functionKV.Set(f, &functionContainer{
  144. ServiceName: serviceName,
  145. InterfaceName: name,
  146. MethodName: methods[i],
  147. })
  148. if err != nil {
  149. kconf.Log.Errorf("fail to save the function mapping for %s, the function is not available: %v", f, err)
  150. }
  151. }
  152. }
  153. err = m.serviceKV.Set(serviceName, info)
  154. if err != nil {
  155. return fmt.Errorf("fail to save the parsing result: %v", err)
  156. }
  157. return nil
  158. }
  159. // Start Implement FunctionRegister
  160. func (m *Manager) HasFunction(name string) bool {
  161. _, ok := m.getFunction(name)
  162. kconf.Log.Debugf("found external function %s? %v ", name, ok)
  163. return ok
  164. }
  165. func (m *Manager) Function(name string) (api.Function, error) {
  166. f, ok := m.getFunction(name)
  167. if !ok {
  168. return nil, fmt.Errorf("service function %s not found", name)
  169. }
  170. s, ok := m.getService(f.ServiceName)
  171. if !ok {
  172. return nil, fmt.Errorf("service function %s's service %s not found", name, f.ServiceName)
  173. }
  174. i, ok := s.Interfaces[f.InterfaceName]
  175. if !ok {
  176. return nil, fmt.Errorf("service function %s's interface %s not found", name, f.InterfaceName)
  177. }
  178. // executor is gotten from pool, so all externalFuncs with the same interface share the same executor instance
  179. e, err := m.getExecutor(f.InterfaceName, i)
  180. if err != nil {
  181. return nil, fmt.Errorf("fail to initiate the executor for %s: %v", f.InterfaceName, err)
  182. }
  183. return &ExternalFunc{exe: e, methodName: f.MethodName}, nil
  184. }
  185. // End Implement FunctionRegister
  186. func (m *Manager) HasService(name string) bool {
  187. _, ok := m.getService(name)
  188. kconf.Log.Debugf("found external service %s? %v ", name, ok)
  189. return ok
  190. }
  191. func (m *Manager) getFunction(name string) (*functionContainer, bool) {
  192. var r *functionContainer
  193. if t, ok := m.functionBuf.Load(name); ok {
  194. r = t.(*functionContainer)
  195. return r, ok
  196. } else {
  197. r = &functionContainer{}
  198. ok, err := m.functionKV.Get(name, r)
  199. if err != nil {
  200. kconf.Log.Errorf("failed to get service function %s from kv: %v", name, err)
  201. return nil, false
  202. }
  203. if ok {
  204. m.functionBuf.Store(name, r)
  205. }
  206. return r, ok
  207. }
  208. }
  209. func (m *Manager) getService(name string) (*serviceInfo, bool) {
  210. var r *serviceInfo
  211. if t, ok := m.serviceBuf.Load(name); ok {
  212. r = t.(*serviceInfo)
  213. return r, ok
  214. } else {
  215. r = &serviceInfo{}
  216. ok, err := m.serviceKV.Get(name, r)
  217. if err != nil {
  218. kconf.Log.Errorf("failed to get service %s from kv: %v", name, err)
  219. return nil, false
  220. }
  221. if ok {
  222. m.serviceBuf.Store(name, r)
  223. }
  224. return r, ok
  225. }
  226. }
  227. // Each interface maps to an executor
  228. func (m *Manager) getExecutor(name string, info *interfaceInfo) (executor, error) {
  229. e, ok := m.executorPool.Load(name)
  230. if !ok {
  231. ne, err := NewExecutor(info)
  232. if err != nil {
  233. return nil, err
  234. }
  235. e, _ = m.executorPool.LoadOrStore(name, ne)
  236. }
  237. return e.(executor), nil
  238. }
  239. func (m *Manager) deleteServiceFuncs(service string) error {
  240. if s, ok := m.getService(service); ok {
  241. for _, i := range s.Interfaces {
  242. for _, f := range i.Functions {
  243. _ = m.deleteFunc(service, f)
  244. }
  245. }
  246. }
  247. return nil
  248. }
  249. func (m *Manager) deleteFunc(service, name string) error {
  250. f, err := m.GetFunction(name)
  251. if err != nil {
  252. return err
  253. }
  254. if f.ServiceName == service {
  255. m.functionBuf.Delete(name)
  256. m.functionKV.Delete(name)
  257. }
  258. return nil
  259. }
  260. // ** CRUD of the service files **
  261. type ServiceCreationRequest struct {
  262. Name string `json:"name"`
  263. File string `json:"file"`
  264. }
  265. func (m *Manager) List() ([]string, error) {
  266. return m.serviceKV.Keys()
  267. }
  268. func (m *Manager) Create(r *ServiceCreationRequest) error {
  269. name, uri := r.Name, r.File
  270. if ok, _ := m.serviceKV.Get(name, &serviceInfo{}); ok {
  271. return fmt.Errorf("service %s exist", name)
  272. }
  273. if !httpx.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  274. return fmt.Errorf("invalid file path %s", uri)
  275. }
  276. zipPath := path.Join(m.etcDir, name+".zip")
  277. //clean up: delete zip file and unzip files in error
  278. defer os.Remove(zipPath)
  279. //download
  280. err := httpx.DownloadFile(zipPath, uri)
  281. if err != nil {
  282. return fmt.Errorf("fail to download file %s: %s", uri, err)
  283. }
  284. //unzip and copy to destination
  285. err = m.unzip(name, zipPath)
  286. if err != nil {
  287. return err
  288. }
  289. // init file to serviceKV
  290. return m.initFile(name + ".json")
  291. }
  292. func (m *Manager) Delete(name string) error {
  293. name = strings.Trim(name, " ")
  294. if name == "" {
  295. return fmt.Errorf("invalid name %s: should not be empty", name)
  296. }
  297. m.deleteServiceFuncs(name)
  298. m.serviceBuf.Delete(name)
  299. err := m.serviceKV.Delete(name)
  300. if err != nil {
  301. return err
  302. }
  303. path := path.Join(m.etcDir, name+".json")
  304. err = os.Remove(path)
  305. if err != nil {
  306. kconf.Log.Errorf("remove service json fails: %v", err)
  307. }
  308. return nil
  309. }
  310. func (m *Manager) Get(name string) (*serviceInfo, error) {
  311. name = strings.Trim(name, " ")
  312. if name == "" {
  313. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  314. }
  315. r, ok := m.getService(name)
  316. if !ok {
  317. return nil, fmt.Errorf("can't get the service %s", name)
  318. }
  319. return r, nil
  320. }
  321. func (m *Manager) Update(req *ServiceCreationRequest) error {
  322. err := m.Delete(req.Name)
  323. if err != nil {
  324. return err
  325. }
  326. return m.Create(req)
  327. }
  328. func (m *Manager) unzip(name, src string) error {
  329. r, err := zip.OpenReader(src)
  330. if err != nil {
  331. return err
  332. }
  333. defer r.Close()
  334. baseName := strings.ToLower(name + ".json")
  335. // Try unzip
  336. found := false
  337. for _, file := range r.File {
  338. if strings.ToLower(file.Name) == baseName {
  339. found = true
  340. break
  341. }
  342. }
  343. if !found {
  344. return fmt.Errorf("cannot find the json descriptor file %s for service", baseName)
  345. }
  346. // unzip
  347. for _, file := range r.File {
  348. err := filex.UnzipTo(file, path.Join(m.etcDir, file.Name))
  349. if err != nil {
  350. return err
  351. }
  352. }
  353. return nil
  354. }
  355. func (m *Manager) ListFunctions() ([]string, error) {
  356. return m.functionKV.Keys()
  357. }
  358. func (m *Manager) GetFunction(name string) (*functionContainer, error) {
  359. name = strings.Trim(name, " ")
  360. if name == "" {
  361. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  362. }
  363. r, ok := m.getFunction(name)
  364. if !ok {
  365. return nil, fmt.Errorf("can't get the service function %s", name)
  366. }
  367. return r, nil
  368. }