manager.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package services
  2. import (
  3. "archive/zip"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/common/kv"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "io/ioutil"
  9. "os"
  10. "path"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. )
  15. var (
  16. once sync.Once
  17. mutex sync.Mutex
  18. singleton *Manager //Do not call this directly, use GetServiceManager
  19. )
  20. type Manager struct {
  21. executorPool *sync.Map // The pool of executors
  22. loaded bool
  23. serviceBuf *sync.Map
  24. functionBuf *sync.Map
  25. etcDir string
  26. serviceKV kv.KeyValue
  27. functionKV kv.KeyValue
  28. }
  29. func GetServiceManager() (*Manager, error) {
  30. mutex.Lock()
  31. defer mutex.Unlock()
  32. if singleton == nil {
  33. dir := "etc/services"
  34. if common.IsTesting {
  35. dir = "services/test"
  36. }
  37. etcDir, err := common.GetLoc(dir)
  38. if err != nil {
  39. return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
  40. }
  41. dbDir, err := common.GetDataLoc()
  42. if err != nil {
  43. return nil, fmt.Errorf("cannot find db folder: %s", err)
  44. }
  45. sdb := kv.GetDefaultKVStore(path.Join(dbDir, "services"))
  46. fdb := kv.GetDefaultKVStore(path.Join(dbDir, "serviceFuncs"))
  47. err = sdb.Open()
  48. if err != nil {
  49. return nil, fmt.Errorf("cannot open service db: %s", err)
  50. }
  51. err = fdb.Open()
  52. if err != nil {
  53. return nil, fmt.Errorf("cannot open function db: %s", err)
  54. }
  55. singleton = &Manager{
  56. executorPool: &sync.Map{},
  57. serviceBuf: &sync.Map{},
  58. functionBuf: &sync.Map{},
  59. etcDir: etcDir,
  60. serviceKV: sdb,
  61. functionKV: fdb,
  62. }
  63. }
  64. if !singleton.loaded && !common.IsTesting { // To boost the testing perf
  65. err := singleton.InitByFiles()
  66. return singleton, err
  67. }
  68. return singleton, nil
  69. }
  70. /**
  71. * This function will parse the service definition json files in etc/services.
  72. * It will validate all json files and their schemaFiles. If invalid, it just prints
  73. * an error log and ignore. So it is possible that only valid service definition are
  74. * parsed and available.
  75. *
  76. * NOT threadsafe, must run in lock
  77. */
  78. func (m *Manager) InitByFiles() error {
  79. common.Log.Debugf("init service manager")
  80. files, err := ioutil.ReadDir(m.etcDir)
  81. if nil != err {
  82. return err
  83. }
  84. // Parse schemas in batch. So we have 2 loops. First loop to collect files and the second to save the result.
  85. for _, file := range files {
  86. baseName := filepath.Base(file.Name())
  87. if filepath.Ext(baseName) == ".json" {
  88. err := m.initFile(baseName)
  89. if err != nil {
  90. common.Log.Errorf("%v", err)
  91. continue
  92. }
  93. }
  94. }
  95. m.loaded = true
  96. return nil
  97. }
  98. func (m *Manager) initFile(baseName string) error {
  99. serviceConf := &conf{}
  100. err := common.ReadJsonUnmarshal(filepath.Join(m.etcDir, baseName), serviceConf)
  101. if err != nil {
  102. return fmt.Errorf("parse services file %s failed: %v", baseName, err)
  103. }
  104. //TODO validate serviceConf
  105. serviceName := baseName[0 : len(baseName)-5]
  106. info := &serviceInfo{
  107. About: serviceConf.About,
  108. Interfaces: make(map[string]*interfaceInfo),
  109. }
  110. for name, binding := range serviceConf.Interfaces {
  111. desc, err := parse(binding.SchemaType, binding.SchemaFile)
  112. if err != nil {
  113. return fmt.Errorf("Fail to parse schema file %s: %v", binding.SchemaFile, err)
  114. }
  115. // setting function alias
  116. aliasMap := make(map[string]string)
  117. for _, finfo := range binding.Functions {
  118. aliasMap[finfo.ServiceName] = finfo.Name
  119. }
  120. methods := desc.GetFunctions()
  121. functions := make([]string, len(methods))
  122. for i, f := range methods {
  123. fname := f
  124. if a, ok := aliasMap[f]; ok {
  125. fname = a
  126. }
  127. functions[i] = fname
  128. }
  129. info.Interfaces[name] = &interfaceInfo{
  130. Desc: binding.Description,
  131. Addr: binding.Address,
  132. Protocol: binding.Protocol,
  133. Schema: &schemaInfo{
  134. SchemaType: binding.SchemaType,
  135. SchemaFile: binding.SchemaFile,
  136. },
  137. Functions: functions,
  138. Options: binding.Options,
  139. }
  140. for i, f := range functions {
  141. err := m.functionKV.Set(f, &functionContainer{
  142. ServiceName: serviceName,
  143. InterfaceName: name,
  144. MethodName: methods[i],
  145. })
  146. if err != nil {
  147. common.Log.Errorf("fail to save the function mapping for %s, the function is not available: %v", f, err)
  148. }
  149. }
  150. }
  151. err = m.serviceKV.Set(serviceName, info)
  152. if err != nil {
  153. return fmt.Errorf("fail to save the parsing result: %v", err)
  154. }
  155. return nil
  156. }
  157. // Start Implement FunctionRegister
  158. func (m *Manager) HasFunction(name string) bool {
  159. _, ok := m.getFunction(name)
  160. common.Log.Debugf("found external function %s? %v ", name, ok)
  161. return ok
  162. }
  163. func (m *Manager) Function(name string) (api.Function, error) {
  164. f, ok := m.getFunction(name)
  165. if !ok {
  166. return nil, fmt.Errorf("service function %s not found", name)
  167. }
  168. s, ok := m.getService(f.ServiceName)
  169. if !ok {
  170. return nil, fmt.Errorf("service function %s's service %s not found", name, f.ServiceName)
  171. }
  172. i, ok := s.Interfaces[f.InterfaceName]
  173. if !ok {
  174. return nil, fmt.Errorf("service function %s's interface %s not found", name, f.InterfaceName)
  175. }
  176. // executor is gotten from pool, so all externalFuncs with the same interface share the same executor instance
  177. e, err := m.getExecutor(f.InterfaceName, i)
  178. if err != nil {
  179. return nil, fmt.Errorf("fail to initiate the executor for %s: %v", f.InterfaceName, err)
  180. }
  181. return &ExternalFunc{exe: e, methodName: f.MethodName}, nil
  182. }
  183. // End Implement FunctionRegister
  184. func (m *Manager) HasService(name string) bool {
  185. _, ok := m.getService(name)
  186. common.Log.Debugf("found external service %s? %v ", name, ok)
  187. return ok
  188. }
  189. func (m *Manager) getFunction(name string) (*functionContainer, bool) {
  190. var r *functionContainer
  191. if t, ok := m.functionBuf.Load(name); ok {
  192. r = t.(*functionContainer)
  193. return r, ok
  194. } else {
  195. r = &functionContainer{}
  196. ok, err := m.functionKV.Get(name, r)
  197. if err != nil {
  198. common.Log.Errorf("failed to get service function %s from kv: %v", name, err)
  199. return nil, false
  200. }
  201. if ok {
  202. m.functionBuf.Store(name, r)
  203. }
  204. return r, ok
  205. }
  206. }
  207. func (m *Manager) getService(name string) (*serviceInfo, bool) {
  208. var r *serviceInfo
  209. if t, ok := m.serviceBuf.Load(name); ok {
  210. r = t.(*serviceInfo)
  211. return r, ok
  212. } else {
  213. r = &serviceInfo{}
  214. ok, err := m.serviceKV.Get(name, r)
  215. if err != nil {
  216. common.Log.Errorf("failed to get service %s from kv: %v", name, err)
  217. return nil, false
  218. }
  219. if ok {
  220. m.serviceBuf.Store(name, r)
  221. }
  222. return r, ok
  223. }
  224. }
  225. // Each interface maps to an executor
  226. func (m *Manager) getExecutor(name string, info *interfaceInfo) (executor, error) {
  227. e, ok := m.executorPool.Load(name)
  228. if !ok {
  229. ne, err := NewExecutor(info)
  230. if err != nil {
  231. return nil, err
  232. }
  233. e, _ = m.executorPool.LoadOrStore(name, ne)
  234. }
  235. return e.(executor), nil
  236. }
  237. func (m *Manager) deleteServiceFuncs(service string) error {
  238. if s, ok := m.getService(service); ok {
  239. for _, i := range s.Interfaces {
  240. for _, f := range i.Functions {
  241. _ = m.deleteFunc(service, f)
  242. }
  243. }
  244. }
  245. return nil
  246. }
  247. func (m *Manager) deleteFunc(service, name string) error {
  248. f, err := m.GetFunction(name)
  249. if err != nil {
  250. return err
  251. }
  252. if f.ServiceName == service {
  253. m.functionBuf.Delete(name)
  254. m.functionKV.Delete(name)
  255. }
  256. return nil
  257. }
  258. // ** CRUD of the service files **
  259. type ServiceCreationRequest struct {
  260. Name string `json:"name"`
  261. File string `json:"file"`
  262. }
  263. func (m *Manager) List() ([]string, error) {
  264. return m.serviceKV.Keys()
  265. }
  266. func (m *Manager) Create(r *ServiceCreationRequest) error {
  267. name, uri := r.Name, r.File
  268. if ok, _ := m.serviceKV.Get(name, &serviceInfo{}); ok {
  269. return fmt.Errorf("service %s exist", name)
  270. }
  271. if !common.IsValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
  272. return fmt.Errorf("invalid file path %s", uri)
  273. }
  274. zipPath := path.Join(m.etcDir, name+".zip")
  275. //clean up: delete zip file and unzip files in error
  276. defer os.Remove(zipPath)
  277. //download
  278. err := common.DownloadFile(zipPath, uri)
  279. if err != nil {
  280. return fmt.Errorf("fail to download file %s: %s", uri, err)
  281. }
  282. //unzip and copy to destination
  283. err = m.unzip(name, zipPath)
  284. if err != nil {
  285. return err
  286. }
  287. // init file to serviceKV
  288. return m.initFile(name + ".json")
  289. }
  290. func (m *Manager) Delete(name string) error {
  291. name = strings.Trim(name, " ")
  292. if name == "" {
  293. return fmt.Errorf("invalid name %s: should not be empty", name)
  294. }
  295. m.deleteServiceFuncs(name)
  296. m.serviceBuf.Delete(name)
  297. err := m.serviceKV.Delete(name)
  298. if err != nil {
  299. return err
  300. }
  301. path := path.Join(m.etcDir, name+".json")
  302. err = os.Remove(path)
  303. if err != nil {
  304. common.Log.Errorf("remove service json fails: %v", err)
  305. }
  306. return nil
  307. }
  308. func (m *Manager) Get(name string) (*serviceInfo, error) {
  309. name = strings.Trim(name, " ")
  310. if name == "" {
  311. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  312. }
  313. r, ok := m.getService(name)
  314. if !ok {
  315. return nil, fmt.Errorf("can't get the service %s", name)
  316. }
  317. return r, nil
  318. }
  319. func (m *Manager) Update(req *ServiceCreationRequest) error {
  320. err := m.Delete(req.Name)
  321. if err != nil {
  322. return err
  323. }
  324. return m.Create(req)
  325. }
  326. func (m *Manager) unzip(name, src string) error {
  327. r, err := zip.OpenReader(src)
  328. if err != nil {
  329. return err
  330. }
  331. defer r.Close()
  332. baseName := strings.ToLower(name + ".json")
  333. // Try unzip
  334. found := false
  335. for _, file := range r.File {
  336. if strings.ToLower(file.Name) == baseName {
  337. found = true
  338. break
  339. }
  340. }
  341. if !found {
  342. return fmt.Errorf("cannot find the json descriptor file %s for service", baseName)
  343. }
  344. // unzip
  345. for _, file := range r.File {
  346. err := common.UnzipTo(file, path.Join(m.etcDir, file.Name))
  347. if err != nil {
  348. return err
  349. }
  350. }
  351. return nil
  352. }
  353. func (m *Manager) ListFunctions() ([]string, error) {
  354. return m.functionKV.Keys()
  355. }
  356. func (m *Manager) GetFunction(name string) (*functionContainer, error) {
  357. name = strings.Trim(name, " ")
  358. if name == "" {
  359. return nil, fmt.Errorf("invalid name %s: should not be empty", name)
  360. }
  361. r, ok := m.getFunction(name)
  362. if !ok {
  363. return nil, fmt.Errorf("can't get the service function %s", name)
  364. }
  365. return r, nil
  366. }